From b91d37df066524515306b33bb3413f3a705d45b0 Mon Sep 17 00:00:00 2001 From: Alejandro Revilla Date: Mon, 11 Sep 2023 19:12:36 -0300 Subject: [PATCH] add metrics and profiling --- .../java/org/jpos/metrics/MeterFactory.java | 2 - .../jpos/transaction/TransactionManager.java | 188 ++++++++++++++---- 2 files changed, 148 insertions(+), 42 deletions(-) diff --git a/jpos/src/main/java/org/jpos/metrics/MeterFactory.java b/jpos/src/main/java/org/jpos/metrics/MeterFactory.java index 0837b65b0b..af87abd25e 100644 --- a/jpos/src/main/java/org/jpos/metrics/MeterFactory.java +++ b/jpos/src/main/java/org/jpos/metrics/MeterFactory.java @@ -23,9 +23,7 @@ import io.micrometer.core.instrument.Timer; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Gauge; -import io.micrometer.core.instrument.distribution.Histogram; import io.micrometer.core.instrument.search.Search; -import org.jpos.core.metrics.MeterInfo; import java.time.Duration; import java.util.concurrent.Callable; diff --git a/jpos/src/main/java/org/jpos/transaction/TransactionManager.java b/jpos/src/main/java/org/jpos/transaction/TransactionManager.java index 6ba0ac6be7..fe2ded4f09 100644 --- a/jpos/src/main/java/org/jpos/transaction/TransactionManager.java +++ b/jpos/src/main/java/org/jpos/transaction/TransactionManager.java @@ -18,12 +18,20 @@ package org.jpos.transaction; +import io.micrometer.core.instrument.*; +import io.micrometer.core.instrument.Tags; +import io.micrometer.core.instrument.binder.BaseUnits; +import io.micrometer.core.instrument.config.MeterFilter; +import io.micrometer.core.instrument.distribution.DistributionStatisticConfig; import org.HdrHistogram.AtomicHistogram; import org.jdom2.Element; import org.jpos.core.Configuration; import org.jpos.core.ConfigurationException; +import org.jpos.metrics.MeterInfo; import org.jpos.function.TriConsumer; import org.jpos.function.TriFunction; +import org.jpos.jfr.TMEvent; +import org.jpos.metrics.MeterFactory; import org.jpos.q2.QBeanSupport; import org.jpos.q2.QFactory; import org.jpos.space.*; @@ -31,8 +39,10 @@ import java.io.PrintStream; import java.io.Serializable; +import java.time.Duration; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.time.Instant; @@ -40,6 +50,7 @@ import java.time.ZoneId; import org.jpos.iso.ISOUtil; +import org.jpos.util.Metrics; import static org.jpos.transaction.ContextConstants.TIMESTAMP; @@ -66,7 +77,7 @@ public class TransactionManager private static final ThreadLocal tlContext = new ThreadLocal<>(); private static final ThreadLocal tlId = new ThreadLocal<>(); private Metrics metrics; - private static Map params = new HashMap<>(); + private Map params = new HashMap<>(); private long globalMaxTime; private Space sp; @@ -98,6 +109,11 @@ public class TransactionManager private Runnable retryTask = null; private TPS tps; private ExecutorService executor; + private final List meters = new ArrayList<>(); + private static AtomicBoolean filtersAdded = new AtomicBoolean(); + + private Gauge activeSessionsGauge; + private Counter transactionsCounter; @Override public void initService () throws ConfigurationException { @@ -116,10 +132,29 @@ public void initService () throws ConfigurationException { initStatusListeners (getPersist()); executor = Executors.newThreadPerTaskExecutor( Thread.ofVirtual() - .allowSetThreadLocals(true) .inheritInheritableThreadLocals(false) .name(getName()) .factory()); + + if (!filtersAdded.getAndSet(true)) { + getServer().getMeterRegistry().config().meterFilter(new MeterFilter() { + @Override + public DistributionStatisticConfig configure(Meter.Id id, DistributionStatisticConfig config) { + if (id.getName().equals(MeterInfo.TM_OPERATION.id())) { + return DistributionStatisticConfig.builder().serviceLevelObjectives( + Duration.ofMillis(10).toNanos(), + Duration.ofMillis(100).toNanos(), + Duration.ofMillis(500).toNanos(), + Duration.ofMillis(1000).toNanos(), + Duration.ofMillis(5000).toNanos(), + Duration.ofMillis(15000).toNanos()) + .build() + .merge(config); + } + return config; + } + }); + } } @Override @@ -133,7 +168,7 @@ public void startService () throws Exception { checkRetryTask(); if (iisp != isp) { - Thread.ofVirtual().unstarted( + Thread.ofPlatform().unstarted( new InputQueueMonitor() ).start(); } @@ -147,7 +182,7 @@ public void stopService () { for (Object o=iisp.inp(queue); o != null; o=iisp.inp(queue)) isp.out(queue, o); // push back to replicated space - + meters.forEach(getServer().getMeterRegistry()::remove); tps.stop(); for (Destroyable destroyable : destroyables) { try { @@ -206,8 +241,8 @@ public void run () { private void runTransaction (Serializable context, int session) { long id = 0; - List members = null; - Iterator iter = null; + List members; + Iterator iter; boolean abort; LogEvent evt; Profiler prof; @@ -217,6 +252,9 @@ private void runTransaction (Serializable context, int session) { evt = null; thread.setName (getName() + "-" + session + ":idle"); int action = -1; + id = nextId (); + TMEvent tme = new TMEvent(getName(), id); + tme.begin(); try { setThreadLocal(id, context); if (hasStatusListeners) @@ -225,7 +263,6 @@ private void runTransaction (Serializable context, int session) { Chronometer chronometer = new Chronometer(getStart(context)); abort = false; - id = nextId (); members = new ArrayList<> (); iter = getParticipants (DEFAULT_GROUP).iterator(); if (debug) { @@ -307,6 +344,7 @@ private void runTransaction (Serializable context, int session) { getLog().error(t); } } + tme.commit(); } } @@ -325,9 +363,7 @@ public long getInTransit () { } @Override - public void setConfiguration (Configuration cfg) - throws ConfigurationException - { + public void setConfiguration (Configuration cfg) throws ConfigurationException { super.setConfiguration (cfg); debug = cfg.getBoolean ("debug", true); debugContext = cfg.getBoolean ("debug-context", debug); @@ -356,6 +392,15 @@ public void setConfiguration (Configuration cfg) if (profiler) metrics = new Metrics(new AtomicHistogram(cfg.getLong("metrics-highest-trackable-value", 60000), 2)); abortOnMisconfiguredGroups = cfg.getBoolean("abort-on-misconfigured-groups"); + + // Configure meters + try { + activeSessionsGauge = MeterFactory.gauge + (getServer().getMeterRegistry(), MeterInfo.TM_ACTIVE_SESSIONS, Tags.of("name", getName()), BaseUnits.THREADS, activeSessions::get + ); + } catch (Exception e) { + throw new ConfigurationException (e); + } } public void addListener (TransactionStatusListener l) { synchronized (statusListeners) { @@ -420,6 +465,8 @@ public void dump (PrintStream ps, String indent) { (int session, long id, Serializable context, List members, boolean recover, LogEvent evt, Profiler prof) { for (TransactionParticipant p :members) { + var jfr = new TMEvent.Commit("%s:%s".formatted(getName(), p.getClass().getName()), id); + jfr.begin(); ParticipantParams pp = getParams(p); if (recover && p instanceof ContextRecovery cr) { context = recover (cr, id, context, pp, true); @@ -436,6 +483,7 @@ session, TransactionStatusEvent.State.COMMITING, id, getName(p), context if (prof != null) prof.checkPoint (" commit: " + getName(p)); } + jfr.commit(); } } protected void abort @@ -473,6 +521,7 @@ session, TransactionStatusEvent.State.ABORTING, id, getName(p), context } catch (Throwable t) { getLog().warn ("PREPARE-FOR-ABORT: " + Long.toString (id), t); } finally { + getParams(p).timers.prepareForAbortTimer.record (c.elapsed(), TimeUnit.MILLISECONDS); if (metrics != null) metrics.record(getName(p) + "-prepare-for-abort", c.elapsed()); } @@ -488,8 +537,10 @@ session, TransactionStatusEvent.State.ABORTING, id, getName(p), context } catch (Throwable t) { getLog().warn ("PREPARE: " + Long.toString (id), t); } finally { - if (metrics != null) + getParams(p).timers.prepareTimer.record (c.elapsed(), TimeUnit.MILLISECONDS); + if (metrics != null) { metrics.record(getName(p) + "-prepare", c.elapsed()); + } } return ABORTED; } @@ -502,9 +553,11 @@ session, TransactionStatusEvent.State.ABORTING, id, getName(p), context p.commit(id, context); } catch (Throwable t) { getLog().warn ("COMMIT: " + Long.toString (id), t); + } finally { + getParams(p).timers.commitTimer.record (c.elapsed(), TimeUnit.MILLISECONDS); + if (metrics != null) + metrics.record(getName(p) + "-commit", c.elapsed()); } - if (metrics != null) - metrics.record(getName(p) + "-commit", c.elapsed()); } protected void abort (TransactionParticipant p, long id, Serializable context) @@ -515,9 +568,11 @@ session, TransactionStatusEvent.State.ABORTING, id, getName(p), context p.abort(id, context); } catch (Throwable t) { getLog().warn ("ABORT: " + id, t); + } finally { + getParams(p).timers.abortTimer.record (c.elapsed(), TimeUnit.MILLISECONDS); + if (metrics != null) + metrics.record(getName(p) + "-abort", c.elapsed()); } - if (metrics != null) - metrics.record(getName(p) + "-abort", c.elapsed()); } protected int prepare (int session, long id, Serializable context, List members, Iterator iter, boolean abort, LogEvent evt, Profiler prof, Chronometer chronometer) @@ -532,6 +587,7 @@ session, TransactionStatusEvent.State.ABORTING, id, getName(p), context return ABORTED; } TransactionParticipant p = iter.next(); + ParticipantParams pp = getParams(p); if (!abort && pp.maxTime > 0 && chronometer.elapsed() > pp.maxTime) { abort = true; @@ -539,7 +595,10 @@ session, TransactionStatusEvent.State.ABORTING, id, getName(p), context evt.addMessage(" forcedAbort: " + getName(p) + " elapsed=" + chronometer.elapsed()); } + TMEvent jfr; if (abort) { + jfr = new TMEvent.PrepareForAbort("%s:%s".formatted(getName(), p.getClass().getName()), id); + jfr.begin(); if (hasStatusListeners) notifyStatusListeners ( session, TransactionStatusEvent.State.PREPARING_FOR_ABORT, id, getName(p), context @@ -558,6 +617,8 @@ session, TransactionStatusEvent.State.PREPARING_FOR_ABORT, id, getName(p), conte session, TransactionStatusEvent.State.PREPARING, id, getName(p), context ); + jfr = new TMEvent.Prepare("%s:%s".formatted(getName(), p.getClass().getName()), id); + jfr.begin(); chronometer.lap(); action = prepareOrAbort (p, id, context, pp, this::prepare); @@ -586,6 +647,7 @@ session, TransactionStatusEvent.State.PREPARING, id, getName(p), context if ((action & READONLY) == 0) { Chronometer c = new Chronometer(); snapshot (id, context); + getParams(p).timers.snapshotTimer.record (c.elapsed(), TimeUnit.MILLISECONDS); if (metrics != null) metrics.record(getName(p) + "-snapshot", c.elapsed()); } @@ -628,6 +690,7 @@ session, TransactionStatusEvent.State.PREPARING, id, getName(p), context iter = participants.iterator(); } } + jfr.commit(); } return abort ? retry ? RETRY : ABORTED : PREPARED; } @@ -694,7 +757,7 @@ protected List initGroup (Element e) } return group; } - public TransactionParticipant createParticipant (Element e) + public TransactionParticipant createParticipant (Element e) throws ConfigurationException { QFactory factory = getFactory(); @@ -705,20 +768,22 @@ public TransactionParticipant createParticipant (Element e) QFactory.invoke (participant, "setTransactionManager", this, TransactionManager.class); factory.setConfiguration (participant, e); String realm = QFactory.getAttributeValue(e, "realm"); - if (realm != null && realm.trim().length() > 0) - realm = ":" + realm; - else - realm = ""; - - params.put(participant, new ParticipantParams( - Caller.shortClassName(participant.getClass().getName())+realm, - getLong (e, "timeout", 0L), - getLong (e, "max-time", globalMaxTime), - getSet(e.getChild("requires")), - getSet(e.getChild("provides")), - getSet(e.getChild("optional")) - ) - ); + + try { + String participantShortName = Caller.shortClassName(participant.getClass().getName()); + params.put(participant, new ParticipantParams( + participantShortName + (realm != null && !realm.isEmpty() ? ":" + realm : ""), + getLong (e, "timeout", 0L), + getLong (e, "max-time", globalMaxTime), + getSet(e.getChild("requires")), + getSet(e.getChild("provides")), + getSet(e.getChild("optional")), + getOrCreateTimers(participant) + ) + ); + } catch (Exception ex) { + throw new ConfigurationException (ex); + } if (participant instanceof Destroyable) { destroyables.add((Destroyable) participant); } @@ -801,6 +866,9 @@ protected void snapshot (long id, Serializable context) { snapshot (id, context, null); } protected void snapshot (long id, Serializable context, Integer status) { + var jfr = new TMEvent.Snapshot(getName()+":"+status, id); + jfr.begin(); + String contextKey = getKey (CONTEXT, id); synchronized (psp) { commitOff (psp); @@ -814,6 +882,7 @@ protected void snapshot (long id, Serializable context, Integer status) { } commitOn (psp); } + jfr.commit(); } protected void setState (long id, Integer state) { String stateKey = getKey (STATE, id); @@ -888,7 +957,7 @@ protected void recover (int session, long id) { protected synchronized void checkRetryTask () { if (retryTask == null) { retryTask = new RetryTask(); - Thread.ofVirtual().start(retryTask).setDaemon(true); + Thread.ofVirtual().start(retryTask); } } @@ -1037,7 +1106,8 @@ private String getName(TransactionParticipant p) { private ParticipantParams getParams (TransactionParticipant p) { return Optional.ofNullable(params.get(p)).orElse( - new ParticipantParams(p.getClass().getName(), 0L, 0L, Collections.emptySet(), Collections.emptySet(), Collections.emptySet()) + new ParticipantParams(p.getClass().getName(), 0L, 0L, Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), + getOrCreateTimers(p)) ); } @@ -1098,11 +1168,21 @@ private record ParticipantParams ( long maxTime, Set requires, Set provides, - Set optional) { + Set optional, + Timers timers + ) + { public boolean isConstrained() { return !requires.isEmpty() || !optional.isEmpty(); } } + private record Timers ( + io.micrometer.core.instrument.Timer prepareTimer, + io.micrometer.core.instrument.Timer prepareForAbortTimer, + io.micrometer.core.instrument.Timer commitTimer, + io.micrometer.core.instrument.Timer abortTimer, + io.micrometer.core.instrument.Timer snapshotTimer) + { } private Set getSet (Element e) { return e != null ? new HashSet<>(Arrays.asList(ISOUtil.commaDecode(e.getTextTrim()))) : Collections.emptySet(); @@ -1110,6 +1190,7 @@ private Set getSet (Element e) { private int prepareOrAbort (TransactionParticipant p, long id, Serializable context, ParticipantParams pp, TriFunction preparationFunction) { int action; + if (context instanceof Context ctx && pp.isConstrained()) { if (!ctx.hasKeys(pp.requires.toArray())) { ctx.log ("missing.requires: '%s'".formatted(ctx.keysNotPresent(pp.requires.toArray()))); @@ -1123,7 +1204,10 @@ private int prepareOrAbort (TransactionParticipant p, long id, Serializable cont action = preparationFunction.apply(p, id, context); } if ((action & PAUSE) == PAUSE) { + var jfrp = new TMEvent.Pause(getName(), id); + jfrp.begin(); action = pauseAndWait(context, action); + jfrp.commit(); } return action; } @@ -1139,13 +1223,37 @@ private void commitOrAbort (TransactionParticipant p, long id, Serializable cont } private Serializable recover (ContextRecovery p, long id, Serializable context, ParticipantParams pp, boolean commit) { - if (context instanceof Context ctx && pp.isConstrained()) { - Context c = ctx.clone(pp.requires.toArray(), pp.optional.toArray()); - Serializable s = p.recover (id, c, commit); - return (s instanceof Context rc) ? - rc.clone (pp.provides.toArray()) : s; - } else { - return p.recover (id, context, commit); + var jfr = new TMEvent.Recover("%s:%s".formatted(getName(), p.getClass().getName()), id); + jfr.begin(); + try { + if (context instanceof Context ctx && pp.isConstrained()) { + Context c = ctx.clone(pp.requires.toArray(), pp.optional.toArray()); + Serializable s = p.recover (id, c, commit); + return (s instanceof Context rc) ? + rc.clone (pp.provides.toArray()) : s; + } else { + return p.recover (id, context, commit); + } + } finally { + jfr.commit(); } } + + private Timers getOrCreateTimers(TransactionParticipant p) { + var mr = getServer().getMeterRegistry(); + String participantShortName = Caller.shortClassName(p.getClass().getName()); + var tags = Tags.of("name", getName(), "participant", participantShortName); + if (p instanceof LogSource ls) { + String realm = ls.getRealm(); + if ((realm != null) && !realm.isEmpty()) + tags = tags.and("realm", realm.trim()); + } + return new Timers( + MeterFactory.timer(mr, MeterInfo.TM_OPERATION, tags.and("phase", "prepare")), + MeterFactory.timer(mr, MeterInfo.TM_OPERATION, tags.and("phase", "prepare-for-abort")), + MeterFactory.timer(mr, MeterInfo.TM_OPERATION, tags.and("phase", "commit")), + MeterFactory.timer(mr, MeterInfo.TM_OPERATION, tags.and("phase", "abort")), + MeterFactory.timer(mr, MeterInfo.TM_OPERATION, tags.and("phase", "snapshot")) + ); + } }