diff --git a/pom.xml b/pom.xml
index e37ee80d..3f8a49bb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,6 +6,7 @@
jar
1.12-thrift-0.11.0-inlined-SNAPSHOT
Pegasus Java Client
+
org.junit.jupiter
@@ -80,6 +81,16 @@
javax.annotation-api
1.3.2
+
+ io.prometheus
+ simpleclient
+ 0.4.0
+
+
+ io.prometheus
+ simpleclient_httpserver
+ 0.4.0
+
diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java b/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java
index 2552678c..5b1e9710 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java
@@ -23,7 +23,7 @@
* .operationTimeout(Duration.ofMillis(1000))
* .asyncWorkers(4)
* .enablePerfCounter(false)
- * .falconPerfCounterTags("")
+ * .perfCounterTags("")
* .falconPushInterval(Duration.ofSeconds(10))
* .metaQueryTimeout(Duration.ofMillis(5000))
* .build();
@@ -36,7 +36,8 @@ public class ClientOptions {
public static final Duration DEFAULT_OPERATION_TIMEOUT = Duration.ofMillis(1000);
public static final int DEFAULT_ASYNC_WORKERS = Runtime.getRuntime().availableProcessors();
public static final boolean DEFAULT_ENABLE_PERF_COUNTER = true;
- public static final String DEFAULT_FALCON_PERF_COUNTER_TAGS = "";
+ public static final String DEFAULT_PERF_COUNTER_TYPE = "falcon";
+ public static final String DEFAULT_PERF_COUNTER_TAGS = "";
public static final Duration DEFAULT_FALCON_PUSH_INTERVAL = Duration.ofSeconds(10);
public static final boolean DEFAULT_ENABLE_WRITE_LIMIT = true;
public static final Duration DEFAULT_META_QUERY_TIMEOUT = Duration.ofMillis(5000);
@@ -45,7 +46,8 @@ public class ClientOptions {
private final Duration operationTimeout;
private final int asyncWorkers;
private final boolean enablePerfCounter;
- private final String falconPerfCounterTags;
+ private final String perfCounterType;
+ private final String perfCounterTags;
private final Duration falconPushInterval;
private final boolean enableWriteLimit;
private final Duration metaQueryTimeout;
@@ -55,7 +57,8 @@ protected ClientOptions(Builder builder) {
this.operationTimeout = builder.operationTimeout;
this.asyncWorkers = builder.asyncWorkers;
this.enablePerfCounter = builder.enablePerfCounter;
- this.falconPerfCounterTags = builder.falconPerfCounterTags;
+ this.perfCounterType = builder.perfCounterType;
+ this.perfCounterTags = builder.perfCounterTags;
this.falconPushInterval = builder.falconPushInterval;
this.enableWriteLimit = builder.enableWriteLimit;
this.metaQueryTimeout = builder.metaQueryTimeout;
@@ -66,7 +69,8 @@ protected ClientOptions(ClientOptions original) {
this.operationTimeout = original.getOperationTimeout();
this.asyncWorkers = original.getAsyncWorkers();
this.enablePerfCounter = original.isEnablePerfCounter();
- this.falconPerfCounterTags = original.getFalconPerfCounterTags();
+ this.perfCounterType = original.perfCounterType;
+ this.perfCounterTags = original.getPerfCounterTags();
this.falconPushInterval = original.getFalconPushInterval();
this.enableWriteLimit = original.isWriteLimitEnabled();
this.metaQueryTimeout = original.getMetaQueryTimeout();
@@ -111,7 +115,8 @@ public boolean equals(Object options) {
&& this.operationTimeout.toMillis() == clientOptions.operationTimeout.toMillis()
&& this.asyncWorkers == clientOptions.asyncWorkers
&& this.enablePerfCounter == clientOptions.enablePerfCounter
- && this.falconPerfCounterTags.equals(clientOptions.falconPerfCounterTags)
+ && this.perfCounterType.equals(clientOptions.perfCounterType)
+ && this.perfCounterTags.equals(clientOptions.perfCounterTags)
&& this.falconPushInterval.toMillis() == clientOptions.falconPushInterval.toMillis()
&& this.enableWriteLimit == clientOptions.enableWriteLimit
&& this.metaQueryTimeout.toMillis() == clientOptions.metaQueryTimeout.toMillis();
@@ -129,10 +134,12 @@ public String toString() {
+ operationTimeout.toMillis()
+ ", asyncWorkers="
+ asyncWorkers
- + ", enablePerfCounter="
+ + ", perfCounterType="
+ enablePerfCounter
- + ", falconPerfCounterTags='"
- + falconPerfCounterTags
+ + ", perfCounterTags='"
+ + perfCounterType
+ + ", enablePerfCounter="
+ + perfCounterTags
+ '\''
+ ", falconPushInterval(s)="
+ falconPushInterval.getSeconds()
@@ -149,7 +156,8 @@ public static class Builder {
private Duration operationTimeout = DEFAULT_OPERATION_TIMEOUT;
private int asyncWorkers = DEFAULT_ASYNC_WORKERS;
private boolean enablePerfCounter = DEFAULT_ENABLE_PERF_COUNTER;
- private String falconPerfCounterTags = DEFAULT_FALCON_PERF_COUNTER_TAGS;
+ private String perfCounterType = DEFAULT_PERF_COUNTER_TYPE;
+ private String perfCounterTags = DEFAULT_PERF_COUNTER_TAGS;
private Duration falconPushInterval = DEFAULT_FALCON_PUSH_INTERVAL;
private boolean enableWriteLimit = DEFAULT_ENABLE_WRITE_LIMIT;
private Duration metaQueryTimeout = DEFAULT_META_QUERY_TIMEOUT;
@@ -194,8 +202,8 @@ public Builder asyncWorkers(int asyncWorkers) {
/**
* Whether to enable performance statistics. If true, the client will periodically report
- * metrics to local falcon agent (currently we only support falcon as monitoring system).
- * Defaults to {@literal true}, see {@link #DEFAULT_ENABLE_PERF_COUNTER}.
+ * metrics to local falcon agent (if set falcon as monitoring system) or open prometheus
+ * collector http server. Defaults to {@literal true}, see {@link #DEFAULT_ENABLE_PERF_COUNTER}.
*
* @param enablePerfCounter enablePerfCounter
* @return {@code this}
@@ -205,22 +213,34 @@ public Builder enablePerfCounter(boolean enablePerfCounter) {
return this;
}
+ /**
+ * set the perf-counter type, now only support falcon and prometheus, Defaults to {@literal
+ * falcon}, see {@link #DEFAULT_PERF_COUNTER_TYPE}
+ *
+ * @param perfCounterType perfCounterType
+ * @return this
+ */
+ public Builder perfCounterType(String perfCounterType) {
+ this.perfCounterType = perfCounterType;
+ return this;
+ }
+
/**
* Additional tags for falcon metrics. For example:
* "cluster=c3srv-ad,job=recommend-service-history". Defaults to empty string, see {@link
- * #DEFAULT_FALCON_PERF_COUNTER_TAGS}.
+ * #DEFAULT_PERF_COUNTER_TAGS}.
*
- * @param falconPerfCounterTags falconPerfCounterTags
+ * @param perfCounterTags perfCounterTags
* @return {@code this}
*/
- public Builder falconPerfCounterTags(String falconPerfCounterTags) {
- this.falconPerfCounterTags = falconPerfCounterTags;
+ public Builder perfCounterTags(String perfCounterTags) {
+ this.perfCounterTags = perfCounterTags;
return this;
}
/**
- * The interval to report metrics to local falcon agent. Defaults to {@literal 10s}, see {@link
- * #DEFAULT_FALCON_PUSH_INTERVAL}.
+ * The interval to report metrics to local falcon agent(if set falcon as monitor system).
+ * Defaults to {@literal 10s}, see {@link #DEFAULT_FALCON_PUSH_INTERVAL}.
*
* @param falconPushInterval falconPushInterval
* @return {@code this}
@@ -279,7 +299,8 @@ public ClientOptions.Builder mutate() {
.operationTimeout(getOperationTimeout())
.asyncWorkers(getAsyncWorkers())
.enablePerfCounter(isEnablePerfCounter())
- .falconPerfCounterTags(getFalconPerfCounterTags())
+ .perfCounterType(getPerfCounterType())
+ .perfCounterTags(getPerfCounterTags())
.falconPushInterval(getFalconPushInterval())
.enableWriteLimit(isWriteLimitEnabled())
.metaQueryTimeout(getMetaQueryTimeout());
@@ -316,8 +337,8 @@ public int getAsyncWorkers() {
/**
* Whether to enable performance statistics. If true, the client will periodically report metrics
- * to local falcon agent (currently we only support falcon as monitoring system). Defaults to
- * {@literal true}.
+ * to local falcon agent (if set falcon as monitoring system) or open prometheus collector http
+ * server. Defaults to {@literal true}.
*
* @return whether to enable performance statistics.
*/
@@ -326,16 +347,26 @@ public boolean isEnablePerfCounter() {
}
/**
- * Additional tags for falcon metrics. Defaults to empty string.
+ * get perf-counter type, now only support falcon and prometheus
+ *
+ * @return perf-counter type
+ */
+ public String getPerfCounterType() {
+ return perfCounterType;
+ }
+
+ /**
+ * Additional tags for metrics. Defaults to empty string.
*
* @return additional tags for falcon metrics.
*/
- public String getFalconPerfCounterTags() {
- return falconPerfCounterTags;
+ public String getPerfCounterTags() {
+ return perfCounterTags;
}
/**
- * The interval to report metrics to local falcon agent. Defaults to {@literal 10s}.
+ * The interval to report metrics to local falcon agent(if set falcon as monitor system). Defaults
+ * to {@literal 10s}.
*
* @return the interval to report metrics to local falcon agent.
*/
diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/FalconCollector.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/FalconCollector.java
new file mode 100644
index 00000000..71ee7bb1
--- /dev/null
+++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/FalconCollector.java
@@ -0,0 +1,101 @@
+package com.xiaomi.infra.pegasus.metrics;
+
+import static com.xiaomi.infra.pegasus.metrics.MetricsPool.getMetricName;
+import static com.xiaomi.infra.pegasus.metrics.MetricsPool.getTableTag;
+
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Snapshot;
+import com.xiaomi.infra.pegasus.tools.Tools;
+import java.util.Map;
+import java.util.SortedMap;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public class FalconCollector {
+ private FalconMetric falconMetric = new FalconMetric();
+ private final MetricRegistry registry;
+ public final String defaultTags;
+
+ public FalconCollector(String host, String tags, int reportStepSec, MetricRegistry registry) {
+ this.defaultTags = tags;
+ this.registry = registry;
+ falconMetric.endpoint = host;
+ falconMetric.step = reportStepSec;
+ }
+
+ public String metricsToJson() {
+ falconMetric.timestamp = Tools.unixEpochMills() / 1000;
+
+ StringBuilder builder = new StringBuilder();
+ builder.append('[');
+ SortedMap meters = registry.getMeters();
+ for (Map.Entry entry : meters.entrySet()) {
+ genJsonsFromMeter(entry.getKey(), entry.getValue(), builder);
+ builder.append(',');
+ }
+
+ for (Map.Entry entry : registry.getHistograms().entrySet()) {
+ genJsonsFromHistogram(entry.getKey(), entry.getValue(), builder);
+ builder.append(',');
+ }
+
+ if (builder.charAt(builder.length() - 1) == ',') {
+ builder.deleteCharAt(builder.length() - 1);
+ }
+
+ builder.append("]");
+
+ return builder.toString();
+ }
+
+ public void genJsonsFromMeter(String name, Meter meter, StringBuilder output)
+ throws JSONException {
+ falconMetric.counterType = "GAUGE";
+ falconMetric.metric = getMetricName(name, "");
+ falconMetric.tags = getTableTag(name, defaultTags);
+ falconMetric.value = meter.getMeanRate();
+ oneMetricToJson(falconMetric, output);
+ }
+
+ public void genJsonsFromHistogram(String name, Histogram hist, StringBuilder output)
+ throws JSONException {
+ falconMetric.counterType = "GAUGE";
+ Snapshot s = hist.getSnapshot();
+
+ falconMetric.metric = getMetricName(name, "_p99");
+ falconMetric.tags = getTableTag(name, defaultTags);
+ falconMetric.value = s.get99thPercentile();
+ oneMetricToJson(falconMetric, output);
+ output.append(',');
+
+ falconMetric.metric = getMetricName(name, "_p999");
+ falconMetric.tags = getTableTag(name, defaultTags);
+ falconMetric.value = s.get999thPercentile();
+ oneMetricToJson(falconMetric, output);
+ }
+
+ public static void oneMetricToJson(FalconMetric metric, StringBuilder output)
+ throws JSONException {
+ JSONObject obj = new JSONObject();
+ obj.put("endpoint", metric.endpoint);
+ obj.put("metric", metric.metric);
+ obj.put("timestamp", metric.timestamp);
+ obj.put("step", metric.step);
+ obj.put("value", metric.value);
+ obj.put("counterType", metric.counterType);
+ obj.put("tags", metric.tags);
+ output.append(obj.toString());
+ }
+
+ static final class FalconMetric {
+ public String endpoint; // metric host
+ public String metric; // metric name
+ public long timestamp; // report time in unix seconds
+ public int step; // report interval in seconds;
+ public double value; // metric value
+ public String counterType; // GAUGE or COUNTER
+ public String tags; // metrics description
+ }
+}
diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsReporter.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/FalconReporter.java
similarity index 68%
rename from src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsReporter.java
rename to src/main/java/com/xiaomi/infra/pegasus/metrics/FalconReporter.java
index bb3df12c..2be86666 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsReporter.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/FalconReporter.java
@@ -17,15 +17,15 @@
import org.slf4j.Logger;
/** Created by weijiesun on 18-3-9. */
-public class MetricsReporter {
- public MetricsReporter(int reportSecs, MetricsPool pool) {
+public class FalconReporter implements PegasusMonitor {
+ public FalconReporter(int reportSecs, FalconCollector falconCollectorMetric) {
falconAgentIP = "127.0.0.1";
falconAgentPort = 1988;
- falconAgentSocket = falconAgentIP + ":" + String.valueOf(falconAgentPort);
+ falconAgentSocket = falconAgentIP + ":" + falconAgentPort;
reportIntervalSecs = reportSecs;
falconRequestPath = "/v1/push";
- metrics = pool;
+ falconCollector = falconCollectorMetric;
boot = new Bootstrap();
httpClientGroup = new NioEventLoopGroup(1);
@@ -51,36 +51,32 @@ public void initChannel(SocketChannel ch) {
reportTarget = null;
}
+ @Override
public void start() {
reportStopped = false;
tryConnect();
}
+ @Override
public void stop() {
httpClientGroup.execute(
- new Runnable() {
- @Override
- public void run() {
- reportStopped = true;
- if (actionLater != null) {
- actionLater.cancel(false);
- }
- if (reportTarget != null) {
- reportTarget
- .close()
- .addListener(
- new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture channelFuture)
- throws Exception {
+ () -> {
+ reportStopped = true;
+ if (actionLater != null) {
+ actionLater.cancel(false);
+ }
+ if (reportTarget != null) {
+ reportTarget
+ .close()
+ .addListener(
+ (ChannelFutureListener)
+ channelFuture -> {
if (channelFuture.isSuccess()) {
logger.info("close channel to {} succeed", falconAgentSocket);
} else {
logger.warn("close channel to {} failed: ", channelFuture.cause());
}
- }
- });
- }
+ });
}
});
@@ -95,55 +91,39 @@ public void operationComplete(ChannelFuture channelFuture)
public void tryConnect() {
boot.connect(falconAgentIP, falconAgentPort)
.addListener(
- new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture channelFuture) throws Exception {
- if (channelFuture.isSuccess()) {
- reportTarget = channelFuture.channel();
- logger.info("create channel with {} succeed, wait it active", falconAgentSocket);
- } else {
- logger.error(
- "create channel with {} failed, connect later: ",
- falconAgentSocket,
- channelFuture.cause());
- scheduleNextConnect();
- }
- }
- });
+ (ChannelFutureListener)
+ channelFuture -> {
+ if (channelFuture.isSuccess()) {
+ reportTarget = channelFuture.channel();
+ logger.info(
+ "create channel with {} succeed, wait it active", falconAgentSocket);
+ } else {
+ logger.error(
+ "create channel with {} failed, connect later: ",
+ falconAgentSocket,
+ channelFuture.cause());
+ scheduleNextConnect();
+ }
+ });
}
public void scheduleNextConnect() {
if (reportStopped) return;
actionLater =
- httpClientGroup.schedule(
- new Runnable() {
- @Override
- public void run() {
- tryConnect();
- }
- },
- (long) reportIntervalSecs,
- TimeUnit.SECONDS);
+ httpClientGroup.schedule(this::tryConnect, (long) reportIntervalSecs, TimeUnit.SECONDS);
}
public void scheduleNextReport(final Channel channel) {
if (reportStopped) return;
actionLater =
httpClientGroup.schedule(
- new Runnable() {
- @Override
- public void run() {
- reportMetrics(channel);
- }
- },
- reportIntervalSecs,
- TimeUnit.SECONDS);
+ () -> reportMetrics(channel), reportIntervalSecs, TimeUnit.SECONDS);
}
public void reportMetrics(final Channel channel) {
String json_metrics;
try {
- json_metrics = metrics.metricsToJson();
+ json_metrics = falconCollector.metricsToJson();
} catch (JSONException ex) {
logger.warn("encode metrics to json failed, skip current report, retry later: ", ex);
scheduleNextReport(channel);
@@ -165,18 +145,16 @@ public void reportMetrics(final Channel channel) {
channel
.writeAndFlush(request)
.addListener(
- new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture channelFuture) throws Exception {
- if (!channelFuture.isSuccess()) {
- logger.warn(
- "report to {} failed, skip current report, retry later: ",
- channel.toString(),
- channelFuture.cause());
- channel.close();
- }
- }
- });
+ (ChannelFutureListener)
+ channelFuture -> {
+ if (!channelFuture.isSuccess()) {
+ logger.warn(
+ "report to {} failed, skip current report, retry later: ",
+ channel.toString(),
+ channelFuture.cause());
+ channel.close();
+ }
+ });
}
class HttpClientHandler extends SimpleChannelInboundHandler {
@@ -226,7 +204,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
private int reportIntervalSecs;
private String falconRequestPath;
- private MetricsPool metrics;
+ public FalconCollector falconCollector;
private Bootstrap boot;
private EventLoopGroup httpClientGroup;
@@ -235,5 +213,5 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
private boolean reportStopped;
private Channel reportTarget;
- private static final Logger logger = org.slf4j.LoggerFactory.getLogger(MetricsReporter.class);
+ private static final Logger logger = org.slf4j.LoggerFactory.getLogger(FalconReporter.class);
}
diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsManager.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsManager.java
index ce1a5277..89c0fc1b 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsManager.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsManager.java
@@ -8,6 +8,7 @@
/** Created by weijiesun on 18-3-8. */
public final class MetricsManager {
+
public static void updateCount(String counterName, long count) {
metrics.setMeter(counterName, count);
}
@@ -16,12 +17,14 @@ public static void setHistogramValue(String counterName, long value) {
metrics.setHistorgram(counterName, value);
}
- public static final void initFromHost(String host, String tag, int reportIntervalSec) {
+ public static void initFromHost(
+ String host, String tag, int reportIntervalSec, String perfCounterType) {
synchronized (logger) {
if (started) {
logger.warn(
- "perf counter system has started with host({}), tag({}), interval({}), "
+ "perf counter system({}) has started with host({}), tag({}), interval(if set falcon as system)({}), "
+ "skip this init with host({}), tag({}), interval(){}",
+ MetricsManager.perfCounterType,
MetricsManager.host,
MetricsManager.tag,
MetricsManager.reportIntervalSecs,
@@ -34,24 +37,25 @@ public static final void initFromHost(String host, String tag, int reportInterva
logger.info(
"init metrics with host({}), tag({}), interval({})", host, tag, reportIntervalSec);
+ MetricsManager.perfCounterType = perfCounterType;
MetricsManager.host = host;
MetricsManager.tag = tag;
MetricsManager.reportIntervalSecs = reportIntervalSec;
- metrics = new MetricsPool(host, tag, reportIntervalSec);
- reporter = new MetricsReporter(reportIntervalSec, metrics);
- reporter.start();
+ metrics = new MetricsPool(host, tag, reportIntervalSec, perfCounterType);
+ metrics.start();
started = true;
}
}
- public static final void detectHostAndInit(String tag, int reportIntervalSec) {
- initFromHost(Tools.getLocalHostAddress().getHostName(), tag, reportIntervalSec);
+ public static void detectHostAndInit(String tag, int reportIntervalSec, String perfCounterType) {
+ initFromHost(
+ Tools.getLocalHostAddress().getHostName(), tag, reportIntervalSec, perfCounterType);
}
- public static final void finish() {
+ public static void finish() {
synchronized (logger) {
if (started) {
- reporter.stop();
+ metrics.stop();
started = false;
}
}
@@ -61,8 +65,8 @@ public static final void finish() {
private static String host;
private static String tag;
private static int reportIntervalSecs;
-
+ private static String perfCounterType;
private static MetricsPool metrics;
- private static MetricsReporter reporter;
+
private static final Logger logger = org.slf4j.LoggerFactory.getLogger(MetricsManager.class);
}
diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java
index 8ecb5508..905fd58d 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/MetricsPool.java
@@ -3,27 +3,30 @@
// can be found in the LICENSE file in the root directory of this source tree.
package com.xiaomi.infra.pegasus.metrics;
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Snapshot;
-import com.xiaomi.infra.pegasus.tools.Tools;
-import java.util.Map;
-import java.util.SortedMap;
-import org.json.JSONException;
-import org.json.JSONObject;
/** Created by weijiesun on 18-3-9. */
public final class MetricsPool {
- private final String defaultTags;
+ private final MetricRegistry registry = new MetricRegistry();
+ public static PegasusMonitor pegasusMonitor;
+
+ public MetricsPool(String host, String tags, int reportStepSec, String metricType) {
+ if (metricType.equals("falcon")) {
+ pegasusMonitor =
+ new FalconReporter(
+ reportStepSec, new FalconCollector(host, tags, reportStepSec, registry));
+ } else if (metricType.equals("prometheus")) {
+ pegasusMonitor = new PrometheusCollector(tags, registry);
+ }
+ }
- public MetricsPool(String host, String tags, int reportStepSec) {
- theMetric = new FalconMetric();
- theMetric.endpoint = host;
- theMetric.step = reportStepSec;
- theMetric.tags = tags;
- defaultTags = tags;
+ public void start() {
+ pegasusMonitor.start();
+ }
+
+ public void stop() {
+ pegasusMonitor.stop();
}
public void setMeter(String counterName, long count) {
@@ -34,72 +37,7 @@ public void setHistorgram(String counterName, long value) {
registry.histogram(counterName).update(value);
}
- public void genJsonsFromMeter(String name, Meter meter, StringBuilder output)
- throws JSONException {
- theMetric.counterType = "GAUGE";
-
- theMetric.metric = name + ".cps-1sec";
- theMetric.tags = getTableTag(name);
- theMetric.value = meter.getMeanRate();
- oneMetricToJson(theMetric, output);
- }
-
- public void genJsonsFromHistogram(String name, Histogram hist, StringBuilder output)
- throws JSONException {
- theMetric.counterType = "GAUGE";
- Snapshot s = hist.getSnapshot();
-
- theMetric.metric = name + ".p99";
- theMetric.tags = getTableTag(name);
- theMetric.value = s.get99thPercentile();
- oneMetricToJson(theMetric, output);
- output.append(',');
-
- theMetric.metric = name + ".p999";
- theMetric.tags = getTableTag(name);
- theMetric.value = s.get999thPercentile();
- oneMetricToJson(theMetric, output);
- }
-
- public static void oneMetricToJson(FalconMetric metric, StringBuilder output)
- throws JSONException {
- JSONObject obj = new JSONObject();
- obj.put("endpoint", metric.endpoint);
- obj.put("metric", metric.metric);
- obj.put("timestamp", metric.timestamp);
- obj.put("step", metric.step);
- obj.put("value", metric.value);
- obj.put("counterType", metric.counterType);
- obj.put("tags", metric.tags);
- output.append(obj.toString());
- }
-
- public String metricsToJson() throws JSONException {
- theMetric.timestamp = Tools.unixEpochMills() / 1000;
-
- StringBuilder builder = new StringBuilder();
- builder.append('[');
- SortedMap meters = registry.getMeters();
- for (Map.Entry entry : meters.entrySet()) {
- genJsonsFromMeter(entry.getKey(), entry.getValue(), builder);
- builder.append(',');
- }
-
- for (Map.Entry entry : registry.getHistograms().entrySet()) {
- genJsonsFromHistogram(entry.getKey(), entry.getValue(), builder);
- builder.append(',');
- }
-
- if (builder.charAt(builder.length() - 1) == ',') {
- builder.deleteCharAt(builder.length() - 1);
- }
-
- builder.append("]");
-
- return builder.toString();
- }
-
- private String getTableTag(String counterName) {
+ public static String getTableTag(String counterName, String defaultTags) {
if (defaultTags.contains("table=")) {
return defaultTags;
}
@@ -112,16 +50,11 @@ private String getTableTag(String counterName) {
return defaultTags;
}
- static final class FalconMetric {
- public String endpoint; // metric host
- public String metric; // metric name
- public long timestamp; // report time in unix seconds
- public int step; // report interval in seconds;
- public double value; // metric value
- public String counterType; // GAUGE or COUNTER
- public String tags; // metrics description
+ public static String getMetricName(String name, String suffix) {
+ String[] result = name.split("@");
+ if (result.length >= 2) {
+ return result[0] + suffix;
+ }
+ return name + suffix;
}
-
- private FalconMetric theMetric;
- private final MetricRegistry registry = new MetricRegistry();
}
diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/PegasusMonitor.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/PegasusMonitor.java
new file mode 100644
index 00000000..3c8a66a1
--- /dev/null
+++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/PegasusMonitor.java
@@ -0,0 +1,8 @@
+package com.xiaomi.infra.pegasus.metrics;
+
+public interface PegasusMonitor {
+
+ public void start();
+
+ public void stop();
+}
diff --git a/src/main/java/com/xiaomi/infra/pegasus/metrics/PrometheusCollector.java b/src/main/java/com/xiaomi/infra/pegasus/metrics/PrometheusCollector.java
new file mode 100644
index 00000000..3a9d5b1d
--- /dev/null
+++ b/src/main/java/com/xiaomi/infra/pegasus/metrics/PrometheusCollector.java
@@ -0,0 +1,117 @@
+package com.xiaomi.infra.pegasus.metrics;
+
+import static com.xiaomi.infra.pegasus.metrics.MetricsPool.getMetricName;
+import static com.xiaomi.infra.pegasus.metrics.MetricsPool.getTableTag;
+
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Snapshot;
+import com.google.common.collect.ImmutableList;
+import io.prometheus.client.Collector;
+import io.prometheus.client.GaugeMetricFamily;
+import io.prometheus.client.exporter.HTTPServer;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.slf4j.Logger;
+
+public class PrometheusCollector extends Collector implements PegasusMonitor {
+ private static final Logger logger = org.slf4j.LoggerFactory.getLogger(PrometheusCollector.class);
+
+ private static final int PORT = 9091;
+
+ private final String defaultTags;
+ private final MetricRegistry registry;
+ private Map> tableLabels = new HashMap<>();
+
+ public PrometheusCollector(String defaultTags, MetricRegistry registry) {
+ this.defaultTags = defaultTags;
+ this.registry = registry;
+ }
+
+ public List collect() {
+ final ImmutableList.Builder metricFamilySamplesBuilder =
+ ImmutableList.builder();
+ updateMetric(metricFamilySamplesBuilder);
+ return metricFamilySamplesBuilder.build();
+ }
+
+ public void updateMetric(final ImmutableList.Builder builder) {
+ for (Map.Entry entry : registry.getMeters().entrySet()) {
+ updateQPSMetric(entry.getKey(), entry, builder);
+ }
+
+ for (Map.Entry entry : registry.getHistograms().entrySet()) {
+ updateLatencyMetric(entry.getKey(), entry, builder);
+ }
+ }
+
+ private void updateQPSMetric(
+ String name,
+ Map.Entry meter,
+ final ImmutableList.Builder builder) {
+ Map labels = getLabel(getTableTag(name, defaultTags));
+ GaugeMetricFamily labeledGauge =
+ new GaugeMetricFamily(
+ getMetricName(name, ""), "pegasus operation qps", new ArrayList<>(labels.keySet()));
+ labeledGauge.addMetric(new ArrayList<>(labels.values()), meter.getValue().getMeanRate());
+ builder.add(labeledGauge);
+ }
+
+ private void updateLatencyMetric(
+ String name,
+ Map.Entry meter,
+ final ImmutableList.Builder builder) {
+ Map labels = getLabel(getTableTag(name, defaultTags));
+ Snapshot snapshot = meter.getValue().getSnapshot();
+ updateLatencyMetric(getMetricName(name, "_p99"), snapshot.get99thPercentile(), labels, builder);
+ updateLatencyMetric(
+ getMetricName(name, "_p999"), snapshot.get999thPercentile(), labels, builder);
+ }
+
+ private void updateLatencyMetric(
+ String key,
+ double value,
+ Map labels,
+ final ImmutableList.Builder builder) {
+ GaugeMetricFamily labeledGauge =
+ new GaugeMetricFamily(key, "pegasus operation latency", new ArrayList<>(labels.keySet()));
+ labeledGauge.addMetric(new ArrayList<>(labels.values()), value);
+ builder.add(labeledGauge);
+ }
+
+ private Map getLabel(String labels) {
+ if (tableLabels.containsKey(labels)) {
+ return tableLabels.get(labels);
+ }
+
+ HashMap labelMap = new HashMap<>();
+ String[] labelsString = labels.split(",");
+ for (String label : labelsString) {
+ String[] labelPair = label.split("=");
+ assert (labelPair.length == 2);
+ labelMap.put(labelPair[0], labelPair[1]);
+ }
+ tableLabels.put(labels, labelMap);
+ return labelMap;
+ }
+
+ @Override
+ public void start() {
+ try {
+ register();
+ new HTTPServer(PORT);
+ logger.debug("start the prometheus collector server, port = {}", PORT);
+ } catch (IOException e) {
+ logger.debug("start the prometheus collector server failed, error = {}", e.getMessage());
+ }
+ }
+
+ @Override
+ public void stop() {
+ logger.debug("stop the prometheus collector server, port = {}", PORT);
+ }
+}
diff --git a/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java b/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java
index 9be285ea..61959d2d 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java
@@ -56,23 +56,23 @@ public String getQPSCounter() {
break;
}
- // pegasus.client.put.succ.qps
+ // pegasus_client_put_succ_qps
return new StringBuilder()
- .append("pegasus.client.")
+ .append("pegasus_client_")
.append(name())
- .append(".")
+ .append("_")
.append(mark)
- .append(".qps@")
+ .append("_qps@")
.append(tableName)
.toString();
}
public String getLatencyCounter() {
- // pegasus.client.put.latency
+ // pegasus_client_put_latency
return new StringBuilder()
- .append("pegasus.client.")
+ .append("pegasus_client_")
.append(name())
- .append(".latency@")
+ .append("_latency@")
.append(tableName)
.toString();
}
diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/ClusterOptions.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/ClusterOptions.java
index c3a6fb46..64cd67dd 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/rpc/ClusterOptions.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/ClusterOptions.java
@@ -27,6 +27,9 @@ public class ClusterOptions {
public static final String PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY = "push_counter_interval_secs";
public static final String PEGASUS_PUSH_COUNTER_INTERVAL_SECS_DEF = "60";
+ public static final String PEGASUS_PUSH_COUNTER_TYPE_KEY = "push_counter_type";
+ public static final String PEGASUS_PUSH_COUNTER_TYPE_DEF = "falcon";
+
public static final String PEGASUS_META_QUERY_TIMEOUT_KEY = "meta_query_timeout";
public static final String PEGASUS_META_QUERY_TIMEOUT_DEF = "5000";
@@ -47,6 +50,7 @@ public static String[] allKeys() {
private final int asyncWorkers;
private final boolean enablePerfCounter;
private final String perfCounterTags;
+ private final String perfCounterType;
private final int pushCounterIntervalSecs;
private final int metaQueryTimeout;
@@ -78,6 +82,10 @@ public int metaQueryTimeout() {
return this.metaQueryTimeout;
}
+ public String pushCounterType() {
+ return this.perfCounterType;
+ }
+
public static ClusterOptions create(Properties config) {
int operationTimeout =
Integer.parseInt(
@@ -105,6 +113,9 @@ public static ClusterOptions create(Properties config) {
Integer.parseInt(
config.getProperty(
PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY, PEGASUS_PUSH_COUNTER_INTERVAL_SECS_DEF));
+ String perfCounterType =
+ config.getProperty(PEGASUS_PUSH_COUNTER_TYPE_KEY, PEGASUS_PUSH_COUNTER_TYPE_DEF);
+ assert ((perfCounterType.equals("falcon") || perfCounterType.equals("prometheus")));
int metaQueryTimeout =
Integer.parseInt(
config.getProperty(PEGASUS_META_QUERY_TIMEOUT_KEY, PEGASUS_META_QUERY_TIMEOUT_DEF));
@@ -115,12 +126,13 @@ public static ClusterOptions create(Properties config) {
asyncWorkers,
enablePerfCounter,
perfCounterTags,
+ perfCounterType,
pushIntervalSecs,
metaQueryTimeout);
}
public static ClusterOptions forTest(String[] metaList) {
- return new ClusterOptions(1000, metaList, 1, false, null, 60, 1000);
+ return new ClusterOptions(1000, metaList, 1, false, null, "falcon", 60, 1000);
}
private ClusterOptions(
@@ -129,6 +141,7 @@ private ClusterOptions(
int asyncWorkers,
boolean enablePerfCounter,
String perfCounterTags,
+ String perfCounterType,
int pushCounterIntervalSecs,
int metaQueryTimeout) {
this.operationTimeout = operationTimeout;
@@ -136,6 +149,7 @@ private ClusterOptions(
this.asyncWorkers = asyncWorkers;
this.enablePerfCounter = enablePerfCounter;
this.perfCounterTags = perfCounterTags;
+ this.perfCounterType = perfCounterType;
this.pushCounterIntervalSecs = pushCounterIntervalSecs;
this.metaQueryTimeout = metaQueryTimeout;
}
diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java
index 3c1c6963..1e25d85d 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java
@@ -47,7 +47,8 @@ public ClusterManager(ClusterOptions opts) throws IllegalArgumentException {
setTimeout(opts.operationTimeout());
this.enableCounter = opts.enablePerfCounter();
if (enableCounter) {
- MetricsManager.detectHostAndInit(opts.perfCounterTags(), opts.pushCounterIntervalSecs());
+ MetricsManager.detectHostAndInit(
+ opts.perfCounterTags(), opts.pushCounterIntervalSecs(), opts.pushCounterType());
}
replicaSessions = new ConcurrentHashMap();
diff --git a/src/test/java/com/xiaomi/infra/pegasus/metrics/MetricsPoolTest.java b/src/test/java/com/xiaomi/infra/pegasus/metrics/MetricsPoolTest.java
index 88c0676f..c687ead2 100644
--- a/src/test/java/com/xiaomi/infra/pegasus/metrics/MetricsPoolTest.java
+++ b/src/test/java/com/xiaomi/infra/pegasus/metrics/MetricsPoolTest.java
@@ -6,11 +6,15 @@
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
+import io.prometheus.client.Collector.MetricFamilySamples;
+import java.util.Arrays;
+import java.util.List;
import junit.framework.Assert;
import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.Before;
import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
/** Created by weijiesun on 18-3-9. */
public class MetricsPoolTest {
@@ -23,27 +27,25 @@ public void before() {
public void genJsonsFromMeter() throws Exception {
String host = "simple-test-host.bj";
String tags = "what=you,like=another";
- MetricsPool pool = new MetricsPool(host, tags, 20);
+ FalconCollector falconCollector = new FalconCollector(host, tags, 20, r);
Meter m = r.meter("TestName");
m.mark(1);
m.mark(1);
StringBuilder builder = new StringBuilder();
- pool.genJsonsFromMeter("TestName", m, builder);
+ falconCollector.genJsonsFromMeter("TestName", m, builder);
JSONArray array = new JSONArray("[" + builder.toString() + "]");
Assert.assertEquals(1, array.length());
- String[] metrics = {
- "TestName.cps-1sec", "TestName.cps-1min", "TestName.cps-5min", "TestName.cps-15min"
- };
+ String metric = "TestName";
for (int i = 0; i < array.length(); ++i) {
JSONObject j = array.getJSONObject(i);
Assert.assertEquals(tags, j.getString("tags"));
- Assert.assertEquals(metrics[i], j.getString("metric"));
+ Assert.assertEquals(metric, j.getString("metric"));
Assert.assertEquals("GAUGE", j.getString("counterType"));
Assert.assertEquals(20, j.getInt("step"));
Assert.assertEquals(host, j.getString("endpoint"));
@@ -54,17 +56,17 @@ public void genJsonsFromMeter() throws Exception {
public void genJsonFromHistogram() throws Exception {
String host = "simple-test-host.bj";
String tags = "what=you,like=another";
- MetricsPool pool = new MetricsPool(host, tags, 20);
+ FalconCollector falconCollector = new FalconCollector(host, tags, 20, r);
Histogram h = r.histogram("TestHist");
for (int i = 0; i < 1000000; ++i) h.update((long) i);
StringBuilder builder = new StringBuilder();
- pool.genJsonsFromHistogram("TestHist", h, builder);
+ falconCollector.genJsonsFromHistogram("TestHist", h, builder);
JSONArray array = new JSONArray("[" + builder.toString() + "]");
Assert.assertEquals(2, array.length());
- String[] metrics = {"TestHist.p99", "TestHist.p999"};
+ String[] metrics = {"TestHist_p99", "TestHist_p999"};
for (int i = 0; i < array.length(); ++i) {
JSONObject j = array.getJSONObject(i);
@@ -79,7 +81,7 @@ public void genJsonFromHistogram() throws Exception {
@Test
public void oneMetricToJson() throws Exception {
- MetricsPool.FalconMetric metric = new MetricsPool.FalconMetric();
+ FalconCollector.FalconMetric metric = new FalconCollector.FalconMetric();
metric.endpoint = "1.2.3.4";
metric.metric = "simple_metric";
metric.timestamp = 12343455L;
@@ -89,7 +91,7 @@ public void oneMetricToJson() throws Exception {
metric.tags = "cluster=onebox,app=new";
StringBuilder builder = new StringBuilder();
- MetricsPool.oneMetricToJson(metric, builder);
+ FalconCollector.oneMetricToJson(metric, builder);
JSONObject obj = new JSONObject(builder.toString());
Assert.assertEquals(metric.endpoint, obj.getString("endpoint"));
@@ -102,7 +104,7 @@ public void oneMetricToJson() throws Exception {
builder.setLength(0);
metric.tags = "";
- MetricsPool.oneMetricToJson(metric, builder);
+ FalconCollector.oneMetricToJson(metric, builder);
obj = new JSONObject(builder.toString());
Assert.assertEquals(metric.endpoint, obj.getString("endpoint"));
Assert.assertEquals(metric.metric, obj.getString("metric"));
@@ -117,22 +119,22 @@ public void oneMetricToJson() throws Exception {
public void metricsToJson() throws Exception {
String host = "simple-test-host.bj";
String tags = "what=you,like=another";
- MetricsPool pool = new MetricsPool(host, tags, 20);
+ MetricsPool pool = new MetricsPool(host, tags, 20, "falcon");
pool.setMeter("aaa@temp", 1);
- pool.setMeter("aaa", 2);
for (int i = 0; i < 10000; ++i) {
pool.setHistorgram("ccc", i);
- pool.setHistorgram("ccc@temp", i);
}
- JSONArray array = new JSONArray(pool.metricsToJson());
- Assert.assertEquals(6, array.length());
+ JSONArray array =
+ new JSONArray(
+ ((FalconReporter) MetricsPool.pegasusMonitor).falconCollector.metricsToJson());
+ Assert.assertEquals(3, array.length());
for (int i = 0; i < array.length(); ++i) {
JSONObject j = array.getJSONObject(i);
- if (j.getString("metric").contains("@")) {
+ if (j.getString("metric").contains("aaa")) {
Assert.assertEquals(tags + ",table=temp", j.getString("tags"));
} else {
Assert.assertEquals(tags, j.getString("tags"));
@@ -143,5 +145,32 @@ public void metricsToJson() throws Exception {
}
}
+ @Test
+ public void testPrometheus() {
+
+ String tags = "what=you,like=another";
+ PrometheusCollector prometheusCollector = new PrometheusCollector(tags, r);
+
+ Meter m = r.meter("TestQPSName@temp");
+ for (int i = 0; i < 100; ++i) m.mark(1);
+
+ Histogram h = r.histogram("testLatency@temp");
+ for (int i = 0; i < 10000; ++i) h.update((long) i);
+
+ List metricFamilySamples = prometheusCollector.collect();
+
+ Assertions.assertEquals(3, metricFamilySamples.size());
+
+ MetricFamilySamples QPSMetric = metricFamilySamples.get(0);
+ Assertions.assertEquals("TestQPSName", QPSMetric.name);
+ Assertions.assertArrayEquals(
+ QPSMetric.samples.get(0).labelNames.toArray(),
+ Arrays.asList("what", "like", "table").toArray());
+ Assertions.assertArrayEquals(
+ QPSMetric.samples.get(0).labelValues.toArray(),
+ Arrays.asList("you", "another", "temp").toArray());
+ Assertions.assertTrue(QPSMetric.samples.get(0).value != 0);
+ }
+
MetricRegistry r;
}