Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dont discard metrics on exception while exporting #115

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 8 additions & 23 deletions src/main/java/org/jmxtrans/embedded/EmbeddedJmxTrans.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
package org.jmxtrans.embedded;

import org.jmxtrans.embedded.output.OutputWriter;
import org.jmxtrans.embedded.output.OutputWriterSet;
import org.jmxtrans.embedded.util.concurrent.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -34,7 +35,6 @@
import javax.management.MBeanServer;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -119,9 +119,7 @@ private void execute(String invokerName) {
}

try {
for (OutputWriter outputWriter : outputWriters) {
outputWriter.stop();
}
outputWriters.stopAll();
} catch (Exception e) {
logger.warn("Failure while stopping outputWriters", e);
}
Expand Down Expand Up @@ -179,9 +177,9 @@ public void onStop() {
private final List<Query> queries = new ArrayList<Query>();

/**
* Use to {@linkplain Set} to deduplicate during configuration merger
* Use of {@linkplain Set} to deduplicate during configuration merger
*/
private Set<OutputWriter> outputWriters = new HashSet<OutputWriter>();
private final OutputWriterSet outputWriters = new OutputWriterSet();

private int numQueryThreads = 1;

Expand All @@ -208,9 +206,7 @@ public synchronized void start() throws Exception {
for (Query query : queries) {
query.start();
}
for (OutputWriter outputWriter : outputWriters) {
outputWriter.start();
}
outputWriters.startAll();

collectScheduledExecutor = Executors.newScheduledThreadPool(getNumQueryThreads(), new NamedThreadFactory("jmxtrans-collect-", true));
exportScheduledExecutor = Executors.newScheduledThreadPool(getNumExportThreads(), new NamedThreadFactory("jmxtrans-export-", true));
Expand Down Expand Up @@ -248,26 +244,15 @@ public synchronized void stop() throws Exception {
return;
}
try {
collectScheduledExecutor.shutdown();
try {
collectScheduledExecutor.awaitTermination(getQueryIntervalInSeconds(), TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.warn("Ignore InterruptedException stopping", e);
}
exportScheduledExecutor.shutdown();
try {
exportScheduledExecutor.awaitTermination(getExportIntervalInSeconds(), TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.warn("Ignore InterruptedException stopping", e);
}
collectScheduledExecutor.shutdownNow();
exportScheduledExecutor.shutdownNow();
} catch (RuntimeException e) {
logger.warn("Failure while shutting down ExecutorServices", e);
}
shutdownHook.onStop();
running = false;
}


/**
* Exposed for manual / JMX invocation
*/
Expand Down Expand Up @@ -347,7 +332,7 @@ public void setNumExportThreads(int numExportThreads) {
}

@Nonnull
public Set<OutputWriter> getOutputWriters() {
public OutputWriterSet getOutputWriters() {
return outputWriters;
}

Expand Down
80 changes: 22 additions & 58 deletions src/main/java/org/jmxtrans/embedded/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
package org.jmxtrans.embedded;

import org.jmxtrans.embedded.output.OutputWriter;
import org.jmxtrans.embedded.output.OutputWriterSet;
import org.jmxtrans.embedded.util.Preconditions;
import org.jmxtrans.embedded.util.concurrent.DiscardingBlockingQueue;
import org.jmxtrans.embedded.util.jmx.JmxUtils2;
Expand All @@ -34,7 +35,10 @@
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.management.*;
import javax.management.Attribute;
import javax.management.AttributeList;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -75,7 +79,7 @@ public class Query implements QueryMBean {
* JMX attributes to collect. As an array for {@link javax.management.MBeanServer#getAttributes(javax.management.ObjectName, String[])}
*/
@Nonnull
private Map<String, QueryAttribute> attributesByName = new HashMap<String, QueryAttribute>();
private final Map<String, QueryAttribute> attributesByName = new HashMap<String, QueryAttribute>();
/**
* Copy of {@link #attributesByName}'s {@link java.util.Map#entrySet()} for performance optimization
*/
Expand All @@ -89,7 +93,7 @@ public class Query implements QueryMBean {
* @see #getEffectiveOutputWriters()
*/
@Nonnull
private List<OutputWriter> outputWriters = new ArrayList<OutputWriter>();
private final OutputWriterSet outputWriters = new OutputWriterSet();

/**
* Store the metrics collected on this {@linkplain Query} (see {@link #collectMetrics()})
Expand All @@ -98,6 +102,8 @@ public class Query implements QueryMBean {
@Nonnull
private BlockingQueue<QueryResult> queryResults = new DiscardingBlockingQueue<QueryResult>(200);

private final QueryResultsExporter queryResultsExporter = new QueryResultsExporter(this);

@Nonnull
private final AtomicInteger collectedMetricsCount = new AtomicInteger();

Expand All @@ -107,15 +113,6 @@ public class Query implements QueryMBean {
@Nonnull
private final AtomicInteger collectionCount = new AtomicInteger();

@Nonnull
private final AtomicInteger exportedMetricsCount = new AtomicInteger();

@Nonnull
private final AtomicLong exportDurationInNanos = new AtomicLong();

@Nonnull
private final AtomicInteger exportCount = new AtomicInteger();

/**
* {@link ObjectName} of this {@link QueryMBean}
*/
Expand Down Expand Up @@ -160,6 +157,9 @@ public void collectMetrics() {
logger.trace("Query {} returned {}", objectName, matchingObjectNames);

for (ObjectName matchingObjectName : matchingObjectNames) {
if (Thread.interrupted()) {
throw new RuntimeException(new InterruptedException());
}
long epochInMillis = System.currentTimeMillis();
try {
AttributeList jmxAttributes = embeddedJmxTrans.getMbeanServer().getAttributes(matchingObjectName, this.attributeNames);
Expand All @@ -181,50 +181,24 @@ public void collectMetrics() {

/**
* Export the collected metrics to the {@linkplain OutputWriter}s associated with this {@linkplain Query}
* (see {@link #getEffectiveOutputWriters()}).
*
* Metrics are batched according to {@link EmbeddedJmxTrans#getExportBatchSize()}
*
* @return the number of exported {@linkplain QueryResult}
*/
@Override
public int exportCollectedMetrics() {
if(queryResults.isEmpty()) {
return 0;
}

int totalExportedMetricsCount = 0;
long nanosBefore = System.nanoTime();

List<OutputWriter> effectiveOutputWriters = getEffectiveOutputWriters();
int exportBatchSize = getEmbeddedJmxTrans().getExportBatchSize();
List<QueryResult> availableQueryResults = new ArrayList<QueryResult>(exportBatchSize);

int size;
while ((size = queryResults.drainTo(availableQueryResults, exportBatchSize)) > 0) {
totalExportedMetricsCount += size;
exportedMetricsCount.addAndGet(size);
for (OutputWriter outputWriter : effectiveOutputWriters) {
outputWriter.write(availableQueryResults);
}
availableQueryResults.clear();
}
exportDurationInNanos.addAndGet(System.nanoTime() - nanosBefore);
exportCount.incrementAndGet();
return totalExportedMetricsCount;
return queryResultsExporter.exportCollectedMetrics();
}


/**
* Start all the {@linkplain OutputWriter}s attached to this {@linkplain Query}
*/
@PostConstruct
public void start() throws Exception {
queryMbeanObjectName = JmxUtils2.registerObject(this, "org.jmxtrans.embedded:Type=Query,id=" + id, getEmbeddedJmxTrans().getMbeanServer());


for (OutputWriter outputWriter : outputWriters) {
outputWriter.start();
}
outputWriters.startAll();
}

/**
Expand All @@ -234,9 +208,7 @@ public void start() throws Exception {
public void stop() throws Exception {
JmxUtils2.unregisterObject(queryMbeanObjectName, embeddedJmxTrans.getMbeanServer());

for (OutputWriter outputWriter : outputWriters) {
outputWriter.stop();
}
outputWriters.stopAll();
}

@Override
Expand Down Expand Up @@ -313,21 +285,13 @@ public void setEmbeddedJmxTrans(EmbeddedJmxTrans embeddedJmxTrans) {
public List<OutputWriter> getEffectiveOutputWriters() {
// Google Guava predicates would be nicer but we don't include guava to ease embeddability
List<OutputWriter> result = new ArrayList<OutputWriter>(embeddedJmxTrans.getOutputWriters().size() + outputWriters.size());
for (OutputWriter outputWriter : embeddedJmxTrans.getOutputWriters()) {
if (outputWriter.isEnabled()) {
result.add(outputWriter);
}
}
for (OutputWriter outputWriter : outputWriters) {
if (outputWriter.isEnabled()) {
result.add(outputWriter);
}
}
result.addAll(embeddedJmxTrans.getOutputWriters().findEnabled());
result.addAll(this.getOutputWriters().findEnabled());
return result;
}

@Nonnull
public List<OutputWriter> getOutputWriters() {
public OutputWriterSet getOutputWriters() {
return outputWriters;
}

Expand Down Expand Up @@ -364,17 +328,17 @@ public int getCollectionCount() {

@Override
public int getExportedMetricsCount() {
return exportedMetricsCount.get();
return queryResultsExporter.getExportedMetricsCount();
}

@Override
public long getExportDurationInNanos() {
return exportDurationInNanos.get();
return queryResultsExporter.getExportDurationInNanos();
}

@Override
public int getExportCount() {
return exportCount.get();
return queryResultsExporter.getExportCount();
}

@Override
Expand Down
117 changes: 117 additions & 0 deletions src/main/java/org/jmxtrans/embedded/QueryResultsExporter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package org.jmxtrans.embedded;

import org.jmxtrans.embedded.output.OutputWriter;
import org.jmxtrans.embedded.output.OutputWriterSet;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;

/**
* this class holds a state in order to be able to continue on the last buffer in the event of a failure during the last export
*/
public class QueryResultsExporter {

private final Query query;

private final AtomicInteger exportedMetricsCount = new AtomicInteger();

private final AtomicLong exportDurationInNanos = new AtomicLong();

private final AtomicInteger exportCount = new AtomicInteger();

private final ReentrantLock lock = new ReentrantLock();

private volatile List<QueryResult> lastQueueCheckout;

public QueryResultsExporter(Query query) {
super();
this.query = query;
}

/**
* Export the collected metrics to the {@linkplain OutputWriter}s associated with this {@linkplain Query}
* Metrics are batched according to {@link EmbeddedJmxTrans#getExportBatchSize()}
*
* @return the number of exported {@linkplain QueryResult}
*/
public int exportCollectedMetrics() {
if (lock.isLocked()) {
// an export for this Query is already in progress
// there is no need for multiple threads to export the same Query
// this should never occurs, but just in case it does, it's better returning now, to avoid blocking the current Thread until the other Thread finish
return 0;
}
lock.lock();
try {
long nanosBefore = System.nanoTime();
int successfullyExportedMetricsCount = doExportCollectedMetrics();
exportDurationInNanos.addAndGet(System.nanoTime() - nanosBefore);
exportCount.incrementAndGet();
return successfullyExportedMetricsCount;
} finally {
lock.unlock();
}
}

private int doExportCollectedMetrics() {
int successfullyExportedMetricsCount = 0;

final int exportBatchSize = query.getEmbeddedJmxTrans().getExportBatchSize();

List<QueryResult> queueCheckout = lastQueueCheckout;
successfullyExportedMetricsCount += recoverLastExportIfNeeded(queueCheckout);

// instantiate a new List in order to make sure the current Thread owns it, and in order to avoid making it a long-live Object
queueCheckout = new ArrayList<QueryResult>(exportBatchSize);
lastQueueCheckout = queueCheckout;

final BlockingQueue<QueryResult> localQueryResults = query.getResults();

int size;
while ((size = localQueryResults.drainTo(queueCheckout, exportBatchSize)) > 0) {
if (Thread.interrupted()) {
throw new RuntimeException(new InterruptedException());
}
OutputWriterSet.tryToWriteQueryResultsBatchToAll(queueCheckout, query.getEmbeddedJmxTrans().getOutputWriters(), query.getOutputWriters());
successfullyExportedMetricsCount += size;
exportedMetricsCount.addAndGet(size);
queueCheckout.clear();
}

lastQueueCheckout = null;

return successfullyExportedMetricsCount;
}

protected int recoverLastExportIfNeeded(List<QueryResult> queueCheckout) {
// if the last invocation failed for any reason, then we should retry the latest export attempt
// this ensure we do not lose the content of the buffer that was already drain out of the queryResults Queue
// in the worse case, we may send again the metrics to OutputWriter, we assume it's ok
if (queueCheckout != null) {
final int size = queueCheckout.size();
if (size > 0) {
OutputWriterSet.tryToWriteQueryResultsBatchToAll(queueCheckout, query.getEmbeddedJmxTrans().getOutputWriters(), query.getOutputWriters());
exportedMetricsCount.addAndGet(size);
return size;
}
}
return 0;
}

public int getExportedMetricsCount() {
return exportedMetricsCount.get();
}

public long getExportDurationInNanos() {
return exportDurationInNanos.get();
}

public int getExportCount() {
return exportCount.get();
}

}
Loading