diff --git a/pom.xml b/pom.xml
index e37ee80d..3f8a49bb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,6 +6,7 @@
jar
1.12-thrift-0.11.0-inlined-SNAPSHOT
Pegasus Java Client
+
org.junit.jupiter
@@ -80,6 +81,16 @@
javax.annotation-api
1.3.2
+
+ io.prometheus
+ simpleclient
+ 0.4.0
+
+
+ io.prometheus
+ simpleclient_httpserver
+ 0.4.0
+
diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java b/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java
index 2552678c..5b1e9710 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java
@@ -23,7 +23,7 @@
* .operationTimeout(Duration.ofMillis(1000))
* .asyncWorkers(4)
* .enablePerfCounter(false)
- * .falconPerfCounterTags("")
+ * .perfCounterTags("")
* .falconPushInterval(Duration.ofSeconds(10))
* .metaQueryTimeout(Duration.ofMillis(5000))
* .build();
@@ -36,7 +36,8 @@ public class ClientOptions {
public static final Duration DEFAULT_OPERATION_TIMEOUT = Duration.ofMillis(1000);
public static final int DEFAULT_ASYNC_WORKERS = Runtime.getRuntime().availableProcessors();
public static final boolean DEFAULT_ENABLE_PERF_COUNTER = true;
- public static final String DEFAULT_FALCON_PERF_COUNTER_TAGS = "";
+ public static final String DEFAULT_PERF_COUNTER_TYPE = "falcon";
+ public static final String DEFAULT_PERF_COUNTER_TAGS = "";
public static final Duration DEFAULT_FALCON_PUSH_INTERVAL = Duration.ofSeconds(10);
public static final boolean DEFAULT_ENABLE_WRITE_LIMIT = true;
public static final Duration DEFAULT_META_QUERY_TIMEOUT = Duration.ofMillis(5000);
@@ -45,7 +46,8 @@ public class ClientOptions {
private final Duration operationTimeout;
private final int asyncWorkers;
private final boolean enablePerfCounter;
- private final String falconPerfCounterTags;
+ private final String perfCounterType;
+ private final String perfCounterTags;
private final Duration falconPushInterval;
private final boolean enableWriteLimit;
private final Duration metaQueryTimeout;
@@ -55,7 +57,8 @@ protected ClientOptions(Builder builder) {
this.operationTimeout = builder.operationTimeout;
this.asyncWorkers = builder.asyncWorkers;
this.enablePerfCounter = builder.enablePerfCounter;
- this.falconPerfCounterTags = builder.falconPerfCounterTags;
+ this.perfCounterType = builder.perfCounterType;
+ this.perfCounterTags = builder.perfCounterTags;
this.falconPushInterval = builder.falconPushInterval;
this.enableWriteLimit = builder.enableWriteLimit;
this.metaQueryTimeout = builder.metaQueryTimeout;
@@ -66,7 +69,8 @@ protected ClientOptions(ClientOptions original) {
this.operationTimeout = original.getOperationTimeout();
this.asyncWorkers = original.getAsyncWorkers();
this.enablePerfCounter = original.isEnablePerfCounter();
- this.falconPerfCounterTags = original.getFalconPerfCounterTags();
+ this.perfCounterType = original.perfCounterType;
+ this.perfCounterTags = original.getPerfCounterTags();
this.falconPushInterval = original.getFalconPushInterval();
this.enableWriteLimit = original.isWriteLimitEnabled();
this.metaQueryTimeout = original.getMetaQueryTimeout();
@@ -111,7 +115,8 @@ public boolean equals(Object options) {
&& this.operationTimeout.toMillis() == clientOptions.operationTimeout.toMillis()
&& this.asyncWorkers == clientOptions.asyncWorkers
&& this.enablePerfCounter == clientOptions.enablePerfCounter
- && this.falconPerfCounterTags.equals(clientOptions.falconPerfCounterTags)
+ && this.perfCounterType.equals(clientOptions.perfCounterType)
+ && this.perfCounterTags.equals(clientOptions.perfCounterTags)
&& this.falconPushInterval.toMillis() == clientOptions.falconPushInterval.toMillis()
&& this.enableWriteLimit == clientOptions.enableWriteLimit
&& this.metaQueryTimeout.toMillis() == clientOptions.metaQueryTimeout.toMillis();
@@ -129,10 +134,12 @@ public String toString() {
+ operationTimeout.toMillis()
+ ", asyncWorkers="
+ asyncWorkers
- + ", enablePerfCounter="
+ + ", perfCounterType="
+ enablePerfCounter
- + ", falconPerfCounterTags='"
- + falconPerfCounterTags
+ + ", perfCounterTags='"
+ + perfCounterType
+ + ", enablePerfCounter="
+ + perfCounterTags
+ '\''
+ ", falconPushInterval(s)="
+ falconPushInterval.getSeconds()
@@ -149,7 +156,8 @@ public static class Builder {
private Duration operationTimeout = DEFAULT_OPERATION_TIMEOUT;
private int asyncWorkers = DEFAULT_ASYNC_WORKERS;
private boolean enablePerfCounter = DEFAULT_ENABLE_PERF_COUNTER;
- private String falconPerfCounterTags = DEFAULT_FALCON_PERF_COUNTER_TAGS;
+ private String perfCounterType = DEFAULT_PERF_COUNTER_TYPE;
+ private String perfCounterTags = DEFAULT_PERF_COUNTER_TAGS;
private Duration falconPushInterval = DEFAULT_FALCON_PUSH_INTERVAL;
private boolean enableWriteLimit = DEFAULT_ENABLE_WRITE_LIMIT;
private Duration metaQueryTimeout = DEFAULT_META_QUERY_TIMEOUT;
@@ -194,8 +202,8 @@ public Builder asyncWorkers(int asyncWorkers) {
/**
* Whether to enable performance statistics. If true, the client will periodically report
- * metrics to local falcon agent (currently we only support falcon as monitoring system).
- * Defaults to {@literal true}, see {@link #DEFAULT_ENABLE_PERF_COUNTER}.
+ * metrics to local falcon agent (if set falcon as monitoring system) or open prometheus
+ * collector http server. Defaults to {@literal true}, see {@link #DEFAULT_ENABLE_PERF_COUNTER}.
*
* @param enablePerfCounter enablePerfCounter
* @return {@code this}
@@ -205,22 +213,34 @@ public Builder enablePerfCounter(boolean enablePerfCounter) {
return this;
}
+ /**
+ * set the perf-counter type, now only support falcon and prometheus, Defaults to {@literal
+ * falcon}, see {@link #DEFAULT_PERF_COUNTER_TYPE}
+ *
+ * @param perfCounterType perfCounterType
+ * @return this
+ */
+ public Builder perfCounterType(String perfCounterType) {
+ this.perfCounterType = perfCounterType;
+ return this;
+ }
+
/**
* Additional tags for falcon metrics. For example:
* "cluster=c3srv-ad,job=recommend-service-history". Defaults to empty string, see {@link
- * #DEFAULT_FALCON_PERF_COUNTER_TAGS}.
+ * #DEFAULT_PERF_COUNTER_TAGS}.
*
- * @param falconPerfCounterTags falconPerfCounterTags
+ * @param perfCounterTags perfCounterTags
* @return {@code this}
*/
- public Builder falconPerfCounterTags(String falconPerfCounterTags) {
- this.falconPerfCounterTags = falconPerfCounterTags;
+ public Builder perfCounterTags(String perfCounterTags) {
+ this.perfCounterTags = perfCounterTags;
return this;
}
/**
- * The interval to report metrics to local falcon agent. Defaults to {@literal 10s}, see {@link
- * #DEFAULT_FALCON_PUSH_INTERVAL}.
+ * The interval to report metrics to local falcon agent(if set falcon as monitor system).
+ * Defaults to {@literal 10s}, see {@link #DEFAULT_FALCON_PUSH_INTERVAL}.
*
* @param falconPushInterval falconPushInterval
* @return {@code this}
@@ -279,7 +299,8 @@ public ClientOptions.Builder mutate() {
.operationTimeout(getOperationTimeout())
.asyncWorkers(getAsyncWorkers())
.enablePerfCounter(isEnablePerfCounter())
- .falconPerfCounterTags(getFalconPerfCounterTags())
+ .perfCounterType(getPerfCounterType())
+ .perfCounterTags(getPerfCounterTags())
.falconPushInterval(getFalconPushInterval())
.enableWriteLimit(isWriteLimitEnabled())
.metaQueryTimeout(getMetaQueryTimeout());
@@ -316,8 +337,8 @@ public int getAsyncWorkers() {
/**
* Whether to enable performance statistics. If true, the client will periodically report metrics
- * to local falcon agent (currently we only support falcon as monitoring system). Defaults to
- * {@literal true}.
+ * to local falcon agent (if set falcon as monitoring system) or open prometheus collector http
+ * server. Defaults to {@literal true}.
*
* @return whether to enable performance statistics.
*/
@@ -326,16 +347,26 @@ public boolean isEnablePerfCounter() {
}
/**
- * Additional tags for falcon metrics. Defaults to empty string.
+ * get perf-counter type, now only support falcon and prometheus
+ *
+ * @return perf-counter type
+ */
+ public String getPerfCounterType() {
+ return perfCounterType;
+ }
+
+ /**
+ * Additional tags for metrics. Defaults to empty string.
*
* @return additional tags for falcon metrics.
*/
- public String getFalconPerfCounterTags() {
- return falconPerfCounterTags;
+ public String getPerfCounterTags() {
+ return perfCounterTags;
}
/**
- * The interval to report metrics to local falcon agent. Defaults to {@literal 10s}.
+ * The interval to report metrics to local falcon agent(if set falcon as monitor system). Defaults
+ * to {@literal 10s}.
*
* @return the interval to report metrics to local falcon agent.
*/
diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/Falcon.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/FalconCollector.java
similarity index 85%
rename from src/main/java/com/xiaomi/infra/pegasus/metrics/Falcon.java
rename to src/main/java/com/xiaomi/infra/pegasus/metrics/FalconCollector.java
index 1dc14885..f8f1ee58 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/metrics/Falcon.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/FalconCollector.java
@@ -12,20 +12,19 @@
import org.json.JSONException;
import org.json.JSONObject;
-public class Falcon implements PegasusCollector {
-
+public class FalconCollector {
private FalconMetric falconMetric = new FalconMetric();
private final MetricRegistry registry;
public final String defaultTags;
- public Falcon(String host, String tags, int reportStepSec, MetricRegistry registry) {
+ public FalconCollector(String host, String tags, int reportStepSec, MetricRegistry registry) {
this.defaultTags = tags;
this.registry = registry;
falconMetric.endpoint = host;
falconMetric.step = reportStepSec;
}
- public String updateMetric() {
+ public String metricsToJson() {
falconMetric.timestamp = Tools.unixEpochMills() / 1000;
StringBuilder builder = new StringBuilder();
@@ -54,8 +53,8 @@ public void genJsonsFromMeter(String name, Meter meter, StringBuilder output)
throws JSONException {
falconMetric.counterType = "GAUGE";
- falconMetric.metric = name + ".cps-1sec";
- falconMetric.tags = getTableTag(name, defaultTags, "@");
+ falconMetric.metric = name + "_cps_1sec";
+ falconMetric.tags = getTableTag(name, defaultTags);
falconMetric.value = meter.getMeanRate();
oneMetricToJson(falconMetric, output);
}
@@ -65,14 +64,14 @@ public void genJsonsFromHistogram(String name, Histogram hist, StringBuilder out
falconMetric.counterType = "GAUGE";
Snapshot s = hist.getSnapshot();
- falconMetric.metric = name + ".p99";
- falconMetric.tags = getTableTag(name, defaultTags, "@");
+ falconMetric.metric = name + "_p99";
+ falconMetric.tags = getTableTag(name, defaultTags);
falconMetric.value = s.get99thPercentile();
oneMetricToJson(falconMetric, output);
output.append(',');
- falconMetric.metric = name + ".p999";
- falconMetric.tags = getTableTag(name, defaultTags, "@");
+ falconMetric.metric = name + "_p999";
+ falconMetric.tags = getTableTag(name, defaultTags);
falconMetric.value = s.get999thPercentile();
oneMetricToJson(falconMetric, output);
}
diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsReporter.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/FalconReporter.java
similarity index 67%
rename from src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsReporter.java
rename to src/main/java/com/xiaomi/infra/pegasus/metrics/FalconReporter.java
index 3285c5aa..2be86666 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsReporter.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/FalconReporter.java
@@ -17,15 +17,15 @@
import org.slf4j.Logger;
/** Created by weijiesun on 18-3-9. */
-public class MetricsReporter {
- public MetricsReporter(int reportSecs, PegasusCollector collector) {
+public class FalconReporter implements PegasusMonitor {
+ public FalconReporter(int reportSecs, FalconCollector falconCollectorMetric) {
falconAgentIP = "127.0.0.1";
falconAgentPort = 1988;
falconAgentSocket = falconAgentIP + ":" + falconAgentPort;
reportIntervalSecs = reportSecs;
falconRequestPath = "/v1/push";
- pegasusCollector = collector;
+ falconCollector = falconCollectorMetric;
boot = new Bootstrap();
httpClientGroup = new NioEventLoopGroup(1);
@@ -51,36 +51,32 @@ public void initChannel(SocketChannel ch) {
reportTarget = null;
}
+ @Override
public void start() {
reportStopped = false;
tryConnect();
}
+ @Override
public void stop() {
httpClientGroup.execute(
- new Runnable() {
- @Override
- public void run() {
- reportStopped = true;
- if (actionLater != null) {
- actionLater.cancel(false);
- }
- if (reportTarget != null) {
- reportTarget
- .close()
- .addListener(
- new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture channelFuture)
- throws Exception {
+ () -> {
+ reportStopped = true;
+ if (actionLater != null) {
+ actionLater.cancel(false);
+ }
+ if (reportTarget != null) {
+ reportTarget
+ .close()
+ .addListener(
+ (ChannelFutureListener)
+ channelFuture -> {
if (channelFuture.isSuccess()) {
logger.info("close channel to {} succeed", falconAgentSocket);
} else {
logger.warn("close channel to {} failed: ", channelFuture.cause());
}
- }
- });
- }
+ });
}
});
@@ -95,68 +91,52 @@ public void operationComplete(ChannelFuture channelFuture)
public void tryConnect() {
boot.connect(falconAgentIP, falconAgentPort)
.addListener(
- new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture channelFuture) throws Exception {
- if (channelFuture.isSuccess()) {
- reportTarget = channelFuture.channel();
- logger.info("create channel with {} succeed, wait it active", falconAgentSocket);
- } else {
- logger.error(
- "create channel with {} failed, connect later: ",
- falconAgentSocket,
- channelFuture.cause());
- scheduleNextConnect();
- }
- }
- });
+ (ChannelFutureListener)
+ channelFuture -> {
+ if (channelFuture.isSuccess()) {
+ reportTarget = channelFuture.channel();
+ logger.info(
+ "create channel with {} succeed, wait it active", falconAgentSocket);
+ } else {
+ logger.error(
+ "create channel with {} failed, connect later: ",
+ falconAgentSocket,
+ channelFuture.cause());
+ scheduleNextConnect();
+ }
+ });
}
public void scheduleNextConnect() {
if (reportStopped) return;
actionLater =
- httpClientGroup.schedule(
- new Runnable() {
- @Override
- public void run() {
- tryConnect();
- }
- },
- (long) reportIntervalSecs,
- TimeUnit.SECONDS);
+ httpClientGroup.schedule(this::tryConnect, (long) reportIntervalSecs, TimeUnit.SECONDS);
}
public void scheduleNextReport(final Channel channel) {
if (reportStopped) return;
actionLater =
httpClientGroup.schedule(
- new Runnable() {
- @Override
- public void run() {
- reportMetrics(channel);
- }
- },
- reportIntervalSecs,
- TimeUnit.SECONDS);
+ () -> reportMetrics(channel), reportIntervalSecs, TimeUnit.SECONDS);
}
public void reportMetrics(final Channel channel) {
- String result;
+ String json_metrics;
try {
- result = pegasusCollector.updateMetric();
+ json_metrics = falconCollector.metricsToJson();
} catch (JSONException ex) {
logger.warn("encode metrics to json failed, skip current report, retry later: ", ex);
scheduleNextReport(channel);
return;
}
- logger.debug("generate metrics {} and try to report", result);
+ logger.debug("generate metrics {} and try to report", json_metrics);
FullHttpRequest request =
new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1,
HttpMethod.POST,
falconRequestPath,
- Unpooled.copiedBuffer(result.getBytes()));
+ Unpooled.copiedBuffer(json_metrics.getBytes()));
request.headers().add(HttpHeaders.Names.HOST, falconAgentSocket);
request.headers().add(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
request.headers().add(HttpHeaders.Names.CONTENT_LENGTH, request.content().readableBytes());
@@ -165,18 +145,16 @@ public void reportMetrics(final Channel channel) {
channel
.writeAndFlush(request)
.addListener(
- new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture channelFuture) throws Exception {
- if (!channelFuture.isSuccess()) {
- logger.warn(
- "report to {} failed, skip current report, retry later: ",
- channel.toString(),
- channelFuture.cause());
- channel.close();
- }
- }
- });
+ (ChannelFutureListener)
+ channelFuture -> {
+ if (!channelFuture.isSuccess()) {
+ logger.warn(
+ "report to {} failed, skip current report, retry later: ",
+ channel.toString(),
+ channelFuture.cause());
+ channel.close();
+ }
+ });
}
class HttpClientHandler extends SimpleChannelInboundHandler {
@@ -226,7 +204,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
private int reportIntervalSecs;
private String falconRequestPath;
- private PegasusCollector pegasusCollector;
+ public FalconCollector falconCollector;
private Bootstrap boot;
private EventLoopGroup httpClientGroup;
@@ -235,5 +213,5 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
private boolean reportStopped;
private Channel reportTarget;
- private static final Logger logger = org.slf4j.LoggerFactory.getLogger(MetricsReporter.class);
+ private static final Logger logger = org.slf4j.LoggerFactory.getLogger(FalconReporter.class);
}
diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsManager.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsManager.java
index 37360bad..89c0fc1b 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsManager.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsManager.java
@@ -8,6 +8,7 @@
/** Created by weijiesun on 18-3-8. */
public final class MetricsManager {
+
public static void updateCount(String counterName, long count) {
metrics.setMeter(counterName, count);
}
@@ -16,13 +17,14 @@ public static void setHistogramValue(String counterName, long value) {
metrics.setHistorgram(counterName, value);
}
- public static final void initFromHost(
+ public static void initFromHost(
String host, String tag, int reportIntervalSec, String perfCounterType) {
synchronized (logger) {
if (started) {
logger.warn(
- "perf counter system has started with host({}), tag({}), interval({}), "
+ "perf counter system({}) has started with host({}), tag({}), interval(if set falcon as system)({}), "
+ "skip this init with host({}), tag({}), interval(){}",
+ MetricsManager.perfCounterType,
MetricsManager.host,
MetricsManager.tag,
MetricsManager.reportIntervalSecs,
@@ -35,21 +37,22 @@ public static final void initFromHost(
logger.info(
"init metrics with host({}), tag({}), interval({})", host, tag, reportIntervalSec);
+ MetricsManager.perfCounterType = perfCounterType;
MetricsManager.host = host;
MetricsManager.tag = tag;
MetricsManager.reportIntervalSecs = reportIntervalSec;
metrics = new MetricsPool(host, tag, reportIntervalSec, perfCounterType);
+ metrics.start();
started = true;
}
}
- public static final void detectHostAndInit(
- String tag, int reportIntervalSec, String perfCounterType) {
+ public static void detectHostAndInit(String tag, int reportIntervalSec, String perfCounterType) {
initFromHost(
Tools.getLocalHostAddress().getHostName(), tag, reportIntervalSec, perfCounterType);
}
- public static final void finish() {
+ public static void finish() {
synchronized (logger) {
if (started) {
metrics.stop();
@@ -62,7 +65,7 @@ public static final void finish() {
private static String host;
private static String tag;
private static int reportIntervalSecs;
-
+ private static String perfCounterType;
private static MetricsPool metrics;
private static final Logger logger = org.slf4j.LoggerFactory.getLogger(MetricsManager.class);
diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java
index 9623fb6f..df7e386f 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java
@@ -8,31 +8,25 @@
/** Created by weijiesun on 18-3-9. */
public final class MetricsPool {
- public final String metricType;
-
private final MetricRegistry registry = new MetricRegistry();
- public static MetricsReporter reporter;
- public static PegasusCollector collector;
-
- public MetricsPool(String host, String tags, int reportStepSec, String type) {
- metricType = type;
+ public static PegasusMonitor pegasusMonitor;
+ public MetricsPool(String host, String tags, int reportStepSec, String metricType) {
if (metricType.equals("falcon")) {
- collector = new Falcon(host, tags, reportStepSec, registry);
- reporter = new MetricsReporter(reportStepSec, collector);
- reporter.start();
- } else if (type.equals("prometheus")) {
- collector = new Prometheus(tags, reportStepSec, registry);
- ((Prometheus) collector).start();
+ pegasusMonitor =
+ new FalconReporter(
+ reportStepSec, new FalconCollector(host, tags, reportStepSec, registry));
+ } else if (metricType.equals("prometheus")) {
+ pegasusMonitor = new PrometheusCollector(tags, registry);
}
}
+ public void start() {
+ pegasusMonitor.start();
+ }
+
public void stop() {
- if (metricType.equals("falcon")) {
- reporter.stop();
- } else if (metricType.equals("prometheus")) {
- ((Prometheus) collector).stop();
- }
+ pegasusMonitor.stop();
}
public void setMeter(String counterName, long count) {
@@ -43,11 +37,11 @@ public void setHistorgram(String counterName, long value) {
registry.histogram(counterName).update(value);
}
- public static String getTableTag(String counterName, String defaultTags, String regex) {
+ public static String getTableTag(String counterName, String defaultTags) {
if (defaultTags.contains("table=")) {
return defaultTags;
}
- String[] result = counterName.split(regex);
+ String[] result = counterName.split(":");
if (result.length >= 2) {
return defaultTags.equals("")
? ("table=" + result[1])
diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/PegasusCollector.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/PegasusCollector.java
deleted file mode 100644
index 1434d76f..00000000
--- a/src/main/java/com/xiaomi/infra/pegasus/metrics/PegasusCollector.java
+++ /dev/null
@@ -1,6 +0,0 @@
-package com.xiaomi.infra.pegasus.metrics;
-
-public interface PegasusCollector {
-
- public String updateMetric();
-}
diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/PegasusMonitor.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/PegasusMonitor.java
new file mode 100644
index 00000000..3c8a66a1
--- /dev/null
+++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/PegasusMonitor.java
@@ -0,0 +1,8 @@
+package com.xiaomi.infra.pegasus.metrics;
+
+public interface PegasusMonitor {
+
+ public void start();
+
+ public void stop();
+}
diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/Prometheus.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/Prometheus.java
deleted file mode 100644
index 1572012f..00000000
--- a/src/main/java/com/xiaomi/infra/pegasus/metrics/Prometheus.java
+++ /dev/null
@@ -1,122 +0,0 @@
-package com.xiaomi.infra.pegasus.metrics;
-
-import static com.xiaomi.infra.pegasus.metrics.MetricsPool.getTableTag;
-
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Snapshot;
-import io.prometheus.client.Collector;
-import io.prometheus.client.GaugeMetricFamily;
-import io.prometheus.client.exporter.HTTPServer;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-public class Prometheus extends Collector implements PegasusCollector {
-
- private final ScheduledExecutorService collectorTask;
- private final String defaultTags;
- private final MetricRegistry registry;
- private int reportStepSec;
-
- private Map metrics = new ConcurrentHashMap<>();
- private Map> tableLabels = new HashMap<>();
-
- public Prometheus(String defaultTags, int reportStepSec, MetricRegistry registry) {
- this.defaultTags = defaultTags;
- this.reportStepSec = reportStepSec;
- this.registry = registry;
- this.collectorTask = Executors.newScheduledThreadPool(1);
- }
-
- public List collect() {
- return new ArrayList<>(metrics.values());
- }
-
- public String updateMetric() {
- for (Map.Entry entry : registry.getMeters().entrySet()) {
- String QPSName = format(entry.getKey());
- updateQPSMetric(entry, QPSName);
- }
-
- for (Map.Entry entry : registry.getHistograms().entrySet()) {
- String latencyName = format(entry.getKey());
- updateLatencyMetric(entry, latencyName + "-99th");
- updateLatencyMetric(entry, latencyName + "-999th");
- }
-
- return "OK";
- }
-
- private void updateQPSMetric(Map.Entry meter, String name) {
- Map labels = getLabel(getTableTag(name, defaultTags, ":"));
- if (!metrics.containsKey(name)) {
- // assert labels != null;
- GaugeMetricFamily labeledGauge =
- new GaugeMetricFamily(name, "help", new ArrayList<>(labels.keySet()));
- labeledGauge.addMetric(new ArrayList<>(labels.values()), meter.getValue().getMeanRate());
- metrics.put(name, labeledGauge);
- }
- ((GaugeMetricFamily) metrics.get(name))
- .addMetric(new ArrayList<>(labels.values()), meter.getValue().getMeanRate());
- }
-
- private void updateLatencyMetric(Map.Entry meter, String name) {
- Map labels = getLabel(getTableTag(name, defaultTags, ":"));
- ArrayList labelList = new ArrayList<>(labels.values());
- Snapshot snapshot = meter.getValue().getSnapshot();
- double value =
- name.contains("99th") ? snapshot.get99thPercentile() : snapshot.get999thPercentile();
- if (!metrics.containsKey(name)) {
- // assert labels != null;
- GaugeMetricFamily labeledGauge =
- new GaugeMetricFamily(name, "help", new ArrayList<>(labels.keySet()));
- labeledGauge.addMetric(new ArrayList<>(labels.values()), value);
- metrics.put(name, labeledGauge);
- }
-
- ((GaugeMetricFamily) metrics.get(name)).addMetric(labelList, value);
- }
-
- private Map getLabel(String labels) {
- if (tableLabels.containsKey(labels)) {
- return tableLabels.get(labels);
- }
-
- // todo nullptr
- HashMap labelMap = new HashMap<>();
- String[] labelsString = labels.split(",");
- for (String label : labelsString) {
- String[] labelPair = label.split("=");
- labelMap.put(labelPair[0], labelPair[1]);
- }
- tableLabels.put(labels, labelMap);
- return labelMap;
- }
-
- private String format(String name) {
- return name.replaceAll("\\.", "_").replaceAll("@", ":");
- }
-
- public void start() {
- try {
- register();
- new HTTPServer(9091);
- collectorTask.scheduleAtFixedRate(this::updateMetric, 0, 10, TimeUnit.SECONDS);
- System.out.println("XXXX");
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- public void stop() {
- collectorTask.shutdown();
- }
-}
diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/PrometheusCollector.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/PrometheusCollector.java
new file mode 100644
index 00000000..a2beafab
--- /dev/null
+++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/PrometheusCollector.java
@@ -0,0 +1,129 @@
+package com.xiaomi.infra.pegasus.metrics;
+
+import static com.xiaomi.infra.pegasus.metrics.MetricsPool.getTableTag;
+
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Snapshot;
+import com.google.common.collect.ImmutableList;
+import io.prometheus.client.Collector;
+import io.prometheus.client.GaugeMetricFamily;
+import io.prometheus.client.exporter.HTTPServer;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.slf4j.Logger;
+
+public class PrometheusCollector extends Collector implements PegasusMonitor {
+ private static final Logger logger = org.slf4j.LoggerFactory.getLogger(PrometheusCollector.class);
+
+ private static final int PORT = 9091;
+
+ private final String defaultTags;
+ private final MetricRegistry registry;
+ private Map> tableLabels = new HashMap<>();
+
+ public PrometheusCollector(String defaultTags, MetricRegistry registry) {
+ this.defaultTags = defaultTags;
+ this.registry = registry;
+ }
+
+ public List collect() {
+ final ImmutableList.Builder metricFamilySamplesBuilder =
+ ImmutableList.builder();
+ updateMetric(metricFamilySamplesBuilder);
+ return metricFamilySamplesBuilder.build();
+ }
+
+ public void updateMetric(final ImmutableList.Builder builder) {
+ for (Map.Entry entry : registry.getMeters().entrySet()) {
+ updateQPSMetric(entry.getKey(), entry, builder);
+ }
+
+ for (Map.Entry entry : registry.getHistograms().entrySet()) {
+ updateLatencyMetric(entry.getKey(), entry, builder);
+ }
+ }
+
+ private void updateQPSMetric(
+ String name,
+ Map.Entry meter,
+ final ImmutableList.Builder builder) {
+ Map labels = getLabel(getTableTag(name, defaultTags));
+ GaugeMetricFamily labeledGauge =
+ new GaugeMetricFamily(
+ formatQPSMetricName(name), "pegasus operation qps", new ArrayList<>(labels.keySet()));
+ labeledGauge.addMetric(new ArrayList<>(labels.values()), meter.getValue().getMeanRate());
+ builder.add(labeledGauge);
+ }
+
+ private void updateLatencyMetric(
+ String name,
+ Map.Entry meter,
+ final ImmutableList.Builder builder) {
+ Map labels = getLabel(getTableTag(name, defaultTags));
+ Snapshot snapshot = meter.getValue().getSnapshot();
+ updateLatencyMetric(
+ formatLatencyMetricName(name, "p99"), snapshot.get99thPercentile(), labels, builder);
+ updateLatencyMetric(
+ formatLatencyMetricName(name, "p999"), snapshot.get999thPercentile(), labels, builder);
+ }
+
+ private void updateLatencyMetric(
+ String key,
+ double value,
+ Map labels,
+ final ImmutableList.Builder builder) {
+ GaugeMetricFamily labeledGauge =
+ new GaugeMetricFamily(key, "pegasus operation latency", new ArrayList<>(labels.keySet()));
+ labeledGauge.addMetric(new ArrayList<>(labels.values()), value);
+ builder.add(labeledGauge);
+ }
+
+ private Map getLabel(String labels) {
+ if (tableLabels.containsKey(labels)) {
+ return tableLabels.get(labels);
+ }
+
+ HashMap labelMap = new HashMap<>();
+ String[] labelsString = labels.split(",");
+ for (String label : labelsString) {
+ String[] labelPair = label.split("=");
+ assert (labelPair.length == 2);
+ labelMap.put(labelPair[0], labelPair[1]);
+ }
+ tableLabels.put(labels, labelMap);
+ return labelMap;
+ }
+
+ private String formatLatencyMetricName(String name, String percentage) {
+ String[] metricName = name.split(":");
+ assert (metricName.length == 2);
+ return metricName[0] + "_" + percentage;
+ }
+
+ private String formatQPSMetricName(String name) {
+ String[] metricName = name.split(":");
+ assert (metricName.length == 2);
+ return metricName[0];
+ }
+
+ @Override
+ public void start() {
+ try {
+ register();
+ new HTTPServer(PORT);
+ logger.debug("start the prometheus collector server, port = {}", PORT);
+ } catch (IOException e) {
+ logger.debug("start the prometheus collector server failed, error = {}", e.getMessage());
+ }
+ }
+
+ @Override
+ public void stop() {
+ logger.debug("stop the prometheus collector server, port = {}", PORT);
+ }
+}
diff --git a/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java b/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java
index 9be285ea..f5139493 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java
@@ -56,23 +56,23 @@ public String getQPSCounter() {
break;
}
- // pegasus.client.put.succ.qps
+ // pegasus_client_put_succ_qps
return new StringBuilder()
- .append("pegasus.client.")
+ .append("pegasus_client_")
.append(name())
- .append(".")
+ .append("_")
.append(mark)
- .append(".qps@")
+ .append("_qps:")
.append(tableName)
.toString();
}
public String getLatencyCounter() {
- // pegasus.client.put.latency
+ // pegasus_client_put_latency
return new StringBuilder()
- .append("pegasus.client.")
+ .append("pegasus_client_")
.append(name())
- .append(".latency@")
+ .append("_latency:")
.append(tableName)
.toString();
}
diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/ClusterOptions.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/ClusterOptions.java
index af14149e..64cd67dd 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/rpc/ClusterOptions.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/ClusterOptions.java
@@ -115,6 +115,7 @@ public static ClusterOptions create(Properties config) {
PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY, PEGASUS_PUSH_COUNTER_INTERVAL_SECS_DEF));
String perfCounterType =
config.getProperty(PEGASUS_PUSH_COUNTER_TYPE_KEY, PEGASUS_PUSH_COUNTER_TYPE_DEF);
+ assert ((perfCounterType.equals("falcon") || perfCounterType.equals("prometheus")));
int metaQueryTimeout =
Integer.parseInt(
config.getProperty(PEGASUS_META_QUERY_TIMEOUT_KEY, PEGASUS_META_QUERY_TIMEOUT_DEF));
diff --git a/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java b/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java
index 2ccebe42..86aa56aa 100644
--- a/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java
+++ b/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java
@@ -136,53 +136,44 @@ public void testSetGetDel() throws PException {
try {
// set
- while (true) {
- Thread.sleep(1000);
-
- client.set(
- tableName,
- hashKey,
- "basic_test_sort_key_1".getBytes(),
- "basic_test_value_1".getBytes());
+ client.set(
+ tableName, hashKey, "basic_test_sort_key_1".getBytes(), "basic_test_value_1".getBytes());
- // check exist
- boolean exist = client.exist(tableName, hashKey, "basic_test_sort_key_1".getBytes());
- Assert.assertTrue(exist);
+ // check exist
+ boolean exist = client.exist(tableName, hashKey, "basic_test_sort_key_1".getBytes());
+ Assert.assertTrue(exist);
- exist = client.exist(tableName, hashKey, "basic_test_sort_key_2".getBytes());
- Assert.assertFalse(exist);
+ exist = client.exist(tableName, hashKey, "basic_test_sort_key_2".getBytes());
+ Assert.assertFalse(exist);
- // check sortkey count
- long sortKeyCount = client.sortKeyCount(tableName, hashKey);
- Assert.assertEquals(1, sortKeyCount);
+ // check sortkey count
+ long sortKeyCount = client.sortKeyCount(tableName, hashKey);
+ Assert.assertEquals(1, sortKeyCount);
- // get
- byte[] value = client.get(tableName, hashKey, "basic_test_sort_key_1".getBytes());
- Assert.assertArrayEquals("basic_test_value_1".getBytes(), value);
+ // get
+ byte[] value = client.get(tableName, hashKey, "basic_test_sort_key_1".getBytes());
+ Assert.assertArrayEquals("basic_test_value_1".getBytes(), value);
- value = client.get(tableName, hashKey, "basic_test_sort_key_2".getBytes());
- Assert.assertEquals(null, value);
+ value = client.get(tableName, hashKey, "basic_test_sort_key_2".getBytes());
+ Assert.assertEquals(null, value);
- // del
- client.del(tableName, hashKey, "basic_test_sort_key_1".getBytes());
+ // del
+ client.del(tableName, hashKey, "basic_test_sort_key_1".getBytes());
- // check exist
- exist = client.exist(tableName, hashKey, "basic_test_sort_key_1".getBytes());
- Assert.assertFalse(exist);
+ // check exist
+ exist = client.exist(tableName, hashKey, "basic_test_sort_key_1".getBytes());
+ Assert.assertFalse(exist);
- // check sortkey count
- sortKeyCount = client.sortKeyCount(tableName, hashKey);
- Assert.assertEquals(0, sortKeyCount);
+ // check sortkey count
+ sortKeyCount = client.sortKeyCount(tableName, hashKey);
+ Assert.assertEquals(0, sortKeyCount);
- // check deleted
- value = client.get(tableName, hashKey, "basic_test_sort_key_1".getBytes());
- Assert.assertEquals(null, value);
- }
+ // check deleted
+ value = client.get(tableName, hashKey, "basic_test_sort_key_1".getBytes());
+ Assert.assertEquals(null, value);
} catch (PException e) {
e.printStackTrace();
Assert.assertTrue(false);
- } catch (InterruptedException e) {
- e.printStackTrace();
}
PegasusClientFactory.closeSingletonClient();
diff --git a/src/test/java/com/xiaomi/infra/pegasus/metrics/MetricsPoolTest.java b/src/test/java/com/xiaomi/infra/pegasus/metrics/MetricsPoolTest.java
index 198ec26a..d6e582b5 100644
--- a/src/test/java/com/xiaomi/infra/pegasus/metrics/MetricsPoolTest.java
+++ b/src/test/java/com/xiaomi/infra/pegasus/metrics/MetricsPoolTest.java
@@ -23,14 +23,14 @@ public void before() {
public void genJsonsFromMeter() throws Exception {
String host = "simple-test-host.bj";
String tags = "what=you,like=another";
- Falcon falcon = new Falcon(host, tags, 20, r);
+ FalconCollector falconCollector = new FalconCollector(host, tags, 20, r);
Meter m = r.meter("TestName");
m.mark(1);
m.mark(1);
StringBuilder builder = new StringBuilder();
- falcon.genJsonsFromMeter("TestName", m, builder);
+ falconCollector.genJsonsFromMeter("TestName", m, builder);
JSONArray array = new JSONArray("[" + builder.toString() + "]");
Assert.assertEquals(1, array.length());
@@ -54,12 +54,12 @@ public void genJsonsFromMeter() throws Exception {
public void genJsonFromHistogram() throws Exception {
String host = "simple-test-host.bj";
String tags = "what=you,like=another";
- Falcon falcon = new Falcon(host, tags, 20, r);
+ FalconCollector falconCollector = new FalconCollector(host, tags, 20, r);
Histogram h = r.histogram("TestHist");
for (int i = 0; i < 1000000; ++i) h.update((long) i);
StringBuilder builder = new StringBuilder();
- falcon.genJsonsFromHistogram("TestHist", h, builder);
+ falconCollector.genJsonsFromHistogram("TestHist", h, builder);
JSONArray array = new JSONArray("[" + builder.toString() + "]");
Assert.assertEquals(2, array.length());
@@ -79,7 +79,7 @@ public void genJsonFromHistogram() throws Exception {
@Test
public void oneMetricToJson() throws Exception {
- Falcon.FalconMetric metric = new Falcon.FalconMetric();
+ FalconCollector.FalconMetric metric = new FalconCollector.FalconMetric();
metric.endpoint = "1.2.3.4";
metric.metric = "simple_metric";
metric.timestamp = 12343455L;
@@ -89,7 +89,7 @@ public void oneMetricToJson() throws Exception {
metric.tags = "cluster=onebox,app=new";
StringBuilder builder = new StringBuilder();
- Falcon.oneMetricToJson(metric, builder);
+ FalconCollector.oneMetricToJson(metric, builder);
JSONObject obj = new JSONObject(builder.toString());
Assert.assertEquals(metric.endpoint, obj.getString("endpoint"));
@@ -102,7 +102,7 @@ public void oneMetricToJson() throws Exception {
builder.setLength(0);
metric.tags = "";
- Falcon.oneMetricToJson(metric, builder);
+ FalconCollector.oneMetricToJson(metric, builder);
obj = new JSONObject(builder.toString());
Assert.assertEquals(metric.endpoint, obj.getString("endpoint"));
Assert.assertEquals(metric.metric, obj.getString("metric"));
@@ -117,7 +117,7 @@ public void oneMetricToJson() throws Exception {
public void metricsToJson() throws Exception {
String host = "simple-test-host.bj";
String tags = "what=you,like=another";
- MetricsPool pool = new MetricsPool(host, tags, 20, "falcon");
+ MetricsPool pool = new MetricsPool(host, tags, 20, "falconCollector");
pool.setMeter("aaa@temp", 1);
pool.setMeter("aaa", 2);
@@ -127,7 +127,9 @@ public void metricsToJson() throws Exception {
pool.setHistorgram("ccc@temp", i);
}
- JSONArray array = new JSONArray(MetricsPool.collector.updateMetric());
+ JSONArray array =
+ new JSONArray(
+ ((FalconReporter) MetricsPool.pegasusMonitor).falconCollector.metricsToJson());
Assert.assertEquals(6, array.length());
for (int i = 0; i < array.length(); ++i) {
JSONObject j = array.getJSONObject(i);
diff --git a/src/test/resource/pegasus.properties b/src/test/resource/pegasus.properties
index ec4da28e..6282eea7 100644
--- a/src/test/resource/pegasus.properties
+++ b/src/test/resource/pegasus.properties
@@ -1,7 +1,6 @@
meta_servers = 127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603
operation_timeout = 10000
async_workers = 2
-enable_perf_counter = true
-push_counter_type = prometheus
+enable_perf_counter = false
perf_counter_tags = cluster=onebox,app=unit_test
push_counter_interval_secs = 10