Skip to content

Commit

Permalink
use structured logging for Connect/Disconnect messages
Browse files Browse the repository at this point in the history
  • Loading branch information
ar committed Jul 16, 2024
1 parent 3aa032a commit d964b5a
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 40 deletions.
64 changes: 32 additions & 32 deletions jpos/src/main/java/org/jpos/iso/BaseChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.jpos.iso.ISOFilter.VetoException;
import org.jpos.iso.header.BaseHeader;
import org.jpos.jfr.ChannelEvent;
import org.jpos.log.evt.Connect;
import org.jpos.log.evt.Disconnect;
import org.jpos.util.*;

import javax.net.ssl.SSLSocket;
Expand Down Expand Up @@ -107,15 +109,17 @@ public abstract class BaseChannel extends Observable

private Counter msgOutCounter;
private Counter msgInCounter;
private final UUID uuid;

private final Map<Class<? extends Exception>, List<ExceptionHandler>> exceptionHandlers = new HashMap<>();

/**
* constructor shared by server and client
* ISOChannels (which have different signatures)
*/
public BaseChannel () {
super();
uuid = UUID.randomUUID();
cnt = new int[SIZEOF_CNT];
name = "";
incomingFilters = new ArrayList();
Expand Down Expand Up @@ -357,21 +361,25 @@ protected Socket newSocket (String[] hosts, int[] ports, LogEvent evt)
int ii = nextHostPort++ % hosts.length;
h = hosts[ii];
p = ports[ii];
evt.addMessage ("Try " + i + " " + h+":" + p);
s = newSocket (h, p);
evt.addMessage (" Connected to %s:%d (%d)".formatted(
s.getInetAddress().getHostAddress(),
s.getPort(),
s.getLocalPort())
evt.addMessage(new Connect(
s.getInetAddress().getHostAddress(),
s.getPort(),
s.getLocalPort(), null)
);
break;
} catch (IOException e) {
evt.addMessage (" " + e.getMessage());
lastIOException = e;
evt.addMessage(
new Connect(h, p, 0, "%s:%d %s (%s)".formatted(
h, p, Caller.shortClassName(lastIOException.getClass().getName()), Caller.info(1)
)
)
);
}
}
if (s == null)
throw new IOException ("%s:%d %s (%s)".formatted(h, p, lastIOException, Caller.info(1)));
throw new IOException ("%s:%d %s (%s)".formatted(h, p, Caller.shortClassName(lastIOException.getClass().getName()), Caller.info(1)));
return s;
}
/**
Expand Down Expand Up @@ -434,24 +442,22 @@ public int getSoLingerSeconds() {
public void connect () throws IOException {
ChannelEvent jfr = new ChannelEvent.Connect();
jfr.begin();
LogEvent evt = new LogEvent (this, "connect");
LogEvent evt = new LogEvent (this, "connect").withTraceId(uuid);
try {
socket = newSocket (hosts, ports, evt);
if (getHost() != null)
jfr.setDetail("%s:%d".formatted(getHost(), getPort()));
connect(socket);
jfr.append("%d".formatted(socket.getLocalPort()));
evt.withTraceId(getSocketUUID());
applyTimeout();
Logger.log (evt);
} catch (IOException e) {
jfr = new ChannelEvent.ConnectionException(jfr.getDetail());
jfr.begin();
jfr.append (e.getMessage());
evt.addMessage (jfr.getDetail());
evt.addMessage(e);
Logger.log (evt);
throw e;
} finally {
Logger.log (evt);
jfr.commit();
}
}
Expand Down Expand Up @@ -627,7 +633,7 @@ public void send (ISOMsg m)
{
ChannelEvent jfr = new ChannelEvent.Send();
jfr.begin();
LogEvent evt = new LogEvent (this, "send");
LogEvent evt = new LogEvent (this, "send").withTraceId(getSocketUUID());
try {
if (!isConnected())
throw new IOException ("unconnected ISOChannel");
Expand Down Expand Up @@ -766,7 +772,7 @@ public ISOMsg receive() throws IOException, ISOException {

byte[] b=null;
byte[] header=null;
LogEvent evt = new LogEvent (this, "receive");
LogEvent evt = new LogEvent (this, "receive").withTraceId(getSocketUUID());
ISOMsg m = createMsg (); // call createMsg instead of createISOMsg for
// backward compatibility
m.setSource (this);
Expand All @@ -780,7 +786,6 @@ public ISOMsg receive() throws IOException, ISOException {
if (expectKeepAlive) {
while (len == 0) {
//If zero length, this is a keep alive msg
Logger.log(new LogEvent(this, "receive", "Zero length keep alive message received"));
len = getMessageLength();
}
}
Expand Down Expand Up @@ -834,23 +839,12 @@ else if (len > 0 && len <= getMaxPacketLength()) {
evt.addMessage (ISOUtil.hexdump (b));
}
throw e;
} catch (EOFException e) {
closeSocket();
evt.addMessage ("<peer-disconnect/>");
throw e;
} catch (SocketException e) {
closeSocket();
if (usable)
evt.addMessage ("<peer-disconnect>" + e.getMessage() + "</peer-disconnect>");
throw e;
} catch (InterruptedIOException e) {
closeSocket();
evt.addMessage ("<io-timeout/>");
throw e;
} catch (IOException e) {
} catch (IOException e) {
evt.addMessage (
new Disconnect(socket.getInetAddress().getHostAddress(), socket.getPort(), socket.getLocalPort(),
"%s (%s)".formatted(Caller.shortClassName(e.getClass().getName()), Caller.info()), e.getMessage())
);
closeSocket();
if (usable)
evt.addMessage (e);
throw e;
} catch (Exception e) {
closeSocket();
Expand Down Expand Up @@ -1223,4 +1217,10 @@ public Object clone(){
throw new InternalError();
}
}

private UUID getSocketUUID() {
return socket != null ?
new UUID(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits() ^ socket.hashCode()) :
uuid;
}
}
4 changes: 3 additions & 1 deletion jpos/src/main/java/org/jpos/log/AuditLogEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
@JsonSubTypes.Type(value = ThrowableAuditLogEvent.class, name = "throwable"),
@JsonSubTypes.Type(value = License.class, name = "license"),
@JsonSubTypes.Type(value = SysInfo.class, name = "sysinfo"),
@JsonSubTypes.Type(value = Connect.class, name = "connect"),
@JsonSubTypes.Type(value = Disconnect.class, name = "disconnect")
})

public sealed interface AuditLogEvent permits Deploy, DeployActivity, License, LogMessage, Shutdown, Start, Stop, SysInfo, ThrowableAuditLogEvent, UnDeploy { }
public sealed interface AuditLogEvent permits Connect, Deploy, DeployActivity, Disconnect, License, LogMessage, Shutdown, Start, Stop, SysInfo, ThrowableAuditLogEvent, UnDeploy { }
45 changes: 45 additions & 0 deletions jpos/src/main/java/org/jpos/log/evt/Connect.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* jPOS Project [http://jpos.org]
* Copyright (C) 2000-2023 jPOS Software SRL
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package org.jpos.log.evt;

import com.fasterxml.jackson.annotation.JsonInclude;
import org.jpos.log.AuditLogEvent;

public record Connect(String host, int remotePort, int localPort, @JsonInclude(JsonInclude.Include.NON_NULL) String error) implements AuditLogEvent {
public Connect(String host, int remotePort, int localPort, String error) {
this.host = host;
this.remotePort = remotePort;
this.localPort = localPort;
this.error = error;
}

public Connect(String host, int remotePort, int localPort) {
this(host, remotePort, localPort, null);
}

@Override
public String toString() {
return "Connect{" +
"host='" + host + '\'' +
", remotePort=" + remotePort +
", localPort=" + localPort +
(error != null ? ", error='" + error + '\'' : "") +
'}';
}
}
29 changes: 29 additions & 0 deletions jpos/src/main/java/org/jpos/log/evt/Disconnect.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* jPOS Project [http://jpos.org]
* Copyright (C) 2000-2010 Alejandro P. Revilla
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package org.jpos.log.evt;

import com.fasterxml.jackson.annotation.JsonInclude;
import org.jpos.log.AuditLogEvent;

public record Disconnect(
String host,
int remotePort,
int localPort,
@JsonInclude(JsonInclude.Include.NON_NULL) String exception,
@JsonInclude(JsonInclude.Include.NON_NULL) String message) implements AuditLogEvent { }
14 changes: 7 additions & 7 deletions jpos/src/main/java/org/jpos/q2/iso/ChannelAdaptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -306,15 +306,15 @@ else if (keepAlive && channel.isConnected() && channel instanceof BaseChannel) {
((BaseChannel)channel).sendKeepAlive();
}
} catch (ISOFilter.VetoException e) {
getLog().warn ("channel-sender-"+in, e.getMessage ());
// getLog().warn ("channel-sender-"+in, e.getMessage ());
} catch (ISOException e) {
getLog().warn ("channel-sender-"+in, e.getMessage ());
// getLog().warn ("channel-sender-"+in, e.getMessage ());
if (!ignoreISOExceptions) {
disconnect ();
}
ISOUtil.sleep (1000); // slow down on errors
} catch (Exception e) {
getLog().warn ("channel-sender-"+in, e.getMessage ());
// getLog().warn ("channel-sender-"+in, e.getMessage ());
disconnect ();
ISOUtil.sleep (1000);
}
Expand Down Expand Up @@ -343,10 +343,10 @@ public void run () {
else
sp.out (out, m);
} catch (ISOFilter.VetoException e) {
getLog().warn ("channel-receiver-"+out+"-veto-exception", e.getMessage());
// getLog().warn ("channel-receiver-"+out+"-veto-exception", e.getMessage());
} catch (ISOException e) {
if (running()) {
getLog().warn ("channel-receiver-"+out, e);
// getLog().warn ("channel-receiver-"+out, e);
if (!ignoreISOExceptions) {
sp.out (reconnect, Boolean.TRUE, delay);
disconnect ();
Expand All @@ -356,15 +356,15 @@ public void run () {
}
} catch (SocketTimeoutException | EOFException e) {
if (running()) {
getLog().warn ("channel-receiver-"+out, "Read timeout / EOF - reconnecting");
// getLog().warn ("channel-receiver-"+out, "Read timeout / EOF - reconnecting");
sp.out (reconnect, Boolean.TRUE, delay);
disconnect ();
sp.out (in, Boolean.TRUE); // wake-up Sender
ISOUtil.sleep(1000);
}
} catch (Exception e) {
if (running()) {
getLog().warn ("channel-receiver-"+out, e);
// getLog().warn ("channel-receiver-"+out, e);
sp.out (reconnect, Boolean.TRUE, delay);
disconnect ();
sp.out (in, Boolean.TRUE); // wake-up Sender
Expand Down

0 comments on commit d964b5a

Please sign in to comment.