Skip to content

Commit

Permalink
Fix Connection prematurely closed
Browse files Browse the repository at this point in the history
Motivation:
The existing `ReactorNettyClient#close()` method might not handle requests asynchronously, potentially resulting in premature connection closure.

Modification:
Make sure that ReactorNettyClient#close() handles requests asynchronously.

Result:
Resolves #139
  • Loading branch information
jchrys committed Sep 16, 2023
1 parent 173779b commit 38c1fca
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@
import io.netty.util.internal.logging.InternalLoggerFactory;
import reactor.core.publisher.Flux;

import java.util.concurrent.atomic.AtomicBoolean;

import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.requireNonNull;

/**
Expand All @@ -59,16 +57,10 @@ final class MessageDuplexCodec extends ChannelDuplexHandler {

private final ConnectionContext context;

private final AtomicBoolean closing;

private final RequestQueue requestQueue;

private final ServerMessageDecoder decoder = new ServerMessageDecoder();

MessageDuplexCodec(ConnectionContext context, AtomicBoolean closing, RequestQueue requestQueue) {
MessageDuplexCodec(ConnectionContext context) {
this.context = requireNonNull(context, "context must not be null");
this.closing = requireNonNull(closing, "closing must not be null");
this.requestQueue = requireNonNull(requestQueue, "requestQueue must not be null");
}

@Override
Expand Down Expand Up @@ -129,14 +121,6 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
@Override
public void channelInactive(ChannelHandlerContext ctx) {
decoder.dispose();
requestQueue.dispose();

// Server has closed the connection without us wanting to close it
// Typically happens if we send data asynchronously (i.e. previous command didn't complete).
if (closing.compareAndSet(false, true)) {
logger.warn("Connection has been closed by peer");
}

ctx.fireChannelInactive();
}

Expand Down
85 changes: 60 additions & 25 deletions src/main/java/io/asyncer/r2dbc/mysql/client/ReactorNettyClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import reactor.util.context.Context;
import reactor.util.context.ContextView;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.BiConsumer;
import java.util.function.Function;

Expand All @@ -62,6 +62,19 @@ final class ReactorNettyClient implements Client {

private static final boolean INFO_ENABLED = logger.isInfoEnabled();

private static final int ST_CONNECTED = 0;

private static final int ST_CLOSE_REQUESTED = 1;

private static final int ST_CLOSING = 2;

private static final int ST_CLOSED = 3;

private static final AtomicIntegerFieldUpdater<ReactorNettyClient> STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(ReactorNettyClient.class, "state");

private volatile int state = ST_CONNECTED;

private final Connection connection;

private final ConnectionContext context;
Expand All @@ -73,8 +86,6 @@ final class ReactorNettyClient implements Client {

private final RequestQueue requestQueue = new RequestQueue();

private final AtomicBoolean closing = new AtomicBoolean();

ReactorNettyClient(Connection connection, MySqlSslConfiguration ssl, ConnectionContext context) {
requireNonNull(connection, "connection must not be null");
requireNonNull(context, "context must not be null");
Expand All @@ -88,7 +99,7 @@ final class ReactorNettyClient implements Client {
// Note: encoder/decoder should before reactor bridge.
connection.addHandlerLast(EnvelopeSlicer.NAME, new EnvelopeSlicer())
.addHandlerLast(MessageDuplexCodec.NAME,
new MessageDuplexCodec(context, this.closing, this.requestQueue));
new MessageDuplexCodec(context));

if (ssl.getSslMode().startSsl()) {
connection.addHandlerFirst(SslBridgeHandler.NAME, new SslBridgeHandler(context, ssl));
Expand Down Expand Up @@ -192,23 +203,38 @@ public <T> Flux<T> exchange(FluxExchangeable<T> exchangeable) {
@Override
public Mono<Void> close() {
return Mono.<Mono<Void>>create(sink -> {
if (!closing.compareAndSet(false, true)) {
// client is closing or closed
sink.success();
return;
}

requestQueue.submit(RequestTask.wrap(sink, Mono.fromRunnable(() -> {
Sinks.EmitResult result = requests.tryEmitNext(ExitMessage.INSTANCE);

if (result != Sinks.EmitResult.OK) {
logger.error("Exit message sending failed due to {}, force closing", result);
}
})));
}).flatMap(Function.identity()).onErrorResume(e -> {
logger.error("Exit message sending failed, force closing", e);
return Mono.empty();
}).then(forceClose());
if (state == ST_CLOSED) {
logger.debug("Close request ignored (connection already closed)");
sink.success();
return;
}

if (STATE_UPDATER.compareAndSet(this, ST_CONNECTED, ST_CLOSE_REQUESTED)) {
logger.debug("Close request accepted");
} else {
logger.debug("Close request accepted (duplicated)");
}

requestQueue.submit(RequestTask.wrap(sink, Mono.fromRunnable(() -> {
Sinks.EmitResult result = requests.tryEmitNext(ExitMessage.INSTANCE);

if (result != Sinks.EmitResult.OK) {
logger.error("Exit message sending failed due to {}, force closing", result);
} else {
if (STATE_UPDATER.compareAndSet(this, ST_CLOSE_REQUESTED, ST_CLOSING)) {
logger.debug("Exit message sent");
} else {
logger.debug("Exit message sent (duplicated / connection already closed)");
}
}
})));
})
.flatMap(Function.identity())
.onErrorResume(e -> {
logger.error("Exit message sending failed, force closing", e);
return Mono.empty();
})
.then(forceClose());
}

@Override
Expand All @@ -223,7 +249,7 @@ public ByteBufAllocator getByteBufAllocator() {

@Override
public boolean isConnected() {
return !closing.get() && connection.channel().isOpen();
return state < ST_CLOSED && connection.channel().isOpen();
}

@Override
Expand All @@ -239,7 +265,7 @@ public void loginSuccess() {
@Override
public String toString() {
return String.format("ReactorNettyClient(%s){connectionId=%d}",
this.closing.get() ? "closing or closed" : "activating", context.getConnectionId());
isConnected() ? "activating" : "clsoing or closed", context.getConnectionId());
}

private void emitNextRequest(ClientMessage request) {
Expand Down Expand Up @@ -275,10 +301,19 @@ private void drainError(R2dbcException e) {
}

private void handleClose() {
if (this.closing.compareAndSet(false, true)) {
logger.warn("Connection has been closed by peer");
final int oldState = state;
if (oldState == ST_CLOSED) {
logger.debug("Connection already closed");
return;
}

STATE_UPDATER.set(this, ST_CLOSED);

if (oldState != ST_CLOSING) {
logger.debug("Connection has been closed by peer");
drainError(ClientExceptions.unexpectedClosed());
} else {
logger.debug("Connection has been closed");
drainError(ClientExceptions.expectedClosed());
}
}
Expand Down

0 comments on commit 38c1fca

Please sign in to comment.