From 411c4cbb18ca4fadcd44f59a2fe031bf13048d30 Mon Sep 17 00:00:00 2001 From: Bela Ban Date: Tue, 1 Oct 2024 17:08:17 +0200 Subject: [PATCH] TCPConnectionNonBlocking: create queue in constructor (https://issues.redhat.com/browse/JGRP-2837) --- src/org/jgroups/blocks/cs/TcpBaseServer.java | 2 +- .../blocks/cs/TcpConnectionNonBlocking.java | 20 +++++++++---------- src/org/jgroups/blocks/cs/TcpServer.java | 2 +- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/org/jgroups/blocks/cs/TcpBaseServer.java b/src/org/jgroups/blocks/cs/TcpBaseServer.java index b8a5245bac..5c54a6030c 100644 --- a/src/org/jgroups/blocks/cs/TcpBaseServer.java +++ b/src/org/jgroups/blocks/cs/TcpBaseServer.java @@ -22,7 +22,7 @@ protected TcpBaseServer(ThreadFactory f, SocketFactory sf, int recv_buf_size) { @Override protected TcpConnection createConnection(Address dest) throws Exception { - return non_blocking_sends? new TcpConnectionNonBlocking(dest, this).maxSize(max_send_queue) + return non_blocking_sends? new TcpConnectionNonBlocking(dest, this, max_send_queue) : new TcpConnection(dest, this); } diff --git a/src/org/jgroups/blocks/cs/TcpConnectionNonBlocking.java b/src/org/jgroups/blocks/cs/TcpConnectionNonBlocking.java index f7ebe79d7d..43f82ee2f9 100644 --- a/src/org/jgroups/blocks/cs/TcpConnectionNonBlocking.java +++ b/src/org/jgroups/blocks/cs/TcpConnectionNonBlocking.java @@ -25,30 +25,31 @@ * @since 5.3.3 */ public class TcpConnectionNonBlocking extends TcpConnection { - protected BlockingQueue queue; - protected int max_size=128; - protected volatile Sender sender; - protected final LongAdder dropped_msgs=new LongAdder(); + protected final BlockingQueue queue; + protected int max_size=128; + protected volatile Sender sender; + protected final LongAdder dropped_msgs=new LongAdder(); - public TcpConnectionNonBlocking(Address peer_addr, TcpBaseServer server) throws Exception { + public TcpConnectionNonBlocking(Address peer_addr, TcpBaseServer server, int max_size) throws Exception { super(peer_addr, server); + this.max_size=max_size; + queue=new ArrayBlockingQueue<>(max_size); } - public TcpConnectionNonBlocking(Socket s, TcpServer server) throws Exception { + public TcpConnectionNonBlocking(Socket s, TcpServer server, int max_size) throws Exception { super(s, server); + this.max_size=max_size; + queue=new ArrayBlockingQueue<>(max_size); } - public int maxSize() {return max_size;} - public TcpConnectionNonBlocking maxSize(int s) {max_size=s; return this;} public long droppedMessages() {return dropped_msgs.sum();} public int queueSize() {return queue != null? queue.size() : 0;} @Override public void start() { super.start(); - queue=new ArrayBlockingQueue<>(max_size); if(sender != null) sender.stop(); sender=new Sender(server.factory).start(); @@ -64,7 +65,6 @@ public TcpConnectionNonBlocking(Socket s, TcpServer server) throws Exception { @Override public void send(byte[] data, int offset, int length) throws Exception { - // to be on the safe side, we copy the data: some bundlers (e.g. TransferQueueBundler) reuse a buffer to // serialize messages to and - before the data is sent by the sender thread - the buffer might be changed! // This is similar to what NioConnection does on a partial write. If the send was synchronous (like in diff --git a/src/org/jgroups/blocks/cs/TcpServer.java b/src/org/jgroups/blocks/cs/TcpServer.java index ff2e30af78..047a14792a 100644 --- a/src/org/jgroups/blocks/cs/TcpServer.java +++ b/src/org/jgroups/blocks/cs/TcpServer.java @@ -115,7 +115,7 @@ public void run() { protected void handleAccept(final Socket client_sock) throws Exception { TcpConnection conn=null; try { - conn=non_blocking_sends? new TcpConnectionNonBlocking(client_sock, TcpServer.this).maxSize(max_send_queue) + conn=non_blocking_sends? new TcpConnectionNonBlocking(client_sock, TcpServer.this, max_send_queue) : new TcpConnection(client_sock, TcpServer.this); Address peer_addr=conn.peerAddress();