Skip to content

Commit

Permalink
Continue work instrumenting components
Browse files Browse the repository at this point in the history
  • Loading branch information
ar committed Nov 16, 2023
1 parent 887bd8c commit 9119309
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 48 deletions.
21 changes: 21 additions & 0 deletions jpos/src/main/java/org/jpos/iso/BaseChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.jpos.iso;

import io.micrometer.core.instrument.Counter;
import jdk.jfr.Event;
import org.jpos.core.Configurable;
import org.jpos.core.Configuration;
Expand Down Expand Up @@ -105,6 +106,9 @@ public abstract class BaseChannel extends Observable
private int nextHostPort = 0;
private boolean roundRobin = false;

private Counter msgOutCounter;
private Counter msgInCounter;

private final Map<Class<? extends Exception>, List<ExceptionHandler>> exceptionHandlers = new HashMap<>();

/**
Expand Down Expand Up @@ -257,6 +261,18 @@ public int[] getCounters() {
public boolean isConnected() {
return socket != null && usable;
}

public void setCounters(Counter msgInCounter, Counter msgOutCounter) {
this.msgInCounter = msgInCounter;
this.msgOutCounter = msgOutCounter;
}
public Counter getMsgInCounter() {
return msgInCounter;
}
public Counter getMsgOutCounter() {
return msgOutCounter;
}

/**
* setup I/O Streams from socket
* @param socket a Socket (client or server)
Expand Down Expand Up @@ -635,6 +651,8 @@ public void send (ISOMsg m)
serverOutLock.unlock();
}
cnt[TX]++;
if (msgOutCounter != null)
msgOutCounter.increment();
setChanged();
notifyObservers(m);
jfr.setDetail(m.toString());
Expand Down Expand Up @@ -801,6 +819,9 @@ else if (len > 0 && len <= getMaxPacketLength()) {
m = applyIncomingFilters (m, header, b, evt);
m.setDirection(ISOMsg.INCOMING);
cnt[RX]++;
if (msgInCounter != null) {
msgInCounter.increment();
}
setChanged();
notifyObservers(m);
} catch (ISOException e) {
Expand Down
2 changes: 2 additions & 0 deletions jpos/src/main/java/org/jpos/iso/ISOServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,8 @@ public void run() {
if (!checkPermission (socket, ev))
return;
realm = realm + "/" + socket.getInetAddress().getHostAddress() + ":" + socket.getPort();
if (clientSideChannel instanceof BaseChannel bc)
baseChannel.setCounters(bc.getMsgInCounter(), bc.getMsgOutCounter());
}
try {
while (true) try {
Expand Down
4 changes: 4 additions & 0 deletions jpos/src/main/java/org/jpos/metrics/MeterFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ public static Gauge gauge(MeterRegistry registry, MeterInfo meterInfo, Tags tags
.register(registry));
}

public static void remove (MeterRegistry registry, Meter meter) {
registry.getMeters().remove(meter);
}

@SuppressWarnings("unchecked")
private static <T extends Meter> T createMeter(MeterRegistry registry, MeterInfo meterInfo, Tags tags, Callable<T> creator) {
try {
Expand Down
14 changes: 13 additions & 1 deletion jpos/src/main/java/org/jpos/metrics/MeterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,19 @@
public enum MeterInfo {
TM_ACTIVE_SESSIONS("jpos.tm.active.sessions", "TransactionManager activeSessions"),
TM_OPERATION("jpos.tm.op", "TransactionManager operation"),
TM_COUNTER("jpos.tm.cnt", "TransactionManager counter");
TM_COUNTER("jpos.tm.cnt", "TransactionManager counter"),
ISOSERVER_CONNECTION_COUNT("jpos.server.connections", "Incoming active connections"),
ISOCHANNEL_CONNECTION_COUNT("jpos.channel.connections", "Outgoing active connections"),
ISOMSG_OUT ("jpos.isomsg.out", "Transmitted messages"),
ISOMSG_IN ("jpos.isomsg.in", "Received messages"),
CHANNEL_ACTIVE_CONNECTIONS("jpos.channel.connections", "Active outgoing connections"),
MUX_STATUS("jpos.mux.status", "MUX Status"),
MUX_TX("jpos.mux.tx", "MUX tx counter"),
MUX_RX("jpos.mux.rx", "MUX rx counter"),
MUX_EXPIRED("jpos.mux.expired.cnt", "MUX expired counter"),
MUX_RESPONSE_TIME("jpos.mux.timer", "MUX response timer"),
CHANNEL_STATUS("jpos.channel.status", "Channel status");

final String id;
final String description;

Expand Down
73 changes: 49 additions & 24 deletions jpos/src/main/java/org/jpos/q2/iso/ChannelAdaptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@

package org.jpos.q2.iso;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.binder.BaseUnits;
import org.jdom2.Element;
import org.jpos.core.ConfigurationException;
import org.jpos.core.Environment;
import org.jpos.core.handlers.exception.ExceptionHandlerAware;
import org.jpos.core.handlers.exception.ExceptionHandlerConfigAware;
import org.jpos.iso.*;
import org.jpos.metrics.MeterFactory;
import org.jpos.metrics.MeterInfo;
import org.jpos.q2.QBeanSupport;
import org.jpos.q2.QFactory;
import org.jpos.space.Space;
Expand All @@ -38,6 +44,9 @@
import java.io.PrintStream;
import java.net.SocketTimeoutException;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* @author Alejandro Revilla
Expand All @@ -62,24 +71,30 @@ public class ChannelAdaptor
private Thread sender;
private final Object disconnectLock = Boolean.TRUE;

private ExecutorService executor;

private Gauge connectionsGauge;

private Counter msgOutCounter;
private Counter msgInCounter;

public ChannelAdaptor () {
super ();
resetCounters();
}

public void initService() throws ConfigurationException {
initSpaceAndQueues();
NameRegistrar.register (getName(), this);
executor = Executors.newVirtualThreadPerTaskExecutor();
}
public void startService () {
try {
channel = initChannel ();
sender = new Thread(new Sender());
sender.start();
executor.submit(new Sender());
if (!writeOnly) { // fixes #426 && jPOS-20
receiver = new Thread(new Receiver());
receiver.start();
executor.submit (new Receiver());
}
initMeters();
} catch (Exception e) {
getLog().warn ("error starting service", e);
}
Expand All @@ -90,31 +105,18 @@ public void stopService () {
if (channel != null)
disconnect();
if (waitForWorkersOnStop) {
waitForSenderToExit();
executor.awaitTermination(5L, TimeUnit.SECONDS);
if (!writeOnly) {
sp.out(ready, new Date());
waitForReceiverToExit();
sp.put(ready, new Date());
}
}
sender = null;
receiver = null;
removeMeters();
} catch (Exception e) {
getLog().warn ("error disconnecting from remote host", e);
}
}
private void waitForSenderToExit() {
join(sender);
}
private void waitForReceiverToExit() {
join(receiver);
SpaceUtil.wipe(sp, ready);
}
private void join(Thread thread) {
try {
if (thread != null)
thread.join();
} catch (InterruptedException ignored) { }
}
public void destroyService () {
NameRegistrar.unregister (getName ());
NameRegistrar.unregister ("channel." + getName ());
Expand Down Expand Up @@ -295,8 +297,9 @@ public void run () {
if (!running())
break;
Object o = sp.in (in, delay);
if (o instanceof ISOMsg) {
channel.send ((ISOMsg) o);
if (o instanceof ISOMsg m) {
channel.send (m);
msgOutCounter.increment();
tx++;
}
else if (keepAlive && channel.isConnected() && channel instanceof BaseChannel) {
Expand Down Expand Up @@ -328,9 +331,11 @@ public void run () {
while (running()) {
try {
Object r = sp.rd (ready, 5000L);
if (r == null)
if (r == null) {
continue;
}
ISOMsg m = channel.receive ();
msgInCounter.increment();
rx++;
lastTxn = System.currentTimeMillis();
if (timeout > 0)
Expand Down Expand Up @@ -476,4 +481,24 @@ protected void append (StringBuffer sb, String name, int value) {
sb.append (name);
sb.append (value);
}
private void initMeters() {
var tags = Tags.of("name", getName(), "type", "client");
var registry = getServer().getMeterRegistry();
connectionsGauge =
MeterFactory.gauge
(registry, MeterInfo.ISOCHANNEL_CONNECTION_COUNT,
tags,
BaseUnits.THREADS,
() -> isConnected() ? 1 : 0
);

msgInCounter = MeterFactory.counter(registry, MeterInfo.ISOMSG_IN, tags);
msgOutCounter = MeterFactory.counter(registry, MeterInfo.ISOMSG_OUT, tags);
}
private void removeMeters() {
var registry = getServer().getMeterRegistry();
registry.remove(connectionsGauge);
registry.remove(msgInCounter);
registry.remove(msgOutCounter);
}
}
65 changes: 43 additions & 22 deletions jpos/src/main/java/org/jpos/q2/iso/QServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,16 @@

package org.jpos.q2.iso;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.binder.BaseUnits;
import org.jdom2.Element;
import org.jpos.core.ConfigurationException;
import org.jpos.core.Environment;
import org.jpos.iso.ISOChannel;
import org.jpos.iso.ISOException;
import org.jpos.iso.ISOMsg;
import org.jpos.iso.ISORequestListener;
import org.jpos.iso.ISOServer;
import org.jpos.iso.ISOServerEventListener;
import org.jpos.iso.ISOServerSocketFactory;
import org.jpos.iso.ISOSource;
import org.jpos.iso.ServerChannel;
import org.jpos.iso.*;
import org.jpos.metrics.MeterFactory;
import org.jpos.metrics.MeterInfo;
import org.jpos.q2.QBeanSupport;
import org.jpos.q2.QFactory;
import org.jpos.space.LocalSpace;
Expand Down Expand Up @@ -66,6 +64,11 @@ public class QServer
private String outQueue;
private String sendMethod;
AtomicInteger msgn = new AtomicInteger();

private Gauge connectionsGauge;
private Counter msgOutCounter;
private Counter msgInCounter;

public QServer () {
super ();
}
Expand Down Expand Up @@ -104,12 +107,13 @@ private void initServer ()
}

server = new ISOServer (port, (ServerChannel) channel, maxSessions);
initMeters(); // meters need 'server' to be initialized
server.setLogger (log.getLogger(), getName() + ".server");
server.setName (getName ());
if (socketFactoryString != null) {
ISOServerSocketFactory sFac = (ISOServerSocketFactory) getFactory().newInstance(socketFactoryString);
if (sFac != null && sFac instanceof LogSource) {
((LogSource) sFac).setLogger(log.getLogger(),getName() + ".socket-factory");
ISOServerSocketFactory sFac = getFactory().newInstance(socketFactoryString);
if (sFac instanceof LogSource ls) {
ls.setLogger(log.getLogger(),getName() + ".socket-factory");
}
server.setSocketFactory(sFac);
}
Expand All @@ -125,10 +129,9 @@ private void initIn() {
inQueue = Environment.get(persist.getChildTextTrim("in"));
if (inQueue != null) {
/*
* We have an 'in' queue to monitor for messages we will
* send out through server in our (SpaceListener)notify(Object, Object) method.
* We have an 'in' queue to monitor for messages to be
* sent out through server in our (SpaceListener)notify(Object, Object) method.
*/

sp.addListener(inQueue, this);
}
}
Expand All @@ -138,7 +141,7 @@ private void initOut() {
if (outQueue != null) {
/*
* We have an 'out' queue to send any messages to that are received
* by the our requestListener(this).
* by our requestListener(this).
*
* Note, if additional ISORequestListeners are registered with the server after
* this point, then they won't see anything as our process(ISOSource, ISOMsg)
Expand All @@ -159,14 +162,11 @@ public void startService () {
}
}
private void initWhoToSendTo() {

Element persist = getPersist();
sendMethod = persist.getChildText("send-request");
if (sendMethod==null) {
sendMethod="LAST";
}


}

@Override
Expand All @@ -175,6 +175,7 @@ public void stopService () {
server.shutdown ();
sp.removeListener(inQueue, this);
}
removeMeters();
}
@Override
public void destroyService () {
Expand Down Expand Up @@ -394,7 +395,7 @@ else if ("RR".equals(sendMethod)) {
}

/*
* This method will be invoke through the ISORequestListener interface, *if*
* This method will be invoked through the ISORequestListener interface, *if*
* this QServer has an 'out' queue to handle.
*/
@Override
Expand All @@ -403,6 +404,26 @@ public boolean process(ISOSource source, ISOMsg m) {
return true;
}

private void initMeters() {
var tags = Tags.of("name", getName(), "type", "server");
var registry = getServer().getMeterRegistry();
connectionsGauge =
MeterFactory.gauge
(registry, MeterInfo.ISOSERVER_CONNECTION_COUNT,
tags,
BaseUnits.THREADS,
server::getConnectionCount
);
msgInCounter = MeterFactory.counter(registry, MeterInfo.ISOMSG_IN, tags);
msgOutCounter = MeterFactory.counter(registry, MeterInfo.ISOMSG_OUT, tags);
if (channel instanceof BaseChannel baseChannel) {
baseChannel.setCounters(msgInCounter, msgOutCounter);
}
}
private void removeMeters() {
var registry = getServer().getMeterRegistry();
registry.remove(connectionsGauge);
registry.remove(msgInCounter);
registry.remove(msgOutCounter);
}
}


2 changes: 1 addition & 1 deletion jpos/src/main/java/org/jpos/q2/qbean/SystemMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private void dumpMeters(PrintStream p, String indent) {
.sorted(Comparator.comparing(meter -> meter.getId().getName()))
.forEach(meter -> {
String prefix = indent + (isFirst.getAndSet(false) ? " meters: " : " ");
p.printf("%s%s%s%n", prefix, meter.getId().getName(), meter.measure());
p.printf("%s%s %s %s%n", prefix, meter.getId().getName(), meter.getId().getTags(), meter.measure());
});
}

Expand Down

0 comments on commit 9119309

Please sign in to comment.