Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Connection prematurely closed #144

Merged
merged 2 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@
import io.netty.util.internal.logging.InternalLoggerFactory;
import reactor.core.publisher.Flux;

import java.util.concurrent.atomic.AtomicBoolean;

import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.requireNonNull;

/**
Expand All @@ -59,16 +57,10 @@ final class MessageDuplexCodec extends ChannelDuplexHandler {

private final ConnectionContext context;

private final AtomicBoolean closing;

private final RequestQueue requestQueue;

private final ServerMessageDecoder decoder = new ServerMessageDecoder();

MessageDuplexCodec(ConnectionContext context, AtomicBoolean closing, RequestQueue requestQueue) {
MessageDuplexCodec(ConnectionContext context) {
this.context = requireNonNull(context, "context must not be null");
this.closing = requireNonNull(closing, "closing must not be null");
this.requestQueue = requireNonNull(requestQueue, "requestQueue must not be null");
}

@Override
Expand Down Expand Up @@ -129,14 +121,6 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
@Override
public void channelInactive(ChannelHandlerContext ctx) {
decoder.dispose();
requestQueue.dispose();

// Server has closed the connection without us wanting to close it
// Typically happens if we send data asynchronously (i.e. previous command didn't complete).
if (closing.compareAndSet(false, true)) {
logger.warn("Connection has been closed by peer");
}

ctx.fireChannelInactive();
}

Expand Down
80 changes: 55 additions & 25 deletions src/main/java/io/asyncer/r2dbc/mysql/client/ReactorNettyClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import reactor.util.context.Context;
import reactor.util.context.ContextView;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.BiConsumer;
import java.util.function.Function;

Expand All @@ -62,6 +62,17 @@ final class ReactorNettyClient implements Client {

private static final boolean INFO_ENABLED = logger.isInfoEnabled();

private static final int ST_CONNECTED = 0;

private static final int ST_CLOSING = 1;

private static final int ST_CLOSED = 2;

private static final AtomicIntegerFieldUpdater<ReactorNettyClient> STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(ReactorNettyClient.class, "state");

private volatile int state = ST_CONNECTED;

private final Connection connection;

private final ConnectionContext context;
Expand All @@ -73,8 +84,6 @@ final class ReactorNettyClient implements Client {

private final RequestQueue requestQueue = new RequestQueue();

private final AtomicBoolean closing = new AtomicBoolean();

ReactorNettyClient(Connection connection, MySqlSslConfiguration ssl, ConnectionContext context) {
requireNonNull(connection, "connection must not be null");
requireNonNull(context, "context must not be null");
Expand All @@ -88,7 +97,7 @@ final class ReactorNettyClient implements Client {
// Note: encoder/decoder should before reactor bridge.
connection.addHandlerLast(EnvelopeSlicer.NAME, new EnvelopeSlicer())
.addHandlerLast(MessageDuplexCodec.NAME,
new MessageDuplexCodec(context, this.closing, this.requestQueue));
new MessageDuplexCodec(context));

if (ssl.getSslMode().startSsl()) {
connection.addHandlerFirst(SslBridgeHandler.NAME, new SslBridgeHandler(context, ssl));
Expand Down Expand Up @@ -191,24 +200,36 @@ public <T> Flux<T> exchange(FluxExchangeable<T> exchangeable) {

@Override
public Mono<Void> close() {
return Mono.<Mono<Void>>create(sink -> {
if (!closing.compareAndSet(false, true)) {
// client is closing or closed
sink.success();
return;
}

requestQueue.submit(RequestTask.wrap(sink, Mono.fromRunnable(() -> {
Sinks.EmitResult result = requests.tryEmitNext(ExitMessage.INSTANCE);
return Mono
.<Mono<Void>>create(sink -> {
if (state == ST_CLOSED) {
logger.debug("Close request ignored (connection already closed)");
sink.success();
return;
}

if (result != Sinks.EmitResult.OK) {
logger.error("Exit message sending failed due to {}, force closing", result);
}
})));
}).flatMap(Function.identity()).onErrorResume(e -> {
logger.error("Exit message sending failed, force closing", e);
return Mono.empty();
}).then(forceClose());
logger.debug("Close request accepted");

requestQueue.submit(RequestTask.wrap(sink, Mono.fromRunnable(() -> {
Sinks.EmitResult result = requests.tryEmitNext(ExitMessage.INSTANCE);

if (result != Sinks.EmitResult.OK) {
logger.error("Exit message sending failed due to {}, force closing", result);
} else {
if (STATE_UPDATER.compareAndSet(this, ST_CONNECTED, ST_CLOSING)) {
logger.debug("Exit message sent");
} else {
logger.debug("Exit message sent (duplicated / connection already closed)");
}
}
})));
})
.flatMap(Function.identity())
.onErrorResume(e -> {
logger.error("Exit message sending failed, force closing", e);
return Mono.empty();
})
.then(forceClose());
}

@Override
Expand All @@ -223,7 +244,7 @@ public ByteBufAllocator getByteBufAllocator() {

@Override
public boolean isConnected() {
return !closing.get() && connection.channel().isOpen();
return state < ST_CLOSED && connection.channel().isOpen();
}

@Override
Expand All @@ -239,7 +260,7 @@ public void loginSuccess() {
@Override
public String toString() {
return String.format("ReactorNettyClient(%s){connectionId=%d}",
this.closing.get() ? "closing or closed" : "activating", context.getConnectionId());
isConnected() ? "activating" : "clsoing or closed", context.getConnectionId());
}

private void emitNextRequest(ClientMessage request) {
Expand Down Expand Up @@ -275,10 +296,19 @@ private void drainError(R2dbcException e) {
}

private void handleClose() {
if (this.closing.compareAndSet(false, true)) {
logger.warn("Connection has been closed by peer");
final int oldState = state;
if (oldState == ST_CLOSED) {
logger.debug("Connection already closed");
return;
}

STATE_UPDATER.set(this, ST_CLOSED);

if (oldState != ST_CLOSING) {
logger.debug("Connection has been closed by peer");
drainError(ClientExceptions.unexpectedClosed());
} else {
logger.debug("Connection has been closed");
drainError(ClientExceptions.expectedClosed());
}
}
Expand Down