From fa407fd12bbb4640db6f234723b2c7dd425f5d3e Mon Sep 17 00:00:00 2001 From: Magnus Gustafsson Date: Tue, 19 Sep 2017 09:11:49 +0200 Subject: [PATCH 1/3] Use LinkedBlockingQueue instead of ArrayBlockingQueue to increase throughput --- .../java/net/spy/memcached/transcoders/TranscodeService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/net/spy/memcached/transcoders/TranscodeService.java b/src/main/java/net/spy/memcached/transcoders/TranscodeService.java index aa9b6ed52..5aeab3509 100644 --- a/src/main/java/net/spy/memcached/transcoders/TranscodeService.java +++ b/src/main/java/net/spy/memcached/transcoders/TranscodeService.java @@ -22,7 +22,7 @@ package net.spy.memcached.transcoders; -import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -45,7 +45,7 @@ public class TranscodeService extends SpyObject { public TranscodeService(boolean daemon) { pool = new ThreadPoolExecutor(1, 10, 60L, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue(100), new BasicThreadFactory( + new LinkedBlockingQueue<>(100), new BasicThreadFactory( "transcoder", daemon), new ThreadPoolExecutor.DiscardPolicy()); } From d109e1de9b6ffd08af1e25a689ec613a66fb538f Mon Sep 17 00:00:00 2001 From: Magnus Gustafsson Date: Tue, 19 Sep 2017 09:16:13 +0200 Subject: [PATCH 2/3] Added the correct generic type --- .../java/net/spy/memcached/transcoders/TranscodeService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/net/spy/memcached/transcoders/TranscodeService.java b/src/main/java/net/spy/memcached/transcoders/TranscodeService.java index 5aeab3509..2b83a816f 100644 --- a/src/main/java/net/spy/memcached/transcoders/TranscodeService.java +++ b/src/main/java/net/spy/memcached/transcoders/TranscodeService.java @@ -45,7 +45,7 @@ public class TranscodeService extends SpyObject { public TranscodeService(boolean daemon) { pool = new ThreadPoolExecutor(1, 10, 60L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(100), new BasicThreadFactory( + new LinkedBlockingQueue(100), new BasicThreadFactory( "transcoder", daemon), new ThreadPoolExecutor.DiscardPolicy()); } From a70f4dcfd3776acd993fd8f79234cd9a794f1056 Mon Sep 17 00:00:00 2001 From: Magnus Gustafsson Date: Tue, 19 Sep 2017 10:40:58 +0200 Subject: [PATCH 3/3] Introduced the current nodes SocketAddress to the MetricCollector --- .../spy/memcached/MemcachedConnection.java | 63 ++++++++++--------- .../metrics/AbstractMetricCollector.java | 10 +-- .../metrics/DefaultMetricCollector.java | 21 ++++--- .../memcached/metrics/MetricCollector.java | 37 +++++++---- .../metrics/NoopMetricCollector.java | 22 ++++--- .../metrics/DummyMetricCollector.java | 25 ++++---- 6 files changed, 100 insertions(+), 78 deletions(-) diff --git a/src/main/java/net/spy/memcached/MemcachedConnection.java b/src/main/java/net/spy/memcached/MemcachedConnection.java index 47f432fb1..44da91fce 100644 --- a/src/main/java/net/spy/memcached/MemcachedConnection.java +++ b/src/main/java/net/spy/memcached/MemcachedConnection.java @@ -300,7 +300,7 @@ public MemcachedConnection(final int bufSize, final ConnectionFactory f, metrics = f.getMetricCollector(); metricType = f.enableMetrics(); - registerMetrics(); + registerMetrics(connections); setName("Memcached IO over " + this); setDaemon(f.isDaemon()); @@ -313,22 +313,25 @@ public MemcachedConnection(final int bufSize, final ConnectionFactory f, * Note that these Metrics may or may not take effect, depending on the * {@link MetricCollector} implementation. This can be controlled from * the {@link DefaultConnectionFactory}. - */ - protected void registerMetrics() { - if (metricType.equals(MetricType.DEBUG) - || metricType.equals(MetricType.PERFORMANCE)) { - metrics.addHistogram(OVERALL_AVG_BYTES_READ_METRIC); - metrics.addHistogram(OVERALL_AVG_BYTES_WRITE_METRIC); - metrics.addHistogram(OVERALL_AVG_TIME_ON_WIRE_METRIC); - metrics.addMeter(OVERALL_RESPONSE_METRIC); - metrics.addMeter(OVERALL_REQUEST_METRIC); - - if (metricType.equals(MetricType.DEBUG)) { - metrics.addCounter(RECON_QUEUE_METRIC); - metrics.addCounter(SHUTD_QUEUE_METRIC); - metrics.addMeter(OVERALL_RESPONSE_RETRY_METRIC); - metrics.addMeter(OVERALL_RESPONSE_SUCC_METRIC); - metrics.addMeter(OVERALL_RESPONSE_FAIL_METRIC); + * @param connections + */ + protected void registerMetrics(List connections) { + for (MemcachedNode node : connections) { + if (metricType.equals(MetricType.DEBUG) + || metricType.equals(MetricType.PERFORMANCE)) { + metrics.addHistogram(node.getSocketAddress(), OVERALL_AVG_BYTES_READ_METRIC); + metrics.addHistogram(node.getSocketAddress(), OVERALL_AVG_BYTES_WRITE_METRIC); + metrics.addHistogram(node.getSocketAddress(), OVERALL_AVG_TIME_ON_WIRE_METRIC); + metrics.addMeter(node.getSocketAddress(), OVERALL_RESPONSE_METRIC); + metrics.addMeter(node.getSocketAddress(), OVERALL_REQUEST_METRIC); + + if (metricType.equals(MetricType.DEBUG)) { + metrics.addCounter(node.getSocketAddress(), RECON_QUEUE_METRIC); + metrics.addCounter(node.getSocketAddress(), SHUTD_QUEUE_METRIC); + metrics.addMeter(node.getSocketAddress(), OVERALL_RESPONSE_RETRY_METRIC); + metrics.addMeter(node.getSocketAddress(), OVERALL_RESPONSE_SUCC_METRIC); + metrics.addMeter(node.getSocketAddress(), OVERALL_RESPONSE_FAIL_METRIC); + } } } } @@ -529,7 +532,7 @@ private void handleShutdownQueue() throws IOException { for (MemcachedNode qa : nodesToShutdown) { if (!addedQueue.contains(qa)) { nodesToShutdown.remove(qa); - metrics.decrementCounter(SHUTD_QUEUE_METRIC); + metrics.decrementCounter(qa.getSocketAddress(), SHUTD_QUEUE_METRIC); Collection notCompletedOperations = qa.destroyInputQueue(); if (qa.getChannel() != null) { qa.getChannel().close(); @@ -810,7 +813,7 @@ private void handleWrites(final MemcachedNode node) throws IOException { boolean canWriteMore = node.getBytesRemainingToWrite() > 0; while (canWriteMore) { int wrote = node.writeSome(); - metrics.updateHistogram(OVERALL_AVG_BYTES_WRITE_METRIC, wrote); + metrics.updateHistogram(node.getSocketAddress(), OVERALL_AVG_BYTES_WRITE_METRIC, wrote); node.fillWriteBuffer(shouldOptimize); canWriteMore = wrote > 0 && node.getBytesRemainingToWrite() > 0; } @@ -832,7 +835,7 @@ private void handleReads(final MemcachedNode node) throws IOException { ByteBuffer rbuf = node.getRbuf(); final SocketChannel channel = node.getChannel(); int read = channel.read(rbuf); - metrics.updateHistogram(OVERALL_AVG_BYTES_READ_METRIC, read); + metrics.updateHistogram(node.getSocketAddress(), OVERALL_AVG_BYTES_READ_METRIC, read); if (read < 0) { currentOp = handleReadsWhenChannelEndOfStream(currentOp, node, rbuf); } @@ -847,9 +850,9 @@ private void handleReads(final MemcachedNode node) throws IOException { long timeOnWire = System.nanoTime() - currentOp.getWriteCompleteTimestamp(); - metrics.updateHistogram(OVERALL_AVG_TIME_ON_WIRE_METRIC, + metrics.updateHistogram(node.getSocketAddress(), OVERALL_AVG_TIME_ON_WIRE_METRIC, (int)(timeOnWire / 1000)); - metrics.markMeter(OVERALL_RESPONSE_METRIC); + metrics.markMeter(node.getSocketAddress(), OVERALL_RESPONSE_METRIC); synchronized(currentOp) { readBufferAndLogMetrics(currentOp, rbuf, node); } @@ -881,9 +884,9 @@ private void readBufferAndLogMetrics(final Operation currentOp, + op; if (op.hasErrored()) { - metrics.markMeter(OVERALL_RESPONSE_FAIL_METRIC); + metrics.markMeter(node.getSocketAddress(), OVERALL_RESPONSE_FAIL_METRIC); } else { - metrics.markMeter(OVERALL_RESPONSE_SUCC_METRIC); + metrics.markMeter(node.getSocketAddress(), OVERALL_RESPONSE_SUCC_METRIC); } } else if (currentOp.getState() == OperationState.RETRY) { handleRetryInformation(currentOp.getErrorMsg()); @@ -896,7 +899,7 @@ private void readBufferAndLogMetrics(final Operation currentOp, + op; retryOperation(currentOp); - metrics.markMeter(OVERALL_RESPONSE_RETRY_METRIC); + metrics.markMeter(node.getSocketAddress(), OVERALL_RESPONSE_RETRY_METRIC); } } @@ -998,7 +1001,7 @@ protected void queueReconnect(final MemcachedNode node) { } reconnectQueue.put(reconnectTime, node); - metrics.incrementCounter(RECON_QUEUE_METRIC); + metrics.incrementCounter(node.getSocketAddress(), RECON_QUEUE_METRIC); node.setupResend(); if (failureMode == FailureMode.Redistribute) { @@ -1114,7 +1117,7 @@ private void attemptReconnects() { while(i.hasNext()) { final MemcachedNode node = i.next(); i.remove(); - metrics.decrementCounter(RECON_QUEUE_METRIC); + metrics.decrementCounter(node.getSocketAddress(), RECON_QUEUE_METRIC); try { if (!belongsToCluster(node)) { @@ -1258,7 +1261,7 @@ public void insertOperation(final MemcachedNode node, final Operation o) { o.initialize(); node.insertOp(o); addedQueue.offer(node); - metrics.markMeter(OVERALL_REQUEST_METRIC); + metrics.markMeter(node.getSocketAddress(), OVERALL_REQUEST_METRIC); Selector s = selector.wakeup(); assert s == selector : "Wakeup returned the wrong selector."; @@ -1280,7 +1283,7 @@ protected void addOperation(final MemcachedNode node, final Operation o) { o.initialize(); node.addOp(o); addedQueue.offer(node); - metrics.markMeter(OVERALL_REQUEST_METRIC); + metrics.markMeter(node.getSocketAddress(), OVERALL_REQUEST_METRIC); Selector s = selector.wakeup(); assert s == selector : "Wakeup returned the wrong selector."; @@ -1325,7 +1328,7 @@ public CountDownLatch broadcastOperation(final BroadcastOpFactory of, node.addOp(op); op.setHandlingNode(node); addedQueue.offer(node); - metrics.markMeter(OVERALL_REQUEST_METRIC); + metrics.markMeter(node.getSocketAddress(), OVERALL_REQUEST_METRIC); } Selector s = selector.wakeup(); diff --git a/src/main/java/net/spy/memcached/metrics/AbstractMetricCollector.java b/src/main/java/net/spy/memcached/metrics/AbstractMetricCollector.java index 5c669d56b..278a3bb69 100644 --- a/src/main/java/net/spy/memcached/metrics/AbstractMetricCollector.java +++ b/src/main/java/net/spy/memcached/metrics/AbstractMetricCollector.java @@ -22,19 +22,21 @@ package net.spy.memcached.metrics; +import java.net.SocketAddress; + /** * This abstract class implements methods needed by all {@link MetricCollector}s. */ public abstract class AbstractMetricCollector implements MetricCollector { @Override - public void decrementCounter(String name) { - decrementCounter(name, 1); + public void decrementCounter(SocketAddress node, String name) { + decrementCounter(node, name, 1); } @Override - public void incrementCounter(String name) { - incrementCounter(name, 1); + public void incrementCounter(SocketAddress node, String name) { + incrementCounter(node, name, 1); } } diff --git a/src/main/java/net/spy/memcached/metrics/DefaultMetricCollector.java b/src/main/java/net/spy/memcached/metrics/DefaultMetricCollector.java index ed0b219c0..f4874b1ad 100644 --- a/src/main/java/net/spy/memcached/metrics/DefaultMetricCollector.java +++ b/src/main/java/net/spy/memcached/metrics/DefaultMetricCollector.java @@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory; import java.io.File; +import java.net.SocketAddress; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -134,14 +135,14 @@ private void initReporter() { } @Override - public void addCounter(String name) { + public void addCounter(SocketAddress node, String name) { if (!counters.containsKey(name)) { counters.put(name, registry.counter(name)); } } @Override - public void removeCounter(String name) { + public void removeCounter(SocketAddress node, String name) { if (!counters.containsKey(name)) { registry.remove(name); counters.remove(name); @@ -149,56 +150,56 @@ public void removeCounter(String name) { } @Override - public void incrementCounter(String name, int amount) { + public void incrementCounter(SocketAddress node, String name, int amount) { if (counters.containsKey(name)) { counters.get(name).inc(amount); } } @Override - public void decrementCounter(String name, int amount) { + public void decrementCounter(SocketAddress node, String name, int amount) { if (counters.containsKey(name)) { counters.get(name).dec(amount); } } @Override - public void addMeter(String name) { + public void addMeter(SocketAddress node, String name) { if (!meters.containsKey(name)) { meters.put(name, registry.meter(name)); } } @Override - public void removeMeter(String name) { + public void removeMeter(SocketAddress node, String name) { if (meters.containsKey(name)) { meters.remove(name); } } @Override - public void markMeter(String name) { + public void markMeter(SocketAddress node, String name) { if (meters.containsKey(name)) { meters.get(name).mark(); } } @Override - public void addHistogram(String name) { + public void addHistogram(SocketAddress node, String name) { if (!histograms.containsKey(name)) { histograms.put(name, registry.histogram(name)); } } @Override - public void removeHistogram(String name) { + public void removeHistogram(SocketAddress node, String name) { if (histograms.containsKey(name)) { histograms.remove(name); } } @Override - public void updateHistogram(String name, int amount) { + public void updateHistogram(SocketAddress node, String name, int amount) { if (histograms.containsKey(name)) { histograms.get(name).update(amount); } diff --git a/src/main/java/net/spy/memcached/metrics/MetricCollector.java b/src/main/java/net/spy/memcached/metrics/MetricCollector.java index 65c95062f..044f17b5a 100644 --- a/src/main/java/net/spy/memcached/metrics/MetricCollector.java +++ b/src/main/java/net/spy/memcached/metrics/MetricCollector.java @@ -22,6 +22,8 @@ package net.spy.memcached.metrics; +import java.net.SocketAddress; + /** * Defines a common API for all {@link MetricCollector}s. * @@ -42,88 +44,99 @@ public interface MetricCollector { /** * Add a Counter to the collector. * + * @param node the memcached node * @param name the name of the counter. */ - void addCounter(String name); + void addCounter(SocketAddress node, String name); /** * Remove a Counter from the collector. * + * @param node the memcached node * @param name the name of the counter. */ - void removeCounter(String name); + void removeCounter(SocketAddress node, String name); /** * Increment a Counter by one. * + * @param node the memcached node * @param name the name of the counter. */ - void incrementCounter(String name); + void incrementCounter(SocketAddress node, String name); /** * Increment a Counter by the given amount. * + * @param node the memcached node * @param name the name of the counter. * @param amount the amount to increase. */ - void incrementCounter(String name, int amount); + void incrementCounter(SocketAddress node, String name, int amount); /** * Decrement a Counter by one. * + * @param node the memcached node * @param name the name of the counter. */ - void decrementCounter(String name); + void decrementCounter(SocketAddress node, String name); /** * Decrement a Counter by the given amount. * + * @param node the memcached node * @param name the name of the counter. * @param amount the amount to decrease. */ - void decrementCounter(String name, int amount); + void decrementCounter(SocketAddress node, String name, int amount); /** * Add a Meter to the Collector. * + * @param node the memcached node * @param name the name of the counter. */ - void addMeter(String name); + void addMeter(SocketAddress node, String name); /** * Remove a Meter from the Collector. * * @param name the name of the counter. */ - void removeMeter(String name); + void removeMeter(SocketAddress node, String name); /** * Mark a checkpoint in the Meter. * + * @param node the memcached node * @param name the name of the counter. */ - void markMeter(String name); + void markMeter(SocketAddress node, String name); /** * Add a Histogram to the Collector. * + * @param node the memcached node * @param name the name of the counter. */ - void addHistogram(String name); + void addHistogram(SocketAddress node, String name); /** * Remove a Histogram from the Collector. * + * @param node the memcached node * @param name the name of the counter. */ - void removeHistogram(String name); + void removeHistogram(SocketAddress node, String name); /** * Update the Histogram with the given amount. * + * @param node the memcached node * @param name the name of the counter. * @param amount the amount to update. */ - void updateHistogram(String name, int amount); + void updateHistogram(SocketAddress node, String name, int amount); } diff --git a/src/main/java/net/spy/memcached/metrics/NoopMetricCollector.java b/src/main/java/net/spy/memcached/metrics/NoopMetricCollector.java index aabe82478..fc8e87d58 100644 --- a/src/main/java/net/spy/memcached/metrics/NoopMetricCollector.java +++ b/src/main/java/net/spy/memcached/metrics/NoopMetricCollector.java @@ -22,6 +22,8 @@ package net.spy.memcached.metrics; +import java.net.SocketAddress; + /** * A {@link MetricCollector} that does nothing. * @@ -31,52 +33,52 @@ public final class NoopMetricCollector extends AbstractMetricCollector { @Override - public void addCounter(String name) { + public void addCounter(SocketAddress node, String name) { return; } @Override - public void removeCounter(String name) { + public void removeCounter(SocketAddress node,String name) { return; } @Override - public void incrementCounter(String name, int amount) { + public void incrementCounter(SocketAddress node,String name, int amount) { return; } @Override - public void decrementCounter(String name, int amount) { + public void decrementCounter(SocketAddress node,String name, int amount) { return; } @Override - public void addMeter(String name) { + public void addMeter(SocketAddress node,String name) { return; } @Override - public void removeMeter(String name) { + public void removeMeter(SocketAddress node,String name) { return; } @Override - public void markMeter(String name) { + public void markMeter(SocketAddress node,String name) { return; } @Override - public void addHistogram(String name) { + public void addHistogram(SocketAddress node,String name) { return; } @Override - public void removeHistogram(String name) { + public void removeHistogram(SocketAddress node,String name) { return; } @Override - public void updateHistogram(String name, int amount) { + public void updateHistogram(SocketAddress node,String name, int amount) { return; } diff --git a/src/test/java/net/spy/memcached/metrics/DummyMetricCollector.java b/src/test/java/net/spy/memcached/metrics/DummyMetricCollector.java index fcc36e8f4..185536629 100644 --- a/src/test/java/net/spy/memcached/metrics/DummyMetricCollector.java +++ b/src/test/java/net/spy/memcached/metrics/DummyMetricCollector.java @@ -23,6 +23,7 @@ package net.spy.memcached.metrics; +import java.net.SocketAddress; import java.util.HashMap; /** @@ -37,62 +38,62 @@ public DummyMetricCollector() { } @Override - public void addCounter(String name) { + public void addCounter(SocketAddress node, String name) { metrics.put(name, 0); } @Override - public void removeCounter(String name) { + public void removeCounter(SocketAddress node,String name) { metrics.remove(name); } @Override - public void incrementCounter(String name) { + public void incrementCounter(SocketAddress node,String name) { metrics.put(name, metrics.get(name) + 1); } @Override - public void incrementCounter(String name, int amount) { + public void incrementCounter(SocketAddress node,String name, int amount) { metrics.put(name, metrics.get(name) + amount); } @Override - public void decrementCounter(String name) { + public void decrementCounter(SocketAddress node,String name) { metrics.put(name, metrics.get(name) - 1); } @Override - public void decrementCounter(String name, int amount) { + public void decrementCounter(SocketAddress node,String name, int amount) { metrics.put(name, metrics.get(name) - amount); } @Override - public void addMeter(String name) { + public void addMeter(SocketAddress node,String name) { metrics.put(name, 0); } @Override - public void removeMeter(String name) { + public void removeMeter(SocketAddress node,String name) { metrics.remove(name); } @Override - public void markMeter(String name) { + public void markMeter(SocketAddress node,String name) { metrics.put(name, metrics.get(name) + 1); } @Override - public void addHistogram(String name) { + public void addHistogram(SocketAddress node,String name) { metrics.put(name, 0); } @Override - public void removeHistogram(String name) { + public void removeHistogram(SocketAddress node,String name) { metrics.remove(name); } @Override - public void updateHistogram(String name, int amount) { + public void updateHistogram(SocketAddress node,String name, int amount) { metrics.put(name, metrics.get(name) + amount); }