Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion psc-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<name>psc-examples</name>

<properties>
<memq.version>0.2.21</memq.version>
<memq.version>1.0.2</memq.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.pinterest.flink.connector.psc.source.metrics;

import com.pinterest.psc.common.TopicUriPartition;
import com.pinterest.psc.metrics.Metric;
import com.pinterest.psc.metrics.MetricName;

import java.util.Map;
import java.util.function.Predicate;

class MemqSourceReaderMetricsUtil {

public static final String MEMQ_CONSUMER_METRIC_GROUP = "memq-consumer-metrics";
public static final String BYTES_CONSUMED_TOTAL = "bytes.consumed.total";
public static final String NOTIFICATION_RECORDS_LAG_MAX = "notification.records.lag.max";

protected static Predicate<Map.Entry<MetricName, ? extends Metric>> createBytesConsumedFilter() {
return entry ->
entry.getKey().group().equals(MEMQ_CONSUMER_METRIC_GROUP)
&& entry.getKey().name().equals(BYTES_CONSUMED_TOTAL);
}

protected static Predicate<Map.Entry<MetricName, ? extends Metric>> createRecordsLagFilter(TopicUriPartition tp) {
return entry ->
entry.getKey().group().equals(MEMQ_CONSUMER_METRIC_GROUP)
&& entry.getKey().name().equals(NOTIFICATION_RECORDS_LAG_MAX);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.pinterest.psc.exception.ClientException;
import com.pinterest.psc.metrics.Metric;
import com.pinterest.psc.metrics.MetricName;
import java.util.Iterator;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
Expand Down Expand Up @@ -153,6 +154,19 @@ public void recordCurrentOffset(TopicUriPartition tp, long offset) {
offsets.get(tp).currentOffset = offset;
}

/**
* Returns the {@link Offset} tracker for the given partition, allowing callers to
* cache it and update offsets directly without repeated HashMap lookups.
*
* @param tp the topic partition to get the tracker for
* @return the Offset tracker
* @throws IllegalArgumentException if the partition is not tracked
*/
public Offset getOffsetTracker(TopicUriPartition tp) {
checkTopicPartitionTracked(tp);
return offsets.get(tp);
}

/**
* Update the latest committed offset of the given {@link TopicUriPartition}.
*
Expand Down Expand Up @@ -180,8 +194,21 @@ public void recordFailedCommit() {
* @param consumer Kafka consumer
*/
public void registerNumBytesIn(PscConsumer<?, ?> consumer) throws ClientException {
Predicate<Map.Entry<MetricName, ? extends Metric>> filter =
KafkaSourceReaderMetricsUtil.createBytesConsumedFilter();
String backendType = getBackendFromTags(consumer.metrics());
Predicate<Map.Entry<MetricName, ? extends Metric>> filter;
switch (backendType) {
case PscUtils.BACKEND_TYPE_KAFKA:
filter = KafkaSourceReaderMetricsUtil.createBytesConsumedFilter();
break;
case PscUtils.BACKEND_TYPE_MEMQ:
filter = MemqSourceReaderMetricsUtil.createBytesConsumedFilter();
break;
default:
LOG.warn(
"Unsupported backend type: \"{}\". Metric \"{}\" may not be reported correctly.",
backendType, MetricNames.IO_NUM_BYTES_IN);
return;
}
this.bytesConsumedTotalMetric = MetricUtil.getPscMetric(consumer.metrics(), filter);
}

Expand Down Expand Up @@ -288,25 +315,35 @@ private void checkTopicPartitionTracked(TopicUriPartition tp) {
case PscUtils.BACKEND_TYPE_KAFKA:
filter = KafkaSourceReaderMetricsUtil.createRecordLagFilter(tp);
break;
case PscUtils.BACKEND_TYPE_MEMQ:
filter = MemqSourceReaderMetricsUtil.createRecordsLagFilter(tp);
break;
default:
LOG.warn(
String.format(
"Unsupported backend type \"%s\". "
+ "Metric \"%s\" may not be reported correctly. ",
backendType, MetricNames.PENDING_RECORDS));
"Unsupported backend type \"{}\". Metric \"{}\" may not be reported correctly.",
backendType, MetricNames.PENDING_RECORDS);
return null;
}
return MetricUtil.getPscMetric(metrics, filter);
try {
return MetricUtil.getPscMetric(metrics, filter);
} catch (IllegalStateException e) {
LOG.debug("Metric not yet available for backend \"{}\", will retry on next poll cycle.", backendType);
return null;
}
}

private static String getBackendFromTags(Map<MetricName, ? extends Metric> metrics) {
// sample the first entry to get the backend type
return metrics.keySet().iterator().next().tags().get("backend");
Iterator<MetricName> it = metrics.keySet().iterator();
if (!it.hasNext()) {
return "unknown";
}
return it.next().tags().get("backend");
}

private static class Offset {
long currentOffset;
long committedOffset;
public static class Offset {
public long currentOffset;
public long committedOffset;

Offset(long currentOffset, long committedOffset) {
this.currentOffset = currentOffset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,7 @@ private static class PscPartitionSplitRecords
private Iterator<PscConsumerMessage<byte[], byte[]>> recordIterator;
private TopicUriPartition currentTopicPartition;
private Long currentSplitStoppingOffset;
private PscSourceReaderMetrics.Offset currentOffsetTracker;

private PscPartitionSplitRecords(
PscConsumerMessagesIterable<byte[], byte[]> consumerMessagesIterable, PscSourceReaderMetrics metrics) {
Expand All @@ -592,11 +593,13 @@ public String nextSplit() {
recordIterator = consumerMessagesIterable.getMessagesForTopicUriPartition(currentTopicPartition).iterator();
currentSplitStoppingOffset =
stoppingOffsets.getOrDefault(currentTopicPartition, Long.MAX_VALUE);
currentOffsetTracker = metrics.getOffsetTracker(currentTopicPartition);
return currentTopicPartition.toString();
} else {
currentTopicPartition = null;
recordIterator = null;
currentSplitStoppingOffset = null;
currentOffsetTracker = null;
return null;
}
}
Expand All @@ -612,7 +615,7 @@ public PscConsumerMessage<byte[], byte[]> nextRecordFromSplit() {
final PscConsumerMessage<byte[], byte[]> message = recordIterator.next();
// Only emit records before stopping offset
if (message.getMessageId().getOffset() < currentSplitStoppingOffset) {
metrics.recordCurrentOffset(currentTopicPartition, message.getMessageId().getOffset());
currentOffsetTracker.currentOffset = message.getMessageId().getOffset();
return message;
}
}
Expand Down
2 changes: 1 addition & 1 deletion psc-integration-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

<properties>
<kafka.version>3.4.0</kafka.version>
<memq.version>0.2.21</memq.version>
<memq.version>1.0.2</memq.version>
</properties>

<dependencies>
Expand Down
2 changes: 1 addition & 1 deletion psc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

<properties>
<kafka.version>3.4.0</kafka.version>
<memq.version>0.2.21</memq.version>
<memq.version>1.0.2</memq.version>
<!-- <ts-consumer.version>1.0.0</ts-consumer.version>-->
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public class TopicUriPartition implements Comparable<TopicUriPartition>, Seriali
private final String topicUriStr;
private final int partition;
private TopicUri backendTopicUri;
private transient int cachedHashCode;

/**
* Builds a TopicUriPartition instance with the default partition value (-1). This is meant to be used in
Expand Down Expand Up @@ -53,6 +54,7 @@ public TopicUriPartition(TopicUri topicUri, int partition) {

protected void setTopicUri(TopicUri backendTopicUri) {
this.backendTopicUri = backendTopicUri;
this.cachedHashCode = 0;
}

/**
Expand Down Expand Up @@ -106,10 +108,14 @@ public boolean equals(Object other) {

@Override
public int hashCode() {
int result = topicUriStr.hashCode();
result = 31 * result + (backendTopicUri == null ? 0 : backendTopicUri.hashCode());
result = 31 * result + partition;
return result;
int h = cachedHashCode;
if (h == 0) {
h = topicUriStr.hashCode();
h = 31 * h + (backendTopicUri == null ? 0 : backendTopicUri.hashCode());
h = 31 * h + partition;
cachedHashCode = h;
}
return h;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.pinterest.psc.config;

import com.pinterest.memq.client.commons.ConsumerConfigs;
import com.pinterest.psc.common.TopicUri;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class PscMetadataClientToMemqConsumerConfigConverter extends PscMetadataClientToBackendMetatadataClientConfigCoverter {
@Override
protected Map<String, String> getConfigConverterMap() {
return new HashMap<String, String>() {
private static final long serialVersionUID = 1L;

{
put(PscConfiguration.PSC_METADATA_CLIENT_ID, ConsumerConfigs.CLIENT_ID);
}
};
}

@Override
public Properties convert(PscConfigurationInternal pscConfigurationInternal, TopicUri topicUri) {
return super.convert(pscConfigurationInternal, topicUri);
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.pinterest.psc.consumer;

import com.pinterest.psc.common.TopicUriPartition;
import com.pinterest.psc.logging.PscLogger;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -13,11 +15,18 @@

public class PscConsumerMessagesIterable<K, V> implements Iterable<PscConsumerMessage<K, V>> {

private static final PscLogger logger = PscLogger.getLogger(PscConsumerMessagesIterable.class);

List<PscConsumerMessage<K, V>> messages;
Map<TopicUriPartition, List<PscConsumerMessage<K, V>>> messagesByTopicUriPartition;

public PscConsumerMessagesIterable(PscConsumerPollMessageIterator<K, V> iterator) {
this.messages = iterator.asList();
try {
iterator.close();
} catch (IOException e) {
logger.warn("Failed to close poll message iterator", e);
}
this.messagesByTopicUriPartition = new HashMap<>();
for (PscConsumerMessage<K, V> message : messages) {
TopicUriPartition topicUriPartition = message.getMessageId().getTopicUriPartition();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public class MemqTopicUri extends BaseTopicUri {
public static final String PLAINTEXT_PROTOCOL = "plaintext";
public static final String SECURE_PROTOCOL = "secure";

MemqTopicUri(TopicUri topicUri) {
public MemqTopicUri(TopicUri topicUri) {
super(topicUri);
}

Expand Down
Loading
Loading