Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

Commit

Permalink
set client_timeout in header (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
qinzuoyan authored and huangwei5 committed Jan 4, 2019
1 parent 092e142 commit 1fd1239
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ public client_operator(gpid gpid, String tableName) {
this.rpc_error = new error_code();
}

public final byte[] prepare_thrift_header(int body_length) {
public final byte[] prepare_thrift_header(int body_length, int client_timeout) {
header.body_length = body_length;
header.header_length = ThriftHeader.HEADER_LENGTH;
header.client_timeout = client_timeout;
header.thread_hash = Tools.dsn_gpid_to_thread_hash(header.app_id, header.partition_index);
header.partition_hash = 0;
return header.toByteArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public static class RequestEntry {
public com.xiaomi.infra.pegasus.operator.client_operator op;
public Runnable callback;
public ScheduledFuture timeoutTask;
public long timeoutMs;
}

public enum ConnState {
Expand Down Expand Up @@ -113,6 +114,7 @@ public int asyncSend(client_operator op, Runnable callbackFunc, long timeoutInMi
//the timer task is scheduled.
pendingResponse.put(new Integer(entry.sequenceId), entry);
entry.timeoutTask = addTimer(entry.sequenceId, timeoutInMilliseconds);
entry.timeoutMs = timeoutInMilliseconds;

// We store the connection_state & netty channel in a struct so that they can fetch and update in atomic.
// Moreover, we can avoid the lock protection when we want to get the netty channel for send message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,19 @@ public ThriftFrameEncoder() {

@Override
protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ReplicaSession.RequestEntry entry, boolean preferDirect) throws Exception {
return preferDirect?ctx.alloc().ioBuffer(256):ctx.alloc().heapBuffer(256);
return preferDirect ? ctx.alloc().ioBuffer(256) : ctx.alloc().heapBuffer(256);
}

@Override
protected void encode(ChannelHandlerContext ctx, ReplicaSession.RequestEntry e, ByteBuf out) throws Exception{
protected void encode(ChannelHandlerContext ctx, ReplicaSession.RequestEntry e, ByteBuf out) throws Exception {
int initIndex = out.writerIndex();

// write the Memory buffer
out.writerIndex(initIndex + ThriftHeader.HEADER_LENGTH);
TBinaryProtocol protocol = new TBinaryProtocol(new TByteBufTransport(out));
e.op.send_data(protocol, e.sequenceId);
out.setBytes(initIndex, e.op.prepare_thrift_header(out.readableBytes() - ThriftHeader.HEADER_LENGTH));
out.setBytes(initIndex, e.op.prepare_thrift_header(
out.readableBytes() - ThriftHeader.HEADER_LENGTH, (int) e.timeoutMs));
}

@Override
Expand Down

0 comments on commit 1fd1239

Please sign in to comment.