Skip to content
Draft
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.7.2-rc.1
dockerImageTag: 3.7.3-rc.1
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ public class PostgresSource extends AbstractJdbcSource<PostgresType> implements
private static final Set<String> INVALID_CDC_SSL_MODES = ImmutableSet.of("allow", "prefer");
private int stateEmissionFrequency;
private final FeatureFlags featureFlags;
private DataSource currentDataSource;

public static Source sshWrappedSource(PostgresSource source) {
return new SshWrappedSource(source, JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY, "security");
Expand Down Expand Up @@ -396,6 +397,7 @@ public JdbcDatabase createDatabase(final JsonNode sourceConfig) throws SQLExcept
getConnectionTimeout(connectionProperties, driverClassName));
// Record the data source so that it can be closed.
dataSources.add(dataSource);
this.currentDataSource = dataSource;

final JdbcDatabase database = new StreamingJdbcDatabase(
dataSource,
Expand Down Expand Up @@ -558,7 +560,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
final JsonNode sourceConfig = database.getSourceConfig();
if (isCdc(sourceConfig) && isAnyStreamIncrementalSyncMode(catalog)) {
LOGGER.info("Using ctid + CDC");
return cdcCtidIteratorsCombined(database, catalog, tableNameToTable, stateManager, emittedAt, getQuoteString(),
return cdcCtidIteratorsCombined(database, currentDataSource, catalog, tableNameToTable, stateManager, emittedAt, getQuoteString(),
(CtidGlobalStateManager) ctidStateManager, savedOffsetAfterReplicationSlotLSN);
}

Expand Down Expand Up @@ -586,7 +588,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final

ctidStateManager.setStreamStateIteratorFields(namespacePair -> Jsons.jsonNode(xminStatus));
final PostgresCtidHandler ctidHandler =
createInitialLoader(database, finalListOfStreamsToBeSyncedViaCtid, fileNodeHandler, getQuoteString(), ctidStateManager,
createInitialLoader(database, currentDataSource, finalListOfStreamsToBeSyncedViaCtid, fileNodeHandler, getQuoteString(), ctidStateManager,
Optional.empty());

if (!xminStreamsCategorised.ctidStreams().streamsForCtidSync().isEmpty()) {
Expand Down Expand Up @@ -653,7 +655,8 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final

ctidStateManager.setStreamStateIteratorFields(namespacePair -> Jsons.jsonNode(cursorBasedStatusMap.get(namespacePair)));
final PostgresCtidHandler cursorBasedCtidHandler =
createInitialLoader(database, finalListOfStreamsToBeSyncedViaCtid, fileNodeHandler, getQuoteString(), ctidStateManager, Optional.empty());
createInitialLoader(database, currentDataSource, finalListOfStreamsToBeSyncedViaCtid, fileNodeHandler, getQuoteString(), ctidStateManager,
Optional.empty());

final List<AutoCloseableIterator<AirbyteMessage>> initialSyncCtidIterators = new ArrayList<>(
cursorBasedCtidHandler.getInitialSyncCtidIterator(new ConfiguredAirbyteCatalog().withStreams(finalListOfStreamsToBeSyncedViaCtid),
Expand Down Expand Up @@ -956,7 +959,7 @@ public InitialLoadHandler<PostgresType> getInitialLoadHandler(final JdbcDatabase
List.of(stream),
getQuoteString());

return createInitialLoader(database, List.of(stream), fileNodeHandler, getQuoteString(), ctidStateManager, Optional.empty());
return createInitialLoader(database, currentDataSource, List.of(stream), fileNodeHandler, getQuoteString(), ctidStateManager, Optional.empty());
}

protected String toSslJdbcParam(final SslMode sslMode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -135,6 +136,7 @@ private static CdcState getDefaultCdcState(final PostgresDebeziumStateUtil postg
}

public static List<AutoCloseableIterator<AirbyteMessage>> cdcCtidIteratorsCombined(final JdbcDatabase database,
final DataSource dataSource,
final ConfiguredAirbyteCatalog catalog,
final Map<String, TableInfo<CommonField<PostgresType>>> tableNameToTable,
final StateManager stateManager,
Expand Down Expand Up @@ -231,7 +233,7 @@ public static List<AutoCloseableIterator<AirbyteMessage>> cdcCtidIteratorsCombin

try {
ctidHandler =
createInitialLoader(database, finalListOfStreamsToBeSyncedViaCtid, fileNodeHandler, quoteString, ctidStateManager,
createInitialLoader(database, dataSource, finalListOfStreamsToBeSyncedViaCtid, fileNodeHandler, quoteString, ctidStateManager,
Optional.of(
new PostgresCdcConnectorMetadataInjector(emittedAt.toString(), io.airbyte.cdk.db.PostgresUtils.getLsn(database).asLong())));
} catch (SQLException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -80,6 +81,7 @@ public static boolean isTidRangeScanCapableDBServer(final JdbcDatabase database)
}

public static PostgresCtidHandler createInitialLoader(final JdbcDatabase database,
final DataSource dataSource,
final List<ConfiguredAirbyteStream> finalListOfStreamsToBeSyncedViaCtid,
final FileNodeHandler fileNodeHandler,
final String quoteString,
Expand All @@ -99,6 +101,7 @@ public static PostgresCtidHandler createInitialLoader(final JdbcDatabase databas

return new PostgresCtidHandler(sourceConfig,
database,
dataSource,
new CtidPostgresSourceOperations(optionalMetadataInjector),
quoteString,
fileNodeHandler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
Expand All @@ -35,8 +36,12 @@
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.Spliterators;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.CheckForNull;
import javax.sql.DataSource;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -45,12 +50,25 @@
* This class is responsible to divide the data of the stream into chunks based on the ctid and
* dynamically create iterator and keep processing them one after another. The class also makes sure
* to check for VACUUM in between processing chunks and if VACUUM happens then re-start syncing the
* data
* data.
*
* <p>
* All CTID batch queries within a single scan are executed on a single connection using
* {@code REPEATABLE READ} isolation. This guarantees that every batch sees the same MVCC snapshot,
* preventing rows updated by concurrent transactions from being silently dropped.
*/
public class InitialSyncCtidIterator extends AbstractIterator<RowDataWithCtid> implements AutoCloseableIterator<RowDataWithCtid> {

private static final Logger LOGGER = LoggerFactory.getLogger(InitialSyncCtidIterator.class);
public static final int MAX_TUPLES_IN_QUERY = 5_000_000;

/**
* Default fetch size for streaming results from the snapshot connection. This value matches
* PostgreSQL JDBC driver best practices for cursor-based streaming.
*/
@VisibleForTesting
static final int SNAPSHOT_FETCH_SIZE = 10_000;

private final AirbyteStreamNameNamespacePair airbyteStream;
private final long blockSize;
private final List<String> columnNames;
Expand All @@ -65,6 +83,7 @@ public class InitialSyncCtidIterator extends AbstractIterator<RowDataWithCtid> i
private final long tableSize;
private final int maxTuple;
private final boolean useTestPageSize;
private final DataSource dataSource;

private AutoCloseableIterator<RowDataWithCtid> currentIterator;
private Long lastKnownFileNode;
Expand All @@ -76,8 +95,15 @@ public class InitialSyncCtidIterator extends AbstractIterator<RowDataWithCtid> i
private Optional<Duration> cdcInitialLoadTimeout;
private boolean isCdcSync;

/**
* A dedicated connection held open for the duration of the CTID scan. Opened with REPEATABLE READ
* isolation so that all batch queries see the same consistent MVCC snapshot.
*/
private Connection snapshotConnection;

public InitialSyncCtidIterator(final CtidStateManager ctidStateManager,
final JdbcDatabase database,
final DataSource dataSource,
final CtidPostgresSourceOperations sourceOperations,
final String quoteString,
final List<String> columnNames,
Expand All @@ -97,6 +123,7 @@ public InitialSyncCtidIterator(final CtidStateManager ctidStateManager,
this.columnNames = columnNames;
this.ctidStateManager = ctidStateManager;
this.database = database;
this.dataSource = dataSource;
this.fileNodeHandler = fileNodeHandler;
this.quoteString = quoteString;
this.schemaName = schemaName;
Expand Down Expand Up @@ -169,13 +196,81 @@ protected RowDataWithCtid computeNext() {
}
}

/**
* Executes a CTID batch query on the shared snapshot connection. Unlike the previous implementation
* that called {@code database.unsafeQuery()} (which opens a new connection per batch), this method
* reuses the single {@link #snapshotConnection} so that all batches see the same MVCC snapshot.
*/
private Stream<RowDataWithCtid> getStream(final Pair<Ctid, Ctid> p) throws SQLException {
return database.unsafeQuery(
connection -> getCtidStatement(connection, p.getLeft(), p.getRight()),
sourceOperations::recordWithCtid);
final PreparedStatement statement =
getCtidStatement(snapshotConnection, p.getLeft(), p.getRight());
statement.setFetchSize(SNAPSHOT_FETCH_SIZE);
final ResultSet resultSet = statement.executeQuery();
return toStream(resultSet, sourceOperations::recordWithCtid);
}

/**
* Converts a {@link ResultSet} into a {@link Stream} of mapped records. The stream does NOT close
* the underlying connection on close — the snapshot connection is managed by this iterator's
* lifecycle.
*/
private static <T> Stream<T> toStream(
final ResultSet resultSet,
final ResultSetMapper<T> mapper) {
return StreamSupport.stream(
new Spliterators.AbstractSpliterator<T>(Long.MAX_VALUE, java.util.Spliterator.ORDERED) {

@Override
public boolean tryAdvance(final Consumer<? super T> action) {
try {
if (!resultSet.next()) {
resultSet.close();
return false;
}
action.accept(mapper.apply(resultSet));
return true;
} catch (final SQLException e) {
throw new RuntimeException(e);
}
}

},
false);
}

private void initSubQueries() {
@FunctionalInterface
interface ResultSetMapper<T> {

T apply(ResultSet rs) throws SQLException;

}

/**
* Opens a fresh snapshot connection with REPEATABLE READ isolation. If a previous snapshot
* connection exists (e.g. after a VACUUM reset), it is closed first.
*/
private void openSnapshotConnection() throws SQLException {
closeSnapshotConnection();

snapshotConnection = dataSource.getConnection();
snapshotConnection.setAutoCommit(false);
snapshotConnection.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
LOGGER.info(
"Opened REPEATABLE READ snapshot connection for CTID scan of stream {}", airbyteStream);
}

private void closeSnapshotConnection() {
if (snapshotConnection != null) {
try {
snapshotConnection.close();
} catch (final SQLException e) {
LOGGER.warn("Failed to close snapshot connection for stream {}", airbyteStream, e);
}
snapshotConnection = null;
}
}

private void initSubQueries() throws SQLException {
if (useTestPageSize) {
LOGGER.warn("Using test page size");
}
Expand All @@ -184,6 +279,9 @@ private void initSubQueries() {

subQueriesPlan.addAll(getQueryPlan(currentCtidStatus));
lastKnownFileNode = currentCtidStatus != null ? currentCtidStatus.getRelationFilenode() : null;

// Open the snapshot connection that will be shared across all batch queries.
openSnapshotConnection();
}

private PreparedStatement getCtidStatement(final Connection connection,
Expand All @@ -203,7 +301,7 @@ private List<Pair<Ctid, Ctid>> getQueryPlan(final CtidStatus currentCtidStatus)
return queryPlan;
}

private void resetSubQueries(final Long latestFileNode) {
private void resetSubQueries(final Long latestFileNode) throws SQLException {
LOGGER.warn(
"The latest file node {} for stream {} is not equal to the last file node {} known to Airbyte. Airbyte will sync this table from scratch again",
latestFileNode,
Expand All @@ -216,6 +314,10 @@ private void resetSubQueries(final Long latestFileNode) {
subQueriesPlan.clear();
subQueriesPlan.addAll(getQueryPlan(null));
numberOfTimesReSynced++;

// VACUUM invalidates the physical layout. Re-open a fresh snapshot connection
// so that the new scan operates on a consistent view of the new file layout.
openSnapshotConnection();
}

/**
Expand Down Expand Up @@ -356,6 +458,7 @@ public void close() throws Exception {
if (currentIterator != null) {
currentIterator.close();
}
closeSnapshotConnection();
}

private boolean isCdcSync(CtidStateManager initialLoadStateManager) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import javax.sql.DataSource;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -60,9 +61,11 @@ public class PostgresCtidHandler implements InitialLoadHandler<PostgresType> {
final Map<AirbyteStreamNameNamespacePair, TableBlockSize> tableBlockSizes;
final Optional<Map<AirbyteStreamNameNamespacePair, Integer>> tablesMaxTuple;
private final boolean tidRangeScanCapableDBServer;
private final DataSource dataSource;

public PostgresCtidHandler(final JsonNode config,
final JdbcDatabase database,
final DataSource dataSource,
final CtidPostgresSourceOperations sourceOperations,
final String quoteString,
final FileNodeHandler fileNodeHandler,
Expand All @@ -71,6 +74,7 @@ public PostgresCtidHandler(final JsonNode config,
final CtidStateManager ctidStateManager) {
this.config = config;
this.database = database;
this.dataSource = dataSource;
this.sourceOperations = sourceOperations;
this.quoteString = quoteString;
this.fileNodeHandler = fileNodeHandler;
Expand Down Expand Up @@ -160,7 +164,8 @@ private AutoCloseableIterator<RowDataWithCtid> queryTableCtid(
@NotNull final Optional<Duration> cdcInitialLoadTimeout) {

LOGGER.info("Queueing query for table: {}", tableName);
return new InitialSyncCtidIterator(ctidStateManager, database, sourceOperations, quoteString, columnNames, schemaName, tableName, tableSize,
return new InitialSyncCtidIterator(ctidStateManager, database, dataSource, sourceOperations, quoteString, columnNames, schemaName, tableName,
tableSize,
blockSize, maxTuple, fileNodeHandler, tidRangeScanCapableDBServer,
config.has(USE_TEST_CHUNK_SIZE) && config.get(USE_TEST_CHUNK_SIZE).asBoolean(), emittedAt,
cdcInitialLoadTimeout);
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp

| Version | Date | Pull Request | Subject |
|---------|------------|------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.7.3-rc.1 | 2026-02-26 | [74062](https://github.com/airbytehq/airbyte/pull/74062) | Fix CTID scanning silently dropping rows updated during multi-batch full refresh scan |
| 3.7.1 | 2026-01-27 | [72396](https://github.com/airbytehq/airbyte/pull/72396) | Promoting release candidate 3.7.1-rc.1 to a main version. |
| 3.7.0 | 2025-08-12 | [57511](https://github.com/airbytehq/airbyte/pull/57511) | Add configurations for Azure authentication to Azure Postgres servers. |
| 3.6.35 | 2025-06-12 | [61527](https://github.com/airbytehq/airbyte/pull/61527) | Add error handling for connection issues and adopt the latest CDK version.
Expand Down
Loading