Skip to content

allows Tulip tables to be used as sources in FiveTran and enables connections to other database

License

Notifications You must be signed in to change notification settings

eddya101/tulip-fivetran-connector

Repository files navigation

Tulip Interfaces Connector Example

This connector syncs data from Tulip Tables to Fivetran destinations. Tulip is a frontline operations platform that enables manufacturers to build apps without code, connect machines and devices, and analyze data to improve operations. This connector allows you to replicate Tulip Table data into your data warehouse for analysis and reporting.

Connector overview

This connector provides incremental data replication from Tulip Tables using the Tulip Table API with a two-phase synchronization strategy. It supports:

  • Two-phase sync strategy: Efficient historical load (BOOTSTRAP) followed by incremental updates (INCREMENTAL)
  • Cursor-based pagination: Uses _sequenceNumber as primary cursor to avoid offset-based pagination overhead
  • Dynamic schema discovery: Automatically maps Tulip field types to warehouse column types
  • Late commit handling: 60-second lookback window on _updatedAt prevents data loss from concurrent updates
  • Custom filtering: Supports Tulip API filter syntax to sync only relevant records
  • Automatic field optimization: Excludes tableLink fields to reduce database load on Tulip API
  • Workspace support: Can sync tables from workspace-scoped or instance-level endpoints
  • Automatic checkpointing: Saves state every 500 records for resumable syncs
  • Robust error handling: Exponential backoff retry logic for rate limiting and transient errors

The connector is designed for manufacturing operations and supply chain teams who need to analyze production data, track quality metrics, and monitor operational performance across their data warehouse ecosystem.

Requirements

  • Tulip Instance
  • Tulip API key with tables read scope
  • Supported Python versions
  • Operating system:
    • Windows: 10 or later (64-bit only)
    • macOS: 13 (Ventura) or later (Apple Silicon [arm64] or Intel [x86_64])
    • Linux: Distributions such as Ubuntu 20.04 or later, Debian 10 or later, or Amazon Linux 2 or later (arm64 or x86_64)

Getting started

Refer to the Connector SDK Setup Guide to get started.

Features

  • Two-phase sync: BOOTSTRAP mode for historical data using _sequenceNumber, INCREMENTAL mode for ongoing updates
  • Cursor-based pagination: Primary cursor on _sequenceNumber avoids database load from large offsets
  • Schema discovery: Automatically detects table schema and maps Tulip data types to Fivetran types
  • Custom filtering: Supports Tulip API filter syntax for conditional data replication
  • Automatic field optimization: Excludes Linked Table Fields (tableLink type) to reduce database load on Tulip API
  • Workspace support: Can sync tables from workspace-scoped or instance-level endpoints
  • Human-readable columns: Generates descriptive column names from field labels and IDs
  • Checkpointing: Saves state every 500 records for resumable syncs
  • Error handling: Implements exponential backoff retry logic for rate limiting and transient failures
  • Late commit handling: 60-second lookback window on _updatedAt catches records committed after cursor update

Configuration file

The connector requires the following configuration keys in configuration.json:

{
  "subdomain": "<YOUR_SUBDOMAIN>",
  "api_key": "<YOUR_API_KEY>",
  "api_secret": "<YOUR_API_SECRET>",
  "table_id": "<YOUR_TABLE_ID>",
  "workspace_id": "<YOUR_WORKSPACE_ID>",
  "sync_from_date": "<YYYY-MM-DDTHH:MM:SSZ>",
  "custom_filter_json": "[]"
}

Configuration parameters:

  • subdomain (required) - Your Tulip instance subdomain (e.g., "acme" for acme.tulip.co)
  • api_key (required) - Tulip API key obtained from API settings in Tulip, note API key needs table read scope
  • api_secret (required) - Tulip API secret corresponding to the API key
  • table_id (required) - Unique identifier of the Tulip table to sync (e.g., "T65jBaGMgiexWy5yS")
  • workspace_id (optional) - Workspace ID for workspace-scoped tables (omit for instance-level api keys or instances without workspaces)
  • sync_from_date (optional) - ISO 8601 timestamp to start initial sync (defaults to beginning of time if omitted)
  • custom_filter_json (optional) - JSON array of Tulip API filter objects, ref to Tulip API docs for JSON definition (defaults to empty array if omitted)

Note on field selection: The connector automatically excludes Linked Table Fields (tableLink type) to reduce database load on the Tulip API. System fields (id, _createdAt, _updatedAt, _sequenceNumber) and all non-tableLink custom fields are always included.

Note: Ensure that the configuration.json file is not checked into version control to protect sensitive information.

Requirements file

This connector does not require any additional Python packages beyond those pre-installed in the Fivetran environment. The requirements.txt file should be left empty.

Note: The fivetran_connector_sdk:latest and requests:latest packages are pre-installed in the Fivetran environment. To avoid dependency conflicts, do not declare them in your requirements.txt.

Authentication

Authentication - Refer to def schema() and def update()

This connector uses HTTP Basic Authentication with Tulip API credentials:

  1. Log into your Tulip instance.
  2. Navigate to Account Settings > API Tokens.
  3. Create a new API Token.
  4. Copy the API Key and API Secret from the token configuration.
  5. Ensure the API key has table:read permissions to access the target table.

The connector passes the API key and secret as HTTP Basic Auth credentials to all Tulip API endpoints.

Pagination and sync strategy

Pagination - Refer to def update()

The connector implements a two-phase cursor-based synchronization strategy to handle large datasets efficiently:

Phase 1: BOOTSTRAP mode (Historical data load)

  • Uses _sequenceNumber as primary cursor to avoid offset pagination overhead
  • Filters: _sequenceNumber > last_sequence (and optionally _updatedAt > sync_from_date)
  • Sorts by _sequenceNumber in ascending order
  • Fetches records in batches of 100 until API returns fewer than 100 records
  • Transitions to INCREMENTAL mode when bootstrap completes

Phase 2: INCREMENTAL mode (Ongoing updates)

  • Primary cursor: _sequenceNumber > last_sequence
  • Secondary filter: _updatedAt > (last_updated_at - 60s) (60-second lookback)
  • Sorts by _sequenceNumber in ascending order
  • The lookback window catches records that were committed after the previous sync's cursor update
  • Prevents infinite loops when multiple records share the same _updatedAt timestamp

Checkpointing

  • State is checkpointed every 500 records with: cursor_mode, last_sequence, last_updated_at
  • Enables resumable syncs if connector fails mid-batch

Data handling

Data handling - Refer to def schema(), def _map_tulip_type_to_fivetran(), def generate_column_name(), and def _transform_record()

The connector transforms Tulip Table data through several stages:

  1. Schema discovery:

    • Fetches table metadata from Tulip API
    • Maps Tulip data types to Fivetran types using _map_tulip_type_to_fivetran()
    • Generates human-readable column names from field labels and IDs
    • Includes system fields: id (primary key), _createdAt, _updatedAt, _sequenceNumber
    • Automatically excludes Linked Table Fields (tableLink type) to reduce database load
  2. Type mapping:

    • integer maps to INT
    • float maps to DOUBLE
    • boolean maps to BOOLEAN
    • timestamp and datetime map to UTC_DATETIME
    • interval maps to INT (stored as seconds)
    • user maps to STRING
    • All other types default to STRING
  3. Record transformation:

    • Transforms field IDs to human-readable column names using generate_column_name()
    • Column naming format: label__id (e.g., customer_name__rqoqm)
    • Normalizes labels: lowercase, replaces spaces with underscores, removes special characters
    • Preserves system fields in their original format
  4. Two-phase sync:

    • BOOTSTRAP: Filters by _sequenceNumber > last_sequence for efficient historical load
    • INCREMENTAL: Filters by _sequenceNumber > last_sequence AND _updatedAt > (last_updated_at - 60s) to catch late commits
    • Updates cursors (last_sequence, last_updated_at) after each batch is processed
    • Automatically transitions from BOOTSTRAP to INCREMENTAL when historical load completes

Error handling

Error handling - Refer to def _fetch_with_retry()

The connector implements robust error handling:

  • Rate limiting: Detects HTTP 429 responses and retries with exponential backoff (5s, 10s, 20s)
  • Request failures: Automatically retries transient errors up to 3 attempts
  • Specific exception handling: Catches and logs KeyError, json.JSONDecodeError, requests.exceptions.HTTPError
  • Structured logging: Uses Python logging module with appropriate levels (INFO, WARNING, ERROR, CRITICAL)
  • State preservation: Checkpoints every 500 records to minimize data loss on failure

Tables created

The connector creates a single table per Tulip Table synced. The table name is generated from the Tulip table label and ID in the format label__id.

Schema Structure

Each synced table follows this schema structure:

Property Value
Table Name <label>__<table_id> (e.g., kitsdummy__t65jbagmgiexwy5ys)
Primary Key ["id"]

Columns

Column Name Data Type Description Source
id STRING Unique record identifier System (primary key)
_createdAt UTC_DATETIME Record creation timestamp System
_updatedAt UTC_DATETIME Record last update timestamp System
_sequenceNumber INT Monotonically increasing sequence number used for cursor-based pagination System
<fieldlabel>__<fieldid> Varies Custom field columns following pattern fieldlabel__fieldid Custom fields

Example Schema

For a Tulip table named "KitsDummy" with ID T65jBaGMgiexWy5yS:

Table: kitsdummy__t65jbagmgiexwy5ys Primary Key: ["id"]

Column Name Data Type Description
id STRING Unique record identifier (primary key)
_createdAt UTC_DATETIME Record creation timestamp
_updatedAt UTC_DATETIME Record last update timestamp
_sequenceNumber INT Monotonically increasing sequence number
customer_name__rqoqm STRING Customer name field
kit_number__pxwol INT Kit number field
kit_start_datetime__rzkek_kit_start_date_time UTC_DATETIME Kit start date/time field
spectrapath_data_success__pybts_spectra_path_data_success BOOLEAN Spectrapath data success indicator

Notes:

  • Schema is automatically discovered on the first sync
  • Schema updates when new fields are added to the Tulip table
  • Linked Table Fields (tableLink type) are automatically excluded to reduce database load
  • Custom column names follow the pattern: <normalized_field_label>__<field_id>

Additional files

  • test_connector.py - Comprehensive test suite with 15 tests covering schema discovery, incremental sync, pagination, error handling, and custom filters
  • DEVELOPMENT_GUIDE.md - Development environment setup instructions and debugging workflow
  • TULIP_INTEGRATION_GUIDE.md - End-to-end integration guide for setting up Tulip with Fivetran and Snowflake

Additional considerations

The examples provided are intended to help you effectively use Fivetran's Connector SDK. While we've tested the code, Fivetran cannot be held responsible for any unexpected or negative consequences that may arise from using these examples. For inquiries, please reach out to our Support team.

About

allows Tulip tables to be used as sources in FiveTran and enables connections to other database

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages