Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions src/main/java/com/microsoft/azure/functions/KafkaHeader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

package com.microsoft.azure.functions;

import java.nio.charset.StandardCharsets;

/**
* Represents a single Kafka record header (key-value pair where value is raw bytes).
*/
public class KafkaHeader {
private final String key;
private final byte[] value;

public KafkaHeader(String key, byte[] value) {
this.key = key;
this.value = value;
}

/**
* Returns the header key.
*/
public String getKey() {
return key;
}

/**
* Returns the header value as raw bytes, or null if not present.
*/
public byte[] getValue() {
return value;
}

/**
* Returns the header value as a UTF-8 string, or null if the value is null.
*/
public String getValueAsString() {
return value == null ? null : new String(value, StandardCharsets.UTF_8);
}
}
104 changes: 104 additions & 0 deletions src/main/java/com/microsoft/azure/functions/KafkaRecord.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

package com.microsoft.azure.functions;

import java.nio.charset.StandardCharsets;
import java.util.List;

/**
* Represents a raw Apache Kafka record with full metadata.
* Key and value are raw bytes — the user controls deserialization.
*/
public class KafkaRecord {
private final String topic;
private final int partition;
private final long offset;
private final byte[] key;
private final byte[] value;
private final KafkaTimestamp timestamp;
private final List<KafkaHeader> headers;
private final Integer leaderEpoch;

public KafkaRecord(String topic, int partition, long offset, byte[] key, byte[] value,
KafkaTimestamp timestamp, List<KafkaHeader> headers, Integer leaderEpoch) {
this.topic = topic;
this.partition = partition;
this.offset = offset;
this.key = key;
this.value = value;
this.timestamp = timestamp;
this.headers = headers;
this.leaderEpoch = leaderEpoch;
}

/**
* Returns the topic name this record was consumed from.
*/
public String getTopic() {
return topic;
}

/**
* Returns the partition this record was consumed from.
*/
public int getPartition() {
return partition;
}

/**
* Returns the offset of this record within the partition.
*/
public long getOffset() {
return offset;
}

/**
* Returns the raw key bytes. Null if the record has no key.
*/
public byte[] getKey() {
return key;
}

/**
* Returns the raw value bytes. Null if the record has no value.
*/
public byte[] getValue() {
return value;
}

/**
* Returns the key as a UTF-8 string, or null if the key is null.
*/
public String getKeyAsString() {
return key == null ? null : new String(key, StandardCharsets.UTF_8);
}

/**
* Returns the value as a UTF-8 string, or null if the value is null.
*/
public String getValueAsString() {
return value == null ? null : new String(value, StandardCharsets.UTF_8);
}

/**
* Returns the record timestamp.
*/
public KafkaTimestamp getTimestamp() {
return timestamp;
}

/**
* Returns the record headers.
*/
public List<KafkaHeader> getHeaders() {
return headers;
}

/**
* Returns the leader epoch, if available. Null if not provided by the broker.
*/
public Integer getLeaderEpoch() {
return leaderEpoch;
}
}
42 changes: 42 additions & 0 deletions src/main/java/com/microsoft/azure/functions/KafkaTimestamp.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

package com.microsoft.azure.functions;

import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;

/**
* Represents the timestamp of a Kafka record.
*/
public class KafkaTimestamp {
private final long unixTimestampMs;
private final KafkaTimestampType type;

public KafkaTimestamp(long unixTimestampMs, KafkaTimestampType type) {
this.unixTimestampMs = unixTimestampMs;
this.type = type;
}

/**
* Returns the timestamp as Unix milliseconds since epoch.
*/
public long getUnixTimestampMs() {
return unixTimestampMs;
}

/**
* Returns the timestamp type.
*/
public KafkaTimestampType getType() {
return type;
}

/**
* Returns the timestamp as an {@link OffsetDateTime} in UTC.
*/
public OffsetDateTime getDateTimeOffset() {
return OffsetDateTime.ofInstant(Instant.ofEpochMilli(unixTimestampMs), ZoneOffset.UTC);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

package com.microsoft.azure.functions;

/**
* Defines the type of a Kafka record timestamp.
*/
public enum KafkaTimestampType {
/**
* Timestamp type is not available.
*/
NotAvailable(0),

/**
* Timestamp was set by the producer (record creation time).
*/
CreateTime(1),

/**
* Timestamp was set by the broker (log append time).
*/
LogAppendTime(2);

private final int value;

KafkaTimestampType(int value) {
this.value = value;
}

public int getValue() {
return value;
}

/**
* Returns the enum constant for the given int value, or {@code NotAvailable} if not recognized.
*/
public static KafkaTimestampType fromValue(int value) {
for (KafkaTimestampType type : values()) {
if (type.value == value) {
return type;
}
}
return NotAvailable;
}
}