Skip to content

[python] Add consumer management for streaming progress#7349

Open
tub wants to merge 11 commits intoapache:masterfrom
tub:python-streaming-1c-consumer
Open

[python] Add consumer management for streaming progress#7349
tub wants to merge 11 commits intoapache:masterfrom
tub:python-streaming-1c-consumer

Conversation

@tub
Copy link
Contributor

@tub tub commented Mar 5, 2026

Summary

  • Add Consumer dataclass for tracking consumption progress (snapshot ID)
  • Add ConsumerManager for persisting, loading, and expiring consumers via file IO
  • Consumer state stored as JSON files under table's consumer/ directory

Stacked PR series

This is PR 1c/5 in the Python streaming read series:

  • PR 1a: Caching infrastructure + utilities
  • PR 1b: Scanners, sharding, row kind
  • PR 1c (this): Consumer management (~406 lines)
  • PR 2: Core streaming (AsyncStreamingTableScan)
  • PR 3: CLI (paimon tail)

Incremental diff (vs 1b): tub/paimon@python-streaming-1b-scanners...tub:paimon:python-streaming-1c-consumer (or wait until 1a & 1b are merged to compare)

Test plan

  • flake8 passes on all changed files
  • python -m pytest passes
  • New tests: consumer_test.py (11 tests)

tub and others added 3 commits March 5, 2026 14:43
- Add backtick quoting to Identifier for SQL-safe formatting
- Add ChangelogProducer enum to core_options
- Add exists_batch() for bulk file existence checks
- Add LRU caching to ManifestFileManager and ManifestListManager
- Add snapshot caching and traversal helpers to SnapshotManager
- Add cachetools dependency

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add FollowUpScanner hierarchy (base, delta, changelog)
- Add IncrementalDiffScanner for diff-based streaming reads
- Add sharding support to FileScanner
- Add row kind support to TableRead for changelog streams

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add Consumer dataclass for tracking consumption progress
- Add ConsumerManager for persisting/loading/expiring consumers
- Add unit tests for consumer operations

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
tub and others added 8 commits March 5, 2026 18:08
…tEquals

Extract shared base class for ManifestFileCacheTest and ManifestListCacheTest,
add _make_snapshot() helper, and fix deprecated assertEquals (removed in 3.12).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…rim docs, remove ChangelogProducer

- Upgrade cachetools to >=7,<8 for cachedmethod(info=True) support
- Remove ChangelogProducer enum (belongs in apache#7348 scanners branch)
- Replace manual cache hit/miss counters with @cachedmethod(info=True)
  decorator on ManifestFileManager, ManifestListManager, SnapshotManager
- Trim verbose docstrings across identifier, file_io, pyarrow_file_io,
  manifest_list_manager, and snapshot_manager
- Update cache tests to use cache_info() instead of manual counters

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…efault-size tests

- Move shared cache-behaviour tests (second_read, disabled_when_zero)
  into _CacheBehaviourMixin so they run for both manager types without
  duplication
- Extract _EMPTY_ROW / _EMPTY_STATS module constants to reduce
  DataFileMeta boilerplate
- Remove test_default_cache_size tests (just assert constructor defaults)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
cachetools 7.x requires Python >=3.10 but the project supports 3.6+.
Drop info=True and explicit key= from @cachedmethod (both 7.x-only
features) while keeping the decorator itself (available since 4.x).

Replace cache_info()-based test assertions with unittest.mock spies on
file_io.new_input_stream, testing the actual caching effect without any
production code counters.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…olidate tests, fix parallelism

- Collapse repetitive module/class/method docstrings to one-liners in all
  scanner files (follow_up_scanner, delta, changelog, incremental_diff)
- Remove TDD process commentary from test docstrings
- Consolidate DeltaFollowUpScanner false-case tests into one parameterized test
- Remove misleading commit_kind from ChangelogFollowUpScanner test mocks
- Extract duplicated mock helpers to module-level functions
- Fix max(8, ...) parallelism bug: respect user-configured parallelism
- Remove obvious/redundant inline comments
- Standardize license headers to comment style, merge double docstrings
- Add clarifying docstring to ManifestListManager.read_all

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…sistency

Align consumer module with the one-liner docstring style used across the rest of the streaming PR stack. Replace os.path.join with f-string path construction for consistency with paimon-python conventions. Add tests for _validate_consumer_id rejection cases.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant