Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ subprojects {
tasks.withType(JavaCompile) {
// Skipping 'deprecation' since pegasus generates problematic files and 'fallthrough' since it can't be suppressed
// Xlint:all - [deprecation, fallthrough]
options.compilerArgs = ["-Xlint:cast,classfile,dep-ann,divzero,empty,finally,options,overrides,path,processing,rawtypes,serial,static,try,unchecked,varargs", "-Werror"]
options.compilerArgs = ["-Xlint:cast,classfile,dep-ann,divzero,empty,finally,options,overrides,path,processing,rawtypes,serial,static,try,unchecked,varargs", "-Xlint:-options", "-Werror"]
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
*/
package com.linkedin.datastream.server;

import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
Expand Down Expand Up @@ -52,6 +54,18 @@ public class EventProducer implements DatastreamEventProducer {
public static final String CONFIG_FLUSH_INTERVAL_MS = "flushIntervalMs";
public static final String CONFIG_ENABLE_PER_TOPIC_METRICS = "enablePerTopicMetrics";
public static final String CONFIG_ENABLE_PER_TOPIC_EVENT_LATENCY_METRICS = "enablePerTopicEventLatencyMetrics";
/**
* When enabled, emits per-source-database throughput attribution metrics keyed as
* {@code EventProducer.db.<databaseName>.bytesProducedRate} and
* {@code EventProducer.db.<databaseName>.eventProduceRate}.
* Applies only to CDC connectors whose source URI uses a single-slash scheme
* (e.g. {@code espresso:/}, {@code mysql:/}, {@code tidb:/}).
* Double-slash URIs (e.g. {@code kafka://}) produce no database metrics.
*
* <p><b>Cardinality warning:</b> each distinct database name creates a new metric series.
* Enable only when the set of source databases is bounded and well-understood.
*/
public static final String CONFIG_ENABLE_THROUGHPUT_METRICS = "enableThroughputAttributionMetrics";

// Default flush interval, It is intentionally kept at low frequency. If a particular connectors wants
// a more frequent flush (high traffic connectors), it can perform that on it's own.
Expand Down Expand Up @@ -79,7 +93,9 @@ public class EventProducer implements DatastreamEventProducer {
private static final String EVENTS_PRODUCED_OUTSIDE_SLA = "eventsProducedOutsideSla";
private static final String EVENTS_PRODUCED_OUTSIDE_ALTERNATE_SLA = "eventsProducedOutsideAlternateSla";
private static final String DROPPED_SENT_FROM_SERIALIZATION_ERROR = "droppedSentFromSerializationError";
static final String BYTES_PRODUCED_RATE = "bytesProducedRate";
private static final String AGGREGATE = "aggregate";

private static final String DEFAULT_AVAILABILITY_THRESHOLD_SLA_MS = "60000"; // 1 minute
private static final String DEFAULT_AVAILABILITY_THRESHOLD_ALTERNATE_SLA_MS = "180000"; // 3 minutes
private static final String DEFAULT_WARN_LOG_LATENCY_ENABLED = "false";
Expand Down Expand Up @@ -109,6 +125,9 @@ public class EventProducer implements DatastreamEventProducer {
private final boolean _skipMessageOnSerializationErrors;
private final boolean _enablePerTopicMetrics;
private final boolean _enablePerTopicEventLatencyMetrics;
private final boolean _enableThroughputMetrics;
// Cached source database name parsed from the connection string at construction time (null for non-CDC sources)
private final String _sourceDatabase;
private final Duration _flushInterval;
private final Function<DatastreamTask, Set<String>> _throughputViolatingTopicsProvider;

Expand Down Expand Up @@ -188,6 +207,12 @@ public EventProducer(DatastreamTask task, TransportProvider transportProvider, C
Boolean.parseBoolean(config.getProperty(CONFIG_ENABLE_PER_TOPIC_EVENT_LATENCY_METRICS,
Boolean.FALSE.toString()));

_enableThroughputMetrics =
Boolean.parseBoolean(config.getProperty(CONFIG_ENABLE_THROUGHPUT_METRICS, Boolean.FALSE.toString()));

String[] sourceParts = getSourcePathParts();
_sourceDatabase = (sourceParts != null && sourceParts.length > 1) ? sourceParts[1] : null;

_logger.info("Created event producer with customCheckpointing={}", customCheckpointing);

_dynamicMetricsManager = DynamicMetricsManager.getInstance();
Expand Down Expand Up @@ -281,10 +306,17 @@ private DatastreamRecordMetadata helperSendOrBroadcast(DatastreamProducerRecord
record.setEventsSendTimestamp(System.currentTimeMillis());
long recordEventsSourceTimestamp = record.getEventsSourceTimestamp();
long recordEventsSendTimestamp = record.getEventsSendTimestamp().orElse(0L);
final long numSerializedBytes = record.getEvents().stream()
.mapToLong(e -> {
long keySize = e.key().filter(k -> k instanceof byte[]).map(k -> (long) ((byte[]) k).length).orElse(0L);
long valSize = e.value().filter(v -> v instanceof byte[]).map(v -> (long) ((byte[]) v).length).orElse(0L);
return keySize + valSize;
})
.sum();
if (isBroadcast) {
broadcastMetadata = _transportProvider.broadcast(destination, record,
(metadata, exception) -> onSendCallback(metadata, exception, sendEventCallback, recordEventsSourceTimestamp,
recordEventsSendTimestamp));
recordEventsSendTimestamp, numSerializedBytes));
_logger.debug("Broadcast completed with {}", broadcastMetadata);
if (broadcastMetadata.isMessageSerializationError()) {
_logger.warn("Broadcast of record {} to destination {} failed because of serialization error.",
Expand All @@ -293,7 +325,7 @@ private DatastreamRecordMetadata helperSendOrBroadcast(DatastreamProducerRecord
} else {
_transportProvider.send(destination, record,
(metadata, exception) -> onSendCallback(metadata, exception, sendEventCallback, recordEventsSourceTimestamp,
recordEventsSendTimestamp));
recordEventsSendTimestamp, numSerializedBytes));
}
} catch (Exception e) {
String errorMessage = String.format("Failed to send the event %s exception %s", record, e);
Expand Down Expand Up @@ -365,7 +397,8 @@ private void performSlaRelatedLogging(DatastreamRecordMetadata metadata, long ev
* per DatastreamProducerRecord (i.e. by the number of events within the record), only increment all metrics by 1
* to avoid overcounting.
*/
private void reportMetrics(DatastreamRecordMetadata metadata, long eventsSourceTimestamp, long eventsSendTimestamp) {
private void reportMetrics(DatastreamRecordMetadata metadata, long eventsSourceTimestamp, long eventsSendTimestamp,
long numBytes) {
// If per-topic metrics are enabled, use topic as key for metrics; else, use datastream name as the key
String datastreamName = getDatastreamName();

Expand Down Expand Up @@ -413,6 +446,7 @@ private void reportMetrics(DatastreamRecordMetadata metadata, long eventsSourceT
}
_dynamicMetricsManager.createOrUpdateMeter(MODULE, AGGREGATE, EVENT_PRODUCE_RATE, 1);
_dynamicMetricsManager.createOrUpdateMeter(MODULE, _datastreamTask.getConnectorType(), EVENT_PRODUCE_RATE, 1);
reportThroughputAttributionMetrics(numBytes);
}

/**
Expand All @@ -424,7 +458,7 @@ private void reportMetrics(DatastreamRecordMetadata metadata, long eventsSourceT
* to avoid overcounting.
*/
private void reportMetricsForThroughputViolatingTopics(DatastreamRecordMetadata metadata, long eventsSourceTimestamp,
long eventsSendTimestamp) {
long eventsSendTimestamp, long numBytes) {
String topicOrDatastreamName = _enablePerTopicMetrics ? metadata.getTopic() : getDatastreamName();
// Treat all events within this record equally (assume same timestamp)
if (eventsSourceTimestamp > 0) {
Expand Down Expand Up @@ -457,6 +491,7 @@ private void reportMetricsForThroughputViolatingTopics(DatastreamRecordMetadata
}
_dynamicMetricsManager.createOrUpdateMeter(MODULE, AGGREGATE, EVENT_PRODUCE_RATE, 1);
_dynamicMetricsManager.createOrUpdateMeter(MODULE, _datastreamTask.getConnectorType(), EVENT_PRODUCE_RATE, 1);
reportThroughputAttributionMetrics(numBytes);
}

// Report Event Latency metrics for aggregate, connector and topic/datastream
Expand Down Expand Up @@ -492,7 +527,7 @@ private void reportSendLatencyMetrics(DatastreamRecordMetadata metadata, long se
}

private void onSendCallback(DatastreamRecordMetadata metadata, Exception exception, SendCallback sendCallback,
long eventSourceTimestamp, long eventSendTimestamp) {
long eventSourceTimestamp, long eventSendTimestamp, long numBytes) {

SendFailedException sendFailedException = null;

Expand All @@ -505,9 +540,9 @@ private void onSendCallback(DatastreamRecordMetadata metadata, Exception excepti
// Reporting separate metrics for throughput violating topics.

if (_throughputViolatingTopicsProvider.apply(_datastreamTask).contains(metadata.getUndecoratedTopic())) {
reportMetricsForThroughputViolatingTopics(metadata, eventSourceTimestamp, eventSendTimestamp);
reportMetricsForThroughputViolatingTopics(metadata, eventSourceTimestamp, eventSendTimestamp, numBytes);
} else {
reportMetrics(metadata, eventSourceTimestamp, eventSendTimestamp);
reportMetrics(metadata, eventSourceTimestamp, eventSendTimestamp, numBytes);
}
}
} catch (Exception e) {
Expand Down Expand Up @@ -601,10 +636,36 @@ public String toString() {
return String.format("EventProducer producerId=%d", _producerId);
}

private void reportThroughputAttributionMetrics(long numBytes) {
if (!_enableThroughputMetrics) {
return;
}
if (_sourceDatabase != null) {
_dynamicMetricsManager.createOrUpdateMeter(MODULE, "db." + _sourceDatabase, BYTES_PRODUCED_RATE, numBytes);
_dynamicMetricsManager.createOrUpdateMeter(MODULE, "db." + _sourceDatabase, EVENT_PRODUCE_RATE, 1);
}
_dynamicMetricsManager.createOrUpdateMeter(MODULE, AGGREGATE, BYTES_PRODUCED_RATE, numBytes);
_dynamicMetricsManager.createOrUpdateMeter(MODULE, _datastreamTask.getConnectorType(), BYTES_PRODUCED_RATE, numBytes);
}

private String getDatastreamName() {
return _datastreamTask.getDatastreams().get(0).getName();
}

// Returns path segments ["CLUSTER", "DATABASE", "TABLE"] for CDC single-slash URIs, null for BMM double-slash URIs.
// Consistent with MySqlKafkaSource, TiDBKafkaSource, and EspressoSource parsing in brooklin-li-common.
private String[] getSourcePathParts() {
try {
URI uri = new URI(_datastreamTask.getDatastreamSource().getConnectionString());
if (uri.getAuthority() != null) {
return null; // double-slash URI (e.g. kafka://host/topic) — no cluster/database segments
}
return uri.getPath().substring(1).split("/");
} catch (URISyntaxException e) {
return null;
}
}

/**
* Get the list of metrics maintained by the event producer
*/
Expand All @@ -615,6 +676,7 @@ public static List<BrooklinMetricInfo> getMetricInfos() {
metrics.add(new BrooklinCounterInfo(METRICS_PREFIX + EVENTS_PRODUCED_WITHIN_ALTERNATE_SLA));
metrics.add(new BrooklinCounterInfo(METRICS_PREFIX + TOTAL_EVENTS_PRODUCED));
metrics.add(new BrooklinMeterInfo(METRICS_PREFIX + EVENT_PRODUCE_RATE));
metrics.add(new BrooklinMeterInfo(METRICS_PREFIX + BYTES_PRODUCED_RATE));
metrics.add(new BrooklinCounterInfo(METRICS_PREFIX + EVENTS_PRODUCED_OUTSIDE_SLA));
metrics.add(new BrooklinCounterInfo(METRICS_PREFIX + EVENTS_PRODUCED_OUTSIDE_ALTERNATE_SLA));
metrics.add(new BrooklinCounterInfo(METRICS_PREFIX + DROPPED_SENT_FROM_SERIALIZATION_ERROR));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;

import com.linkedin.datastream.common.BrooklinEnvelope;
Expand Down Expand Up @@ -155,6 +156,111 @@ public void testPerDatastreamMetrics() {
metrics.getMetric("EventProducer." + datastreamName + "." + EventProducer.EVENTS_SEND_LATENCY_MS_STRING));
}

@Test
public void testThroughputAttributionMetrics() {
String datastreamName = "datastream-testThroughputAttributionMetrics";
Datastream datastream = DatastreamTestUtils.createDatastreams(DummyConnector.CONNECTOR_TYPE, datastreamName)[0];
datastream.getSource().setConnectionString("mysql:/myhost/testDatabase/myTable");
DatastreamTaskImpl task = new DatastreamTaskImpl(Collections.singletonList(datastream));

byte[] key = new byte[10];
byte[] value = new byte[20];
long expectedBytes = key.length + value.length; // 30

String someTopicName = "someTopicName";
TransportProvider transport = new NoOpTransportProviderAdminFactory.NoOpTransportProvider() {
@Override
public void send(String destination, DatastreamProducerRecord record, SendCallback onComplete) {
DatastreamRecordMetadata metadata =
new DatastreamRecordMetadata(record.getCheckpoint(), someTopicName, record.getPartition().orElse(0));
onComplete.onCompletion(metadata, null);
}
};

Properties props = new Properties();
props.put(EventProducer.CONFIG_ENABLE_THROUGHPUT_METRICS, Boolean.TRUE.toString());
EventProducer eventProducer =
new EventProducer(task, transport, new NoOpCheckpointProvider(), props, false);

DatastreamProducerRecordBuilder builder = new DatastreamProducerRecordBuilder();
builder.setPartition(0);
builder.setSourceCheckpoint("0");
builder.setEventsSourceTimestamp(System.currentTimeMillis());
builder.addEvent(new BrooklinEnvelope(key, value, null, new HashMap<>()));
eventProducer.send(builder.build(), (m, e) -> { });

DynamicMetricsManager metrics = DynamicMetricsManager.getInstance();
String connectorType = DummyConnector.CONNECTOR_TYPE;

Meter dbBytesRate = (Meter) metrics.getMetric("EventProducer.db.testDatabase." + EventProducer.BYTES_PRODUCED_RATE);
Assert.assertNotNull(dbBytesRate, "Per-database bytesProducedRate should exist");
Assert.assertEquals(dbBytesRate.getCount(), expectedBytes);

Meter dbEventRate = (Meter) metrics.getMetric("EventProducer.db.testDatabase.eventProduceRate");
Assert.assertNotNull(dbEventRate, "Per-database eventProduceRate should exist");
Assert.assertEquals(dbEventRate.getCount(), 1);

Meter aggBytesRate = (Meter) metrics.getMetric("EventProducer.aggregate." + EventProducer.BYTES_PRODUCED_RATE);
Assert.assertNotNull(aggBytesRate, "Aggregate bytesProducedRate should exist");
Assert.assertEquals(aggBytesRate.getCount(), expectedBytes);

Meter connectorBytesRate = (Meter) metrics.getMetric("EventProducer." + connectorType + "." + EventProducer.BYTES_PRODUCED_RATE);
Assert.assertNotNull(connectorBytesRate, "Connector-type bytesProducedRate should exist");
Assert.assertEquals(connectorBytesRate.getCount(), expectedBytes);
}

@Test
public void testThroughputMetricsDisabledByDefault() {
String datastreamName = "datastream-testThroughputMetricsDisabled";
Datastream datastream = DatastreamTestUtils.createDatastreams(DummyConnector.CONNECTOR_TYPE, datastreamName)[0];
datastream.getSource().setConnectionString("mysql:/myhost/someDatabase/someTable");
DatastreamTaskImpl task = new DatastreamTaskImpl(Collections.singletonList(datastream));

String someTopicName = "someTopicName";
TransportProvider transport = new NoOpTransportProviderAdminFactory.NoOpTransportProvider() {
@Override
public void send(String destination, DatastreamProducerRecord record, SendCallback onComplete) {
DatastreamRecordMetadata metadata =
new DatastreamRecordMetadata(record.getCheckpoint(), someTopicName, record.getPartition().orElse(0));
onComplete.onCompletion(metadata, null);
}
};

// Do NOT set CONFIG_ENABLE_THROUGHPUT_METRICS — it should default to false
EventProducer eventProducer =
new EventProducer(task, transport, new NoOpCheckpointProvider(), new Properties(), false);

eventProducer.send(createDatastreamProducerRecord(), (m, e) -> { });

DynamicMetricsManager metrics = DynamicMetricsManager.getInstance();
Assert.assertNull(metrics.getMetric("EventProducer.db.someDatabase." + EventProducer.BYTES_PRODUCED_RATE),
"bytesProducedRate should not exist when throughput metrics are disabled");
Assert.assertNull(metrics.getMetric("EventProducer.aggregate." + EventProducer.BYTES_PRODUCED_RATE),
"aggregate bytesProducedRate should not exist when throughput metrics are disabled");
}

@Test
public void testNoDatabaseMetricForBmmUri() {
Datastream datastream = DatastreamTestUtils.createDatastreams(DummyConnector.CONNECTOR_TYPE, "datastream-testBmmUri")[0];
// BMM source uses double-slash URI — no database segment should be extracted
datastream.getSource().setConnectionString("kafka://broker:9092/someTopic");
DatastreamTaskImpl task = new DatastreamTaskImpl(Collections.singletonList(datastream));

Properties props = new Properties();
props.put(EventProducer.CONFIG_ENABLE_THROUGHPUT_METRICS, Boolean.TRUE.toString());
EventProducer eventProducer =
new EventProducer(task, new NoOpTransportProviderAdminFactory.NoOpTransportProvider(),
new NoOpCheckpointProvider(), props, false);

eventProducer.send(createDatastreamProducerRecord(), (m, e) -> { });

DynamicMetricsManager metrics = DynamicMetricsManager.getInstance();
Assert.assertNull(metrics.getMetric("EventProducer.db.someTopic." + EventProducer.BYTES_PRODUCED_RATE),
"No per-database metric should exist for BMM double-slash URI");
Assert.assertNotNull(metrics.getMetric("EventProducer.aggregate." + EventProducer.BYTES_PRODUCED_RATE),
"Aggregate bytesProducedRate should still exist for BMM");
}

private DatastreamProducerRecord createDatastreamProducerRecord() {
return createDatastreamProducerRecord(0, "0", 1);
}
Expand Down
Loading