Skip to content

[metric] Support influxdb reporter#2839

Open
zhuyufeng0809 wants to merge 2 commits intoapache:mainfrom
zhuyufeng0809:fluss-2838
Open

[metric] Support influxdb reporter#2839
zhuyufeng0809 wants to merge 2 commits intoapache:mainfrom
zhuyufeng0809:fluss-2838

Conversation

@zhuyufeng0809
Copy link
Contributor

@zhuyufeng0809 zhuyufeng0809 commented Mar 10, 2026

Purpose

Linked issue: close #2838

Brief change log

Tests

API and Format

Documentation

@zhuyufeng0809 zhuyufeng0809 force-pushed the fluss-2838 branch 2 times, most recently from 063f7e8 to dd4b4cf Compare March 11, 2026 03:46
@zhuyufeng0809
Copy link
Contributor Author

Hi @swuferhong , could you help me to review this PR ?

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a new InfluxDB metrics reporter plugin/module to the Fluss metrics subsystem, including configuration options and unit tests, so Fluss metrics can be exported to InfluxDB v3.

Changes:

  • Introduce fluss-metrics-influxdb module with an SPI-discoverable MetricReporterPlugin and a scheduled reporter implementation.
  • Add metrics.reporter.influxdb.* configuration options to ConfigOptions.
  • Add unit tests and test logging setup for the new module.

Reviewed changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
fluss-metrics/pom.xml Registers the new InfluxDB metrics module in the metrics reactor build.
fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java Adds InfluxDB reporter configuration keys (host/org/bucket/token/push interval).
fluss-metrics/fluss-metrics-influxdb/pom.xml Defines the new module and its dependencies.
fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterPlugin.java Creates the reporter from config and exposes plugin identifier for SPI loading.
fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbReporter.java Implements scheduled reporting and metric add/remove bookkeeping.
fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbPointProducer.java Converts Fluss metrics into InfluxDB Points.
fluss-metrics/fluss-metrics-influxdb/src/main/resources/META-INF/services/org.apache.fluss.metrics.reporter.MetricReporterPlugin Registers the plugin via Java SPI.
fluss-metrics/fluss-metrics-influxdb/src/test/java/org/apache/fluss/metrics/influxdb/*Test.java Adds tests for plugin SPI and point/reporting behavior.
fluss-metrics/fluss-metrics-influxdb/src/test/resources/* Adds JUnit extension SPI + log4j2 test configuration for tests.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +139 to +143
for (Map.Entry<String, String> entry : group.getAllVariables().entrySet()) {
tags.add(
new HashMap.SimpleEntry<>(
filterCharacters(entry.getKey()), filterCharacters(entry.getValue())));
}
}
}

client.writePoints(points);
Comment on lines +94 to +101
for (Map.Entry<Metric, String> entry : metricNames.entrySet()) {
Metric metric = entry.getKey();
String metricName = entry.getValue();
List<Map.Entry<String, String>> tags = metricTags.get(metric);

try {
Point point = pointProducer.createPoint(metric, metricName, tags, now);
points.add(point);
if (value instanceof Number) {
return point.setField("value", ((Number) value));
} else if (value instanceof Boolean) {
return point.setField("value", ((boolean) value));
Comment on lines +47 to +48
for (Map.Entry<String, String> tag : tags) {
point.setTag(tag.getKey(), tag.getValue());
@@ -0,0 +1,28 @@
w#
Comment on lines +61 to +79
reporter =
new InfluxdbReporter(
"http://localhost:8086",
"test-org",
"test-bucket",
"test-token",
Duration.ofSeconds(10)) {
@Override
public void open(Configuration config) {
// Override to inject mock client
try {
Field clientField = InfluxdbReporter.class.getDeclaredField("client");
clientField.setAccessible(true);
clientField.set(this, mockClient);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
<modules>
<module>fluss-metrics-prometheus</module>
<module>fluss-metrics-jmx</module>
<module>fluss-metrics-influxdb</module>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[metric] Support influxdb reporter

2 participants