Skip to content

Commit e666f19

Browse files
[Destination MSSQL] v2 rc8 (#54186)
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
1 parent 7abac4a commit e666f19

File tree

12 files changed

+244
-381
lines changed

12 files changed

+244
-381
lines changed

airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1597,6 +1597,148 @@ abstract class BasicFunctionalityIntegrationTest(
15971597
)
15981598
}
15991599

1600+
@Test
1601+
open fun testDedupWithStringKey() {
1602+
assumeTrue(supportsDedup)
1603+
fun makeStream(syncId: Long) =
1604+
DestinationStream(
1605+
DestinationStream.Descriptor(randomizedNamespace, "test_stream"),
1606+
importType =
1607+
Dedupe(
1608+
primaryKey = listOf(listOf("id1"), listOf("id2")),
1609+
cursor = listOf("updated_at"),
1610+
),
1611+
schema =
1612+
ObjectType(
1613+
properties =
1614+
linkedMapOf(
1615+
"id1" to stringType,
1616+
"id2" to intType,
1617+
"updated_at" to timestamptzType,
1618+
"name" to stringType,
1619+
"_ab_cdc_deleted_at" to timestamptzType,
1620+
)
1621+
),
1622+
generationId = 42,
1623+
minimumGenerationId = 0,
1624+
syncId = syncId,
1625+
)
1626+
fun makeRecord(data: String, extractedAt: Long) =
1627+
InputRecord(
1628+
randomizedNamespace,
1629+
"test_stream",
1630+
data,
1631+
emittedAtMs = extractedAt,
1632+
)
1633+
1634+
val sync1Stream = makeStream(syncId = 42)
1635+
runSync(
1636+
updatedConfig,
1637+
sync1Stream,
1638+
listOf(
1639+
// emitted_at:1000 is equal to 1970-01-01 00:00:01Z.
1640+
// This obviously makes no sense in relation to updated_at being in the year 2000,
1641+
// but that's OK because (from destinations POV) updated_at has no relation to
1642+
// extractedAt.
1643+
makeRecord(
1644+
"""{"id1": "9cf974de-52cf-4194-9f3d-7efa76ba4d84", "id2": 200, "updated_at": "2000-01-01T00:00:00Z", "name": "Alice1", "_ab_cdc_deleted_at": null}""",
1645+
extractedAt = 1000,
1646+
),
1647+
// Emit a second record for id=(1,200) with a different updated_at.
1648+
makeRecord(
1649+
"""{"id1": "9cf974de-52cf-4194-9f3d-7efa76ba4d84", "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "name": "Alice2", "_ab_cdc_deleted_at": null}""",
1650+
extractedAt = 1000,
1651+
),
1652+
// Emit a record with no _ab_cdc_deleted_at field. CDC sources typically emit an
1653+
// explicit null, but we should handle both cases.
1654+
makeRecord(
1655+
"""{"id1": "9cf974de-52cf-4194-9f3d-7efa76ba4d84", "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob1"}""",
1656+
extractedAt = 1000,
1657+
),
1658+
),
1659+
)
1660+
dumpAndDiffRecords(
1661+
parsedConfig,
1662+
listOf(
1663+
// Alice has only the newer record, and Bob also exists
1664+
OutputRecord(
1665+
extractedAt = 1000,
1666+
generationId = 42,
1667+
data =
1668+
mapOf(
1669+
"id1" to "9cf974de-52cf-4194-9f3d-7efa76ba4d84",
1670+
"id2" to 200,
1671+
"updated_at" to TimestampWithTimezoneValue("2000-01-01T00:01:00Z"),
1672+
"name" to "Alice2",
1673+
"_ab_cdc_deleted_at" to null
1674+
),
1675+
airbyteMeta = OutputRecord.Meta(syncId = 42),
1676+
),
1677+
OutputRecord(
1678+
extractedAt = 1000,
1679+
generationId = 42,
1680+
data =
1681+
mapOf(
1682+
"id1" to "9cf974de-52cf-4194-9f3d-7efa76ba4d84",
1683+
"id2" to 201,
1684+
"updated_at" to TimestampWithTimezoneValue("2000-01-01T00:02:00Z"),
1685+
"name" to "Bob1"
1686+
),
1687+
airbyteMeta = OutputRecord.Meta(syncId = 42),
1688+
),
1689+
),
1690+
sync1Stream,
1691+
primaryKey = listOf(listOf("id1"), listOf("id2")),
1692+
cursor = listOf("updated_at"),
1693+
)
1694+
1695+
val sync2Stream = makeStream(syncId = 43)
1696+
runSync(
1697+
updatedConfig,
1698+
sync2Stream,
1699+
listOf(
1700+
// Update both Alice and Bob
1701+
makeRecord(
1702+
"""{"id1": "9cf974de-52cf-4194-9f3d-7efa76ba4d84", "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "name": "Alice3", "_ab_cdc_deleted_at": null}""",
1703+
extractedAt = 2000,
1704+
),
1705+
makeRecord(
1706+
"""{"id1": "9cf974de-52cf-4194-9f3d-7efa76ba4d84", "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "name": "Bob2"}""",
1707+
extractedAt = 2000,
1708+
),
1709+
// And delete Bob. Again, T+D doesn't check the actual _value_ of deleted_at (i.e.
1710+
// the fact that it's in the past is irrelevant). It only cares whether deleted_at
1711+
// is non-null. So the destination should delete Bob.
1712+
makeRecord(
1713+
"""{"id1": "9cf974de-52cf-4194-9f3d-7efa76ba4d84", "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00Z"}""",
1714+
extractedAt = 2000,
1715+
),
1716+
),
1717+
)
1718+
dumpAndDiffRecords(
1719+
parsedConfig,
1720+
listOf(
1721+
// Alice still exists (and has been updated to the latest version), but Bob is gone
1722+
OutputRecord(
1723+
extractedAt = 2000,
1724+
generationId = 42,
1725+
data =
1726+
mapOf(
1727+
"id1" to "9cf974de-52cf-4194-9f3d-7efa76ba4d84",
1728+
"id2" to 200,
1729+
"updated_at" to TimestampWithTimezoneValue("2000-01-02T00:00:00Z"),
1730+
"name" to "Alice3",
1731+
"_ab_cdc_deleted_at" to null
1732+
),
1733+
airbyteMeta = OutputRecord.Meta(syncId = 43),
1734+
)
1735+
),
1736+
sync2Stream,
1737+
primaryKey = listOf(listOf("id1"), listOf("id2")),
1738+
cursor = listOf("updated_at"),
1739+
)
1740+
}
1741+
16001742
/**
16011743
* Change the cursor column in the second sync to a column that doesn't exist in the first sync.
16021744
* Verify that we overwrite everything correctly.

airbyte-integrations/connectors/destination-mssql-v2/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ data:
1616
type: GSM
1717
connectorType: destination
1818
definitionId: 37a928c1-2d5c-431a-a97d-ae236bd1ea0c
19-
dockerImageTag: 0.1.9
19+
dockerImageTag: 0.1.10
2020
dockerRepository: airbyte/destination-mssql-v2
2121
documentationUrl: https://docs.airbyte.com/integrations/destinations/mssql-v2
2222
githubIssueLabel: destination-mssql-v2

airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLQueryBuilder.kt

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,11 @@ import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_GENERATION_ID
2323
import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_META
2424
import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_RAW_ID
2525
import io.airbyte.integrations.destination.mssql.v2.config.MSSQLConfiguration
26-
import io.airbyte.integrations.destination.mssql.v2.convert.AirbyteTypeToSqlType
26+
import io.airbyte.integrations.destination.mssql.v2.convert.AirbyteTypeToMssqlType
2727
import io.airbyte.integrations.destination.mssql.v2.convert.AirbyteValueToStatement.Companion.setAsNullValue
2828
import io.airbyte.integrations.destination.mssql.v2.convert.AirbyteValueToStatement.Companion.setValue
2929
import io.airbyte.integrations.destination.mssql.v2.convert.MssqlType
3030
import io.airbyte.integrations.destination.mssql.v2.convert.ResultSetToAirbyteValue.Companion.getAirbyteNamedValue
31-
import io.airbyte.integrations.destination.mssql.v2.convert.SqlTypeToMssqlType
3231
import io.airbyte.protocol.models.Jsons
3332
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
3433
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
@@ -230,9 +229,9 @@ class MSSQLQueryBuilder(
230229
Append -> emptyList()
231230
Overwrite -> emptyList()
232231
}
232+
private val indexedColumns: Set<String> = uniquenessKey.toSet()
233233

234-
private val toSqlType = AirbyteTypeToSqlType()
235-
private val toMssqlType = SqlTypeToMssqlType()
234+
private val toMssqlType = AirbyteTypeToMssqlType()
236235

237236
val finalTableSchema: List<NamedField> =
238237
airbyteFinalTableFields + extractFinalTableSchema(stream.schema)
@@ -251,9 +250,7 @@ class MSSQLQueryBuilder(
251250
}
252251

253252
private fun getSchema(): List<NamedSqlField> =
254-
finalTableSchema.map {
255-
NamedSqlField(it.name, toMssqlType.convert(toSqlType.convert(it.type.type)))
256-
}
253+
finalTableSchema.map { NamedSqlField(it.name, toMssqlType.convert(it.type.type)) }
257254

258255
fun updateSchema(connection: Connection) {
259256
val existingSchema = getExistingSchema(connection)
@@ -486,7 +483,12 @@ class MSSQLQueryBuilder(
486483
separator: String = DEFAULT_SEPARATOR
487484
): String {
488485
return schema.joinToString(separator = separator) {
489-
"[${it.name}] ${toMssqlType.convert(toSqlType.convert(it.type.type)).sqlString} NULL"
486+
val mssqlType =
487+
toMssqlType.convert(
488+
it.type.type,
489+
isIndexed = indexedColumns.contains(it.name),
490+
)
491+
"[${it.name}] ${mssqlType.sqlString} NULL"
490492
}
491493
}
492494
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.mssql.v2.convert
6+
7+
import io.airbyte.cdk.load.data.AirbyteType
8+
import io.airbyte.cdk.load.data.ArrayType
9+
import io.airbyte.cdk.load.data.ArrayTypeWithoutSchema
10+
import io.airbyte.cdk.load.data.BooleanType
11+
import io.airbyte.cdk.load.data.DateType
12+
import io.airbyte.cdk.load.data.IntegerType
13+
import io.airbyte.cdk.load.data.NumberType
14+
import io.airbyte.cdk.load.data.ObjectType
15+
import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema
16+
import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema
17+
import io.airbyte.cdk.load.data.StringType
18+
import io.airbyte.cdk.load.data.TimeTypeWithTimezone
19+
import io.airbyte.cdk.load.data.TimeTypeWithoutTimezone
20+
import io.airbyte.cdk.load.data.TimestampTypeWithTimezone
21+
import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone
22+
import io.airbyte.cdk.load.data.UnionType
23+
import io.airbyte.cdk.load.data.UnknownType
24+
import java.sql.Types
25+
26+
enum class MssqlType(val sqlType: Int, val sqlStringOverride: String? = null) {
27+
TEXT(Types.LONGVARCHAR),
28+
BIT(Types.BOOLEAN),
29+
DATE(Types.DATE),
30+
BIGINT(Types.BIGINT),
31+
DECIMAL(Types.DECIMAL, sqlStringOverride = "DECIMAL(18, 8)"),
32+
VARCHAR(Types.VARCHAR, sqlStringOverride = "VARCHAR(MAX)"),
33+
VARCHAR_INDEX(Types.VARCHAR, sqlStringOverride = "VARCHAR(200)"),
34+
DATETIMEOFFSET(Types.TIMESTAMP_WITH_TIMEZONE),
35+
TIME(Types.TIME),
36+
DATETIME(Types.TIMESTAMP);
37+
38+
val sqlString: String = sqlStringOverride ?: name
39+
}
40+
41+
class AirbyteTypeToMssqlType {
42+
fun convert(airbyteSchema: AirbyteType, isIndexed: Boolean = false): MssqlType {
43+
return when (airbyteSchema) {
44+
is ObjectType -> MssqlType.TEXT
45+
is ArrayType -> MssqlType.TEXT
46+
is ArrayTypeWithoutSchema -> MssqlType.TEXT
47+
is BooleanType -> MssqlType.BIT
48+
is DateType -> MssqlType.DATE
49+
is IntegerType -> MssqlType.BIGINT
50+
is NumberType -> MssqlType.DECIMAL
51+
is ObjectTypeWithEmptySchema -> MssqlType.TEXT
52+
is ObjectTypeWithoutSchema -> MssqlType.TEXT
53+
is StringType -> if (isIndexed) MssqlType.VARCHAR_INDEX else MssqlType.VARCHAR
54+
is TimeTypeWithTimezone -> MssqlType.DATETIMEOFFSET
55+
is TimeTypeWithoutTimezone -> MssqlType.TIME
56+
is TimestampTypeWithTimezone -> MssqlType.DATETIMEOFFSET
57+
is TimestampTypeWithoutTimezone -> MssqlType.DATETIME
58+
is UnionType -> MssqlType.TEXT
59+
is UnknownType -> MssqlType.TEXT
60+
}
61+
}
62+
}

airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteTypeToSqlType.kt

Lines changed: 0 additions & 84 deletions
This file was deleted.

0 commit comments

Comments
 (0)