diff --git a/pom.xml b/pom.xml index a483d766..4ee9cfe3 100644 --- a/pom.xml +++ b/pom.xml @@ -154,11 +154,21 @@ + compile-grpc compile compile-custom + + compile-kafka-proto + + compile + + + ${basedir}/src/main/proto/kafka + + diff --git a/src/main/java/com/microsoft/azure/functions/worker/binding/RpcModelBindingDataSource.java b/src/main/java/com/microsoft/azure/functions/worker/binding/RpcModelBindingDataSource.java index 041530e9..d555288b 100644 --- a/src/main/java/com/microsoft/azure/functions/worker/binding/RpcModelBindingDataSource.java +++ b/src/main/java/com/microsoft/azure/functions/worker/binding/RpcModelBindingDataSource.java @@ -2,7 +2,10 @@ import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; +import com.google.protobuf.InvalidProtocolBufferException; +import com.microsoft.azure.functions.KafkaRecord; import com.microsoft.azure.functions.rpc.messages.ModelBindingData; +import com.microsoft.azure.functions.worker.binding.kafka.KafkaRecordProtoDeserializer; import com.microsoft.azure.functions.worker.WorkerLogManager; import org.apache.commons.lang3.exception.ExceptionUtils; @@ -12,44 +15,68 @@ import java.util.logging.Logger; /** - * A DataSource that parses "model_binding_data" from the host. The "content" field - * is assumed to be JSON. We parse it into a Map. - * When someone calls "lookupName('ContainerName')", we return a nested DataSource - * that acts like a string. + * A DataSource that parses "model_binding_data" from the host. + * + *

Dispatches based on content_type: + *

*/ public class RpcModelBindingDataSource extends DataSource { private static final Logger LOGGER = WorkerLogManager.getSystemLogger(); private static final Gson GSON = new Gson(); - // This holds the parsed key-value pairs from the model_binding_data.content JSON + // This holds the parsed key-value pairs from the model_binding_data.content JSON (null for Protobuf path) private final Map contentMap; + // Precomputed KafkaRecord for Protobuf path (null for JSON path) + private final KafkaRecord kafkaRecord; + public RpcModelBindingDataSource(String name, ModelBindingData modelData) { super(name, modelData, MODEL_BINDING_DATA_OPERATIONS); - // Parse the JSON in modelData.getContent() => Map - String jsonString = modelData.getContent().toStringUtf8(); - if (jsonString == null || jsonString.isEmpty()) { - throw new IllegalArgumentException( - "model_binding_data.content is empty or missing for name: " + name - ); - } + String contentType = modelData.getContentType(); + String source = modelData.getSource(); - Map parsed = null; - try { - Type mapType = new TypeToken>(){}.getType(); - parsed = GSON.fromJson(jsonString, mapType); - } catch (Exception ex) { - LOGGER.warning("Failed to parse model_binding_data JSON: " + ExceptionUtils.getRootCauseMessage(ex)); - throw new RuntimeException(ex); - } + if (KafkaRecordProtoDeserializer.EXPECTED_CONTENT_TYPE.equals(contentType) + && KafkaRecordProtoDeserializer.EXPECTED_SOURCE.equals(source)) { + // Protobuf path: deserialize KafkaRecord + this.contentMap = null; + try { + this.kafkaRecord = KafkaRecordProtoDeserializer.deserialize( + modelData.getContent().toByteArray()); + } catch (InvalidProtocolBufferException ex) { + LOGGER.warning("Failed to deserialize KafkaRecord Protobuf: " + + ExceptionUtils.getRootCauseMessage(ex)); + throw new RuntimeException(ex); + } + } else { + // JSON path: legacy behavior + this.kafkaRecord = null; + String jsonString = modelData.getContent().toStringUtf8(); + if (jsonString == null || jsonString.isEmpty()) { + throw new IllegalArgumentException( + "model_binding_data.content is empty or missing for name: " + name + ); + } + + Map parsed = null; + try { + Type mapType = new TypeToken>(){}.getType(); + parsed = GSON.fromJson(jsonString, mapType); + } catch (Exception ex) { + LOGGER.warning("Failed to parse model_binding_data JSON: " + ExceptionUtils.getRootCauseMessage(ex)); + throw new RuntimeException(ex); + } - if (parsed == null) { - throw new IllegalArgumentException( - "model_binding_data.content was not valid JSON for name: " + name - ); + if (parsed == null) { + throw new IllegalArgumentException( + "model_binding_data.content was not valid JSON for name: " + name + ); + } + this.contentMap = parsed; } - this.contentMap = parsed; } /** @@ -59,18 +86,18 @@ public RpcModelBindingDataSource(String name, ModelBindingData modelData) { */ @Override protected Optional> lookupName(String subName) { - if (contentMap.containsKey(subName)) { - // Create a nested string data source so the code can do - // getTriggerMetadataByName("ContainerName", String.class) - // and eventually get that string value. + if (contentMap != null && contentMap.containsKey(subName)) { String value = contentMap.get(subName); return Optional.of(new RpcStringDataSource(subName, value)); } return Optional.empty(); } - // The operations can remain minimal, if you only do sub-value lookups - // from "lookupName(...)". Or you might define operations for the entire Map. + // Package-private for testing + KafkaRecord getKafkaRecord() { + return kafkaRecord; + } + private static final DataOperations MODEL_BINDING_DATA_OPERATIONS = new DataOperations<>(); @@ -84,5 +111,14 @@ protected Optional> lookupName(String subName) { // Or if they want it as a raw string, we can do that MODEL_BINDING_DATA_OPERATIONS.addOperation(String.class, modelBindingData -> modelBindingData.getContent()); + + // KafkaRecord binding: deserialize Protobuf to KafkaRecord + MODEL_BINDING_DATA_OPERATIONS.addOperation(KafkaRecord.class, modelBindingData -> { + try { + return KafkaRecordProtoDeserializer.deserialize(modelBindingData.getContent().toByteArray()); + } catch (InvalidProtocolBufferException ex) { + throw new RuntimeException("Failed to deserialize KafkaRecord Protobuf", ex); + } + }); } } diff --git a/src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaRecordProtoDeserializer.java b/src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaRecordProtoDeserializer.java new file mode 100644 index 00000000..3e78e40d --- /dev/null +++ b/src/main/java/com/microsoft/azure/functions/worker/binding/kafka/KafkaRecordProtoDeserializer.java @@ -0,0 +1,83 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +package com.microsoft.azure.functions.worker.binding.kafka; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.microsoft.azure.functions.KafkaHeader; +import com.microsoft.azure.functions.KafkaRecord; +import com.microsoft.azure.functions.KafkaTimestamp; +import com.microsoft.azure.functions.KafkaTimestampType; +import com.microsoft.azure.functions.worker.binding.kafka.proto.KafkaRecordProtos.KafkaRecordProto; +import com.microsoft.azure.functions.worker.binding.kafka.proto.KafkaRecordProtos.KafkaHeaderProto; +import com.microsoft.azure.functions.worker.binding.kafka.proto.KafkaRecordProtos.KafkaTimestampProto; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Deserializes Protobuf-encoded KafkaRecordProto bytes into a {@link KafkaRecord} POJO. + */ +public final class KafkaRecordProtoDeserializer { + + public static final String EXPECTED_SOURCE = "AzureKafkaRecord"; + public static final String EXPECTED_CONTENT_TYPE = "application/x-protobuf"; + + private KafkaRecordProtoDeserializer() { + } + + /** + * Deserializes Protobuf bytes into a {@link KafkaRecord}. + * + * @param protoBytes the Protobuf-encoded bytes from ModelBindingData.content + * @return a fully-populated KafkaRecord + * @throws InvalidProtocolBufferException if the bytes are not valid Protobuf + */ + public static KafkaRecord deserialize(byte[] protoBytes) throws InvalidProtocolBufferException { + KafkaRecordProto proto = KafkaRecordProto.parseFrom(protoBytes); + return fromProto(proto); + } + + static KafkaRecord fromProto(KafkaRecordProto proto) { + // Key: optional bytes — null if not present + byte[] key = proto.hasKey() ? proto.getKey().toByteArray() : null; + + // Value: optional bytes — null if not present + byte[] value = proto.hasValue() ? proto.getValue().toByteArray() : null; + + // Leader epoch: optional int32 — null if not present + Integer leaderEpoch = proto.hasLeaderEpoch() ? proto.getLeaderEpoch() : null; + + // Timestamp + KafkaTimestamp timestamp = null; + if (proto.hasTimestamp()) { + KafkaTimestampProto ts = proto.getTimestamp(); + timestamp = new KafkaTimestamp( + ts.getUnixTimestampMs(), + KafkaTimestampType.fromValue(ts.getType())); + } + + // Headers + List headers; + if (proto.getHeadersCount() > 0) { + headers = new ArrayList<>(proto.getHeadersCount()); + for (KafkaHeaderProto h : proto.getHeadersList()) { + byte[] headerValue = h.hasValue() ? h.getValue().toByteArray() : null; + headers.add(new KafkaHeader(h.getKey(), headerValue)); + } + } else { + headers = Collections.emptyList(); + } + + return new KafkaRecord( + proto.getTopic(), + proto.getPartition(), + proto.getOffset(), + key, + value, + timestamp, + headers, + leaderEpoch); + } +} diff --git a/src/main/proto/kafka/KafkaRecordProto.proto b/src/main/proto/kafka/KafkaRecordProto.proto new file mode 100644 index 00000000..5d196045 --- /dev/null +++ b/src/main/proto/kafka/KafkaRecordProto.proto @@ -0,0 +1,34 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +syntax = "proto3"; +package azure.functions.kafka; +option java_package = "com.microsoft.azure.functions.worker.binding.kafka.proto"; +option java_outer_classname = "KafkaRecordProtos"; + +// Represents a single Kafka record aligned with the Apache Kafka specification. +// This schema must stay in sync with the WebJobs extension copy of KafkaRecordProto.proto in +// Microsoft.Azure.WebJobs.Extensions.Kafka. +message KafkaRecordProto { + string topic = 1; + int32 partition = 2; + int64 offset = 3; + optional bytes key = 4; + optional bytes value = 5; + KafkaTimestampProto timestamp = 6; + repeated KafkaHeaderProto headers = 7; + optional int32 leader_epoch = 8; + + // Reserved for future fields. Do not reuse field numbers. + reserved 9 to 15; +} + +message KafkaTimestampProto { + int64 unix_timestamp_ms = 1; + int32 type = 2; // 0=NotAvailable, 1=CreateTime, 2=LogAppendTime +} + +message KafkaHeaderProto { + string key = 1; + optional bytes value = 2; +} diff --git a/src/test/java/com/microsoft/azure/functions/worker/binding/tests/KafkaRecordDeserializerTest.java b/src/test/java/com/microsoft/azure/functions/worker/binding/tests/KafkaRecordDeserializerTest.java new file mode 100644 index 00000000..50b1f68c --- /dev/null +++ b/src/test/java/com/microsoft/azure/functions/worker/binding/tests/KafkaRecordDeserializerTest.java @@ -0,0 +1,180 @@ +package com.microsoft.azure.functions.worker.binding.tests; + +import com.google.protobuf.ByteString; +import com.microsoft.azure.functions.KafkaHeader; +import com.microsoft.azure.functions.KafkaRecord; +import com.microsoft.azure.functions.KafkaTimestampType; +import com.microsoft.azure.functions.worker.binding.kafka.KafkaRecordProtoDeserializer; +import com.microsoft.azure.functions.worker.binding.kafka.proto.KafkaRecordProtos.*; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; + +import static org.junit.jupiter.api.Assertions.*; + +public class KafkaRecordDeserializerTest { + + @Test + public void deserialize_fullRecord_allFieldsPreserved() throws Exception { + KafkaRecordProto proto = KafkaRecordProto.newBuilder() + .setTopic("my-topic") + .setPartition(3) + .setOffset(12345) + .setKey(ByteString.copyFromUtf8("my-key")) + .setValue(ByteString.copyFromUtf8("{\"name\":\"test\"}")) + .setTimestamp(KafkaTimestampProto.newBuilder() + .setUnixTimestampMs(1700000000000L) + .setType(1) // CreateTime + .build()) + .setLeaderEpoch(7) + .addHeaders(KafkaHeaderProto.newBuilder() + .setKey("trace-id") + .setValue(ByteString.copyFromUtf8("trace-abc")) + .build()) + .build(); + + KafkaRecord record = KafkaRecordProtoDeserializer.deserialize(proto.toByteArray()); + + assertEquals("my-topic", record.getTopic()); + assertEquals(3, record.getPartition()); + assertEquals(12345, record.getOffset()); + assertEquals("my-key", record.getKeyAsString()); + assertEquals("{\"name\":\"test\"}", record.getValueAsString()); + assertEquals(1700000000000L, record.getTimestamp().getUnixTimestampMs()); + assertEquals(KafkaTimestampType.CreateTime, record.getTimestamp().getType()); + assertEquals(7, record.getLeaderEpoch()); + assertEquals(1, record.getHeaders().size()); + assertEquals("trace-id", record.getHeaders().get(0).getKey()); + assertEquals("trace-abc", record.getHeaders().get(0).getValueAsString()); + } + + @Test + public void deserialize_nullKeyAndValue() throws Exception { + KafkaRecordProto proto = KafkaRecordProto.newBuilder() + .setTopic("test-topic") + .setPartition(0) + .setOffset(100) + .setTimestamp(KafkaTimestampProto.newBuilder() + .setUnixTimestampMs(1700000000000L) + .setType(0) + .build()) + .build(); + // Key and Value deliberately not set + + KafkaRecord record = KafkaRecordProtoDeserializer.deserialize(proto.toByteArray()); + + assertNull(record.getKey()); + assertNull(record.getValue()); + assertNull(record.getKeyAsString()); + assertNull(record.getValueAsString()); + assertEquals("test-topic", record.getTopic()); + } + + @Test + public void deserialize_noLeaderEpoch_returnsNull() throws Exception { + KafkaRecordProto proto = KafkaRecordProto.newBuilder() + .setTopic("test-topic") + .setPartition(0) + .setOffset(0) + .setValue(ByteString.copyFromUtf8("test")) + .setTimestamp(KafkaTimestampProto.newBuilder() + .setUnixTimestampMs(0) + .setType(0) + .build()) + .build(); + + KafkaRecord record = KafkaRecordProtoDeserializer.deserialize(proto.toByteArray()); + + assertNull(record.getLeaderEpoch()); + } + + @Test + public void deserialize_unknownTimestampType_fallsBackToNotAvailable() throws Exception { + KafkaRecordProto proto = KafkaRecordProto.newBuilder() + .setTopic("test-topic") + .setPartition(0) + .setOffset(0) + .setValue(ByteString.copyFromUtf8("test")) + .setTimestamp(KafkaTimestampProto.newBuilder() + .setUnixTimestampMs(1700000000000L) + .setType(99) // Unknown future value + .build()) + .build(); + + KafkaRecord record = KafkaRecordProtoDeserializer.deserialize(proto.toByteArray()); + + assertEquals(KafkaTimestampType.NotAvailable, record.getTimestamp().getType()); + assertEquals(1700000000000L, record.getTimestamp().getUnixTimestampMs()); + } + + @Test + public void deserialize_multipleHeaders() throws Exception { + KafkaRecordProto proto = KafkaRecordProto.newBuilder() + .setTopic("test-topic") + .setPartition(0) + .setOffset(0) + .setValue(ByteString.copyFromUtf8("test")) + .setTimestamp(KafkaTimestampProto.newBuilder() + .setUnixTimestampMs(0) + .setType(0) + .build()) + .addHeaders(KafkaHeaderProto.newBuilder() + .setKey("correlation-id") + .setValue(ByteString.copyFromUtf8("abc-123")) + .build()) + .addHeaders(KafkaHeaderProto.newBuilder() + .setKey("null-value-header") + // Value intentionally not set + .build()) + .build(); + + KafkaRecord record = KafkaRecordProtoDeserializer.deserialize(proto.toByteArray()); + + assertEquals(2, record.getHeaders().size()); + assertEquals("correlation-id", record.getHeaders().get(0).getKey()); + assertEquals("abc-123", record.getHeaders().get(0).getValueAsString()); + assertEquals("null-value-header", record.getHeaders().get(1).getKey()); + assertNull(record.getHeaders().get(1).getValue()); + } + + @Test + public void deserialize_timestampDateTimeOffset() throws Exception { + KafkaRecordProto proto = KafkaRecordProto.newBuilder() + .setTopic("test-topic") + .setPartition(0) + .setOffset(0) + .setValue(ByteString.copyFromUtf8("test")) + .setTimestamp(KafkaTimestampProto.newBuilder() + .setUnixTimestampMs(1700000000000L) + .setType(2) // LogAppendTime + .build()) + .build(); + + KafkaRecord record = KafkaRecordProtoDeserializer.deserialize(proto.toByteArray()); + + assertEquals(KafkaTimestampType.LogAppendTime, record.getTimestamp().getType()); + assertNotNull(record.getTimestamp().getDateTimeOffset()); + assertEquals(1700000000000L, record.getTimestamp().getDateTimeOffset().toInstant().toEpochMilli()); + } + + @Test + public void kafkaTimestampType_fromValue_allValues() { + assertEquals(KafkaTimestampType.NotAvailable, KafkaTimestampType.fromValue(0)); + assertEquals(KafkaTimestampType.CreateTime, KafkaTimestampType.fromValue(1)); + assertEquals(KafkaTimestampType.LogAppendTime, KafkaTimestampType.fromValue(2)); + assertEquals(KafkaTimestampType.NotAvailable, KafkaTimestampType.fromValue(99)); + assertEquals(KafkaTimestampType.NotAvailable, KafkaTimestampType.fromValue(-1)); + } + + @Test + public void kafkaHeader_getValueAsString_nullValue() { + KafkaHeader header = new KafkaHeader("key", null); + assertNull(header.getValueAsString()); + } + + @Test + public void kafkaHeader_getValueAsString_withValue() { + KafkaHeader header = new KafkaHeader("key", "hello".getBytes(StandardCharsets.UTF_8)); + assertEquals("hello", header.getValueAsString()); + } +}