Skip to content

[source-marketo] connector is failing on null primary key id for the leads stream due to CJK characters in the field values #74087

@engtchris

Description

Connector Name

source-marketo

Connector Version

1.40.0

What step the error happened?

During the sync

Summary

The source-marketo connector (v1.4.40) fails to sync the leads stream when the Marketo Bulk Export API returns records containing CJK (Chinese, Japanese, Korean) characters. The connector's internal CSV parser misaligns columns on these records, causing the source-defined primary key id field to be parsed as null. For some records, values still exist in the CSV in other columns that allow us to backtrack who the person was, despite the null id. However, this is not always the case, and even one record is enough to break the sync.

The BasicAirbyteMessageValidator then aborts the entire sync with:
SourceException: All the defined primary keys are null, the primary keys are: id

We strongly believe it's due to the presentation of CJK characters. When airbyte fails to parse it correctly, it shifts values over including the id field so that the id field becomes null. Our connector starts syncing data from 2024-01-01 onward. Despite some leads being created before this, due to the incremental + dedupe setting, those leads getting updated causes them to appear in our new sync. There is no easy way to identify all leads with CJK characters, and thus makes it difficult to diagnose + fix in Marketo.

Attached is a sample file containing 3 rows of anonymized, randomized data except for the CJK characters. The values are all similar to what's actually showing up in production to keep the integrity of the test.

marketo_leads_corrupted_anonymized.csv

Environment

  • Airbyte Platform: v1.8.5 (self-hosted, Helm chart on EKS)
  • Source Connector: airbyte/source-marketo:1.4.40
  • Destination Connector: airbyte/destination-snowflake:3.15.2

Evidence

  • The Marketo API is not at fault. We downloaded the raw Bulk Export CSVs for the affected date range (Feb 1-5, 2026) directly via the Marketo REST API. All ~100+ leads with CJK characters have properly formatted CSV -- correct quoting, correct encoding, correct column alignment.
  • The connector's CSV parser is at fault. The raw CSV is valid UTF-8 with properly quoted fields. The column misalignment only occurs after the connector processes the CSV.
  • Fixing individual leads confirms the pattern. We cleaned the CJK characters from one blocking lead (had Chinese characters in billingCity/state). The sync advanced from failing at record ~5,000 to ~15,000, where it hit the next lead with CJK characters.
  • Pre-v1.4.0 connectors handle this correctly. Historical connections running v1.2.x/v1.3.x (which include the v0.1.12 encoding fix) successfully synced leads with CJK characters for date ranges going back to 2015.

Steps to Reproduce

  1. Have a Marketo instance with leads containing CJK characters in any text field (name, city, company, etc.)
  2. Configure a source-marketo connection with the leads stream using Incremental | Dedup sync mode
  3. Run a sync where the incremental date window includes leads with CJK text
  4. The sync will fail when BasicAirbyteMessageValidator encounters a record where the CSV column misalignment produces a null id

Relevant log output

### Below is a redacted version of the logs. The error is always consistent with what's below:

## Sync Log Excerpt (redacted) — source-marketo v1.4.17, Airbyte Platform v1.8.5

>> ATTEMPT 1/20

[... check phase completes successfully ...]

2026-02-25 00:12:14 info Launching replication pod with containers:
  - source: airbyte/source-marketo:1.4.17
  - destination: airbyte/destination-snowflake:3.15.2

[... replication starts, processes records ...]

2026-02-25 00:14:26 info Failures: [ {
  "failureOrigin" : "destination",
  "failureType" : "transient_error",
  "internalMessage" : "Some streams either received an INCOMPLETE stream status, or did not receive a stream status at all: marketo.activities_unsubscribe_email, marketo.campaigns, marketo.activities_send_email, marketo.activities_email_bounced, marketo.activities_click_email, marketo.activities_open_email, marketo.leads, marketo.activities_email_delivered, marketo.activity_types\nio.airbyte.commons.exceptions.TransientErrorException: Some streams were unsuccessful due to a source error. See logs for details.",
  "externalMessage" : "Some streams were unsuccessful due to a source error. See logs for details.",
  "metadata" : {
    "attemptNumber" : 0,
    "from_trace_message" : true,
    "connector_command" : "write"
  },
  "stacktrace" : "io.airbyte.commons.exceptions.TransientErrorException: Some streams were unsuccessful due to a source error. See logs for details.\n\tat io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer.close(AsyncStreamConsumer.kt:215)\n\tat kotlin.jdk7.AutoCloseableKt.closeFinally(AutoCloseableJVM.kt:48)\n\tat io.airbyte.cdk.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.kt:215)\n\tat io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.kt:119)\n\tat io.airbyte.cdk.integrations.base.IntegrationRunner.run$default(IntegrationRunner.kt:113)\n\tat io.airbyte.cdk.integrations.base.adaptive.AdaptiveDestinationRunner$Runner.run(AdaptiveDestinationRunner.kt:68)\n"
}, {
  "failureOrigin" : "source",
  "internalMessage" : "All the defined primary keys are null, the primary keys are: id",
  "externalMessage" : "Something went wrong within the source connector",
  "metadata" : {
    "attemptNumber" : 0,
    "connector_command" : "read"
  },
  "stacktrace" : "io.airbyte.workers.internal.exception.SourceException: All the defined primary keys are null, the primary keys are: id\n\tat io.airbyte.workers.internal.BasicAirbyteMessageValidator.validate(BasicAirbyteMessageValidator.kt:106)\n\tat io.airbyte.workers.internal.VersionedAirbyteStreamFactory.toAirbyteMessage(VersionedAirbyteStreamFactory.kt:272)\n\tat io.airbyte.workers.internal.VersionedAirbyteStreamFactory.addLineReadLogic(VersionedAirbyteStreamFactory.kt:151)\n\tat java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:273)\n\tat java.base/java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1950)\n\tat io.airbyte.container.orchestrator.worker.io.LocalContainerAirbyteSource.isFinished(LocalContainerAirbyteSource.kt:79)\n\tat io.airbyte.container.orchestrator.worker.SourceReader.isSourceFinished(ReplicationTask.kt:221)\n\tat io.airbyte.container.orchestrator.worker.SourceReader.run(ReplicationTask.kt:184)\n"
} ]

2026-02-25 00:14:26 info ----- END REPLICATION -----

2026-02-25 00:15:27 info Retry State: RetryManager(successiveCompleteFailures=0, successivePartialFailures=1, totalCompleteFailures=0, totalPartialFailures=1)
 Backoff before next attempt: 0 seconds

>> ATTEMPT 2/20

[... identical failure pattern repeats for all 20 attempts ...]

Contribute

  • Yes, I want to contribute

Internal Tracking: https://github.com/airbytehq/oncall/issues/11468

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions