Skip to content

Commit ee41397

Browse files
Rbroughan/productionalize direct loader (#61528)
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
1 parent 84405d4 commit ee41397

File tree

18 files changed

+788
-89
lines changed

18 files changed

+788
-89
lines changed

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,9 @@ data class Meta(
120120
const val CHECKPOINT_ID_NAME: String = "partition_id"
121121
const val CHECKPOINT_INDEX_NAME: String = "id"
122122

123+
const val AIRBYTE_META_SYNC_ID = "sync_id"
124+
const val AIRBYTE_META_CHANGES = "changes"
125+
123126
const val COLUMN_NAME_AB_RAW_ID: String = "_airbyte_raw_id"
124127
const val COLUMN_NAME_AB_EXTRACTED_AT: String = "_airbyte_extracted_at"
125128
const val COLUMN_NAME_AB_META: String = "_airbyte_meta"
@@ -289,8 +292,8 @@ data class EnrichedDestinationRecordAirbyteValue(
289292
EnrichedAirbyteValue(
290293
ObjectValue(
291294
linkedMapOf(
292-
"sync_id" to IntegerValue(stream.syncId),
293-
"changes" to
295+
Meta.AIRBYTE_META_SYNC_ID to IntegerValue(stream.syncId),
296+
Meta.AIRBYTE_META_CHANGES to
294297
ArrayValue(
295298
(sourceMeta.changes.toAirbyteValues()) +
296299
declaredFields

airbyte-cdk/bulk/toolkits/load-db/src/main/kotlin/io/airbyte/cdk/load/orchestration/db/legacy_typing_deduping/TableCatalogFactory.kt

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,17 @@ const val DEFAULT_AIRBYTE_INTERNAL_NAMESPACE = "airbyte_internal"
2323
data class TableNameInfo(val tableNames: TableNames, val columnNameMapping: ColumnNameMapping)
2424

2525
data class TableCatalog(private val catalog: Map<DestinationStream, TableNameInfo>) :
26-
Map<DestinationStream, TableNameInfo> by catalog
26+
Map<DestinationStream, TableNameInfo> by catalog {
27+
fun getMappedColumnName(stream: DestinationStream, colName: String): String? =
28+
this[stream]?.columnNameMapping?.get(colName)
29+
}
2730

2831
data class TableCatalogByDescriptor(
2932
private val catalog: Map<DestinationStream.Descriptor, TableNameInfo>
30-
) : Map<DestinationStream.Descriptor, TableNameInfo> by catalog
33+
) : Map<DestinationStream.Descriptor, TableNameInfo> by catalog {
34+
fun getFinalTableName(desc: DestinationStream.Descriptor): TableName? =
35+
this[desc]?.tableNames?.finalTableName
36+
}
3137

3238
@Factory
3339
class TableCatalogFactory {

airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ class BigQueryRecordFormatter {
3636
// includes changes in-connector type coercion
3737
// and for raw tables, we only want changes that originated from the source
3838
val protocolMeta = enrichedRecord.sourceMeta.asProtocolObject()
39-
protocolMeta.additionalProperties["sync_id"] = record.stream.syncId
39+
protocolMeta.additionalProperties[Meta.AIRBYTE_META_SYNC_ID] =
40+
record.stream.syncId
4041
outputRecord[key] = protocolMeta.serializeToString()
4142
// TODO we should do this for direct-load tables
4243
// val serializedAirbyteMeta = (value.abValue as

airbyte-integrations/connectors/destination-clickhouse-v2/build.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ application {
3434
}
3535

3636
dependencies {
37-
3837
implementation 'com.clickhouse:client-v2:0.8.6'
3938

4039
testImplementation("io.mockk:mockk:1.14.2")

airbyte-integrations/connectors/destination-clickhouse-v2/src/main/kotlin/io/airbyte/integrations/destination/clickhouse_v2/client/ClickhouseSqlGenerator.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class ClickhouseSqlGenerator {
6363
$COLUMN_NAME_AB_RAW_ID String NOT NULL,
6464
$COLUMN_NAME_AB_EXTRACTED_AT DateTime64(3) NOT NULL,
6565
$COLUMN_NAME_AB_META String NOT NULL,
66-
$COLUMN_NAME_AB_GENERATION_ID UInt32,
66+
$COLUMN_NAME_AB_GENERATION_ID UInt32 NOT NULL,
6767
$columnDeclarations
6868
)
6969
ENGINE = ${engine}
@@ -286,7 +286,7 @@ class ClickhouseSqlGenerator {
286286
.map { (fieldName, type) ->
287287
val columnName = columnNameMapping[fieldName]!!
288288
val typeName = type.type.toDialectType()
289-
"`$columnName` $typeName"
289+
"`$columnName` Nullable($typeName)"
290290
}
291291
.joinToString(",\n")
292292

airbyte-integrations/connectors/destination-clickhouse-v2/src/main/kotlin/io/airbyte/integrations/destination/clickhouse_v2/config/ClickhouseBeanFactory.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ class ClickhouseBeanFactory {
2828
.setUsername(config.username)
2929
.setPassword(config.password)
3030
.setDefaultDatabase(config.resolvedDatabase)
31+
.compressClientRequest(true)
3132
.build()
3233
}
3334

airbyte-integrations/connectors/destination-clickhouse-v2/src/main/kotlin/io/airbyte/integrations/destination/clickhouse_v2/write/direct/ClickhouseDirectLoadDatabaseInitialStatusGatherer.kt renamed to airbyte-integrations/connectors/destination-clickhouse-v2/src/main/kotlin/io/airbyte/integrations/destination/clickhouse_v2/config/ClickhouseDirectLoadDatabaseInitialStatusGatherer.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
33
*/
44

5-
package io.airbyte.integrations.destination.clickhouse_v2.write.direct
5+
package io.airbyte.integrations.destination.clickhouse_v2.config
66

77
import io.airbyte.cdk.load.client.AirbyteClient
88
import io.airbyte.cdk.load.orchestration.db.BaseDirectLoadInitialStatusGatherer

airbyte-integrations/connectors/destination-clickhouse-v2/src/main/kotlin/io/airbyte/integrations/destination/clickhouse_v2/config/ClickhouseNameGenerators.kt

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,11 @@ package io.airbyte.integrations.destination.clickhouse_v2.config
77
import io.airbyte.cdk.load.command.DestinationStream
88
import io.airbyte.cdk.load.orchestration.db.ColumnNameGenerator
99
import io.airbyte.cdk.load.orchestration.db.FinalTableNameGenerator
10-
import io.airbyte.cdk.load.orchestration.db.RawTableNameGenerator
1110
import io.airbyte.cdk.load.orchestration.db.TableName
1211
import io.airbyte.integrations.destination.clickhouse_v2.spec.ClickhouseConfiguration
1312
import jakarta.inject.Singleton
1413
import java.util.Locale
1514

16-
// Unused but needed by another bean
17-
@Singleton
18-
class ClickhouseRawTableNameGenerators(val config: ClickhouseConfiguration) :
19-
RawTableNameGenerator {
20-
override fun getTableName(streamDescriptor: DestinationStream.Descriptor): TableName =
21-
TableName(
22-
config.resolvedDatabase,
23-
"test_${streamDescriptor.name}",
24-
)
25-
}
26-
2715
@Singleton
2816
class ClickhouseFinalTableNameGenerator(private val config: ClickhouseConfiguration) :
2917
FinalTableNameGenerator {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.clickhouse_v2.write
6+
7+
import io.airbyte.cdk.load.data.AirbyteValue
8+
import io.airbyte.cdk.load.message.DestinationRecordRaw
9+
import io.airbyte.cdk.load.orchestration.db.legacy_typing_deduping.TableCatalog
10+
import jakarta.inject.Singleton
11+
12+
/*
13+
* Munges values and keys into a simple map form. Encapsulates
14+
* EnrichedAirybteValue logic from other classes, so it can be replaced if
15+
* necessary, as we know it's slow.
16+
*
17+
* The HashMap construction is deliberate for speed considerations.
18+
*/
19+
@Singleton
20+
class RecordMunger(
21+
private val catalogInfo: TableCatalog,
22+
) {
23+
fun transformForDest(record: DestinationRecordRaw): Map<String, AirbyteValue> {
24+
// this actually munges and coerces data
25+
val enriched =
26+
record.asEnrichedDestinationRecordAirbyteValue(
27+
extractedAtAsTimestampWithTimezone = true
28+
)
29+
30+
val munged = HashMap<String, AirbyteValue>()
31+
enriched.declaredFields.forEach {
32+
val mappedKey = catalogInfo.getMappedColumnName(record.stream, it.key)!!
33+
munged[mappedKey] = it.value.abValue
34+
}
35+
enriched.airbyteMetaFields.forEach { munged[it.key] = it.value.abValue }
36+
37+
return munged
38+
}
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/*
2+
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.clickhouse_v2.write
6+
7+
/*
8+
* Encapsulates basic sized windowing logic. As we implement other windowing,
9+
* we should look to break out a shared interface.
10+
*/
11+
class SizedWindow(private val size: Long) {
12+
private var accumulated = 0L
13+
14+
fun increment(quantity: Long): SizedWindow = this.apply { accumulated += quantity }
15+
16+
fun isComplete(): Boolean = accumulated >= size
17+
}

0 commit comments

Comments
 (0)