From 5d96e0cb2384146a329173686771900fbc6c1426 Mon Sep 17 00:00:00 2001 From: Andreas Skoog Date: Fri, 31 Aug 2018 12:51:39 +0200 Subject: [PATCH] Adds support for filtering out non float values before sending To avoid server side parse errors, do not bother sending string values or null to graphite. Added xml configuration value "filterNonFloatValues" that defaults to false --- .../GraphitePlainTextTcpOutputWriter.java | 35 ++++++++++--- .../agent/GraphiteUdpOutputWriter.java | 14 +++++- .../GraphiteMetricMessageBuilder.java | 26 +++++++--- .../GraphiteOutputWriterCommonSettings.java | 9 +++- .../GraphiteMetricMessageBuilderTest.java | 22 ++++++++- .../GraphitePlainTextTcpOutputWriterTest.java | 49 +++++++++++++++---- .../agent/GraphiteUdpOutputWriterTest.java | 39 +++++++++++++-- 7 files changed, 164 insertions(+), 30 deletions(-) diff --git a/src/main/java/org/jmxtrans/agent/GraphitePlainTextTcpOutputWriter.java b/src/main/java/org/jmxtrans/agent/GraphitePlainTextTcpOutputWriter.java index 2de51b78..62551306 100644 --- a/src/main/java/org/jmxtrans/agent/GraphitePlainTextTcpOutputWriter.java +++ b/src/main/java/org/jmxtrans/agent/GraphitePlainTextTcpOutputWriter.java @@ -23,24 +23,33 @@ */ package org.jmxtrans.agent; -import static org.jmxtrans.agent.graphite.GraphiteOutputWriterCommonSettings.*; -import static org.jmxtrans.agent.util.ConfigurationUtils.*; +import static org.jmxtrans.agent.graphite.GraphiteOutputWriterCommonSettings.SETTING_HOST; +import static org.jmxtrans.agent.graphite.GraphiteOutputWriterCommonSettings.SETTING_PORT; +import static org.jmxtrans.agent.graphite.GraphiteOutputWriterCommonSettings.SETTING_PORT_DEFAULT_VALUE; +import static org.jmxtrans.agent.graphite.GraphiteOutputWriterCommonSettings.filterNonFloatValues; +import static org.jmxtrans.agent.graphite.GraphiteOutputWriterCommonSettings.getConfiguredMetricPrefixOrNull; +import static org.jmxtrans.agent.util.ConfigurationUtils.getInt; +import static org.jmxtrans.agent.util.ConfigurationUtils.getString; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; import java.io.IOException; import java.io.OutputStreamWriter; import java.io.Writer; -import java.net.*; +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.net.Socket; import java.nio.charset.Charset; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.logging.Level; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import org.jmxtrans.agent.graphite.GraphiteMetricMessageBuilder; import org.jmxtrans.agent.util.io.IoUtils; import org.jmxtrans.agent.util.net.HostAndPort; +import org.jmxtrans.agent.util.time.Clock; +import org.jmxtrans.agent.util.time.SystemCurrentTimeMillisClock; /** * @author Cyrille Le Clerc @@ -56,6 +65,8 @@ public class GraphitePlainTextTcpOutputWriter extends AbstractOutputWriter imple private Writer writer; private int socketConnectTimeoutInMillis = SETTING_SOCKET_CONNECT_TIMEOUT_IN_MILLIS_DEFAULT_VALUE; private GraphiteMetricMessageBuilder messageBuilder; + private boolean filterNonFloatValues; + private Clock clock; @Override public void postConstruct(Map settings) { @@ -68,9 +79,11 @@ public void postConstruct(Map settings) { socketConnectTimeoutInMillis = getInt(settings, SETTING_SOCKET_CONNECT_TIMEOUT_IN_MILLIS, SETTING_SOCKET_CONNECT_TIMEOUT_IN_MILLIS_DEFAULT_VALUE); + clock = new SystemCurrentTimeMillisClock(); logger.log(getInfoLevel(), "GraphitePlainTextTcpOutputWriter is configured with " + graphiteServerHostAndPort + ", metricPathPrefix=" + messageBuilder.getPrefix() + ", socketConnectTimeoutInMillis=" + socketConnectTimeoutInMillis); + filterNonFloatValues = filterNonFloatValues(settings); } @Override @@ -80,7 +93,13 @@ public void writeInvocationResult(@Nonnull String invocationName, @Nullable Obje @Override public void writeQueryResult(@Nonnull String metricName, @Nullable String type, @Nullable Object value) throws IOException { - String msg = messageBuilder.buildMessage(metricName, value, TimeUnit.SECONDS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS)); + if (filterNonFloatValues && !messageBuilder.isFloat(value)) { + if (logger.isLoggable(getTraceLevel())) { + logger.log(getTraceLevel(), "Filter non float value '" + value + "'"); + } + return; + } + String msg = messageBuilder.buildMessage(metricName, value, TimeUnit.SECONDS.convert(clock.getCurrentTimeMillis(), TimeUnit.MILLISECONDS)); try { ensureGraphiteConnection(); if (logger.isLoggable(getTraceLevel())) { @@ -94,6 +113,10 @@ public void writeQueryResult(@Nonnull String metricName, @Nullable String type, } } + protected void setClock(Clock clock) { + this.clock = clock; + } + private void releaseGraphiteConnection() { IoUtils.closeQuietly(writer); IoUtils.closeQuietly(socket); diff --git a/src/main/java/org/jmxtrans/agent/GraphiteUdpOutputWriter.java b/src/main/java/org/jmxtrans/agent/GraphiteUdpOutputWriter.java index 761906f5..d455a4fc 100644 --- a/src/main/java/org/jmxtrans/agent/GraphiteUdpOutputWriter.java +++ b/src/main/java/org/jmxtrans/agent/GraphiteUdpOutputWriter.java @@ -23,7 +23,8 @@ */ package org.jmxtrans.agent; -import static org.jmxtrans.agent.graphite.GraphiteOutputWriterCommonSettings.*; +import static org.jmxtrans.agent.graphite.GraphiteOutputWriterCommonSettings.getConfiguredMetricPrefixOrNull; +import static org.jmxtrans.agent.graphite.GraphiteOutputWriterCommonSettings.getHostAndPort; import java.io.IOException; import java.net.DatagramPacket; @@ -36,6 +37,7 @@ import java.util.logging.Level; import org.jmxtrans.agent.graphite.GraphiteMetricMessageBuilder; +import org.jmxtrans.agent.graphite.GraphiteOutputWriterCommonSettings; import org.jmxtrans.agent.util.net.HostAndPort; import org.jmxtrans.agent.util.time.Clock; import org.jmxtrans.agent.util.time.SystemCurrentTimeMillisClock; @@ -52,6 +54,7 @@ public class GraphiteUdpOutputWriter extends AbstractOutputWriter { private UdpMessageSender messageSender; private Clock clock; private GraphiteMetricMessageBuilder messageBuilder; + private boolean filterNonFloatValues = false; @Override public void postConstruct(Map settings) { @@ -62,6 +65,7 @@ public void postConstruct(Map settings) { clock = new SystemCurrentTimeMillisClock(); logger.log(getInfoLevel(), "GraphiteUdpOutputWriter is configured with " + graphiteServerHostAndPort + ", metricPathPrefix=" + messageBuilder.getPrefix()); + filterNonFloatValues = GraphiteOutputWriterCommonSettings.filterNonFloatValues(settings); } @Override @@ -69,8 +73,14 @@ public void writeInvocationResult(String invocationName, Object value) throws IO writeQueryResult(invocationName, null, value); } - @Override + @Override public void writeQueryResult(String metricName, String metricType, Object value) throws IOException { + if (filterNonFloatValues && !messageBuilder.isFloat(value)) { + if (logger.isLoggable(getTraceLevel())) { + logger.log(getTraceLevel(), "Filter non float value '" + value + "'"); + } + return; + } String msg = messageBuilder.buildMessage(metricName, value, TimeUnit.SECONDS.convert(clock.getCurrentTimeMillis(), TimeUnit.MILLISECONDS)); logMessageIfTraceLoggable(msg); tryWriteMsg(msg + "\n"); diff --git a/src/main/java/org/jmxtrans/agent/graphite/GraphiteMetricMessageBuilder.java b/src/main/java/org/jmxtrans/agent/graphite/GraphiteMetricMessageBuilder.java index fbe9ec07..0ecf9f29 100644 --- a/src/main/java/org/jmxtrans/agent/graphite/GraphiteMetricMessageBuilder.java +++ b/src/main/java/org/jmxtrans/agent/graphite/GraphiteMetricMessageBuilder.java @@ -50,13 +50,17 @@ public GraphiteMetricMessageBuilder(@Nullable String configuredMetricPathPrefix) * @return The metric string without trailing newline */ public String buildMessage(String metricName, Object value, long timestamp) { - if (value instanceof Boolean) { - return metricPathPrefix + metricName + " " + ((Boolean)value ? 1 : 0) + " " + timestamp; - } - return metricPathPrefix + metricName + " " + value + " " + timestamp; + return metricPathPrefix + metricName + " " + convertToString(value) + " " + timestamp; } - /** + private String convertToString(Object value) { + if (value instanceof Boolean) { + return (Boolean)value ? "1" : "0"; + } + return String.valueOf(value); + } + + /** * {@link java.net.InetAddress#getLocalHost()} may not be known at JVM startup when the process is launched as a Linux service. */ private static String buildMetricPathPrefix(String configuredMetricPathPrefix) { @@ -75,5 +79,15 @@ private static String buildMetricPathPrefix(String configuredMetricPathPrefix) { public String getPrefix() { return metricPathPrefix; } - + + /** + * Checks if the given value can be sent as a float value to graphite. + * Note that Booleans are converted to 1/0 and will pass this check + * + * @param value value to check + * @return true if the string representation of value is parseable as a float + */ + public boolean isFloat(Object value) { + return convertToString(value).matches("[-+]?[0-9]*\\.?[0-9]+"); + } } diff --git a/src/main/java/org/jmxtrans/agent/graphite/GraphiteOutputWriterCommonSettings.java b/src/main/java/org/jmxtrans/agent/graphite/GraphiteOutputWriterCommonSettings.java index a78faad0..5e8766c0 100644 --- a/src/main/java/org/jmxtrans/agent/graphite/GraphiteOutputWriterCommonSettings.java +++ b/src/main/java/org/jmxtrans/agent/graphite/GraphiteOutputWriterCommonSettings.java @@ -23,7 +23,9 @@ */ package org.jmxtrans.agent.graphite; -import static org.jmxtrans.agent.util.ConfigurationUtils.*; +import static org.jmxtrans.agent.util.ConfigurationUtils.getBoolean; +import static org.jmxtrans.agent.util.ConfigurationUtils.getInt; +import static org.jmxtrans.agent.util.ConfigurationUtils.getString; import java.util.Map; @@ -38,6 +40,7 @@ public class GraphiteOutputWriterCommonSettings { public static final String SETTING_PORT = "port"; public static final int SETTING_PORT_DEFAULT_VALUE = 2003; public static final String SETTING_NAME_PREFIX = "namePrefix"; + public static final String SETTING_FILTER_NON_FLOAT = "filterNonFloatValues"; private GraphiteOutputWriterCommonSettings(){} @@ -49,4 +52,8 @@ public static HostAndPort getHostAndPort(Map settings) { public static String getConfiguredMetricPrefixOrNull(Map settings) { return getString(settings, SETTING_NAME_PREFIX, null); } + + public static boolean filterNonFloatValues(Map settings) { + return getBoolean(settings, "filterNonFloatValues", false); + } } diff --git a/src/test/java/org/jmxtrans/agent/GraphiteMetricMessageBuilderTest.java b/src/test/java/org/jmxtrans/agent/GraphiteMetricMessageBuilderTest.java index a728f694..f4d6d1bc 100644 --- a/src/test/java/org/jmxtrans/agent/GraphiteMetricMessageBuilderTest.java +++ b/src/test/java/org/jmxtrans/agent/GraphiteMetricMessageBuilderTest.java @@ -23,8 +23,12 @@ */ package org.jmxtrans.agent; -import static org.hamcrest.Matchers.*; -import static org.junit.Assert.*; +import static org.hamcrest.Matchers.endsWith; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.startsWith; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import org.jmxtrans.agent.graphite.GraphiteMetricMessageBuilder; import org.junit.Test; @@ -62,4 +66,18 @@ public void falseIsConvertedToZero() { String msg = builder.buildMessage("bar", false, 11); assertThat(msg, equalTo("foo.bar 0 11")); } + + @Test + public void checksFloatValues() throws Exception { + GraphiteMetricMessageBuilder builder = new GraphiteMetricMessageBuilder("foo."); + assertTrue(builder.isFloat("1.23")); + assertTrue(builder.isFloat("1")); + assertTrue(builder.isFloat("-1.23")); + assertTrue(builder.isFloat("0")); + assertTrue(builder.isFloat(false)); + assertTrue(builder.isFloat(true)); + assertFalse(builder.isFloat("")); + assertFalse(builder.isFloat(null)); + assertFalse(builder.isFloat("qwerty")); + } } diff --git a/src/test/java/org/jmxtrans/agent/GraphitePlainTextTcpOutputWriterTest.java b/src/test/java/org/jmxtrans/agent/GraphitePlainTextTcpOutputWriterTest.java index 31eda7d2..114bd728 100644 --- a/src/test/java/org/jmxtrans/agent/GraphitePlainTextTcpOutputWriterTest.java +++ b/src/test/java/org/jmxtrans/agent/GraphitePlainTextTcpOutputWriterTest.java @@ -24,15 +24,17 @@ package org.jmxtrans.agent; -import static org.hamcrest.Matchers.*; -import static org.junit.Assert.*; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; -import java.util.Collection; import java.util.HashMap; import java.util.Map; import org.hamcrest.Matcher; import org.jmxtrans.agent.graphite.GraphiteOutputWriterCommonSettings; +import org.jmxtrans.agent.testutils.FixedTimeClock; +import org.jmxtrans.agent.util.time.Clock; import org.junit.Rule; import org.junit.Test; @@ -44,29 +46,56 @@ public class GraphitePlainTextTcpOutputWriterTest { @Rule public TcpLineServer tcpLineServer = new TcpLineServer(); + private final Clock clock = new FixedTimeClock(33000); + @Test public void reconnectsAfterServerClosesConnection() throws Exception { GraphitePlainTextTcpOutputWriter graphiteWriter = new GraphitePlainTextTcpOutputWriter(); Map config = new HashMap<>(); config.put(GraphiteOutputWriterCommonSettings.SETTING_HOST, "127.0.0.1"); config.put(GraphiteOutputWriterCommonSettings.SETTING_PORT, "" + tcpLineServer.getPort()); + config.put(GraphiteOutputWriterCommonSettings.SETTING_NAME_PREFIX, "bar."); graphiteWriter.postConstruct(config); + graphiteWriter.setClock(clock); // Write one metric to see it is received writeTestMetric(graphiteWriter); - assertEventuallyReceived(tcpLineServer, hasSize(1)); + assertEventuallyReceived(tcpLineServer, containsInAnyOrder("bar.foo 1 33")); // Disconnect the Graphite writer tcpLineServer.disconnectAllClients(); waitForErrorToBeDetectedByGraphiteWriter(graphiteWriter); - writeTestMetric(graphiteWriter); // Write one metric and verify that it is received writeTestMetric(graphiteWriter); - assertEventuallyReceived(tcpLineServer, hasSize(greaterThan(1))); + writeTestMetric(graphiteWriter); + assertEventuallyReceived(tcpLineServer, containsInAnyOrder("bar.foo 1 33", "bar.foo 2 33", "bar.foo 3 33")); + } + + @Test + public void filterNonNumericValues() throws Exception { + GraphitePlainTextTcpOutputWriter writer = new GraphitePlainTextTcpOutputWriter(); + Map config = new HashMap<>(); + config.put(GraphiteOutputWriterCommonSettings.SETTING_HOST, "127.0.0.1"); + config.put(GraphiteOutputWriterCommonSettings.SETTING_PORT, "" + tcpLineServer.getPort()); + config.put(GraphiteOutputWriterCommonSettings.SETTING_NAME_PREFIX, "bar."); + config.put(GraphiteOutputWriterCommonSettings.SETTING_FILTER_NON_FLOAT, "true"); + writer.postConstruct(config); + writer.setClock(clock); + + writer.writeQueryResult("metric", "type", 1); + writer.writeQueryResult("metric", "type", null); + writer.writeQueryResult("metric.2", "type", "non string"); + writer.writeQueryResult("metric.2", "type", "2"); + writer.writeQueryResult("metric.3", "type", ""); + writer.writeQueryResult("metric.3", "type", true); + writer.postCollect(); + assertEventuallyReceived(tcpLineServer, + containsInAnyOrder("bar.metric 1 33", "bar.metric.2 2 33", "bar.metric.3 1 33")); + tcpLineServer.disconnectAllClients(); } private void waitForErrorToBeDetectedByGraphiteWriter(GraphitePlainTextTcpOutputWriter writer) { for (int i = 0; i < 10; i++) { try { - writer.writeQueryResult("foo", null, 1); + writer.writeQueryResult("foo", null, 4711 + i); writer.postCollect(); Thread.sleep(20); } catch (Exception e) { @@ -76,16 +105,18 @@ private void waitForErrorToBeDetectedByGraphiteWriter(GraphitePlainTextTcpOutput fail("No error ocurred after closing server!"); } + private int counter; + private void writeTestMetric(GraphitePlainTextTcpOutputWriter writer) { try { - writer.writeQueryResult("foo", null, 1); + writer.writeQueryResult("foo", null, ++counter); writer.postCollect(); } catch (Exception e) { e.printStackTrace(); } } - public void assertEventuallyReceived(TcpLineServer server, Matcher> matcher) + public void assertEventuallyReceived(TcpLineServer server, Matcher> matcher) throws Exception { for (int i = 0; i < 100; i++) { if (matcher.matches(server.getReceivedLines())) { diff --git a/src/test/java/org/jmxtrans/agent/GraphiteUdpOutputWriterTest.java b/src/test/java/org/jmxtrans/agent/GraphiteUdpOutputWriterTest.java index 40ade38d..2d71d351 100644 --- a/src/test/java/org/jmxtrans/agent/GraphiteUdpOutputWriterTest.java +++ b/src/test/java/org/jmxtrans/agent/GraphiteUdpOutputWriterTest.java @@ -23,8 +23,9 @@ */ package org.jmxtrans.agent; -import static org.hamcrest.Matchers.*; -import static org.junit.Assert.*; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; import java.io.IOException; import java.net.InetAddress; @@ -37,6 +38,7 @@ import java.util.Map; import org.hamcrest.Matcher; +import org.jmxtrans.agent.graphite.GraphiteOutputWriterCommonSettings; import org.jmxtrans.agent.testutils.FixedTimeClock; import org.jmxtrans.agent.util.time.Clock; import org.junit.After; @@ -57,7 +59,7 @@ public class GraphiteUdpOutputWriterTest { @Rule public UdpServer udpServer = new UdpServer(); - private Clock clock = new FixedTimeClock(33000); + private final Clock clock = new FixedTimeClock(33000); private GraphiteUdpOutputWriter writer; @@ -83,12 +85,41 @@ public void manyQueryResults() throws Exception { containsInAnyOrder("foo.metric 1 33\n", "foo.metric.2 2 33\n", "foo.metric.3 3 33\n")); } + @Test + public void handlesBoolean() throws Exception { + writer.writeQueryResult("metric", "type", true); + assertEventuallyReceived(udpServer, contains("foo.metric 1 33\n")); + } + + @Test + public void handlesFalseBoolean() throws Exception { + writer.writeQueryResult("metric", "type", false); + assertEventuallyReceived(udpServer, contains("foo.metric 0 33\n")); + } + @Test public void oneInvocationResult() throws Exception { writer.writeInvocationResult("invoke", 123); assertEventuallyReceived(udpServer, contains("foo.invoke 123 33\n")); } + @Test + public void filterNonNumericValues() throws Exception { + Map testSettings = testSettings(); + testSettings.put(GraphiteOutputWriterCommonSettings.SETTING_FILTER_NON_FLOAT, "true"); + writer.postConstruct(testSettings); + writer.setClock(clock); + + writer.writeQueryResult("metric", "type", 1); + writer.writeQueryResult("metric", "type", null); + writer.writeQueryResult("metric.2", "type", "non string"); + writer.writeQueryResult("metric.2", "type", "2"); + writer.writeQueryResult("metric.3", "type", true); + writer.writeQueryResult("metric.3", "type", ""); + assertEventuallyReceived(udpServer, + containsInAnyOrder("foo.metric 1 33\n", "foo.metric.2 2 33\n", "foo.metric.3 1 33\n")); + } + @After public void destroyWriter() { writer.preDestroy(); @@ -119,7 +150,7 @@ public void assertEventuallyReceived(UdpServer server, Matcher receivedMessages = new ArrayList<>(); + private final List receivedMessages = new ArrayList<>(); private DatagramChannel channel; public void openChannel() throws Exception {