From 256e75c2cbec45e7c2cf27e563bace5732011ea5 Mon Sep 17 00:00:00 2001 From: Tsuyoshi Ushio Date: Thu, 23 Apr 2026 21:19:54 -0700 Subject: [PATCH] feat: Add KafkaRecord POJO types for raw Kafka record binding MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add types that enable users to bind to raw Apache Kafka records with full metadata access in their function signatures. New files: - KafkaRecord.java — Main POJO with topic, partition, offset, key/value as raw bytes, timestamp, headers, leader epoch + convenience helpers - KafkaHeader.java — Header with key + byte[] value + getValueAsString() - KafkaTimestamp.java — Timestamp with getUnixTimestampMs(), getType(), getDateTimeOffset() - KafkaTimestampType.java — Enum: NotAvailable, CreateTime, LogAppendTime with fromValue() safe mapping Non-breaking: All existing types unchanged. Package: com.microsoft.azure.functions Relates to Azure/azure-functions-kafka-extension#612 Co-authored-by: Dobby --- .../azure/functions/KafkaHeader.java | 40 +++++++ .../azure/functions/KafkaRecord.java | 104 ++++++++++++++++++ .../azure/functions/KafkaTimestamp.java | 42 +++++++ .../azure/functions/KafkaTimestampType.java | 46 ++++++++ 4 files changed, 232 insertions(+) create mode 100644 src/main/java/com/microsoft/azure/functions/KafkaHeader.java create mode 100644 src/main/java/com/microsoft/azure/functions/KafkaRecord.java create mode 100644 src/main/java/com/microsoft/azure/functions/KafkaTimestamp.java create mode 100644 src/main/java/com/microsoft/azure/functions/KafkaTimestampType.java diff --git a/src/main/java/com/microsoft/azure/functions/KafkaHeader.java b/src/main/java/com/microsoft/azure/functions/KafkaHeader.java new file mode 100644 index 0000000..efc163d --- /dev/null +++ b/src/main/java/com/microsoft/azure/functions/KafkaHeader.java @@ -0,0 +1,40 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +package com.microsoft.azure.functions; + +import java.nio.charset.StandardCharsets; + +/** + * Represents a single Kafka record header (key-value pair where value is raw bytes). + */ +public class KafkaHeader { + private final String key; + private final byte[] value; + + public KafkaHeader(String key, byte[] value) { + this.key = key; + this.value = value; + } + + /** + * Returns the header key. + */ + public String getKey() { + return key; + } + + /** + * Returns the header value as raw bytes, or null if not present. + */ + public byte[] getValue() { + return value; + } + + /** + * Returns the header value as a UTF-8 string, or null if the value is null. + */ + public String getValueAsString() { + return value == null ? null : new String(value, StandardCharsets.UTF_8); + } +} diff --git a/src/main/java/com/microsoft/azure/functions/KafkaRecord.java b/src/main/java/com/microsoft/azure/functions/KafkaRecord.java new file mode 100644 index 0000000..41df861 --- /dev/null +++ b/src/main/java/com/microsoft/azure/functions/KafkaRecord.java @@ -0,0 +1,104 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +package com.microsoft.azure.functions; + +import java.nio.charset.StandardCharsets; +import java.util.List; + +/** + * Represents a raw Apache Kafka record with full metadata. + * Key and value are raw bytes — the user controls deserialization. + */ +public class KafkaRecord { + private final String topic; + private final int partition; + private final long offset; + private final byte[] key; + private final byte[] value; + private final KafkaTimestamp timestamp; + private final List headers; + private final Integer leaderEpoch; + + public KafkaRecord(String topic, int partition, long offset, byte[] key, byte[] value, + KafkaTimestamp timestamp, List headers, Integer leaderEpoch) { + this.topic = topic; + this.partition = partition; + this.offset = offset; + this.key = key; + this.value = value; + this.timestamp = timestamp; + this.headers = headers; + this.leaderEpoch = leaderEpoch; + } + + /** + * Returns the topic name this record was consumed from. + */ + public String getTopic() { + return topic; + } + + /** + * Returns the partition this record was consumed from. + */ + public int getPartition() { + return partition; + } + + /** + * Returns the offset of this record within the partition. + */ + public long getOffset() { + return offset; + } + + /** + * Returns the raw key bytes. Null if the record has no key. + */ + public byte[] getKey() { + return key; + } + + /** + * Returns the raw value bytes. Null if the record has no value. + */ + public byte[] getValue() { + return value; + } + + /** + * Returns the key as a UTF-8 string, or null if the key is null. + */ + public String getKeyAsString() { + return key == null ? null : new String(key, StandardCharsets.UTF_8); + } + + /** + * Returns the value as a UTF-8 string, or null if the value is null. + */ + public String getValueAsString() { + return value == null ? null : new String(value, StandardCharsets.UTF_8); + } + + /** + * Returns the record timestamp. + */ + public KafkaTimestamp getTimestamp() { + return timestamp; + } + + /** + * Returns the record headers. + */ + public List getHeaders() { + return headers; + } + + /** + * Returns the leader epoch, if available. Null if not provided by the broker. + */ + public Integer getLeaderEpoch() { + return leaderEpoch; + } +} diff --git a/src/main/java/com/microsoft/azure/functions/KafkaTimestamp.java b/src/main/java/com/microsoft/azure/functions/KafkaTimestamp.java new file mode 100644 index 0000000..303320c --- /dev/null +++ b/src/main/java/com/microsoft/azure/functions/KafkaTimestamp.java @@ -0,0 +1,42 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +package com.microsoft.azure.functions; + +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; + +/** + * Represents the timestamp of a Kafka record. + */ +public class KafkaTimestamp { + private final long unixTimestampMs; + private final KafkaTimestampType type; + + public KafkaTimestamp(long unixTimestampMs, KafkaTimestampType type) { + this.unixTimestampMs = unixTimestampMs; + this.type = type; + } + + /** + * Returns the timestamp as Unix milliseconds since epoch. + */ + public long getUnixTimestampMs() { + return unixTimestampMs; + } + + /** + * Returns the timestamp type. + */ + public KafkaTimestampType getType() { + return type; + } + + /** + * Returns the timestamp as an {@link OffsetDateTime} in UTC. + */ + public OffsetDateTime getDateTimeOffset() { + return OffsetDateTime.ofInstant(Instant.ofEpochMilli(unixTimestampMs), ZoneOffset.UTC); + } +} diff --git a/src/main/java/com/microsoft/azure/functions/KafkaTimestampType.java b/src/main/java/com/microsoft/azure/functions/KafkaTimestampType.java new file mode 100644 index 0000000..41e0698 --- /dev/null +++ b/src/main/java/com/microsoft/azure/functions/KafkaTimestampType.java @@ -0,0 +1,46 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +package com.microsoft.azure.functions; + +/** + * Defines the type of a Kafka record timestamp. + */ +public enum KafkaTimestampType { + /** + * Timestamp type is not available. + */ + NotAvailable(0), + + /** + * Timestamp was set by the producer (record creation time). + */ + CreateTime(1), + + /** + * Timestamp was set by the broker (log append time). + */ + LogAppendTime(2); + + private final int value; + + KafkaTimestampType(int value) { + this.value = value; + } + + public int getValue() { + return value; + } + + /** + * Returns the enum constant for the given int value, or {@code NotAvailable} if not recognized. + */ + public static KafkaTimestampType fromValue(int value) { + for (KafkaTimestampType type : values()) { + if (type.value == value) { + return type; + } + } + return NotAvailable; + } +}