Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use LinkedBlockingQueue instead of ArrayBlockingQueue #47

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 33 additions & 30 deletions src/main/java/net/spy/memcached/MemcachedConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ public MemcachedConnection(final int bufSize, final ConnectionFactory f,
metrics = f.getMetricCollector();
metricType = f.enableMetrics();

registerMetrics();
registerMetrics(connections);

setName("Memcached IO over " + this);
setDaemon(f.isDaemon());
Expand All @@ -313,22 +313,25 @@ public MemcachedConnection(final int bufSize, final ConnectionFactory f,
* Note that these Metrics may or may not take effect, depending on the
* {@link MetricCollector} implementation. This can be controlled from
* the {@link DefaultConnectionFactory}.
*/
protected void registerMetrics() {
if (metricType.equals(MetricType.DEBUG)
|| metricType.equals(MetricType.PERFORMANCE)) {
metrics.addHistogram(OVERALL_AVG_BYTES_READ_METRIC);
metrics.addHistogram(OVERALL_AVG_BYTES_WRITE_METRIC);
metrics.addHistogram(OVERALL_AVG_TIME_ON_WIRE_METRIC);
metrics.addMeter(OVERALL_RESPONSE_METRIC);
metrics.addMeter(OVERALL_REQUEST_METRIC);

if (metricType.equals(MetricType.DEBUG)) {
metrics.addCounter(RECON_QUEUE_METRIC);
metrics.addCounter(SHUTD_QUEUE_METRIC);
metrics.addMeter(OVERALL_RESPONSE_RETRY_METRIC);
metrics.addMeter(OVERALL_RESPONSE_SUCC_METRIC);
metrics.addMeter(OVERALL_RESPONSE_FAIL_METRIC);
* @param connections
*/
protected void registerMetrics(List<MemcachedNode> connections) {
for (MemcachedNode node : connections) {
if (metricType.equals(MetricType.DEBUG)
|| metricType.equals(MetricType.PERFORMANCE)) {
metrics.addHistogram(node.getSocketAddress(), OVERALL_AVG_BYTES_READ_METRIC);
metrics.addHistogram(node.getSocketAddress(), OVERALL_AVG_BYTES_WRITE_METRIC);
metrics.addHistogram(node.getSocketAddress(), OVERALL_AVG_TIME_ON_WIRE_METRIC);
metrics.addMeter(node.getSocketAddress(), OVERALL_RESPONSE_METRIC);
metrics.addMeter(node.getSocketAddress(), OVERALL_REQUEST_METRIC);

if (metricType.equals(MetricType.DEBUG)) {
metrics.addCounter(node.getSocketAddress(), RECON_QUEUE_METRIC);
metrics.addCounter(node.getSocketAddress(), SHUTD_QUEUE_METRIC);
metrics.addMeter(node.getSocketAddress(), OVERALL_RESPONSE_RETRY_METRIC);
metrics.addMeter(node.getSocketAddress(), OVERALL_RESPONSE_SUCC_METRIC);
metrics.addMeter(node.getSocketAddress(), OVERALL_RESPONSE_FAIL_METRIC);
}
}
}
}
Expand Down Expand Up @@ -529,7 +532,7 @@ private void handleShutdownQueue() throws IOException {
for (MemcachedNode qa : nodesToShutdown) {
if (!addedQueue.contains(qa)) {
nodesToShutdown.remove(qa);
metrics.decrementCounter(SHUTD_QUEUE_METRIC);
metrics.decrementCounter(qa.getSocketAddress(), SHUTD_QUEUE_METRIC);
Collection<Operation> notCompletedOperations = qa.destroyInputQueue();
if (qa.getChannel() != null) {
qa.getChannel().close();
Expand Down Expand Up @@ -810,7 +813,7 @@ private void handleWrites(final MemcachedNode node) throws IOException {
boolean canWriteMore = node.getBytesRemainingToWrite() > 0;
while (canWriteMore) {
int wrote = node.writeSome();
metrics.updateHistogram(OVERALL_AVG_BYTES_WRITE_METRIC, wrote);
metrics.updateHistogram(node.getSocketAddress(), OVERALL_AVG_BYTES_WRITE_METRIC, wrote);
node.fillWriteBuffer(shouldOptimize);
canWriteMore = wrote > 0 && node.getBytesRemainingToWrite() > 0;
}
Expand All @@ -832,7 +835,7 @@ private void handleReads(final MemcachedNode node) throws IOException {
ByteBuffer rbuf = node.getRbuf();
final SocketChannel channel = node.getChannel();
int read = channel.read(rbuf);
metrics.updateHistogram(OVERALL_AVG_BYTES_READ_METRIC, read);
metrics.updateHistogram(node.getSocketAddress(), OVERALL_AVG_BYTES_READ_METRIC, read);
if (read < 0) {
currentOp = handleReadsWhenChannelEndOfStream(currentOp, node, rbuf);
}
Expand All @@ -847,9 +850,9 @@ private void handleReads(final MemcachedNode node) throws IOException {

long timeOnWire =
System.nanoTime() - currentOp.getWriteCompleteTimestamp();
metrics.updateHistogram(OVERALL_AVG_TIME_ON_WIRE_METRIC,
metrics.updateHistogram(node.getSocketAddress(), OVERALL_AVG_TIME_ON_WIRE_METRIC,
(int)(timeOnWire / 1000));
metrics.markMeter(OVERALL_RESPONSE_METRIC);
metrics.markMeter(node.getSocketAddress(), OVERALL_RESPONSE_METRIC);
synchronized(currentOp) {
readBufferAndLogMetrics(currentOp, rbuf, node);
}
Expand Down Expand Up @@ -881,9 +884,9 @@ private void readBufferAndLogMetrics(final Operation currentOp,
+ op;

if (op.hasErrored()) {
metrics.markMeter(OVERALL_RESPONSE_FAIL_METRIC);
metrics.markMeter(node.getSocketAddress(), OVERALL_RESPONSE_FAIL_METRIC);
} else {
metrics.markMeter(OVERALL_RESPONSE_SUCC_METRIC);
metrics.markMeter(node.getSocketAddress(), OVERALL_RESPONSE_SUCC_METRIC);
}
} else if (currentOp.getState() == OperationState.RETRY) {
handleRetryInformation(currentOp.getErrorMsg());
Expand All @@ -896,7 +899,7 @@ private void readBufferAndLogMetrics(final Operation currentOp,
+ op;

retryOperation(currentOp);
metrics.markMeter(OVERALL_RESPONSE_RETRY_METRIC);
metrics.markMeter(node.getSocketAddress(), OVERALL_RESPONSE_RETRY_METRIC);
}
}

Expand Down Expand Up @@ -998,7 +1001,7 @@ protected void queueReconnect(final MemcachedNode node) {
}

reconnectQueue.put(reconnectTime, node);
metrics.incrementCounter(RECON_QUEUE_METRIC);
metrics.incrementCounter(node.getSocketAddress(), RECON_QUEUE_METRIC);

node.setupResend();
if (failureMode == FailureMode.Redistribute) {
Expand Down Expand Up @@ -1114,7 +1117,7 @@ private void attemptReconnects() {
while(i.hasNext()) {
final MemcachedNode node = i.next();
i.remove();
metrics.decrementCounter(RECON_QUEUE_METRIC);
metrics.decrementCounter(node.getSocketAddress(), RECON_QUEUE_METRIC);

try {
if (!belongsToCluster(node)) {
Expand Down Expand Up @@ -1258,7 +1261,7 @@ public void insertOperation(final MemcachedNode node, final Operation o) {
o.initialize();
node.insertOp(o);
addedQueue.offer(node);
metrics.markMeter(OVERALL_REQUEST_METRIC);
metrics.markMeter(node.getSocketAddress(), OVERALL_REQUEST_METRIC);

Selector s = selector.wakeup();
assert s == selector : "Wakeup returned the wrong selector.";
Expand All @@ -1280,7 +1283,7 @@ protected void addOperation(final MemcachedNode node, final Operation o) {
o.initialize();
node.addOp(o);
addedQueue.offer(node);
metrics.markMeter(OVERALL_REQUEST_METRIC);
metrics.markMeter(node.getSocketAddress(), OVERALL_REQUEST_METRIC);

Selector s = selector.wakeup();
assert s == selector : "Wakeup returned the wrong selector.";
Expand Down Expand Up @@ -1325,7 +1328,7 @@ public CountDownLatch broadcastOperation(final BroadcastOpFactory of,
node.addOp(op);
op.setHandlingNode(node);
addedQueue.offer(node);
metrics.markMeter(OVERALL_REQUEST_METRIC);
metrics.markMeter(node.getSocketAddress(), OVERALL_REQUEST_METRIC);
}

Selector s = selector.wakeup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,21 @@

package net.spy.memcached.metrics;

import java.net.SocketAddress;

/**
* This abstract class implements methods needed by all {@link MetricCollector}s.
*/
public abstract class AbstractMetricCollector implements MetricCollector {

@Override
public void decrementCounter(String name) {
decrementCounter(name, 1);
public void decrementCounter(SocketAddress node, String name) {
decrementCounter(node, name, 1);
}

@Override
public void incrementCounter(String name) {
incrementCounter(name, 1);
public void incrementCounter(SocketAddress node, String name) {
incrementCounter(node, name, 1);
}

}
21 changes: 11 additions & 10 deletions src/main/java/net/spy/memcached/metrics/DefaultMetricCollector.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.slf4j.LoggerFactory;

import java.io.File;
import java.net.SocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -134,71 +135,71 @@ private void initReporter() {
}

@Override
public void addCounter(String name) {
public void addCounter(SocketAddress node, String name) {
if (!counters.containsKey(name)) {
counters.put(name, registry.counter(name));
}
}

@Override
public void removeCounter(String name) {
public void removeCounter(SocketAddress node, String name) {
if (!counters.containsKey(name)) {
registry.remove(name);
counters.remove(name);
}
}

@Override
public void incrementCounter(String name, int amount) {
public void incrementCounter(SocketAddress node, String name, int amount) {
if (counters.containsKey(name)) {
counters.get(name).inc(amount);
}
}

@Override
public void decrementCounter(String name, int amount) {
public void decrementCounter(SocketAddress node, String name, int amount) {
if (counters.containsKey(name)) {
counters.get(name).dec(amount);
}
}

@Override
public void addMeter(String name) {
public void addMeter(SocketAddress node, String name) {
if (!meters.containsKey(name)) {
meters.put(name, registry.meter(name));
}
}

@Override
public void removeMeter(String name) {
public void removeMeter(SocketAddress node, String name) {
if (meters.containsKey(name)) {
meters.remove(name);
}
}

@Override
public void markMeter(String name) {
public void markMeter(SocketAddress node, String name) {
if (meters.containsKey(name)) {
meters.get(name).mark();
}
}

@Override
public void addHistogram(String name) {
public void addHistogram(SocketAddress node, String name) {
if (!histograms.containsKey(name)) {
histograms.put(name, registry.histogram(name));
}
}

@Override
public void removeHistogram(String name) {
public void removeHistogram(SocketAddress node, String name) {
if (histograms.containsKey(name)) {
histograms.remove(name);
}
}

@Override
public void updateHistogram(String name, int amount) {
public void updateHistogram(SocketAddress node, String name, int amount) {
if (histograms.containsKey(name)) {
histograms.get(name).update(amount);
}
Expand Down
37 changes: 25 additions & 12 deletions src/main/java/net/spy/memcached/metrics/MetricCollector.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

package net.spy.memcached.metrics;

import java.net.SocketAddress;

/**
* Defines a common API for all {@link MetricCollector}s.
*
Expand All @@ -42,88 +44,99 @@ public interface MetricCollector {
/**
* Add a Counter to the collector.
*
* @param node the memcached node
* @param name the name of the counter.
*/
void addCounter(String name);
void addCounter(SocketAddress node, String name);

/**
* Remove a Counter from the collector.
*
* @param node the memcached node
* @param name the name of the counter.
*/
void removeCounter(String name);
void removeCounter(SocketAddress node, String name);

/**
* Increment a Counter by one.
*
* @param node the memcached node
* @param name the name of the counter.
*/
void incrementCounter(String name);
void incrementCounter(SocketAddress node, String name);

/**
* Increment a Counter by the given amount.
*
* @param node the memcached node
* @param name the name of the counter.
* @param amount the amount to increase.
*/
void incrementCounter(String name, int amount);
void incrementCounter(SocketAddress node, String name, int amount);

/**
* Decrement a Counter by one.
*
* @param node the memcached node
* @param name the name of the counter.
*/
void decrementCounter(String name);
void decrementCounter(SocketAddress node, String name);

/**
* Decrement a Counter by the given amount.
*
* @param node the memcached node
* @param name the name of the counter.
* @param amount the amount to decrease.
*/
void decrementCounter(String name, int amount);
void decrementCounter(SocketAddress node, String name, int amount);

/**
* Add a Meter to the Collector.
*
* @param node the memcached node
* @param name the name of the counter.
*/
void addMeter(String name);
void addMeter(SocketAddress node, String name);

/**
* Remove a Meter from the Collector.
*
* @param name the name of the counter.
*/
void removeMeter(String name);
void removeMeter(SocketAddress node, String name);

/**
* Mark a checkpoint in the Meter.
*
* @param node the memcached node
* @param name the name of the counter.
*/
void markMeter(String name);
void markMeter(SocketAddress node, String name);

/**
* Add a Histogram to the Collector.
*
* @param node the memcached node
* @param name the name of the counter.
*/
void addHistogram(String name);
void addHistogram(SocketAddress node, String name);

/**
* Remove a Histogram from the Collector.
*
* @param node the memcached node
* @param name the name of the counter.
*/
void removeHistogram(String name);
void removeHistogram(SocketAddress node, String name);

/**
* Update the Histogram with the given amount.
*
* @param node the memcached node
* @param name the name of the counter.
* @param amount the amount to update.
*/
void updateHistogram(String name, int amount);
void updateHistogram(SocketAddress node, String name, int amount);

}
Loading