Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support for filtering out non float values before sending #135

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,33 @@
*/
package org.jmxtrans.agent;

import static org.jmxtrans.agent.graphite.GraphiteOutputWriterCommonSettings.*;
import static org.jmxtrans.agent.util.ConfigurationUtils.*;
import static org.jmxtrans.agent.graphite.GraphiteOutputWriterCommonSettings.SETTING_HOST;
import static org.jmxtrans.agent.graphite.GraphiteOutputWriterCommonSettings.SETTING_PORT;
import static org.jmxtrans.agent.graphite.GraphiteOutputWriterCommonSettings.SETTING_PORT_DEFAULT_VALUE;
import static org.jmxtrans.agent.graphite.GraphiteOutputWriterCommonSettings.filterNonFloatValues;
import static org.jmxtrans.agent.graphite.GraphiteOutputWriterCommonSettings.getConfiguredMetricPrefixOrNull;
import static org.jmxtrans.agent.util.ConfigurationUtils.getInt;
import static org.jmxtrans.agent.util.ConfigurationUtils.getString;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.*;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import org.jmxtrans.agent.graphite.GraphiteMetricMessageBuilder;
import org.jmxtrans.agent.util.io.IoUtils;
import org.jmxtrans.agent.util.net.HostAndPort;
import org.jmxtrans.agent.util.time.Clock;
import org.jmxtrans.agent.util.time.SystemCurrentTimeMillisClock;

/**
* @author <a href="mailto:[email protected]">Cyrille Le Clerc</a>
Expand All @@ -56,6 +65,8 @@ public class GraphitePlainTextTcpOutputWriter extends AbstractOutputWriter imple
private Writer writer;
private int socketConnectTimeoutInMillis = SETTING_SOCKET_CONNECT_TIMEOUT_IN_MILLIS_DEFAULT_VALUE;
private GraphiteMetricMessageBuilder messageBuilder;
private boolean filterNonFloatValues;
private Clock clock;

@Override
public void postConstruct(Map<String, String> settings) {
Expand All @@ -68,9 +79,11 @@ public void postConstruct(Map<String, String> settings) {
socketConnectTimeoutInMillis = getInt(settings,
SETTING_SOCKET_CONNECT_TIMEOUT_IN_MILLIS,
SETTING_SOCKET_CONNECT_TIMEOUT_IN_MILLIS_DEFAULT_VALUE);
clock = new SystemCurrentTimeMillisClock();

logger.log(getInfoLevel(), "GraphitePlainTextTcpOutputWriter is configured with " + graphiteServerHostAndPort + ", metricPathPrefix=" + messageBuilder.getPrefix() +
", socketConnectTimeoutInMillis=" + socketConnectTimeoutInMillis);
filterNonFloatValues = filterNonFloatValues(settings);
}

@Override
Expand All @@ -80,7 +93,13 @@ public void writeInvocationResult(@Nonnull String invocationName, @Nullable Obje

@Override
public void writeQueryResult(@Nonnull String metricName, @Nullable String type, @Nullable Object value) throws IOException {
String msg = messageBuilder.buildMessage(metricName, value, TimeUnit.SECONDS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS));
if (filterNonFloatValues && !messageBuilder.isFloat(value)) {
if (logger.isLoggable(getTraceLevel())) {
logger.log(getTraceLevel(), "Filter non float value '" + value + "'");
}
return;
}
String msg = messageBuilder.buildMessage(metricName, value, TimeUnit.SECONDS.convert(clock.getCurrentTimeMillis(), TimeUnit.MILLISECONDS));
try {
ensureGraphiteConnection();
if (logger.isLoggable(getTraceLevel())) {
Expand All @@ -94,6 +113,10 @@ public void writeQueryResult(@Nonnull String metricName, @Nullable String type,
}
}

protected void setClock(Clock clock) {
this.clock = clock;
}

private void releaseGraphiteConnection() {
IoUtils.closeQuietly(writer);
IoUtils.closeQuietly(socket);
Expand Down
14 changes: 12 additions & 2 deletions src/main/java/org/jmxtrans/agent/GraphiteUdpOutputWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
*/
package org.jmxtrans.agent;

import static org.jmxtrans.agent.graphite.GraphiteOutputWriterCommonSettings.*;
import static org.jmxtrans.agent.graphite.GraphiteOutputWriterCommonSettings.getConfiguredMetricPrefixOrNull;
import static org.jmxtrans.agent.graphite.GraphiteOutputWriterCommonSettings.getHostAndPort;

import java.io.IOException;
import java.net.DatagramPacket;
Expand All @@ -36,6 +37,7 @@
import java.util.logging.Level;

import org.jmxtrans.agent.graphite.GraphiteMetricMessageBuilder;
import org.jmxtrans.agent.graphite.GraphiteOutputWriterCommonSettings;
import org.jmxtrans.agent.util.net.HostAndPort;
import org.jmxtrans.agent.util.time.Clock;
import org.jmxtrans.agent.util.time.SystemCurrentTimeMillisClock;
Expand All @@ -52,6 +54,7 @@ public class GraphiteUdpOutputWriter extends AbstractOutputWriter {
private UdpMessageSender messageSender;
private Clock clock;
private GraphiteMetricMessageBuilder messageBuilder;
private boolean filterNonFloatValues = false;

@Override
public void postConstruct(Map<String, String> settings) {
Expand All @@ -62,15 +65,22 @@ public void postConstruct(Map<String, String> settings) {
clock = new SystemCurrentTimeMillisClock();
logger.log(getInfoLevel(), "GraphiteUdpOutputWriter is configured with " + graphiteServerHostAndPort
+ ", metricPathPrefix=" + messageBuilder.getPrefix());
filterNonFloatValues = GraphiteOutputWriterCommonSettings.filterNonFloatValues(settings);
}

@Override
public void writeInvocationResult(String invocationName, Object value) throws IOException {
writeQueryResult(invocationName, null, value);
}

@Override
@Override
public void writeQueryResult(String metricName, String metricType, Object value) throws IOException {
if (filterNonFloatValues && !messageBuilder.isFloat(value)) {
if (logger.isLoggable(getTraceLevel())) {
logger.log(getTraceLevel(), "Filter non float value '" + value + "'");
}
return;
}
String msg = messageBuilder.buildMessage(metricName, value, TimeUnit.SECONDS.convert(clock.getCurrentTimeMillis(), TimeUnit.MILLISECONDS));
logMessageIfTraceLoggable(msg);
tryWriteMsg(msg + "\n");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,17 @@ public GraphiteMetricMessageBuilder(@Nullable String configuredMetricPathPrefix)
* @return The metric string without trailing newline
*/
public String buildMessage(String metricName, Object value, long timestamp) {
if (value instanceof Boolean) {
return metricPathPrefix + metricName + " " + ((Boolean)value ? 1 : 0) + " " + timestamp;
}
return metricPathPrefix + metricName + " " + value + " " + timestamp;
return metricPathPrefix + metricName + " " + convertToString(value) + " " + timestamp;
}

/**
private String convertToString(Object value) {
if (value instanceof Boolean) {
return (Boolean)value ? "1" : "0";
}
return String.valueOf(value);
}

/**
* {@link java.net.InetAddress#getLocalHost()} may not be known at JVM startup when the process is launched as a Linux service.
*/
private static String buildMetricPathPrefix(String configuredMetricPathPrefix) {
Expand All @@ -75,5 +79,15 @@ private static String buildMetricPathPrefix(String configuredMetricPathPrefix) {
public String getPrefix() {
return metricPathPrefix;
}


/**
* Checks if the given value can be sent as a float value to graphite.
* Note that Booleans are converted to 1/0 and will pass this check
*
* @param value value to check
* @return true if the string representation of value is parseable as a float
*/
public boolean isFloat(Object value) {
return convertToString(value).matches("[-+]?[0-9]*\\.?[0-9]+");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
*/
package org.jmxtrans.agent.graphite;

import static org.jmxtrans.agent.util.ConfigurationUtils.*;
import static org.jmxtrans.agent.util.ConfigurationUtils.getBoolean;
import static org.jmxtrans.agent.util.ConfigurationUtils.getInt;
import static org.jmxtrans.agent.util.ConfigurationUtils.getString;

import java.util.Map;

Expand All @@ -38,6 +40,7 @@ public class GraphiteOutputWriterCommonSettings {
public static final String SETTING_PORT = "port";
public static final int SETTING_PORT_DEFAULT_VALUE = 2003;
public static final String SETTING_NAME_PREFIX = "namePrefix";
public static final String SETTING_FILTER_NON_FLOAT = "filterNonFloatValues";

private GraphiteOutputWriterCommonSettings(){}

Expand All @@ -49,4 +52,8 @@ public static HostAndPort getHostAndPort(Map<String, String> settings) {
public static String getConfiguredMetricPrefixOrNull(Map<String, String> settings) {
return getString(settings, SETTING_NAME_PREFIX, null);
}

public static boolean filterNonFloatValues(Map<String,String> settings) {
return getBoolean(settings, "filterNonFloatValues", false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@
*/
package org.jmxtrans.agent;

import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

import org.jmxtrans.agent.graphite.GraphiteMetricMessageBuilder;
import org.junit.Test;
Expand Down Expand Up @@ -62,4 +66,18 @@ public void falseIsConvertedToZero() {
String msg = builder.buildMessage("bar", false, 11);
assertThat(msg, equalTo("foo.bar 0 11"));
}

@Test
public void checksFloatValues() throws Exception {
GraphiteMetricMessageBuilder builder = new GraphiteMetricMessageBuilder("foo.");
assertTrue(builder.isFloat("1.23"));
assertTrue(builder.isFloat("1"));
assertTrue(builder.isFloat("-1.23"));
assertTrue(builder.isFloat("0"));
assertTrue(builder.isFloat(false));
assertTrue(builder.isFloat(true));
assertFalse(builder.isFloat(""));
assertFalse(builder.isFloat(null));
assertFalse(builder.isFloat("qwerty"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@

package org.jmxtrans.agent;

import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import org.hamcrest.Matcher;
import org.jmxtrans.agent.graphite.GraphiteOutputWriterCommonSettings;
import org.jmxtrans.agent.testutils.FixedTimeClock;
import org.jmxtrans.agent.util.time.Clock;
import org.junit.Rule;
import org.junit.Test;

Expand All @@ -44,29 +46,56 @@ public class GraphitePlainTextTcpOutputWriterTest {
@Rule
public TcpLineServer tcpLineServer = new TcpLineServer();

private final Clock clock = new FixedTimeClock(33000);

@Test
public void reconnectsAfterServerClosesConnection() throws Exception {
GraphitePlainTextTcpOutputWriter graphiteWriter = new GraphitePlainTextTcpOutputWriter();
Map<String, String> config = new HashMap<>();
config.put(GraphiteOutputWriterCommonSettings.SETTING_HOST, "127.0.0.1");
config.put(GraphiteOutputWriterCommonSettings.SETTING_PORT, "" + tcpLineServer.getPort());
config.put(GraphiteOutputWriterCommonSettings.SETTING_NAME_PREFIX, "bar.");
graphiteWriter.postConstruct(config);
graphiteWriter.setClock(clock);
// Write one metric to see it is received
writeTestMetric(graphiteWriter);
assertEventuallyReceived(tcpLineServer, hasSize(1));
assertEventuallyReceived(tcpLineServer, containsInAnyOrder("bar.foo 1 33"));
// Disconnect the Graphite writer
tcpLineServer.disconnectAllClients();
waitForErrorToBeDetectedByGraphiteWriter(graphiteWriter);
writeTestMetric(graphiteWriter);
// Write one metric and verify that it is received
writeTestMetric(graphiteWriter);
assertEventuallyReceived(tcpLineServer, hasSize(greaterThan(1)));
writeTestMetric(graphiteWriter);
assertEventuallyReceived(tcpLineServer, containsInAnyOrder("bar.foo 1 33", "bar.foo 2 33", "bar.foo 3 33"));
}

@Test
public void filterNonNumericValues() throws Exception {
GraphitePlainTextTcpOutputWriter writer = new GraphitePlainTextTcpOutputWriter();
Map<String, String> config = new HashMap<>();
config.put(GraphiteOutputWriterCommonSettings.SETTING_HOST, "127.0.0.1");
config.put(GraphiteOutputWriterCommonSettings.SETTING_PORT, "" + tcpLineServer.getPort());
config.put(GraphiteOutputWriterCommonSettings.SETTING_NAME_PREFIX, "bar.");
config.put(GraphiteOutputWriterCommonSettings.SETTING_FILTER_NON_FLOAT, "true");
writer.postConstruct(config);
writer.setClock(clock);

writer.writeQueryResult("metric", "type", 1);
writer.writeQueryResult("metric", "type", null);
writer.writeQueryResult("metric.2", "type", "non string");
writer.writeQueryResult("metric.2", "type", "2");
writer.writeQueryResult("metric.3", "type", "");
writer.writeQueryResult("metric.3", "type", true);
writer.postCollect();
assertEventuallyReceived(tcpLineServer,
containsInAnyOrder("bar.metric 1 33", "bar.metric.2 2 33", "bar.metric.3 1 33"));
tcpLineServer.disconnectAllClients();
}

private void waitForErrorToBeDetectedByGraphiteWriter(GraphitePlainTextTcpOutputWriter writer) {
for (int i = 0; i < 10; i++) {
try {
writer.writeQueryResult("foo", null, 1);
writer.writeQueryResult("foo", null, 4711 + i);
writer.postCollect();
Thread.sleep(20);
} catch (Exception e) {
Expand All @@ -76,16 +105,18 @@ private void waitForErrorToBeDetectedByGraphiteWriter(GraphitePlainTextTcpOutput
fail("No error ocurred after closing server!");
}

private int counter;

private void writeTestMetric(GraphitePlainTextTcpOutputWriter writer) {
try {
writer.writeQueryResult("foo", null, 1);
writer.writeQueryResult("foo", null, ++counter);
writer.postCollect();
} catch (Exception e) {
e.printStackTrace();
}
}

public void assertEventuallyReceived(TcpLineServer server, Matcher<Collection<? extends Object>> matcher)
public void assertEventuallyReceived(TcpLineServer server, Matcher<Iterable<? extends String>> matcher)
throws Exception {
for (int i = 0; i < 100; i++) {
if (matcher.matches(server.getReceivedLines())) {
Expand Down
Loading