diff --git a/src/main/java/io/asyncer/r2dbc/mysql/client/MessageDuplexCodec.java b/src/main/java/io/asyncer/r2dbc/mysql/client/MessageDuplexCodec.java index 145112896..a467ae2ec 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/client/MessageDuplexCodec.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/client/MessageDuplexCodec.java @@ -42,8 +42,6 @@ import io.netty.util.internal.logging.InternalLoggerFactory; import reactor.core.publisher.Flux; -import java.util.concurrent.atomic.AtomicBoolean; - import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.requireNonNull; /** @@ -59,16 +57,10 @@ final class MessageDuplexCodec extends ChannelDuplexHandler { private final ConnectionContext context; - private final AtomicBoolean closing; - - private final RequestQueue requestQueue; - private final ServerMessageDecoder decoder = new ServerMessageDecoder(); - MessageDuplexCodec(ConnectionContext context, AtomicBoolean closing, RequestQueue requestQueue) { + MessageDuplexCodec(ConnectionContext context) { this.context = requireNonNull(context, "context must not be null"); - this.closing = requireNonNull(closing, "closing must not be null"); - this.requestQueue = requireNonNull(requestQueue, "requestQueue must not be null"); } @Override @@ -129,14 +121,6 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) @Override public void channelInactive(ChannelHandlerContext ctx) { decoder.dispose(); - requestQueue.dispose(); - - // Server has closed the connection without us wanting to close it - // Typically happens if we send data asynchronously (i.e. previous command didn't complete). - if (closing.compareAndSet(false, true)) { - logger.warn("Connection has been closed by peer"); - } - ctx.fireChannelInactive(); } diff --git a/src/main/java/io/asyncer/r2dbc/mysql/client/ReactorNettyClient.java b/src/main/java/io/asyncer/r2dbc/mysql/client/ReactorNettyClient.java index cb7e7a28d..71f220f73 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/client/ReactorNettyClient.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/client/ReactorNettyClient.java @@ -44,7 +44,7 @@ import reactor.util.context.Context; import reactor.util.context.ContextView; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.function.BiConsumer; import java.util.function.Function; @@ -62,6 +62,19 @@ final class ReactorNettyClient implements Client { private static final boolean INFO_ENABLED = logger.isInfoEnabled(); + private static final int ST_CONNECTED = 0; + + private static final int ST_CLOSE_REQUESTED = 1; + + private static final int ST_CLOSING = 2; + + private static final int ST_CLOSED = 3; + + private static final AtomicIntegerFieldUpdater STATE_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(ReactorNettyClient.class, "state"); + + private volatile int state = ST_CONNECTED; + private final Connection connection; private final ConnectionContext context; @@ -73,8 +86,6 @@ final class ReactorNettyClient implements Client { private final RequestQueue requestQueue = new RequestQueue(); - private final AtomicBoolean closing = new AtomicBoolean(); - ReactorNettyClient(Connection connection, MySqlSslConfiguration ssl, ConnectionContext context) { requireNonNull(connection, "connection must not be null"); requireNonNull(context, "context must not be null"); @@ -88,7 +99,7 @@ final class ReactorNettyClient implements Client { // Note: encoder/decoder should before reactor bridge. connection.addHandlerLast(EnvelopeSlicer.NAME, new EnvelopeSlicer()) .addHandlerLast(MessageDuplexCodec.NAME, - new MessageDuplexCodec(context, this.closing, this.requestQueue)); + new MessageDuplexCodec(context)); if (ssl.getSslMode().startSsl()) { connection.addHandlerFirst(SslBridgeHandler.NAME, new SslBridgeHandler(context, ssl)); @@ -192,23 +203,38 @@ public Flux exchange(FluxExchangeable exchangeable) { @Override public Mono close() { return Mono.>create(sink -> { - if (!closing.compareAndSet(false, true)) { - // client is closing or closed - sink.success(); - return; - } - - requestQueue.submit(RequestTask.wrap(sink, Mono.fromRunnable(() -> { - Sinks.EmitResult result = requests.tryEmitNext(ExitMessage.INSTANCE); - - if (result != Sinks.EmitResult.OK) { - logger.error("Exit message sending failed due to {}, force closing", result); - } - }))); - }).flatMap(Function.identity()).onErrorResume(e -> { - logger.error("Exit message sending failed, force closing", e); - return Mono.empty(); - }).then(forceClose()); + if (state == ST_CLOSED) { + logger.debug("Close request ignored (connection already closed)"); + sink.success(); + return; + } + + if (STATE_UPDATER.compareAndSet(this, ST_CONNECTED, ST_CLOSE_REQUESTED)) { + logger.debug("Close request accepted"); + } else { + logger.debug("Close request accepted (duplicated)"); + } + + requestQueue.submit(RequestTask.wrap(sink, Mono.fromRunnable(() -> { + Sinks.EmitResult result = requests.tryEmitNext(ExitMessage.INSTANCE); + + if (result != Sinks.EmitResult.OK) { + logger.error("Exit message sending failed due to {}, force closing", result); + } else { + if (STATE_UPDATER.compareAndSet(this, ST_CLOSE_REQUESTED, ST_CLOSING)) { + logger.debug("Exit message sent"); + } else { + logger.debug("Exit message sent (duplicated / connection already closed)"); + } + } + }))); + }) + .flatMap(Function.identity()) + .onErrorResume(e -> { + logger.error("Exit message sending failed, force closing", e); + return Mono.empty(); + }) + .then(forceClose()); } @Override @@ -223,7 +249,7 @@ public ByteBufAllocator getByteBufAllocator() { @Override public boolean isConnected() { - return !closing.get() && connection.channel().isOpen(); + return state < ST_CLOSED && connection.channel().isOpen(); } @Override @@ -239,7 +265,7 @@ public void loginSuccess() { @Override public String toString() { return String.format("ReactorNettyClient(%s){connectionId=%d}", - this.closing.get() ? "closing or closed" : "activating", context.getConnectionId()); + isConnected() ? "activating" : "clsoing or closed", context.getConnectionId()); } private void emitNextRequest(ClientMessage request) { @@ -275,10 +301,19 @@ private void drainError(R2dbcException e) { } private void handleClose() { - if (this.closing.compareAndSet(false, true)) { - logger.warn("Connection has been closed by peer"); + final int oldState = state; + if (oldState == ST_CLOSED) { + logger.debug("Connection already closed"); + return; + } + + STATE_UPDATER.set(this, ST_CLOSED); + + if (oldState != ST_CLOSING) { + logger.debug("Connection has been closed by peer"); drainError(ClientExceptions.unexpectedClosed()); } else { + logger.debug("Connection has been closed"); drainError(ClientExceptions.expectedClosed()); } }