Skip to content

Streaming sync serialization#287

Open
bjester wants to merge 10 commits intolearningequality:release-v0.9.xfrom
bjester:streaming-sync
Open

Streaming sync serialization#287
bjester wants to merge 10 commits intolearningequality:release-v0.9.xfrom
bjester:streaming-sync

Conversation

@bjester
Copy link
Member

@bjester bjester commented Feb 13, 2026

Summary

  • This is step one in a complete revitalization of the sync pipeline
  • Adds new stream utilities for managing the processing of sync data in a streaming fashion
    • I looked at several external libraries. Finding a combo that was simple, but still supported python 3.6, was a real challenge. The closest I found was streamz, which I unfortunately opted against because it uses tornado
  • Refactors _serialize_into_store logic into individual classes built upon foundational stream utilities-- so much better for unit testing!
  • Reorganizes some dependent code into locations for shared access and no circular references
  • Adds typing-extensions for backported future typing features
  • Updates MorangoProfileController to use sync_filter kwarg instead of filter-- always bothered me it shadowed the built-in
  • Adds unit tests for new stream utilities and converted serialization code-- the serialization process as a whole has pretty good coverage
  • Replaces usage of _serialize_into_store with new serialize_into_store streaming replacement
  • The new approach does not use bulk_update as Django was observed to spend excessive time with it

Improvements

The changes were evaluated by installing the local version into Kolibri. A dedicated command was created within Kolibri to run solely the serialization step, and then the performance of that command was benchmarked.

Further investigation will be required to determine how to reduce the increased duration.

Case 1: existing large dataset

Kolibri was launched with a pre-existing database containing data for about 18,000 users.

Version: # users Memory Graph Peak Mem Duration
Before: 18k Screenshot from 2026-02-25 14-09-38 325.7 MB 12.49 sec
After: 18k Screenshot from 2026-02-25 14-09-04 93.5 MB 39.50 sec

Case 2: artificial 500 users

Kolibri's generateuserdata command was used to generate data for 500 users, which is the maximum the command currently supports.

Version: # users Memory Graph Peak Mem Duration
Before: 500 image 217.9MB 5.99 sec
After: 500 image 87.5 MB 12.21 sec

Case 3: large dataset reduced -- 1000 users

Since the generateuserdata command currently can only generate up to 500 users, the existing large dataset was trimmed down to 1000 users. After manually deleting the other users, kolibri manage was executed (no-op) to trigger Kolibri's FK integrity check which deletes the broken records. Note, this probably takes longer due to the deletions, which provides additional insights into the process, even though the deletion processing has not really changed.

Version: # users Memory Graph Peak Mem Duration
Before: 1000 image 308.1 MB 44.54 sec
After: 1000 image 87.9 MB 23.63 sec

Case 4: large dataset reduced -- 5000 users

Again, the existing large dataset was trimmed down, this time to 5000 users. Same situation with regards to deletion behavior as in Case (3)

Version: # users Memory Graph Peak Mem Duration
Before: 5000 image 339.7 MB 55.88 sec
After: 5000 image 92.3 MB 28.92 sec

How AI was used

  • To look for stream libraries
  • Multiple models/providers were used to prototype the stream utilities
  • To verify and correct type hinting
  • To add comments, edited afterwards
  • To create tests for streaming utilities (simplistic)
  • To bootstrap tests for the serialization stream utils, heavily refactored by me
  • To generate documentation

TODO

  • Have tests been written for the new code?
  • Has documentation been written/updated?
  • New dependencies (if any) added to requirements file

Reviewer guidance

  • Install the branch locally to Kolibri and perform some syncs with another local Kolibri

Issues addressed

Closes #192

@rtibbles rtibbles self-assigned this Feb 24, 2026
@bjester bjester marked this pull request as ready for review February 25, 2026 22:14
Copy link
Member

@rtibbles rtibbles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementation makes sense to me, and I can follow the mapping from existing operation code to the new stream architecture. The minimal changes to the existing operations tests give confidence against regressions.

The only thing I got hung up on was the names of the abstract base classes!

@abc.abstractmethod
def __call__(self, items: Iterable[Any]) -> Iterator:
"""Process the incoming iterable and yield output items."""
raise NotImplementedError
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small inconsistency here and below where raise NotImplementedError is used? Fairly sure it's not needed in addition to the abstractmethod decorator?


.. code-block:: python

source.pipe(transform1).pipe(transform2).end(sink)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not at all necessary, but the thought of being able to construct the pipeline with pipe operators amused me!

source | transform1 | transform2 | sink

pass


class ReaderModule(abc.ABC):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason this isn't a subclass of StreamModule?

pass


class PipelineModule(StreamModule):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe TransformModule? Or seeing that you use that more specifically below OperatorModule?

pass


class Pipeline(ReaderModule):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the use of Pipeline here and PipelineModule above does make it feel more confusing to me.

stores_to_update.append(created_store)

if stores_to_update:
# TODO: bulk_update performs poorly-- is there a better way?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This library claims 8x speed up over bulk_update - but also doesn't seem to be hugely well maintained, so might be useful to look at for inspiration rather than usage! https://github.com/netzkolchose/django-fast-update

"djangorestframework>3.10",
"django-ipware==4.0.2",
"requests",
"typing-extensions==4.1.1",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this was purposeful, but flagging that this is precisely the same version of typing-extensions that Kolibri bundles (although it's still not quite clear to me what requires it, as it's not a direct dependency).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add chunking to serializing models into Store

2 participants