Conversation
🧹 Python Code Quality Check✅ No issues found in Python Files. This comment is auto-updated with every commit. |
There was a problem hiding this comment.
Pull Request Overview
This PR adds a new S3 Glacier-aware connector example that demonstrates how to sync S3 object metadata using boto3, with special handling for Glacier storage classes and their restoration status.
- Implements incremental sync using LastModified timestamps with state checkpointing
- Handles Glacier storage classes by inspecting restore headers via head_object calls
- Uses pagination for memory-efficient processing of large S3 buckets
Reviewed Changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 26 comments.
| File | Description |
|---|---|
| connectors/s3_glacier/connector.py | Main connector implementation with S3 listing, Glacier restore detection, and incremental sync logic |
| connectors/s3_glacier/configuration.json | Configuration template defining AWS credentials and bucket parameters |
| connectors/s3_glacier/requirements.txt | Declares boto3 and botocore dependencies |
| connectors/s3_glacier/README.md | Comprehensive documentation covering setup, authentication, features, and table schema |
Comments suppressed due to low confidence (1)
connectors/s3_glacier/connector.py:11
- Import of 'Optional' is not used.
from typing import Any, Dict, List, Optional
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
| @@ -0,0 +1,211 @@ | |||
| # Amazon S3 (Glacier-aware) Connector Example | |||
|
|
|||
| ## Connector overview | |||
There was a problem hiding this comment.
@fivetran-surabhisingh You need to list and link to the connector on the main README - https://github.com/fivetran/fivetran_connector_sdk/blob/main/README.md
connectors/s3_glacier/README.md
Outdated
| - Windows: 10 or later (64-bit only) | ||
| - macOS: 13 (Ventura) or later (Apple Silicon [arm64] or Intel [x86_64]) | ||
| - Linux: Distributions such as Ubuntu 20.04 or later, Debian 10 or later, or Amazon Linux 2 or later (arm64 or x86_64) | ||
| - Active **AWS account** with S3 access. |
There was a problem hiding this comment.
| - Active **AWS account** with S3 access. | |
| - Active AWS account with S3 access. |
There was a problem hiding this comment.
Pull Request Overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 13 comments.
Comments suppressed due to low confidence (1)
connectors/s3_glacier/connector.py:11
- Import of 'Optional' is not used.
from typing import Any, Dict, List, Optional
fivetran-sahilkhirwal
left a comment
There was a problem hiding this comment.
Please address these along with copilot and tech writer's review comments
connectors/s3_glacier/connector.py
Outdated
| log.info(f"Starting S3 sync bucket={bucket} prefix={prefix} watermark={watermark}") | ||
|
|
||
| new_wm = watermark | ||
| for row in _iterate_objects(s3, bucket, prefix, page_size): |
There was a problem hiding this comment.
Function name mismatch: calling _iterate_objects but the function is defined as iterate_objects (without leading underscore) on line 94. This will cause a NameError at runtime.
Change to: for row in iterate_objects(s3, bucket, prefix, page_size):
| for row in _iterate_objects(s3, bucket, prefix, page_size): | |
| for row in iterate_objects(s3, bucket, prefix, page_size): |
| "aws_secret_access_key": "<YOUR_AWS_SECRET_ACCESS_KEY>", | ||
| "aws_region": "<YOUR_AWS_REGION>", | ||
| "bucket": "<YOUR_S3_BUCKET_NAME>", | ||
| "prefix": "<OPTIONAL_S3_PREFIX>" |
There was a problem hiding this comment.
Configuration parameter mismatch with configuration.json. The README lists "prefix": "<OPTIONAL_S3_PREFIX>" as optional, but configuration.json shows "prefix": "<YOUR_S3_FOLDER_PATH>".
The placeholder descriptions should match. Use consistent wording:
- Either
<OPTIONAL_S3_PREFIX>in both files - Or
<YOUR_S3_FOLDER_PATH_OPTIONAL>in both files
Additionally, since prefix is optional (has a default value of "" in code), the configuration.json could either omit this field or clearly mark it as optional in the placeholder.
connectors/s3_glacier/connector.py
Outdated
| "GLACIER_IR", "GLACIER_FLEXIBLE_RETRIEVAL" | ||
| } | ||
|
|
||
| def iso(dt): |
There was a problem hiding this comment.
Missing type hint for parameter and return type. Function signature should be:
def iso(dt) -> str | None:Or if using older Python typing:
from typing import Optional
def iso(dt) -> Optional[str]:This improves code clarity and enables better IDE support.
connectors/s3_glacier/connector.py
Outdated
| aws_session_token=configuration.get("aws_session_token", ""), | ||
| region_name=configuration.get("aws_region"), | ||
| ) | ||
| return session.client("s3", config=BotoConfig(retries={"max_attempts": 10, "mode": "standard"})) |
There was a problem hiding this comment.
Magic number without constant definition. The retry count 10 should be defined as a constant at the module level:
__MAX_RETRY_ATTEMPTS = 10Then use it in the config:
config=BotoConfig(retries={"max_attempts": __MAX_RETRY_ATTEMPTS, "mode": "standard"})This makes the code more maintainable and the retry policy explicit.
| """ | ||
| Define the update function, which is a required function, and is called by Fivetran during each sync. | ||
| See the technical reference documentation for more details on the update function | ||
| https://fivetran.com/docs/connectors/connector-sdk/technical-reference#update | ||
| Args: | ||
| configuration: A dictionary containing connection details | ||
| state: A dictionary containing state information from previous runs | ||
| The state dictionary is empty for the first sync or for any full re-sync | ||
| """ |
There was a problem hiding this comment.
Incorrect docstring for update function. The docstring has extra text that doesn't match the required template. Lines 150-152 should be:
"""
Define the update function which lets you configure how your connector fetches data.
See the technical reference documentation for more details on the update function:
https://fivetran.com/docs/connectors/connector-sdk/technical-reference#update
Args:
configuration: a dictionary that holds the configuration settings for the connector.
state: a dictionary that holds the state of the connector.
"""Remove "which is a required function, and is called by Fivetran during each sync" and "The state dictionary is empty for the first sync or for any full re-sync" as these deviate from the required template.
fivetran-sahilkhirwal
left a comment
There was a problem hiding this comment.
Please rebase the PR and address the copilot comments and re-request the review :)
| "aws_secret_access_key": "<YOUR_AWS_SECRET_ACCESS_KEY>", | ||
| "aws_region": "<YOUR_AWS_REGION>", | ||
| "bucket": "<YOUR_S3_BUCKET_NAME>", | ||
| "prefix": "<OPTIONAL_S3_PREFIX>" |
There was a problem hiding this comment.
The prefix placeholder in the configuration example is inconsistent with the table below. Change <OPTIONAL_S3_PREFIX> to <YOUR_S3_FOLDER_PATH> to match the configuration.json file, or update both to use a more descriptive placeholder like <YOUR_S3_OBJECT_PREFIX_OPTIONAL>.
| ## Additional considerations | ||
| This example is provided to help teams integrate AWS S3 metadata and storage class history into their data pipelines. Fivetran makes no guarantees regarding support or maintenance. For assistance, contact Support or submit improvements via pull request. No newline at end of file |
There was a problem hiding this comment.
The "Additional considerations" section does not match the required disclaimer format. It must use this exact text:
## Additional considerations
The examples provided are intended to help you effectively use Fivetran's Connector SDK. While we've tested the code, Fivetran cannot be held responsible for any unexpected or negative consequences that may arise from using these examples. For inquiries, please reach out to our Support team.| | bucket | The target S3 bucket | Yes | | ||
| | prefix | Object prefix (folder) filter | No | | ||
|
|
||
| Note: Do not commit this file to source control. |
There was a problem hiding this comment.
The "Configuration file" section does not include the required warning about not versioning sensitive data. Add this note after the table:
Note: Ensure that the `configuration.json` file is not checked into version control to protect sensitive information.| session = boto3.session.Session( | ||
| aws_access_key_id=configuration.get("aws_access_key_id"), | ||
| aws_secret_access_key=configuration.get("aws_secret_access_key"), | ||
| aws_session_token=configuration.get("aws_session_token", ""), |
There was a problem hiding this comment.
Configuration field aws_session_token is used in the code but not declared in configuration.json. Either remove this field from the code if it's not needed, or add it to configuration.json with a placeholder value like "aws_session_token": "<YOUR_AWS_SESSION_TOKEN_OPTIONAL>".
There was a problem hiding this comment.
Please check this. This key is missing in the configuration.json. Please add it there as well as in the readme :)
| botocore==1.40.59 | ||
| ``` | ||
|
|
||
| Note: `fivetran_connector_sdk` and `requests` are pre-installed in the Fivetran runtime and should not be listed. |
There was a problem hiding this comment.
The "Requirements file" section does not include the required note about pre-installed packages. Replace line 58 with:
Note: The `fivetran_connector_sdk:latest` and `requests:latest` packages are pre-installed in the Fivetran environment. To avoid dependency conflicts, do not declare them in your `requirements.txt`.| # Required for SDK loader | ||
| connector = Connector(update=update, schema=schema) | ||
|
|
||
| # Entry point for local testing | ||
| if __name__ == "__main__": | ||
| with open("configuration.json", "r") as f: | ||
| configuration = json.load(f) | ||
| connector.debug(configuration=configuration) |
There was a problem hiding this comment.
The comment for the main block must use the exact required format. Replace with:
# Create the connector object using the schema and update functions
connector = Connector(update=update, schema=schema)
# Check if the script is being run as the main module.
# This is Python's standard entry method allowing your script to be run directly from the command line or IDE 'run' button.
# This is useful for debugging while you write your code. Note this method is not called by Fivetran when executing your connector in production.
# Please test using the Fivetran debug command prior to finalizing and deploying your connector.
if __name__ == "__main__":
# Open the configuration.json file and load its contents
with open("configuration.json", "r") as f:
configuration = json.load(f)
# Test the connector locally
connector.debug()There was a problem hiding this comment.
Please follow this. This is the standard for defining the main module in the connector SDK examples. The comments make it more readable for the users. please use this exact template :)
| | last_modified | UTC_DATETIME | Last modified timestamp | | ||
| | restore_status | STRING | Glacier restore status | | ||
| | restore_expiry | UTC_DATETIME | Glacier restore expiration time (if applicable) | | ||
| | _fivetran_deleted| BOOLEAN | Soft delete flag | |
There was a problem hiding this comment.
The _fivetran_deleted column is listed in the table schema, but it's not defined in the schema() function in connector.py. This is a system column automatically added by Fivetran and should not be included in connector documentation unless explicitly managed by the connector code. Remove this row from the table.
| | _fivetran_deleted| BOOLEAN | Soft delete flag | |
| new_wm = lm | ||
|
|
||
| if new_wm: | ||
| op.checkpoint({"s3_objects": {"last_modified": new_wm}}) |
There was a problem hiding this comment.
Missing required comment before op.checkpoint(). According to the coding guidelines, EVERY op.checkpoint() call must be preceded by this exact comment:
# Save the progress by checkpointing the state. This is important for ensuring that the sync process can resume
# from the correct position in case of next sync or interruptions.
# Learn more about how and where to checkpoint by reading our best practices documentation
# (https://fivetran.com/docs/connectors/connector-sdk/best-practices#largedatasetrecommendation).
op.checkpoint({"s3_objects": {"last_modified": new_wm}})There was a problem hiding this comment.
Please add this comment before checkpoint()
| import json # For reading configuration from JSON file | ||
| from datetime import datetime, timezone # For working with UTC timestamps | ||
| from typing import Any, Dict, List, Optional # Type hints | ||
|
|
||
| import boto3 # AWS SDK for Python to interact with S3 | ||
| from botocore.config import Config as BotoConfig # For setting retry and timeout configs | ||
| from botocore.exceptions import ClientError # Exception handling for AWS responses | ||
|
|
||
| from fivetran_connector_sdk import Connector # Core SDK functionality | ||
| from fivetran_connector_sdk import Logging as log # For logging | ||
| from fivetran_connector_sdk import Operations as op # For data operations (upsert, checkpoint) |
There was a problem hiding this comment.
| import json # For reading configuration from JSON file | |
| from datetime import datetime, timezone # For working with UTC timestamps | |
| from typing import Any, Dict, List, Optional # Type hints | |
| import boto3 # AWS SDK for Python to interact with S3 | |
| from botocore.config import Config as BotoConfig # For setting retry and timeout configs | |
| from botocore.exceptions import ClientError # Exception handling for AWS responses | |
| from fivetran_connector_sdk import Connector # Core SDK functionality | |
| from fivetran_connector_sdk import Logging as log # For logging | |
| from fivetran_connector_sdk import Operations as op # For data operations (upsert, checkpoint) | |
| # For reading configuration from JSON file | |
| import json | |
| # For working with UTC timestamps | |
| from datetime import datetime, timezone | |
| # Type hints | |
| from typing import Any, Dict, List, Optional | |
| # AWS SDK for Python to interact with S3 | |
| import boto3 | |
| # For setting retry and timeout configs | |
| from botocore.config import Config as BotoConfig | |
| # Exception handling for AWS responses | |
| from botocore.exceptions import ClientError | |
| # Import required classes from fivetran_connector_sdk | |
| from fivetran_connector_sdk import Connector | |
| # For enabling Logs in your connector code | |
| from fivetran_connector_sdk import Logging as log | |
| # For supporting Data operations like Upsert(), Update(), Delete() and checkpoint() | |
| from fivetran_connector_sdk import Operations as op |
fivetran-sahilkhirwal
left a comment
There was a problem hiding this comment.
Please check these review comments and re-request the review once resolved :)
| _GLACIER_CLASSES = {"GLACIER", "DEEP_ARCHIVE", "GLACIER_IR", "GLACIER_FLEXIBLE_RETRIEVAL"} | ||
|
|
||
| _MAX_RETRY_ATTEMPTS = 10 # Max retry attempts for AWS API | ||
| _CHECKPOINT_EVERY = 1000 # Frequency of checkpointing rows |
There was a problem hiding this comment.
| _GLACIER_CLASSES = {"GLACIER", "DEEP_ARCHIVE", "GLACIER_IR", "GLACIER_FLEXIBLE_RETRIEVAL"} | |
| _MAX_RETRY_ATTEMPTS = 10 # Max retry attempts for AWS API | |
| _CHECKPOINT_EVERY = 1000 # Frequency of checkpointing rows | |
| __GLACIER_CLASSES = {"GLACIER", "DEEP_ARCHIVE", "GLACIER_IR", "GLACIER_FLEXIBLE_RETRIEVAL"} | |
| __MAX_RETRY_ATTEMPTS = 10 # Max retry attempts for AWS API | |
| __CHECKPOINT_EVERY = 1000 # Frequency of checkpointing rows |
| session = boto3.session.Session( | ||
| aws_access_key_id=configuration.get("aws_access_key_id"), | ||
| aws_secret_access_key=configuration.get("aws_secret_access_key"), | ||
| aws_session_token=configuration.get("aws_session_token", ""), |
There was a problem hiding this comment.
Please check this. This key is missing in the configuration.json. Please add it there as well as in the readme :)
| session = boto3.session.Session( | ||
| aws_access_key_id=configuration.get("aws_access_key_id"), | ||
| aws_secret_access_key=configuration.get("aws_secret_access_key"), | ||
| aws_session_token=configuration.get("aws_session_token", ""), |
There was a problem hiding this comment.
Please remove the default to correctly send None for unavailable key values.
| def schema(_: dict) -> List[Dict[str, Any]]: | ||
| """ | ||
| Define the output schema for the connector. | ||
|
|
||
| Args: | ||
| _ (dict): Unused config. | ||
|
|
||
| Returns: | ||
| List[Dict[str, Any]]: Schema definition for s3_objects table. | ||
| """ |
There was a problem hiding this comment.
Please add this docstring instead of the one added. This is the standard docstring we use for schema() method.
| ] | ||
|
|
||
|
|
||
| def iterate_objects(s3, bucket: str, prefix: str, page_size: int): |
There was a problem hiding this comment.
This method has high cognitive complexity ( Required is less than 15 but this method has 22 ). please break this into smaller method for better readability and maintainability :)
|
|
||
| new_wm = watermark | ||
| for row in iterate_objects(s3, bucket, prefix, page_size): | ||
| lm = row.get("last_modified") |
There was a problem hiding this comment.
Please use descriptive variable names for better readability :)
| if watermark and lm and lm < watermark: | ||
| continue | ||
|
|
||
| op.upsert("s3_objects", row) |
There was a problem hiding this comment.
| op.upsert("s3_objects", row) | |
| # The 'upsert' operation is used to insert or update data in the destination table. | |
| # The op.upsert method is called with two arguments: | |
| # - The first argument is the name of the table to upsert the data into. | |
| # - The second argument is a dictionary containing the data to be upserted, | |
| op.upsert("s3_objects", row) |
| new_wm = watermark | ||
| for row in iterate_objects(s3, bucket, prefix, page_size): | ||
| lm = row.get("last_modified") | ||
| if watermark and lm and lm < watermark: |
There was a problem hiding this comment.
Also, One question: We are fetching all the data from the source and we are skipping all records before the watermark. Can we not fetch only required data from source instead of fetching everything?
| new_wm = lm | ||
|
|
||
| if new_wm: | ||
| op.checkpoint({"s3_objects": {"last_modified": new_wm}}) |
There was a problem hiding this comment.
Please add this comment before checkpoint()
| # Required for SDK loader | ||
| connector = Connector(update=update, schema=schema) | ||
|
|
||
| # Entry point for local testing | ||
| if __name__ == "__main__": | ||
| with open("configuration.json", "r") as f: | ||
| configuration = json.load(f) | ||
| connector.debug(configuration=configuration) |
There was a problem hiding this comment.
Please follow this. This is the standard for defining the main module in the connector SDK examples. The comments make it more readable for the users. please use this exact template :)
|
Surabhi Singh seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account. You have signed the CLA already but the status is still pending? Let us recheck it. |
Jira ticket
Closes
<ADD TICKET LINK HERE, EACH PR MUST BE LINKED TO A JIRA TICKET>Description of Change
<MENTION A SHORT DESCRIPTION OF YOUR CHANGES HERE>Testing
<MENTION ABOUT YOUR TESTING DETAILS HERE, ATTACH SCREENSHOTS IF NEEDED (WITHOUT PII)>Checklist
Some tips and links to help validate your PR:
fivetran debugcommand.