A production-oriented Telegram ingestion platform with:
- Telethon-based collectors
- Redis atomic deduplication
- Redis Streams for decoupled ingestion
- PostgreSQL for durable storage
- Phase 2 quality pipeline (raw -> normalized -> entities)
- Scheduler/session assignment scaffolding
- Prometheus metrics endpoints
app/collector_v2.py: Telethon collector that publishes deduplicated records to Redis Streamapp/stream_worker_v2.py: Stream worker consuming from Redis and writing through the Phase 2 pipelineapp/scheduler.py: scheduler loop for session/channel assignment reconciliationapp/session_worker.py: session heartbeat workerapp/quality_processor.py: normalization, noise filtering, extraction, scoringapp/entity_resolver.py: entity upsert + linkingsql/001_schema.sql: PostgreSQL schemainfra/docker-compose.yml: PostgreSQL, Redis, Prometheus
Copy .env.example to .env and fill required values.
- Start infrastructure:
cd infra && docker compose --env-file ../.env up -d-
Apply schema to PostgreSQL.
-
Install app dependencies:
pip install -r requirements.txt- Start worker:
python -m app.main_worker- Start collector:
python -m app.main_collector- The system is designed for at-least-once delivery with idempotent database writes.
- Messages are ACKed only after successful pipeline handling.
XAUTOCLAIMis used to recover stale pending messages.