diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index bcd00b1300..2f192f2b65 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -1990,6 +1990,41 @@ public class ConfigOptions { + "the CoordinatorServer) it is advisable to use a port range " + "like 9990-9999."); + // ------------------------------------------------------------------------ + // ConfigOptions for influxdb reporter + // ------------------------------------------------------------------------ + public static final ConfigOption METRICS_REPORTER_INFLUXDB_HOST_URL = + key("metrics.reporter.influxdb.host-url") + .stringType() + .noDefaultValue() + .withDescription( + "The InfluxDB server host URL including scheme, host name, and port."); + + public static final ConfigOption METRICS_REPORTER_INFLUXDB_BUCKET = + key("metrics.reporter.influxdb.bucket") + .stringType() + .noDefaultValue() + .withFallbackKeys("metrics.reporter.influxdb.database") + .withDescription("The InfluxDB bucket/database name."); + + public static final ConfigOption METRICS_REPORTER_INFLUXDB_ORG = + key("metrics.reporter.influxdb.org") + .stringType() + .noDefaultValue() + .withDescription("The InfluxDB organization name."); + + public static final ConfigOption METRICS_REPORTER_INFLUXDB_TOKEN = + key("metrics.reporter.influxdb.token") + .stringType() + .noDefaultValue() + .withDescription("The InfluxDB authentication token."); + + public static final ConfigOption METRICS_REPORTER_INFLUXDB_PUSH_INTERVAL = + key("metrics.reporter.influxdb.push-interval") + .durationType() + .defaultValue(Duration.ofSeconds(10)) + .withDescription("The interval of reporting metrics to InfluxDB."); + // ------------------------------------------------------------------------ // ConfigOptions for lakehouse storage // ------------------------------------------------------------------------ diff --git a/fluss-metrics/fluss-metrics-influxdb/pom.xml b/fluss-metrics/fluss-metrics-influxdb/pom.xml new file mode 100644 index 0000000000..eef4ed8c34 --- /dev/null +++ b/fluss-metrics/fluss-metrics-influxdb/pom.xml @@ -0,0 +1,79 @@ + + + + + 4.0.0 + + org.apache.fluss + fluss-metrics + 0.10-SNAPSHOT + + + fluss-metrics-influxdb + Fluss : Metrics : InfluxDB + + + 1.8.0 + + + + + org.apache.fluss + fluss-common + ${project.version} + provided + + + + com.influxdb + influxdb3-java + ${influxdb3.version} + provided + + + + + org.apache.fluss + fluss-test-utils + + + org.apache.fluss + fluss-common + ${project.version} + test + test-jar + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + ${skip.on.java8} + + ${skip.on.java8} + + + + + diff --git a/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbPointProducer.java b/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbPointProducer.java new file mode 100644 index 0000000000..04ee1f30ed --- /dev/null +++ b/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbPointProducer.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metrics.influxdb; + +import org.apache.fluss.metrics.Counter; +import org.apache.fluss.metrics.Gauge; +import org.apache.fluss.metrics.Histogram; +import org.apache.fluss.metrics.HistogramStatistics; +import org.apache.fluss.metrics.Meter; +import org.apache.fluss.metrics.Metric; + +import com.influxdb.v3.client.Point; + +import java.time.Instant; +import java.util.List; +import java.util.Map; + +/** Producer that creates InfluxDB {@link Point Points} from Fluss {@link Metric Metrics}. */ +public class InfluxdbPointProducer { + + private static final InfluxdbPointProducer INSTANCE = new InfluxdbPointProducer(); + + public static InfluxdbPointProducer getInstance() { + return INSTANCE; + } + + public Point createPoint( + Metric metric, String metricName, List> tags, Instant time) { + Point point = Point.measurement(metricName).setTimestamp(time); + + for (Map.Entry tag : tags) { + point.setTag(tag.getKey(), tag.getValue()); + } + + if (metric instanceof Counter) { + return createPointForCounter((Counter) metric, point); + } + + if (metric instanceof Gauge) { + return createPointForGauge((Gauge) metric, point); + } + + if (metric instanceof Meter) { + return createPointForMeter((Meter) metric, point); + } + + if (metric instanceof Histogram) { + return createPointForHistogram((Histogram) metric, point); + } + + throw new IllegalArgumentException("Unknown metric type: " + metric.getClass()); + } + + private Point createPointForCounter(Counter counter, Point point) { + return point.setField("count", counter.getCount()); + } + + private Point createPointForGauge(Gauge gauge, Point point) { + Object value = gauge.getValue(); + + if (value instanceof Number) { + return point.setField("value", ((Number) value)); + } else if (value instanceof Boolean) { + return point.setField("value", ((boolean) value)); + } else { + return point.setField("value", String.valueOf(value)); + } + } + + private Point createPointForMeter(Meter meter, Point point) { + return point.setField("rate", meter.getRate()).setField("count", meter.getCount()); + } + + private Point createPointForHistogram(Histogram histogram, Point point) { + HistogramStatistics stats = histogram.getStatistics(); + return point.setField("count", histogram.getCount()) + .setField("mean", stats.getMean()) + .setField("stddev", stats.getStdDev()) + .setField("min", stats.getMin()) + .setField("max", stats.getMax()) + .setField("p50", stats.getQuantile(0.5)) + .setField("p75", stats.getQuantile(0.75)) + .setField("p95", stats.getQuantile(0.95)) + .setField("p98", stats.getQuantile(0.98)) + .setField("p99", stats.getQuantile(0.99)) + .setField("p999", stats.getQuantile(0.999)); + } +} diff --git a/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbReporter.java b/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbReporter.java new file mode 100644 index 0000000000..ba43551f42 --- /dev/null +++ b/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbReporter.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metrics.influxdb; + +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metrics.Metric; +import org.apache.fluss.metrics.groups.MetricGroup; +import org.apache.fluss.metrics.reporter.ScheduledMetricReporter; +import org.apache.fluss.utils.MapUtils; + +import com.influxdb.v3.client.InfluxDBClient; +import com.influxdb.v3.client.Point; +import com.influxdb.v3.client.config.ClientConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** {@link ScheduledMetricReporter} that exports {@link Metric Metrics} via InfluxDB. */ +public class InfluxdbReporter implements ScheduledMetricReporter { + + private static final Logger LOG = LoggerFactory.getLogger(InfluxdbReporter.class); + + private static final char SCOPE_SEPARATOR = '_'; + private static final String SCOPE_PREFIX = "fluss" + SCOPE_SEPARATOR; + + private final Map metricNames; + private final Map>> metricTags; + private final InfluxDBClient client; + private final Duration pushInterval; + private final InfluxdbPointProducer pointProducer; + + public InfluxdbReporter( + String hostUrl, String org, String bucket, String token, Duration pushInterval) { + ClientConfig clientConfig = + new ClientConfig.Builder() + .host(hostUrl) + .token(token.toCharArray()) + .organization(org) + .database(bucket) + .build(); + + this.client = InfluxDBClient.getInstance(clientConfig); + this.pushInterval = pushInterval; + this.metricNames = MapUtils.newConcurrentHashMap(); + this.metricTags = MapUtils.newConcurrentHashMap(); + this.pointProducer = InfluxdbPointProducer.getInstance(); + + LOG.info("Started InfluxDB reporter connecting to {}", hostUrl); + } + + @Override + public void open(Configuration config) { + // do nothing + } + + @Override + public void close() { + if (client != null) { + try { + client.close(); + } catch (Exception e) { + LOG.warn("Failed to close InfluxDB client", e); + } + } + } + + @Override + public void report() { + List points = new ArrayList<>(); + Instant now = Instant.now(); + + for (Map.Entry entry : metricNames.entrySet()) { + Metric metric = entry.getKey(); + String metricName = entry.getValue(); + List> tags = metricTags.get(metric); + + try { + Point point = pointProducer.createPoint(metric, metricName, tags, now); + points.add(point); + } catch (Exception e) { + LOG.warn("Failed to create point for metric {}", metricName, e); + } + } + + client.writePoints(points); + } + + @Override + public Duration scheduleInterval() { + return pushInterval; + } + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + String scopedMetricName = getScopedName(metricName, group); + List> tags = getTags(group); + + metricNames.put(metric, scopedMetricName); + metricTags.put(metric, tags); + } + + @Override + public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) { + metricNames.remove(metric); + metricTags.remove(metric); + } + + private String getScopedName(String metricName, MetricGroup group) { + return SCOPE_PREFIX + + group.getLogicalScope(this::filterCharacters, SCOPE_SEPARATOR) + + SCOPE_SEPARATOR + + filterCharacters(metricName); + } + + private List> getTags(MetricGroup group) { + List> tags = new ArrayList<>(); + for (Map.Entry entry : group.getAllVariables().entrySet()) { + tags.add( + new HashMap.SimpleEntry<>( + filterCharacters(entry.getKey()), filterCharacters(entry.getValue()))); + } + return tags; + } + + private String filterCharacters(String input) { + return input.replaceAll("[^a-zA-Z0-9_:]", String.valueOf(SCOPE_SEPARATOR)); + } +} diff --git a/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterPlugin.java b/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterPlugin.java new file mode 100644 index 0000000000..0f502cea0d --- /dev/null +++ b/fluss-metrics/fluss-metrics-influxdb/src/main/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterPlugin.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metrics.influxdb; + +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metrics.reporter.MetricReporter; +import org.apache.fluss.metrics.reporter.MetricReporterPlugin; +import org.apache.fluss.utils.StringUtils; + +import java.time.Duration; + +import static org.apache.fluss.config.ConfigOptions.METRICS_REPORTER_INFLUXDB_BUCKET; +import static org.apache.fluss.config.ConfigOptions.METRICS_REPORTER_INFLUXDB_HOST_URL; +import static org.apache.fluss.config.ConfigOptions.METRICS_REPORTER_INFLUXDB_ORG; +import static org.apache.fluss.config.ConfigOptions.METRICS_REPORTER_INFLUXDB_PUSH_INTERVAL; +import static org.apache.fluss.config.ConfigOptions.METRICS_REPORTER_INFLUXDB_TOKEN; + +/** {@link MetricReporterPlugin} for {@link InfluxdbReporter}. */ +public class InfluxdbReporterPlugin implements MetricReporterPlugin { + + private static final String PLUGIN_NAME = "influxdb"; + + @Override + public MetricReporter createMetricReporter(Configuration configuration) { + String hostUrl = configuration.getString(METRICS_REPORTER_INFLUXDB_HOST_URL); + String org = configuration.getString(METRICS_REPORTER_INFLUXDB_ORG); + String bucket = configuration.getString(METRICS_REPORTER_INFLUXDB_BUCKET); + String token = configuration.getString(METRICS_REPORTER_INFLUXDB_TOKEN); + Duration pushInterval = configuration.get(METRICS_REPORTER_INFLUXDB_PUSH_INTERVAL); + + if (StringUtils.isNullOrWhitespaceOnly(hostUrl)) { + throw new IllegalArgumentException( + "InfluxDB host URL must be configured via '" + + METRICS_REPORTER_INFLUXDB_HOST_URL.key() + + "'"); + } + if (StringUtils.isNullOrWhitespaceOnly(org)) { + throw new IllegalArgumentException( + "InfluxDB organization must be configured via '" + + METRICS_REPORTER_INFLUXDB_ORG.key() + + "'"); + } + if (StringUtils.isNullOrWhitespaceOnly(bucket)) { + throw new IllegalArgumentException( + "InfluxDB bucket must be configured via '" + + METRICS_REPORTER_INFLUXDB_BUCKET.key() + + "'"); + } + if (StringUtils.isNullOrWhitespaceOnly(token)) { + throw new IllegalArgumentException( + "InfluxDB token must be configured via '" + + METRICS_REPORTER_INFLUXDB_TOKEN.key() + + "'"); + } + + return new InfluxdbReporter(hostUrl, org, bucket, token, pushInterval); + } + + @Override + public String identifier() { + return PLUGIN_NAME; + } +} diff --git a/fluss-metrics/fluss-metrics-influxdb/src/main/resources/META-INF/services/org.apache.fluss.metrics.reporter.MetricReporterPlugin b/fluss-metrics/fluss-metrics-influxdb/src/main/resources/META-INF/services/org.apache.fluss.metrics.reporter.MetricReporterPlugin new file mode 100644 index 0000000000..e2b8a6eae2 --- /dev/null +++ b/fluss-metrics/fluss-metrics-influxdb/src/main/resources/META-INF/services/org.apache.fluss.metrics.reporter.MetricReporterPlugin @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +org.apache.fluss.metrics.influxdb.InfluxdbReporterPlugin diff --git a/fluss-metrics/fluss-metrics-influxdb/src/test/java/org/apache/fluss/metrics/influxdb/InfluxdbPointProducerTest.java b/fluss-metrics/fluss-metrics-influxdb/src/test/java/org/apache/fluss/metrics/influxdb/InfluxdbPointProducerTest.java new file mode 100644 index 0000000000..9cb999c03b --- /dev/null +++ b/fluss-metrics/fluss-metrics-influxdb/src/test/java/org/apache/fluss/metrics/influxdb/InfluxdbPointProducerTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metrics.influxdb; + +import org.apache.fluss.metrics.Counter; +import org.apache.fluss.metrics.Gauge; +import org.apache.fluss.metrics.SimpleCounter; +import org.apache.fluss.metrics.util.TestHistogram; +import org.apache.fluss.metrics.util.TestMeter; + +import com.influxdb.v3.client.Point; +import org.junit.jupiter.api.Test; + +import java.math.BigInteger; +import java.time.Instant; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for the {@link InfluxdbPointProducer}. */ +class InfluxdbPointProducerTest { + + private final InfluxdbPointProducer pointProducer = InfluxdbPointProducer.getInstance(); + + @Test + void testCreatePointForCounter() { + Counter counter = new SimpleCounter(); + counter.inc(42L); + + List> tags = new ArrayList<>(); + tags.add(new AbstractMap.SimpleEntry<>("host", "localhost")); + tags.add(new AbstractMap.SimpleEntry<>("server", "test")); + + Instant time = Instant.now(); + Point point = pointProducer.createPoint(counter, "test_counter", tags, time); + + assertThat(point).isNotNull(); + assertThat(point.getMeasurement()).isEqualTo("test_counter"); + assertThat(point.getField("count")).isEqualTo(42L); + assertThat(point.getTimestamp()) + .isEqualTo( + BigInteger.valueOf( + time.getEpochSecond() * 1_000_000_000L + time.getNano())); + } + + @Test + void testCreatePointForGauge() { + List> tags = new ArrayList<>(); + tags.add(new AbstractMap.SimpleEntry<>("host", "localhost")); + Instant time = Instant.now(); + + // Test with Number (Integer) + Gauge intGauge = () -> 123; + Point intPoint = pointProducer.createPoint(intGauge, "test_gauge_int", tags, time); + assertThat(intPoint).isNotNull(); + assertThat(intPoint.getMeasurement()).isEqualTo("test_gauge_int"); + assertThat(intPoint.getField("value")).isEqualTo(123); + assertThat(intPoint.getTimestamp()) + .isEqualTo( + BigInteger.valueOf( + time.getEpochSecond() * 1_000_000_000L + time.getNano())); + + // Test with Number (Double) + Gauge doubleGauge = () -> 123.456; + Point doublePoint = pointProducer.createPoint(doubleGauge, "test_gauge_double", tags, time); + assertThat(doublePoint).isNotNull(); + assertThat(doublePoint.getMeasurement()).isEqualTo("test_gauge_double"); + assertThat(doublePoint.getField("value")).isEqualTo(123.456); + + // Test with Boolean + Gauge boolGauge = () -> true; + Point boolPoint = pointProducer.createPoint(boolGauge, "test_gauge_boolean", tags, time); + assertThat(boolPoint).isNotNull(); + assertThat(boolPoint.getMeasurement()).isEqualTo("test_gauge_boolean"); + assertThat(boolPoint.getField("value")).isEqualTo(true); + + // Test with String + Gauge stringGauge = () -> "test_value"; + Point stringPoint = pointProducer.createPoint(stringGauge, "test_gauge_string", tags, time); + assertThat(stringPoint).isNotNull(); + assertThat(stringPoint.getMeasurement()).isEqualTo("test_gauge_string"); + assertThat(stringPoint.getField("value")).isEqualTo("test_value"); + + // Test with Null + Gauge nullGauge = () -> null; + Point nullPoint = pointProducer.createPoint(nullGauge, "test_gauge_null", tags, time); + assertThat(nullPoint).isNotNull(); + assertThat(nullPoint.getMeasurement()).isEqualTo("test_gauge_null"); + assertThat(nullPoint.getField("value")).isEqualTo("null"); + } + + @Test + void testCreatePointForMeter() { + TestMeter meter = new TestMeter(1000, 5.0); + + List> tags = new ArrayList<>(); + tags.add(new AbstractMap.SimpleEntry<>("host", "localhost")); + + Instant time = Instant.now(); + Point point = pointProducer.createPoint(meter, "test_meter", tags, time); + + assertThat(point).isNotNull(); + assertThat(point.getMeasurement()).isEqualTo("test_meter"); + assertThat(point.getField("rate")).isEqualTo(5.0); + assertThat(point.getField("count")).isEqualTo(1000L); + assertThat(point.getTimestamp()) + .isEqualTo( + BigInteger.valueOf( + time.getEpochSecond() * 1_000_000_000L + time.getNano())); + } + + @Test + void testCreatePointForHistogram() { + TestHistogram histogram = new TestHistogram(); + histogram.setCount(80); + histogram.setMean(50.5); + histogram.setStdDev(10.2); + histogram.setMax(100); + histogram.setMin(1); + + List> tags = new ArrayList<>(); + tags.add(new AbstractMap.SimpleEntry<>("host", "localhost")); + + Instant time = Instant.now(); + Point point = pointProducer.createPoint(histogram, "test_histogram", tags, time); + + assertThat(point).isNotNull(); + assertThat(point.getMeasurement()).isEqualTo("test_histogram"); + assertThat(point.getField("count")).isEqualTo(80L); + assertThat(point.getField("mean")).isEqualTo(50.5); + assertThat(point.getField("stddev")).isEqualTo(10.2); + assertThat(point.getField("max")).isEqualTo(100L); + assertThat(point.getField("min")).isEqualTo(1L); + assertThat(point.getTimestamp()) + .isEqualTo( + BigInteger.valueOf( + time.getEpochSecond() * 1_000_000_000L + time.getNano())); + } +} diff --git a/fluss-metrics/fluss-metrics-influxdb/src/test/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterPluginTest.java b/fluss-metrics/fluss-metrics-influxdb/src/test/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterPluginTest.java new file mode 100644 index 0000000000..4df8f1abf2 --- /dev/null +++ b/fluss-metrics/fluss-metrics-influxdb/src/test/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterPluginTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metrics.influxdb; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metrics.util.MetricReporterTestUtils; + +import org.junit.jupiter.api.Test; + +import java.time.Duration; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for the {@link InfluxdbReporterPlugin}. */ +class InfluxdbReporterPluginTest { + + @Test + void testCreateMetricReporterWithValidConfig() { + Configuration config = new Configuration(); + config.setString(ConfigOptions.METRICS_REPORTER_INFLUXDB_HOST_URL, "http://localhost:8086"); + config.setString(ConfigOptions.METRICS_REPORTER_INFLUXDB_ORG, "test-org"); + config.setString(ConfigOptions.METRICS_REPORTER_INFLUXDB_BUCKET, "test-bucket"); + config.setString(ConfigOptions.METRICS_REPORTER_INFLUXDB_TOKEN, "test-token"); + config.set(ConfigOptions.METRICS_REPORTER_INFLUXDB_PUSH_INTERVAL, Duration.ofSeconds(15)); + + InfluxdbReporterPlugin plugin = new InfluxdbReporterPlugin(); + InfluxdbReporter reporter = (InfluxdbReporter) plugin.createMetricReporter(config); + + try { + assertThat(reporter).isNotNull(); + assertThat(reporter.scheduleInterval()).isEqualTo(Duration.ofSeconds(15)); + } finally { + reporter.close(); + } + } + + @Test + void testMetricReporterSetupViaSPI() { + MetricReporterTestUtils.testMetricReporterSetupViaSPI(InfluxdbReporterPlugin.class); + } +} diff --git a/fluss-metrics/fluss-metrics-influxdb/src/test/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterTest.java b/fluss-metrics/fluss-metrics-influxdb/src/test/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterTest.java new file mode 100644 index 0000000000..450c617f61 --- /dev/null +++ b/fluss-metrics/fluss-metrics-influxdb/src/test/java/org/apache/fluss/metrics/influxdb/InfluxdbReporterTest.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metrics.influxdb; + +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metrics.Counter; +import org.apache.fluss.metrics.Gauge; +import org.apache.fluss.metrics.SimpleCounter; +import org.apache.fluss.metrics.groups.MetricGroup; +import org.apache.fluss.metrics.util.TestHistogram; +import org.apache.fluss.metrics.util.TestMeter; +import org.apache.fluss.metrics.util.TestMetricGroup; + +import com.influxdb.v3.client.InfluxDBClient; +import com.influxdb.v3.client.Point; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import java.lang.reflect.Field; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** Tests for the {@link InfluxdbReporter}. */ +class InfluxdbReporterTest { + + private InfluxdbReporter reporter; + private MetricGroup metricGroup; + private InfluxDBClient mockClient; + + @BeforeEach + void setUp() throws Exception { + mockClient = mock(InfluxDBClient.class); + doNothing().when(mockClient).writePoints(any()); + + reporter = + new InfluxdbReporter( + "http://localhost:8086", + "test-org", + "test-bucket", + "test-token", + Duration.ofSeconds(10)) { + @Override + public void open(Configuration config) { + // Override to inject mock client + try { + Field clientField = InfluxdbReporter.class.getDeclaredField("client"); + clientField.setAccessible(true); + clientField.set(this, mockClient); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + reporter.open(new Configuration()); + + Map variables = new HashMap<>(); + variables.put("", "localhost"); + variables.put("table", "test_table"); + + metricGroup = + TestMetricGroup.newBuilder() + .setLogicalScopeFunction((characterFilter, character) -> "tabletServer") + .setVariables(variables) + .build(); + } + + @AfterEach + void tearDown() { + if (reporter != null) { + reporter.close(); + } + } + + // -------------------- Tests for notifyOfAddedMetric -------------------- + + @Test + void testNotifyOfAddedMetric() throws Exception { + Counter counter = new SimpleCounter(); + counter.inc(42); + + reporter.notifyOfAddedMetric(counter, "test_counter", metricGroup); + + Map metricNames = getMetricNames(); + assertThat(metricNames).containsKey(counter); + assertThat(metricNames.get(counter)).isEqualTo("fluss_tabletServer_test_counter"); + + Map>> metricTags = + getMetricTags(); + assertThat(metricTags).containsKey(counter); + assertThat(metricTags.get(counter)) + .containsExactlyInAnyOrder( + Map.entry("_host_", "localhost"), Map.entry("table", "test_table")); + } + + @Test + void testNotifyOfAddedMetricWithDifferentTypes() throws Exception { + Counter counter = new SimpleCounter(); + reporter.notifyOfAddedMetric(counter, "counter_type_test", metricGroup); + + Gauge gauge = () -> 123; + reporter.notifyOfAddedMetric(gauge, "gauge_type_test", metricGroup); + + TestMeter meter = new TestMeter(100, 5.0); + reporter.notifyOfAddedMetric(meter, "meter_type_test", metricGroup); + + TestHistogram histogram = new TestHistogram(); + reporter.notifyOfAddedMetric(histogram, "histogram_type_test", metricGroup); + + Map metricNames = getMetricNames(); + assertThat(metricNames).containsKeys(counter, gauge, meter, histogram); + assertThat(metricNames.get(counter)).isEqualTo("fluss_tabletServer_counter_type_test"); + assertThat(metricNames.get(gauge)).isEqualTo("fluss_tabletServer_gauge_type_test"); + assertThat(metricNames.get(meter)).isEqualTo("fluss_tabletServer_meter_type_test"); + assertThat(metricNames.get(histogram)).isEqualTo("fluss_tabletServer_histogram_type_test"); + } + + // -------------------- Tests for notifyOfRemovedMetric -------------------- + + @Test + void testNotifyOfRemovedMetric() throws Exception { + Counter counter = new SimpleCounter(); + reporter.notifyOfAddedMetric(counter, "test_counter", metricGroup); + + Map metricNames = getMetricNames(); + assertThat(metricNames).containsKey(counter); + + reporter.notifyOfRemovedMetric(counter, "test_counter", metricGroup); + + assertThat(metricNames).doesNotContainKey(counter); + + Map>> metricTags = + getMetricTags(); + assertThat(metricTags).doesNotContainKey(counter); + } + + // -------------------- Tests for report -------------------- + + @Test + void testReport() throws Exception { + // Test report with no metrics + reporter.report(); + verify(mockClient, times(1)).writePoints(any()); + + // Test report with Counter + Counter counter = new SimpleCounter(); + counter.inc(42); + reporter.notifyOfAddedMetric(counter, "report_test_counter", metricGroup); + reporter.report(); + ArgumentCaptor> captor = ArgumentCaptor.forClass(List.class); + verify(mockClient, times(2)).writePoints(captor.capture()); + List points = captor.getValue(); + assertThat(points).hasSize(1); + + // Test report with Gauge + Gauge gauge = () -> 123; + reporter.notifyOfAddedMetric(gauge, "report_test_gauge", metricGroup); + reporter.report(); + verify(mockClient, times(3)).writePoints(any()); + + // Test report with Meter + TestMeter meter = new TestMeter(1000, 5.0); + reporter.notifyOfAddedMetric(meter, "report_test_meter", metricGroup); + reporter.report(); + verify(mockClient, times(4)).writePoints(any()); + + // Test report with Histogram + TestHistogram histogram = new TestHistogram(); + histogram.setCount(50); + histogram.setMean(100.0); + reporter.notifyOfAddedMetric(histogram, "report_test_histogram", metricGroup); + reporter.report(); + verify(mockClient, times(5)).writePoints(any()); + + // Test report with multiple metrics + Counter counter2 = new SimpleCounter(); + counter2.inc(10); + reporter.notifyOfAddedMetric(counter2, "report_test_counter2", metricGroup); + + Gauge gauge2 = () -> 3.14; + reporter.notifyOfAddedMetric(gauge2, "report_test_gauge2", metricGroup); + reporter.report(); + verify(mockClient, times(6)).writePoints(any()); + } + + // -------------------- Helper methods -------------------- + + @SuppressWarnings("unchecked") + private Map getMetricNames() throws Exception { + Field field = InfluxdbReporter.class.getDeclaredField("metricNames"); + field.setAccessible(true); + return (Map) field.get(reporter); + } + + @SuppressWarnings("unchecked") + private Map>> getMetricTags() + throws Exception { + Field field = InfluxdbReporter.class.getDeclaredField("metricTags"); + field.setAccessible(true); + return (Map>>) + field.get(reporter); + } +} diff --git a/fluss-metrics/fluss-metrics-influxdb/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension b/fluss-metrics/fluss-metrics-influxdb/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension new file mode 100644 index 0000000000..ca0e907f6d --- /dev/null +++ b/fluss-metrics/fluss-metrics-influxdb/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.fluss.testutils.common.TestLoggerExtension \ No newline at end of file diff --git a/fluss-metrics/fluss-metrics-influxdb/src/test/resources/log4j2-test.properties b/fluss-metrics/fluss-metrics-influxdb/src/test/resources/log4j2-test.properties new file mode 100644 index 0000000000..460cb8070a --- /dev/null +++ b/fluss-metrics/fluss-metrics-influxdb/src/test/resources/log4j2-test.properties @@ -0,0 +1,28 @@ +w# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level = OFF +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n diff --git a/fluss-metrics/pom.xml b/fluss-metrics/pom.xml index 9e5bc44871..7a7f7a3762 100644 --- a/fluss-metrics/pom.xml +++ b/fluss-metrics/pom.xml @@ -34,6 +34,7 @@ fluss-metrics-prometheus fluss-metrics-jmx + fluss-metrics-influxdb