Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,21 @@
</configuration>
<executions>
<execution>
<id>compile-grpc</id>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
<execution>
<id>compile-kafka-proto</id>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<protoSourceRoot>${basedir}/src/main/proto/kafka</protoSourceRoot>
</configuration>
</execution>
</executions>
</plugin>
<!-- This plugin enforces that maven plugin must above 3.2.0 -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.google.protobuf.InvalidProtocolBufferException;
import com.microsoft.azure.functions.KafkaRecord;
import com.microsoft.azure.functions.rpc.messages.ModelBindingData;
import com.microsoft.azure.functions.worker.binding.kafka.KafkaRecordProtoDeserializer;
import com.microsoft.azure.functions.worker.WorkerLogManager;
import org.apache.commons.lang3.exception.ExceptionUtils;

Expand All @@ -12,44 +15,68 @@
import java.util.logging.Logger;

/**
* A DataSource that parses "model_binding_data" from the host. The "content" field
* is assumed to be JSON. We parse it into a Map<String,String>.
* When someone calls "lookupName('ContainerName')", we return a nested DataSource
* that acts like a string.
* A DataSource that parses "model_binding_data" from the host.
*
* <p>Dispatches based on content_type:
* <ul>
* <li>"application/x-protobuf" with source "AzureKafkaRecord" — Protobuf deserialization to KafkaRecord</li>
* <li>all other — JSON parsing into Map&lt;String,String&gt; (legacy behavior)</li>
* </ul>
*/
public class RpcModelBindingDataSource extends DataSource<ModelBindingData> {
private static final Logger LOGGER = WorkerLogManager.getSystemLogger();
private static final Gson GSON = new Gson();

// This holds the parsed key-value pairs from the model_binding_data.content JSON
// This holds the parsed key-value pairs from the model_binding_data.content JSON (null for Protobuf path)
private final Map<String, String> contentMap;

// Precomputed KafkaRecord for Protobuf path (null for JSON path)
private final KafkaRecord kafkaRecord;

public RpcModelBindingDataSource(String name, ModelBindingData modelData) {
super(name, modelData, MODEL_BINDING_DATA_OPERATIONS);

// Parse the JSON in modelData.getContent() => Map<String,String>
String jsonString = modelData.getContent().toStringUtf8();
if (jsonString == null || jsonString.isEmpty()) {
throw new IllegalArgumentException(
"model_binding_data.content is empty or missing for name: " + name
);
}
String contentType = modelData.getContentType();
String source = modelData.getSource();

Map<String,String> parsed = null;
try {
Type mapType = new TypeToken<Map<String, String>>(){}.getType();
parsed = GSON.fromJson(jsonString, mapType);
} catch (Exception ex) {
LOGGER.warning("Failed to parse model_binding_data JSON: " + ExceptionUtils.getRootCauseMessage(ex));
throw new RuntimeException(ex);
}
if (KafkaRecordProtoDeserializer.EXPECTED_CONTENT_TYPE.equals(contentType)
&& KafkaRecordProtoDeserializer.EXPECTED_SOURCE.equals(source)) {
// Protobuf path: deserialize KafkaRecord
this.contentMap = null;
try {
this.kafkaRecord = KafkaRecordProtoDeserializer.deserialize(
modelData.getContent().toByteArray());
} catch (InvalidProtocolBufferException ex) {
LOGGER.warning("Failed to deserialize KafkaRecord Protobuf: "
+ ExceptionUtils.getRootCauseMessage(ex));
throw new RuntimeException(ex);
}
} else {
// JSON path: legacy behavior
this.kafkaRecord = null;
String jsonString = modelData.getContent().toStringUtf8();
if (jsonString == null || jsonString.isEmpty()) {
throw new IllegalArgumentException(
"model_binding_data.content is empty or missing for name: " + name
);
}

Map<String,String> parsed = null;
try {
Type mapType = new TypeToken<Map<String, String>>(){}.getType();
parsed = GSON.fromJson(jsonString, mapType);
} catch (Exception ex) {
LOGGER.warning("Failed to parse model_binding_data JSON: " + ExceptionUtils.getRootCauseMessage(ex));
throw new RuntimeException(ex);
}

if (parsed == null) {
throw new IllegalArgumentException(
"model_binding_data.content was not valid JSON for name: " + name
);
if (parsed == null) {
throw new IllegalArgumentException(
"model_binding_data.content was not valid JSON for name: " + name
);
}
this.contentMap = parsed;
}
this.contentMap = parsed;
}

/**
Expand All @@ -59,18 +86,18 @@ public RpcModelBindingDataSource(String name, ModelBindingData modelData) {
*/
@Override
protected Optional<DataSource<?>> lookupName(String subName) {
if (contentMap.containsKey(subName)) {
// Create a nested string data source so the code can do
// getTriggerMetadataByName("ContainerName", String.class)
// and eventually get that string value.
if (contentMap != null && contentMap.containsKey(subName)) {
String value = contentMap.get(subName);
return Optional.of(new RpcStringDataSource(subName, value));
}
return Optional.empty();
}

// The operations can remain minimal, if you only do sub-value lookups
// from "lookupName(...)". Or you might define operations for the entire Map.
// Package-private for testing
KafkaRecord getKafkaRecord() {
return kafkaRecord;
}

private static final DataOperations<ModelBindingData, Object> MODEL_BINDING_DATA_OPERATIONS
= new DataOperations<>();

Expand All @@ -84,5 +111,14 @@ protected Optional<DataSource<?>> lookupName(String subName) {

// Or if they want it as a raw string, we can do that
MODEL_BINDING_DATA_OPERATIONS.addOperation(String.class, modelBindingData -> modelBindingData.getContent());

// KafkaRecord binding: deserialize Protobuf to KafkaRecord
MODEL_BINDING_DATA_OPERATIONS.addOperation(KafkaRecord.class, modelBindingData -> {
try {
return KafkaRecordProtoDeserializer.deserialize(modelBindingData.getContent().toByteArray());
} catch (InvalidProtocolBufferException ex) {
throw new RuntimeException("Failed to deserialize KafkaRecord Protobuf", ex);
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

package com.microsoft.azure.functions.worker.binding.kafka;

import com.google.protobuf.InvalidProtocolBufferException;
import com.microsoft.azure.functions.KafkaHeader;
import com.microsoft.azure.functions.KafkaRecord;
import com.microsoft.azure.functions.KafkaTimestamp;
import com.microsoft.azure.functions.KafkaTimestampType;
import com.microsoft.azure.functions.worker.binding.kafka.proto.KafkaRecordProtos.KafkaRecordProto;
import com.microsoft.azure.functions.worker.binding.kafka.proto.KafkaRecordProtos.KafkaHeaderProto;
import com.microsoft.azure.functions.worker.binding.kafka.proto.KafkaRecordProtos.KafkaTimestampProto;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
* Deserializes Protobuf-encoded KafkaRecordProto bytes into a {@link KafkaRecord} POJO.
*/
public final class KafkaRecordProtoDeserializer {

public static final String EXPECTED_SOURCE = "AzureKafkaRecord";
public static final String EXPECTED_CONTENT_TYPE = "application/x-protobuf";

private KafkaRecordProtoDeserializer() {
}

/**
* Deserializes Protobuf bytes into a {@link KafkaRecord}.
*
* @param protoBytes the Protobuf-encoded bytes from ModelBindingData.content
* @return a fully-populated KafkaRecord
* @throws InvalidProtocolBufferException if the bytes are not valid Protobuf
*/
public static KafkaRecord deserialize(byte[] protoBytes) throws InvalidProtocolBufferException {
KafkaRecordProto proto = KafkaRecordProto.parseFrom(protoBytes);
return fromProto(proto);
}

static KafkaRecord fromProto(KafkaRecordProto proto) {
// Key: optional bytes — null if not present
byte[] key = proto.hasKey() ? proto.getKey().toByteArray() : null;

// Value: optional bytes — null if not present
byte[] value = proto.hasValue() ? proto.getValue().toByteArray() : null;

// Leader epoch: optional int32 — null if not present
Integer leaderEpoch = proto.hasLeaderEpoch() ? proto.getLeaderEpoch() : null;

// Timestamp
KafkaTimestamp timestamp = null;
if (proto.hasTimestamp()) {
KafkaTimestampProto ts = proto.getTimestamp();
timestamp = new KafkaTimestamp(
ts.getUnixTimestampMs(),
KafkaTimestampType.fromValue(ts.getType()));
}

// Headers
List<KafkaHeader> headers;
if (proto.getHeadersCount() > 0) {
headers = new ArrayList<>(proto.getHeadersCount());
for (KafkaHeaderProto h : proto.getHeadersList()) {
byte[] headerValue = h.hasValue() ? h.getValue().toByteArray() : null;
headers.add(new KafkaHeader(h.getKey(), headerValue));
}
} else {
headers = Collections.emptyList();
}

return new KafkaRecord(
proto.getTopic(),
proto.getPartition(),
proto.getOffset(),
key,
value,
timestamp,
headers,
leaderEpoch);
}
}
34 changes: 34 additions & 0 deletions src/main/proto/kafka/KafkaRecordProto.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

syntax = "proto3";
package azure.functions.kafka;
option java_package = "com.microsoft.azure.functions.worker.binding.kafka.proto";
option java_outer_classname = "KafkaRecordProtos";

// Represents a single Kafka record aligned with the Apache Kafka specification.
// This schema must stay in sync with the WebJobs extension copy of KafkaRecordProto.proto in
// Microsoft.Azure.WebJobs.Extensions.Kafka.
message KafkaRecordProto {
string topic = 1;
int32 partition = 2;
int64 offset = 3;
optional bytes key = 4;
optional bytes value = 5;
KafkaTimestampProto timestamp = 6;
repeated KafkaHeaderProto headers = 7;
optional int32 leader_epoch = 8;

// Reserved for future fields. Do not reuse field numbers.
reserved 9 to 15;
}

message KafkaTimestampProto {
int64 unix_timestamp_ms = 1;
int32 type = 2; // 0=NotAvailable, 1=CreateTime, 2=LogAppendTime
}

message KafkaHeaderProto {
string key = 1;
optional bytes value = 2;
}
Loading
Loading