From 40b90285ce204db59e1374011a4341c5397c42ce Mon Sep 17 00:00:00 2001 From: Tsuyoshi Ushio Date: Thu, 23 Apr 2026 21:14:13 -0700 Subject: [PATCH 1/2] feat: Add KafkaRecord type and Protobuf deserialization for Kafka trigger binding MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add support for binding to raw Apache Kafka records (KafkaRecord type) in the Java worker, enabling users to access full Kafka message metadata. New files: - KafkaRecord.java — Main POJO with topic, partition, offset, key/value as raw bytes, timestamp, headers, leader epoch - KafkaHeader.java — Header with key + byte[] value + getValueAsString() - KafkaTimestamp.java — Timestamp with UnixTimestampMs + type + getDateTimeOffset() - KafkaTimestampType.java — Enum: NotAvailable, CreateTime, LogAppendTime - KafkaRecordProtoDeserializer.java — Protobuf to POJO conversion - KafkaRecordProto.proto — Shared schema (synced with host extension) - KafkaRecordDeserializerTest.java — 10 unit tests Modified files: - RpcModelBindingDataSource.java — Add content_type dispatch: application/json (existing path) vs application/x-protobuf (new KafkaRecord path) - pom.xml — Add second proto source root for Kafka proto compilation Non-breaking: All existing binding types (String, byte[], Map) unchanged. Users opt in via KafkaRecord parameter type. Relates to Azure/azure-functions-kafka-extension#612 Fixes #868 Co-authored-by: Dobby --- pom.xml | 10 + .../binding/RpcModelBindingDataSource.java | 98 +++++++--- .../worker/binding/kafka/KafkaHeader.java | 40 ++++ .../worker/binding/kafka/KafkaRecord.java | 104 ++++++++++ .../kafka/KafkaRecordProtoDeserializer.java | 79 ++++++++ .../worker/binding/kafka/KafkaTimestamp.java | 42 +++++ .../binding/kafka/KafkaTimestampType.java | 46 +++++ src/main/proto/kafka/KafkaRecordProto.proto | 34 ++++ .../tests/KafkaRecordDeserializerTest.java | 177 ++++++++++++++++++ 9 files changed, 599 insertions(+), 31 deletions(-) create mode 100644 src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaHeader.java create mode 100644 src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaRecord.java create mode 100644 src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaRecordProtoDeserializer.java create mode 100644 src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaTimestamp.java create mode 100644 src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaTimestampType.java create mode 100644 src/main/proto/kafka/KafkaRecordProto.proto create mode 100644 src/test/java/com/microsoft/azure/functions/worker/binding/tests/KafkaRecordDeserializerTest.java diff --git a/pom.xml b/pom.xml index a483d766..4ee9cfe3 100644 --- a/pom.xml +++ b/pom.xml @@ -154,11 +154,21 @@ + compile-grpc compile compile-custom + + compile-kafka-proto + + compile + + + ${basedir}/src/main/proto/kafka + + diff --git a/src/main/java/com/microsoft/azure/functions/worker/binding/RpcModelBindingDataSource.java b/src/main/java/com/microsoft/azure/functions/worker/binding/RpcModelBindingDataSource.java index 041530e9..58eae70b 100644 --- a/src/main/java/com/microsoft/azure/functions/worker/binding/RpcModelBindingDataSource.java +++ b/src/main/java/com/microsoft/azure/functions/worker/binding/RpcModelBindingDataSource.java @@ -2,7 +2,10 @@ import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; +import com.google.protobuf.InvalidProtocolBufferException; import com.microsoft.azure.functions.rpc.messages.ModelBindingData; +import com.microsoft.azure.functions.worker.binding.kafka.KafkaRecord; +import com.microsoft.azure.functions.worker.binding.kafka.KafkaRecordProtoDeserializer; import com.microsoft.azure.functions.worker.WorkerLogManager; import org.apache.commons.lang3.exception.ExceptionUtils; @@ -12,44 +15,68 @@ import java.util.logging.Logger; /** - * A DataSource that parses "model_binding_data" from the host. The "content" field - * is assumed to be JSON. We parse it into a Map. - * When someone calls "lookupName('ContainerName')", we return a nested DataSource - * that acts like a string. + * A DataSource that parses "model_binding_data" from the host. + * + *

Dispatches based on content_type: + *

    + *
  • "application/x-protobuf" with source "AzureKafkaRecord" — Protobuf deserialization to KafkaRecord
  • + *
  • all other — JSON parsing into Map<String,String> (legacy behavior)
  • + *
*/ public class RpcModelBindingDataSource extends DataSource { private static final Logger LOGGER = WorkerLogManager.getSystemLogger(); private static final Gson GSON = new Gson(); - // This holds the parsed key-value pairs from the model_binding_data.content JSON + // This holds the parsed key-value pairs from the model_binding_data.content JSON (null for Protobuf path) private final Map contentMap; + // Precomputed KafkaRecord for Protobuf path (null for JSON path) + private final KafkaRecord kafkaRecord; + public RpcModelBindingDataSource(String name, ModelBindingData modelData) { super(name, modelData, MODEL_BINDING_DATA_OPERATIONS); - // Parse the JSON in modelData.getContent() => Map - String jsonString = modelData.getContent().toStringUtf8(); - if (jsonString == null || jsonString.isEmpty()) { - throw new IllegalArgumentException( - "model_binding_data.content is empty or missing for name: " + name - ); - } + String contentType = modelData.getContentType(); + String source = modelData.getSource(); - Map parsed = null; - try { - Type mapType = new TypeToken>(){}.getType(); - parsed = GSON.fromJson(jsonString, mapType); - } catch (Exception ex) { - LOGGER.warning("Failed to parse model_binding_data JSON: " + ExceptionUtils.getRootCauseMessage(ex)); - throw new RuntimeException(ex); - } + if (KafkaRecordProtoDeserializer.EXPECTED_CONTENT_TYPE.equals(contentType) + && KafkaRecordProtoDeserializer.EXPECTED_SOURCE.equals(source)) { + // Protobuf path: deserialize KafkaRecord + this.contentMap = null; + try { + this.kafkaRecord = KafkaRecordProtoDeserializer.deserialize( + modelData.getContent().toByteArray()); + } catch (InvalidProtocolBufferException ex) { + LOGGER.warning("Failed to deserialize KafkaRecord Protobuf: " + + ExceptionUtils.getRootCauseMessage(ex)); + throw new RuntimeException(ex); + } + } else { + // JSON path: legacy behavior + this.kafkaRecord = null; + String jsonString = modelData.getContent().toStringUtf8(); + if (jsonString == null || jsonString.isEmpty()) { + throw new IllegalArgumentException( + "model_binding_data.content is empty or missing for name: " + name + ); + } + + Map parsed = null; + try { + Type mapType = new TypeToken>(){}.getType(); + parsed = GSON.fromJson(jsonString, mapType); + } catch (Exception ex) { + LOGGER.warning("Failed to parse model_binding_data JSON: " + ExceptionUtils.getRootCauseMessage(ex)); + throw new RuntimeException(ex); + } - if (parsed == null) { - throw new IllegalArgumentException( - "model_binding_data.content was not valid JSON for name: " + name - ); + if (parsed == null) { + throw new IllegalArgumentException( + "model_binding_data.content was not valid JSON for name: " + name + ); + } + this.contentMap = parsed; } - this.contentMap = parsed; } /** @@ -59,18 +86,18 @@ public RpcModelBindingDataSource(String name, ModelBindingData modelData) { */ @Override protected Optional> lookupName(String subName) { - if (contentMap.containsKey(subName)) { - // Create a nested string data source so the code can do - // getTriggerMetadataByName("ContainerName", String.class) - // and eventually get that string value. + if (contentMap != null && contentMap.containsKey(subName)) { String value = contentMap.get(subName); return Optional.of(new RpcStringDataSource(subName, value)); } return Optional.empty(); } - // The operations can remain minimal, if you only do sub-value lookups - // from "lookupName(...)". Or you might define operations for the entire Map. + // Package-private for testing + KafkaRecord getKafkaRecord() { + return kafkaRecord; + } + private static final DataOperations MODEL_BINDING_DATA_OPERATIONS = new DataOperations<>(); @@ -84,5 +111,14 @@ protected Optional> lookupName(String subName) { // Or if they want it as a raw string, we can do that MODEL_BINDING_DATA_OPERATIONS.addOperation(String.class, modelBindingData -> modelBindingData.getContent()); + + // KafkaRecord binding: deserialize Protobuf to KafkaRecord + MODEL_BINDING_DATA_OPERATIONS.addOperation(KafkaRecord.class, modelBindingData -> { + try { + return KafkaRecordProtoDeserializer.deserialize(modelBindingData.getContent().toByteArray()); + } catch (InvalidProtocolBufferException ex) { + throw new RuntimeException("Failed to deserialize KafkaRecord Protobuf", ex); + } + }); } } diff --git a/src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaHeader.java b/src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaHeader.java new file mode 100644 index 00000000..ad32b864 --- /dev/null +++ b/src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaHeader.java @@ -0,0 +1,40 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +package com.microsoft.azure.functions.worker.binding.kafka; + +import java.nio.charset.StandardCharsets; + +/** + * Represents a single Kafka record header (key-value pair where value is raw bytes). + */ +public class KafkaHeader { + private final String key; + private final byte[] value; + + public KafkaHeader(String key, byte[] value) { + this.key = key; + this.value = value; + } + + /** + * Returns the header key. + */ + public String getKey() { + return key; + } + + /** + * Returns the header value as raw bytes, or null if not present. + */ + public byte[] getValue() { + return value; + } + + /** + * Returns the header value as a UTF-8 string, or null if the value is null. + */ + public String getValueAsString() { + return value == null ? null : new String(value, StandardCharsets.UTF_8); + } +} diff --git a/src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaRecord.java b/src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaRecord.java new file mode 100644 index 00000000..b6c1c1d2 --- /dev/null +++ b/src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaRecord.java @@ -0,0 +1,104 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +package com.microsoft.azure.functions.worker.binding.kafka; + +import java.nio.charset.StandardCharsets; +import java.util.List; + +/** + * Represents a raw Apache Kafka record with full metadata. + * Key and value are raw bytes — the user controls deserialization. + */ +public class KafkaRecord { + private final String topic; + private final int partition; + private final long offset; + private final byte[] key; + private final byte[] value; + private final KafkaTimestamp timestamp; + private final List headers; + private final Integer leaderEpoch; + + public KafkaRecord(String topic, int partition, long offset, byte[] key, byte[] value, + KafkaTimestamp timestamp, List headers, Integer leaderEpoch) { + this.topic = topic; + this.partition = partition; + this.offset = offset; + this.key = key; + this.value = value; + this.timestamp = timestamp; + this.headers = headers; + this.leaderEpoch = leaderEpoch; + } + + /** + * Returns the topic name this record was consumed from. + */ + public String getTopic() { + return topic; + } + + /** + * Returns the partition this record was consumed from. + */ + public int getPartition() { + return partition; + } + + /** + * Returns the offset of this record within the partition. + */ + public long getOffset() { + return offset; + } + + /** + * Returns the raw key bytes. Null if the record has no key. + */ + public byte[] getKey() { + return key; + } + + /** + * Returns the raw value bytes. Null if the record has no value. + */ + public byte[] getValue() { + return value; + } + + /** + * Returns the key as a UTF-8 string, or null if the key is null. + */ + public String getKeyAsString() { + return key == null ? null : new String(key, StandardCharsets.UTF_8); + } + + /** + * Returns the value as a UTF-8 string, or null if the value is null. + */ + public String getValueAsString() { + return value == null ? null : new String(value, StandardCharsets.UTF_8); + } + + /** + * Returns the record timestamp. + */ + public KafkaTimestamp getTimestamp() { + return timestamp; + } + + /** + * Returns the record headers. + */ + public List getHeaders() { + return headers; + } + + /** + * Returns the leader epoch, if available. Null if not provided by the broker. + */ + public Integer getLeaderEpoch() { + return leaderEpoch; + } +} diff --git a/src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaRecordProtoDeserializer.java b/src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaRecordProtoDeserializer.java new file mode 100644 index 00000000..73b93bca --- /dev/null +++ b/src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaRecordProtoDeserializer.java @@ -0,0 +1,79 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +package com.microsoft.azure.functions.worker.binding.kafka; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.microsoft.azure.functions.worker.binding.kafka.proto.KafkaRecordProtos.KafkaRecordProto; +import com.microsoft.azure.functions.worker.binding.kafka.proto.KafkaRecordProtos.KafkaHeaderProto; +import com.microsoft.azure.functions.worker.binding.kafka.proto.KafkaRecordProtos.KafkaTimestampProto; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Deserializes Protobuf-encoded KafkaRecordProto bytes into a {@link KafkaRecord} POJO. + */ +public final class KafkaRecordProtoDeserializer { + + public static final String EXPECTED_SOURCE = "AzureKafkaRecord"; + public static final String EXPECTED_CONTENT_TYPE = "application/x-protobuf"; + + private KafkaRecordProtoDeserializer() { + } + + /** + * Deserializes Protobuf bytes into a {@link KafkaRecord}. + * + * @param protoBytes the Protobuf-encoded bytes from ModelBindingData.content + * @return a fully-populated KafkaRecord + * @throws InvalidProtocolBufferException if the bytes are not valid Protobuf + */ + public static KafkaRecord deserialize(byte[] protoBytes) throws InvalidProtocolBufferException { + KafkaRecordProto proto = KafkaRecordProto.parseFrom(protoBytes); + return fromProto(proto); + } + + static KafkaRecord fromProto(KafkaRecordProto proto) { + // Key: optional bytes — null if not present + byte[] key = proto.hasKey() ? proto.getKey().toByteArray() : null; + + // Value: optional bytes — null if not present + byte[] value = proto.hasValue() ? proto.getValue().toByteArray() : null; + + // Leader epoch: optional int32 — null if not present + Integer leaderEpoch = proto.hasLeaderEpoch() ? proto.getLeaderEpoch() : null; + + // Timestamp + KafkaTimestamp timestamp = null; + if (proto.hasTimestamp()) { + KafkaTimestampProto ts = proto.getTimestamp(); + timestamp = new KafkaTimestamp( + ts.getUnixTimestampMs(), + KafkaTimestampType.fromValue(ts.getType())); + } + + // Headers + List headers; + if (proto.getHeadersCount() > 0) { + headers = new ArrayList<>(proto.getHeadersCount()); + for (KafkaHeaderProto h : proto.getHeadersList()) { + byte[] headerValue = h.hasValue() ? h.getValue().toByteArray() : null; + headers.add(new KafkaHeader(h.getKey(), headerValue)); + } + } else { + headers = Collections.emptyList(); + } + + return new KafkaRecord( + proto.getTopic(), + proto.getPartition(), + proto.getOffset(), + key, + value, + timestamp, + headers, + leaderEpoch); + } +} diff --git a/src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaTimestamp.java b/src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaTimestamp.java new file mode 100644 index 00000000..b9f1530a --- /dev/null +++ b/src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaTimestamp.java @@ -0,0 +1,42 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +package com.microsoft.azure.functions.worker.binding.kafka; + +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; + +/** + * Represents the timestamp of a Kafka record. + */ +public class KafkaTimestamp { + private final long unixTimestampMs; + private final KafkaTimestampType type; + + public KafkaTimestamp(long unixTimestampMs, KafkaTimestampType type) { + this.unixTimestampMs = unixTimestampMs; + this.type = type; + } + + /** + * Returns the timestamp as Unix milliseconds since epoch. + */ + public long getUnixTimestampMs() { + return unixTimestampMs; + } + + /** + * Returns the timestamp type. + */ + public KafkaTimestampType getType() { + return type; + } + + /** + * Returns the timestamp as an {@link OffsetDateTime} in UTC. + */ + public OffsetDateTime getDateTimeOffset() { + return OffsetDateTime.ofInstant(Instant.ofEpochMilli(unixTimestampMs), ZoneOffset.UTC); + } +} diff --git a/src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaTimestampType.java b/src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaTimestampType.java new file mode 100644 index 00000000..f86c5cce --- /dev/null +++ b/src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaTimestampType.java @@ -0,0 +1,46 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +package com.microsoft.azure.functions.worker.binding.kafka; + +/** + * Defines the type of a Kafka record timestamp. + */ +public enum KafkaTimestampType { + /** + * Timestamp type is not available. + */ + NotAvailable(0), + + /** + * Timestamp was set by the producer (record creation time). + */ + CreateTime(1), + + /** + * Timestamp was set by the broker (log append time). + */ + LogAppendTime(2); + + private final int value; + + KafkaTimestampType(int value) { + this.value = value; + } + + public int getValue() { + return value; + } + + /** + * Returns the enum constant for the given int value, or {@code NotAvailable} if not recognized. + */ + public static KafkaTimestampType fromValue(int value) { + for (KafkaTimestampType type : values()) { + if (type.value == value) { + return type; + } + } + return NotAvailable; + } +} diff --git a/src/main/proto/kafka/KafkaRecordProto.proto b/src/main/proto/kafka/KafkaRecordProto.proto new file mode 100644 index 00000000..5d196045 --- /dev/null +++ b/src/main/proto/kafka/KafkaRecordProto.proto @@ -0,0 +1,34 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +syntax = "proto3"; +package azure.functions.kafka; +option java_package = "com.microsoft.azure.functions.worker.binding.kafka.proto"; +option java_outer_classname = "KafkaRecordProtos"; + +// Represents a single Kafka record aligned with the Apache Kafka specification. +// This schema must stay in sync with the WebJobs extension copy of KafkaRecordProto.proto in +// Microsoft.Azure.WebJobs.Extensions.Kafka. +message KafkaRecordProto { + string topic = 1; + int32 partition = 2; + int64 offset = 3; + optional bytes key = 4; + optional bytes value = 5; + KafkaTimestampProto timestamp = 6; + repeated KafkaHeaderProto headers = 7; + optional int32 leader_epoch = 8; + + // Reserved for future fields. Do not reuse field numbers. + reserved 9 to 15; +} + +message KafkaTimestampProto { + int64 unix_timestamp_ms = 1; + int32 type = 2; // 0=NotAvailable, 1=CreateTime, 2=LogAppendTime +} + +message KafkaHeaderProto { + string key = 1; + optional bytes value = 2; +} diff --git a/src/test/java/com/microsoft/azure/functions/worker/binding/tests/KafkaRecordDeserializerTest.java b/src/test/java/com/microsoft/azure/functions/worker/binding/tests/KafkaRecordDeserializerTest.java new file mode 100644 index 00000000..cfa1581c --- /dev/null +++ b/src/test/java/com/microsoft/azure/functions/worker/binding/tests/KafkaRecordDeserializerTest.java @@ -0,0 +1,177 @@ +package com.microsoft.azure.functions.worker.binding.tests; + +import com.google.protobuf.ByteString; +import com.microsoft.azure.functions.worker.binding.kafka.*; +import com.microsoft.azure.functions.worker.binding.kafka.proto.KafkaRecordProtos.*; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; + +import static org.junit.jupiter.api.Assertions.*; + +public class KafkaRecordDeserializerTest { + + @Test + public void deserialize_fullRecord_allFieldsPreserved() throws Exception { + KafkaRecordProto proto = KafkaRecordProto.newBuilder() + .setTopic("my-topic") + .setPartition(3) + .setOffset(12345) + .setKey(ByteString.copyFromUtf8("my-key")) + .setValue(ByteString.copyFromUtf8("{\"name\":\"test\"}")) + .setTimestamp(KafkaTimestampProto.newBuilder() + .setUnixTimestampMs(1700000000000L) + .setType(1) // CreateTime + .build()) + .setLeaderEpoch(7) + .addHeaders(KafkaHeaderProto.newBuilder() + .setKey("trace-id") + .setValue(ByteString.copyFromUtf8("trace-abc")) + .build()) + .build(); + + KafkaRecord record = KafkaRecordProtoDeserializer.deserialize(proto.toByteArray()); + + assertEquals("my-topic", record.getTopic()); + assertEquals(3, record.getPartition()); + assertEquals(12345, record.getOffset()); + assertEquals("my-key", record.getKeyAsString()); + assertEquals("{\"name\":\"test\"}", record.getValueAsString()); + assertEquals(1700000000000L, record.getTimestamp().getUnixTimestampMs()); + assertEquals(KafkaTimestampType.CreateTime, record.getTimestamp().getType()); + assertEquals(7, record.getLeaderEpoch()); + assertEquals(1, record.getHeaders().size()); + assertEquals("trace-id", record.getHeaders().get(0).getKey()); + assertEquals("trace-abc", record.getHeaders().get(0).getValueAsString()); + } + + @Test + public void deserialize_nullKeyAndValue() throws Exception { + KafkaRecordProto proto = KafkaRecordProto.newBuilder() + .setTopic("test-topic") + .setPartition(0) + .setOffset(100) + .setTimestamp(KafkaTimestampProto.newBuilder() + .setUnixTimestampMs(1700000000000L) + .setType(0) + .build()) + .build(); + // Key and Value deliberately not set + + KafkaRecord record = KafkaRecordProtoDeserializer.deserialize(proto.toByteArray()); + + assertNull(record.getKey()); + assertNull(record.getValue()); + assertNull(record.getKeyAsString()); + assertNull(record.getValueAsString()); + assertEquals("test-topic", record.getTopic()); + } + + @Test + public void deserialize_noLeaderEpoch_returnsNull() throws Exception { + KafkaRecordProto proto = KafkaRecordProto.newBuilder() + .setTopic("test-topic") + .setPartition(0) + .setOffset(0) + .setValue(ByteString.copyFromUtf8("test")) + .setTimestamp(KafkaTimestampProto.newBuilder() + .setUnixTimestampMs(0) + .setType(0) + .build()) + .build(); + + KafkaRecord record = KafkaRecordProtoDeserializer.deserialize(proto.toByteArray()); + + assertNull(record.getLeaderEpoch()); + } + + @Test + public void deserialize_unknownTimestampType_fallsBackToNotAvailable() throws Exception { + KafkaRecordProto proto = KafkaRecordProto.newBuilder() + .setTopic("test-topic") + .setPartition(0) + .setOffset(0) + .setValue(ByteString.copyFromUtf8("test")) + .setTimestamp(KafkaTimestampProto.newBuilder() + .setUnixTimestampMs(1700000000000L) + .setType(99) // Unknown future value + .build()) + .build(); + + KafkaRecord record = KafkaRecordProtoDeserializer.deserialize(proto.toByteArray()); + + assertEquals(KafkaTimestampType.NotAvailable, record.getTimestamp().getType()); + assertEquals(1700000000000L, record.getTimestamp().getUnixTimestampMs()); + } + + @Test + public void deserialize_multipleHeaders() throws Exception { + KafkaRecordProto proto = KafkaRecordProto.newBuilder() + .setTopic("test-topic") + .setPartition(0) + .setOffset(0) + .setValue(ByteString.copyFromUtf8("test")) + .setTimestamp(KafkaTimestampProto.newBuilder() + .setUnixTimestampMs(0) + .setType(0) + .build()) + .addHeaders(KafkaHeaderProto.newBuilder() + .setKey("correlation-id") + .setValue(ByteString.copyFromUtf8("abc-123")) + .build()) + .addHeaders(KafkaHeaderProto.newBuilder() + .setKey("null-value-header") + // Value intentionally not set + .build()) + .build(); + + KafkaRecord record = KafkaRecordProtoDeserializer.deserialize(proto.toByteArray()); + + assertEquals(2, record.getHeaders().size()); + assertEquals("correlation-id", record.getHeaders().get(0).getKey()); + assertEquals("abc-123", record.getHeaders().get(0).getValueAsString()); + assertEquals("null-value-header", record.getHeaders().get(1).getKey()); + assertNull(record.getHeaders().get(1).getValue()); + } + + @Test + public void deserialize_timestampDateTimeOffset() throws Exception { + KafkaRecordProto proto = KafkaRecordProto.newBuilder() + .setTopic("test-topic") + .setPartition(0) + .setOffset(0) + .setValue(ByteString.copyFromUtf8("test")) + .setTimestamp(KafkaTimestampProto.newBuilder() + .setUnixTimestampMs(1700000000000L) + .setType(2) // LogAppendTime + .build()) + .build(); + + KafkaRecord record = KafkaRecordProtoDeserializer.deserialize(proto.toByteArray()); + + assertEquals(KafkaTimestampType.LogAppendTime, record.getTimestamp().getType()); + assertNotNull(record.getTimestamp().getDateTimeOffset()); + assertEquals(1700000000000L, record.getTimestamp().getDateTimeOffset().toInstant().toEpochMilli()); + } + + @Test + public void kafkaTimestampType_fromValue_allValues() { + assertEquals(KafkaTimestampType.NotAvailable, KafkaTimestampType.fromValue(0)); + assertEquals(KafkaTimestampType.CreateTime, KafkaTimestampType.fromValue(1)); + assertEquals(KafkaTimestampType.LogAppendTime, KafkaTimestampType.fromValue(2)); + assertEquals(KafkaTimestampType.NotAvailable, KafkaTimestampType.fromValue(99)); + assertEquals(KafkaTimestampType.NotAvailable, KafkaTimestampType.fromValue(-1)); + } + + @Test + public void kafkaHeader_getValueAsString_nullValue() { + KafkaHeader header = new KafkaHeader("key", null); + assertNull(header.getValueAsString()); + } + + @Test + public void kafkaHeader_getValueAsString_withValue() { + KafkaHeader header = new KafkaHeader("key", "hello".getBytes(StandardCharsets.UTF_8)); + assertEquals("hello", header.getValueAsString()); + } +} From 6f4c17b5219cea64bdfb2b02f95e369529927e15 Mon Sep 17 00:00:00 2001 From: Tsuyoshi Ushio Date: Thu, 23 Apr 2026 21:19:19 -0700 Subject: [PATCH 2/2] refactor: Move KafkaRecord POJO types to azure-functions-java-library Move KafkaRecord, KafkaHeader, KafkaTimestamp, KafkaTimestampType from worker package (com.microsoft.azure.functions.worker.binding.kafka) to library package (com.microsoft.azure.functions) so user functions can import these types in their Maven projects. Worker keeps only KafkaRecordProtoDeserializer (runtime-only code). Co-authored-by: Dobby --- .../binding/RpcModelBindingDataSource.java | 2 +- .../worker/binding/kafka/KafkaHeader.java | 40 ------- .../worker/binding/kafka/KafkaRecord.java | 104 ------------------ .../kafka/KafkaRecordProtoDeserializer.java | 4 + .../worker/binding/kafka/KafkaTimestamp.java | 42 ------- .../binding/kafka/KafkaTimestampType.java | 46 -------- .../tests/KafkaRecordDeserializerTest.java | 5 +- 7 files changed, 9 insertions(+), 234 deletions(-) delete mode 100644 src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaHeader.java delete mode 100644 src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaRecord.java delete mode 100644 src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaTimestamp.java delete mode 100644 src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaTimestampType.java diff --git a/src/main/java/com/microsoft/azure/functions/worker/binding/RpcModelBindingDataSource.java b/src/main/java/com/microsoft/azure/functions/worker/binding/RpcModelBindingDataSource.java index 58eae70b..d555288b 100644 --- a/src/main/java/com/microsoft/azure/functions/worker/binding/RpcModelBindingDataSource.java +++ b/src/main/java/com/microsoft/azure/functions/worker/binding/RpcModelBindingDataSource.java @@ -3,8 +3,8 @@ import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import com.google.protobuf.InvalidProtocolBufferException; +import com.microsoft.azure.functions.KafkaRecord; import com.microsoft.azure.functions.rpc.messages.ModelBindingData; -import com.microsoft.azure.functions.worker.binding.kafka.KafkaRecord; import com.microsoft.azure.functions.worker.binding.kafka.KafkaRecordProtoDeserializer; import com.microsoft.azure.functions.worker.WorkerLogManager; import org.apache.commons.lang3.exception.ExceptionUtils; diff --git a/src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaHeader.java b/src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaHeader.java deleted file mode 100644 index ad32b864..00000000 --- a/src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaHeader.java +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -package com.microsoft.azure.functions.worker.binding.kafka; - -import java.nio.charset.StandardCharsets; - -/** - * Represents a single Kafka record header (key-value pair where value is raw bytes). - */ -public class KafkaHeader { - private final String key; - private final byte[] value; - - public KafkaHeader(String key, byte[] value) { - this.key = key; - this.value = value; - } - - /** - * Returns the header key. - */ - public String getKey() { - return key; - } - - /** - * Returns the header value as raw bytes, or null if not present. - */ - public byte[] getValue() { - return value; - } - - /** - * Returns the header value as a UTF-8 string, or null if the value is null. - */ - public String getValueAsString() { - return value == null ? null : new String(value, StandardCharsets.UTF_8); - } -} diff --git a/src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaRecord.java b/src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaRecord.java deleted file mode 100644 index b6c1c1d2..00000000 --- a/src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaRecord.java +++ /dev/null @@ -1,104 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -package com.microsoft.azure.functions.worker.binding.kafka; - -import java.nio.charset.StandardCharsets; -import java.util.List; - -/** - * Represents a raw Apache Kafka record with full metadata. - * Key and value are raw bytes — the user controls deserialization. - */ -public class KafkaRecord { - private final String topic; - private final int partition; - private final long offset; - private final byte[] key; - private final byte[] value; - private final KafkaTimestamp timestamp; - private final List headers; - private final Integer leaderEpoch; - - public KafkaRecord(String topic, int partition, long offset, byte[] key, byte[] value, - KafkaTimestamp timestamp, List headers, Integer leaderEpoch) { - this.topic = topic; - this.partition = partition; - this.offset = offset; - this.key = key; - this.value = value; - this.timestamp = timestamp; - this.headers = headers; - this.leaderEpoch = leaderEpoch; - } - - /** - * Returns the topic name this record was consumed from. - */ - public String getTopic() { - return topic; - } - - /** - * Returns the partition this record was consumed from. - */ - public int getPartition() { - return partition; - } - - /** - * Returns the offset of this record within the partition. - */ - public long getOffset() { - return offset; - } - - /** - * Returns the raw key bytes. Null if the record has no key. - */ - public byte[] getKey() { - return key; - } - - /** - * Returns the raw value bytes. Null if the record has no value. - */ - public byte[] getValue() { - return value; - } - - /** - * Returns the key as a UTF-8 string, or null if the key is null. - */ - public String getKeyAsString() { - return key == null ? null : new String(key, StandardCharsets.UTF_8); - } - - /** - * Returns the value as a UTF-8 string, or null if the value is null. - */ - public String getValueAsString() { - return value == null ? null : new String(value, StandardCharsets.UTF_8); - } - - /** - * Returns the record timestamp. - */ - public KafkaTimestamp getTimestamp() { - return timestamp; - } - - /** - * Returns the record headers. - */ - public List getHeaders() { - return headers; - } - - /** - * Returns the leader epoch, if available. Null if not provided by the broker. - */ - public Integer getLeaderEpoch() { - return leaderEpoch; - } -} diff --git a/src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaRecordProtoDeserializer.java b/src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaRecordProtoDeserializer.java index 73b93bca..3e78e40d 100644 --- a/src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaRecordProtoDeserializer.java +++ b/src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaRecordProtoDeserializer.java @@ -4,6 +4,10 @@ package com.microsoft.azure.functions.worker.binding.kafka; import com.google.protobuf.InvalidProtocolBufferException; +import com.microsoft.azure.functions.KafkaHeader; +import com.microsoft.azure.functions.KafkaRecord; +import com.microsoft.azure.functions.KafkaTimestamp; +import com.microsoft.azure.functions.KafkaTimestampType; import com.microsoft.azure.functions.worker.binding.kafka.proto.KafkaRecordProtos.KafkaRecordProto; import com.microsoft.azure.functions.worker.binding.kafka.proto.KafkaRecordProtos.KafkaHeaderProto; import com.microsoft.azure.functions.worker.binding.kafka.proto.KafkaRecordProtos.KafkaTimestampProto; diff --git a/src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaTimestamp.java b/src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaTimestamp.java deleted file mode 100644 index b9f1530a..00000000 --- a/src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaTimestamp.java +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -package com.microsoft.azure.functions.worker.binding.kafka; - -import java.time.Instant; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; - -/** - * Represents the timestamp of a Kafka record. - */ -public class KafkaTimestamp { - private final long unixTimestampMs; - private final KafkaTimestampType type; - - public KafkaTimestamp(long unixTimestampMs, KafkaTimestampType type) { - this.unixTimestampMs = unixTimestampMs; - this.type = type; - } - - /** - * Returns the timestamp as Unix milliseconds since epoch. - */ - public long getUnixTimestampMs() { - return unixTimestampMs; - } - - /** - * Returns the timestamp type. - */ - public KafkaTimestampType getType() { - return type; - } - - /** - * Returns the timestamp as an {@link OffsetDateTime} in UTC. - */ - public OffsetDateTime getDateTimeOffset() { - return OffsetDateTime.ofInstant(Instant.ofEpochMilli(unixTimestampMs), ZoneOffset.UTC); - } -} diff --git a/src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaTimestampType.java b/src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaTimestampType.java deleted file mode 100644 index f86c5cce..00000000 --- a/src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaTimestampType.java +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -package com.microsoft.azure.functions.worker.binding.kafka; - -/** - * Defines the type of a Kafka record timestamp. - */ -public enum KafkaTimestampType { - /** - * Timestamp type is not available. - */ - NotAvailable(0), - - /** - * Timestamp was set by the producer (record creation time). - */ - CreateTime(1), - - /** - * Timestamp was set by the broker (log append time). - */ - LogAppendTime(2); - - private final int value; - - KafkaTimestampType(int value) { - this.value = value; - } - - public int getValue() { - return value; - } - - /** - * Returns the enum constant for the given int value, or {@code NotAvailable} if not recognized. - */ - public static KafkaTimestampType fromValue(int value) { - for (KafkaTimestampType type : values()) { - if (type.value == value) { - return type; - } - } - return NotAvailable; - } -} diff --git a/src/test/java/com/microsoft/azure/functions/worker/binding/tests/KafkaRecordDeserializerTest.java b/src/test/java/com/microsoft/azure/functions/worker/binding/tests/KafkaRecordDeserializerTest.java index cfa1581c..50b1f68c 100644 --- a/src/test/java/com/microsoft/azure/functions/worker/binding/tests/KafkaRecordDeserializerTest.java +++ b/src/test/java/com/microsoft/azure/functions/worker/binding/tests/KafkaRecordDeserializerTest.java @@ -1,7 +1,10 @@ package com.microsoft.azure.functions.worker.binding.tests; import com.google.protobuf.ByteString; -import com.microsoft.azure.functions.worker.binding.kafka.*; +import com.microsoft.azure.functions.KafkaHeader; +import com.microsoft.azure.functions.KafkaRecord; +import com.microsoft.azure.functions.KafkaTimestampType; +import com.microsoft.azure.functions.worker.binding.kafka.KafkaRecordProtoDeserializer; import com.microsoft.azure.functions.worker.binding.kafka.proto.KafkaRecordProtos.*; import org.junit.jupiter.api.Test;