Skip to content

Add consumer to send msgs to datalake#536

Draft
lucaslinhares wants to merge 10 commits intomainfrom
feature/add-weni-datalake-sdk-lib
Draft

Add consumer to send msgs to datalake#536
lucaslinhares wants to merge 10 commits intomainfrom
feature/add-weni-datalake-sdk-lib

Conversation

@lucaslinhares
Copy link
Copy Markdown

No description provided.

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @lucaslinhares, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

Summary of Changes

This pull request adds a consumer to send messages to a data lake. It includes changes to the poetry.lock and pyproject.toml files to update dependencies, and adds new files temba/msgs/consumers/__init__.py, temba/msgs/consumers/msg_consumer.py, and temba/msgs/handle.py to implement the consumer logic.

Highlights

  • Dependency Updates: The poetry.lock and pyproject.toml files are updated to include new dependencies such as exceptiongroup, grpcio, grpcio-health-checking, grpcio-tools, iniconfig, jinja2, markupsafe, moto, pluggy, protobuf, pytest, responses, tomli, weni-datalake-sdk, werkzeug, and xmltodict. Versions of existing dependencies are also updated.
  • New Consumer Implementation: The temba/msgs/consumers/msg_consumer.py file implements a new consumer, MsgConsumer, which inherits from EDAConsumer. This consumer parses JSON messages and sends the data to a data lake using the weni_datalake_sdk.
  • Consumer Registration: The temba/msgs/handle.py file registers the new MsgConsumer to consume messages from the flows.msgs queue.

Changelog

Click here to see the changelog
  • poetry.lock
    • Adds new dependencies: exceptiongroup, grpcio, grpcio-health-checking, grpcio-tools, iniconfig, jinja2, markupsafe, moto, pluggy, protobuf, pytest, responses, tomli, weni-datalake-sdk, werkzeug, and xmltodict.
    • Updates versions of existing dependencies.
    • Updates content hash.
  • pyproject.toml
    • Updates version of weni-rp-apps from 2.9.0 to 2.9.1a1.
    • Adds moto and weni-datalake-sdk as dependencies.
    • Updates version of responses to be >=0.13.0
  • temba/msgs/consumers/init.py
    • Adds import statement for MsgConsumer.
  • temba/msgs/consumers/msg_consumer.py
    • Implements MsgConsumer class that inherits from EDAConsumer.
    • Defines consume method to parse JSON messages and send data to a data lake.
  • temba/msgs/handle.py
    • Registers MsgConsumer to consume messages from the flows.msgs queue.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.


A consumer awakes, with purpose defined,
To ingest the data, of every kind.
From queues it shall listen, with diligent ear,
To send to the lake, banishing fear.
With SDK in hand, and paths to explore,
The data will land, forevermore.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a consumer to send messages to a data lake. The changes involve adding new dependencies, updating existing ones, and creating new files for the consumer logic. Overall, the changes seem reasonable, but there are a few points that need attention.

Summary of Findings

  • Dependency Updates: Several dependencies have been updated, including grpcio and protobuf. It's crucial to ensure that these updates are compatible with the existing codebase and that any breaking changes are addressed.
  • New Dependencies: New dependencies like exceptiongroup, grpcio-health-checking, moto, and weni-datalake-sdk have been added. It's important to verify the necessity and security of these dependencies.
  • Error Handling: The MsgConsumer includes basic error handling using sentry_sdk. Consider adding more robust error handling and logging to ensure message delivery and data integrity.

Merge Readiness

The pull request introduces significant changes, including dependency updates and new consumer logic. While the changes seem generally well-structured, it's crucial to address the identified issues before merging. Specifically, the dependency updates need thorough verification, and the error handling in MsgConsumer should be enhanced. I am unable to approve this pull request, and recommend that it not be merged until these issues are addressed (at a minimum), and that others review and approve this code before merging.

Comment on lines +20 to +23
except Exception as exception:
capture_exception(exception)
message.channel.basic_reject(message.delivery_tag, requeue=False)
print(f"[MsgConsumer] - Message rejected by: {exception}")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The except block captures all exceptions. While this prevents the consumer from crashing, it might mask specific errors that should be handled differently. Consider catching more specific exception types and handling them accordingly. For example, you might want to retry sending the message if it fails due to a temporary network issue, but reject it immediately if it fails due to invalid data.
Also, consider adding more context to the exception message, such as the message ID or other relevant identifiers.

except JSONDecodeError as e:
    capture_exception(e)
    message.channel.basic_reject(message.delivery_tag, requeue=False)
    logger.error(f"[MsgConsumer] - JSONDecodeError: {e} for message with delivery_tag: {message.delivery_tag}")
except Exception as exception:
    capture_exception(exception)
    message.channel.basic_reject(message.delivery_tag, requeue=False)
    logger.exception(f"[MsgConsumer] - Message rejected due to an unexpected error: {exception} for message with delivery_tag: {message.delivery_tag}")

Comment on lines +11 to +13
def consume(self, message: amqp.Message): # pragma: no cover
print(f"[MsgConsumer] - Consuming a message. Body: {message.body}")
try:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Consider adding more context to the print statement, such as the message ID or other relevant identifiers, to aid in debugging and monitoring.
Also, it might be beneficial to log the message body using a structured logging approach (e.g., using logger.info with extra context) instead of a simple print statement.

logger.info(f"[MsgConsumer] - Consuming message with delivery_tag: {message.delivery_tag}", extra={"message_body": message.body})

Comment thread temba/msgs/consumers/msg_consumer.py Outdated
Comment on lines +15 to +16
data = dict(body)
send_data(MsgPath, data)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

It's generally a good practice to log the data being sent to the data lake, especially in production environments, to facilitate debugging and auditing. Consider adding a log statement before calling send_data.

logger.debug(f"[MsgConsumer] - Sending data to datalake: {data}")
send_data(MsgPath, data)

@codecov
Copy link
Copy Markdown

codecov bot commented Mar 26, 2025

Codecov Report

❌ Patch coverage is 63.63636% with 4 lines in your changes missing coverage. Please review.
✅ Project coverage is 99.04%. Comparing base (29bd682) to head (c75d120).
⚠️ Report is 733 commits behind head on main.

Files with missing lines Patch % Lines
temba/msgs/handle.py 0.00% 4 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #536      +/-   ##
==========================================
- Coverage   99.05%   99.04%   -0.02%     
==========================================
  Files         574      576       +2     
  Lines       29616    29627      +11     
==========================================
+ Hits        29336    29343       +7     
- Misses        280      284       +4     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@sonarqubecloud
Copy link
Copy Markdown

@sonarqubecloud
Copy link
Copy Markdown

@lucaslinhares lucaslinhares marked this pull request as draft March 4, 2026 20:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant