Skip to content

Commit

Permalink
TCPConnectionNonBlocking: create queue in constructor (https://issues…
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Oct 1, 2024
1 parent 0154970 commit eee37b8
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 12 deletions.
2 changes: 1 addition & 1 deletion src/org/jgroups/blocks/cs/TcpBaseServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ protected TcpBaseServer(ThreadFactory f, SocketFactory sf, int recv_buf_size) {

@Override
protected TcpConnection createConnection(Address dest) throws Exception {
return non_blocking_sends? new TcpConnectionNonBlocking(dest, this).maxSize(max_send_queue)
return non_blocking_sends? new TcpConnectionNonBlocking(dest, this, max_send_queue)
: new TcpConnection(dest, this);
}

Expand Down
20 changes: 10 additions & 10 deletions src/org/jgroups/blocks/cs/TcpConnectionNonBlocking.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,31 @@
* @since 5.3.3
*/
public class TcpConnectionNonBlocking extends TcpConnection {
protected BlockingQueue<ByteArray> queue;
protected int max_size=128;
protected volatile Sender sender;
protected final LongAdder dropped_msgs=new LongAdder();
protected final BlockingQueue<ByteArray> queue;
protected int max_size=128;
protected volatile Sender sender;
protected final LongAdder dropped_msgs=new LongAdder();


public TcpConnectionNonBlocking(Address peer_addr, TcpBaseServer server) throws Exception {
public TcpConnectionNonBlocking(Address peer_addr, TcpBaseServer server, int max_size) throws Exception {
super(peer_addr, server);
this.max_size=max_size;
queue=new ArrayBlockingQueue<>(max_size);
}

public TcpConnectionNonBlocking(Socket s, TcpServer server) throws Exception {
public TcpConnectionNonBlocking(Socket s, TcpServer server, int max_size) throws Exception {
super(s, server);
this.max_size=max_size;
queue=new ArrayBlockingQueue<>(max_size);
}


public int maxSize() {return max_size;}
public TcpConnectionNonBlocking maxSize(int s) {max_size=s; return this;}
public long droppedMessages() {return dropped_msgs.sum();}
public int queueSize() {return queue != null? queue.size() : 0;}


@Override public void start() {
super.start();
queue=new ArrayBlockingQueue<>(max_size);
if(sender != null)
sender.stop();
sender=new Sender(server.factory).start();
Expand All @@ -64,7 +65,6 @@ public TcpConnectionNonBlocking(Socket s, TcpServer server) throws Exception {

@Override
public void send(byte[] data, int offset, int length) throws Exception {

// to be on the safe side, we copy the data: some bundlers (e.g. TransferQueueBundler) reuse a buffer to
// serialize messages to and - before the data is sent by the sender thread - the buffer might be changed!
// This is similar to what NioConnection does on a partial write. If the send was synchronous (like in
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/blocks/cs/TcpServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void run() {
protected void handleAccept(final Socket client_sock) throws Exception {
TcpConnection conn=null;
try {
conn=non_blocking_sends? new TcpConnectionNonBlocking(client_sock, TcpServer.this).maxSize(max_send_queue)
conn=non_blocking_sends? new TcpConnectionNonBlocking(client_sock, TcpServer.this, max_send_queue)
: new TcpConnection(client_sock, TcpServer.this);

Address peer_addr=conn.peerAddress();
Expand Down

0 comments on commit eee37b8

Please sign in to comment.