Skip to content

Commit

Permalink
Merge pull request #471 from hatoo/refactor-client3
Browse files Browse the repository at this point in the history
refactor client
  • Loading branch information
hatoo authored Apr 20, 2024
2 parents eb483f0 + 4a73e00 commit 68ff8db
Showing 1 changed file with 110 additions and 64 deletions.
174 changes: 110 additions & 64 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,52 @@ async fn setup_http2(client: &Client) -> Result<(ConnectionTime, ClientStateHttp
Ok((connection_time, client_state))
}

async fn work_http2_once(
client: &Client,
client_state: &mut ClientStateHttp2,
report_tx: &flume::Sender<Result<RequestResult, ClientError>>,
connection_time: ConnectionTime,
start_latency_correction: Option<Instant>,
) -> (bool, bool) {
let mut res = client.work_http2(client_state).await;
let is_cancel = is_cancel_error(&res);
let is_reconnect = is_hyper_error(&res);
set_connection_time(&mut res, connection_time);
if let Some(start_latency_correction) = start_latency_correction {
set_start_latency_correction(&mut res, start_latency_correction);
}
report_tx.send_async(res).await.unwrap();
(is_cancel, is_reconnect)
}

async fn work_http2_or_acquire(
client: &Client,
client_state: &mut ClientStateHttp2,
report_tx: &flume::Sender<Result<RequestResult, ClientError>>,
connection_time: ConnectionTime,
start_latency_correction: Option<Instant>,
semaphore: &tokio::sync::Semaphore,
) -> (bool, bool) {
tokio::select! {
mut res =
client.work_http2(client_state) => {
let is_cancel = is_cancel_error(&res);
let is_reconnect = is_hyper_error(&res);
set_connection_time(&mut res, connection_time);
if let Some(start_latency_correction) = start_latency_correction {
set_start_latency_correction(&mut res, start_latency_correction);
}
report_tx.send_async(res).await.unwrap();
(is_cancel ,is_reconnect )

}
_ = semaphore.acquire() => {
report_tx.send_async(Err(ClientError::Deadline)).await.unwrap();
(true, false)
}
}
}

fn set_connection_time<E>(res: &mut Result<RequestResult, E>, connection_time: ConnectionTime) {
if let Ok(res) = res {
res.connection_time = Some(connection_time);
Expand Down Expand Up @@ -828,12 +874,15 @@ pub async fn work(
tokio::spawn(async move {
while counter.fetch_add(1, Ordering::Relaxed) < n_tasks
{
let mut res =
client.work_http2(&mut client_state).await;
let is_cancel = is_cancel_error(&res);
let is_reconnect = is_hyper_error(&res);
set_connection_time(&mut res, connection_time);
report_tx.send_async(res).await.unwrap();
let (is_cancel, is_reconnect) = work_http2_once(
&client,
&mut client_state,
&report_tx,
connection_time,
None,
)
.await;

if is_cancel || is_reconnect {
return is_cancel;
}
Expand Down Expand Up @@ -971,12 +1020,15 @@ pub async fn work_with_qps(
let mut client_state = client_state.clone();
tokio::spawn(async move {
while let Ok(()) = rx.recv_async().await {
let mut res =
client.work_http2(&mut client_state).await;
let is_cancel = is_cancel_error(&res);
let is_reconnect = is_hyper_error(&res);
set_connection_time(&mut res, connection_time);
report_tx.send_async(res).await.unwrap();
let (is_cancel, is_reconnect) = work_http2_once(
&client,
&mut client_state,
&report_tx,
connection_time,
None,
)
.await;

if is_cancel || is_reconnect {
return is_cancel;
}
Expand Down Expand Up @@ -1114,13 +1166,15 @@ pub async fn work_with_qps_latency_correction(
let mut client_state = client_state.clone();
tokio::spawn(async move {
while let Ok(start) = rx.recv_async().await {
let mut res =
client.work_http2(&mut client_state).await;
let is_cancel = is_cancel_error(&res);
let is_reconnect = is_hyper_error(&res);
set_connection_time(&mut res, connection_time);
set_start_latency_correction(&mut res, start);
report_tx.send_async(res).await.unwrap();
let (is_cancel, is_reconnect) = work_http2_once(
&client,
&mut client_state,
&report_tx,
connection_time,
Some(start),
)
.await;

if is_cancel || is_reconnect {
return is_cancel;
}
Expand Down Expand Up @@ -1230,21 +1284,19 @@ pub async fn work_until(
tokio::spawn(async move {
// This is where HTTP2 loops to make all the requests for a given client and worker
loop {
tokio::select! {
mut res =
client.work_http2(&mut client_state) => {
let is_cancel = is_cancel_error(&res);
let is_reconnect = is_hyper_error(&res);
set_connection_time(&mut res, connection_time);
report_tx.send_async(res).await.unwrap();
if is_cancel || is_reconnect {
break is_cancel;
}
}
_ = s.acquire() => {
report_tx.send_async(Err(ClientError::Deadline)).await.unwrap();
break true;
}
let (is_cancel, is_reconnect) =
work_http2_or_acquire(
&client,
&mut client_state,
&report_tx,
connection_time,
None,
&s,
)
.await;

if is_cancel || is_reconnect {
break is_cancel;
}
}
})
Expand Down Expand Up @@ -1397,20 +1449,18 @@ pub async fn work_until_with_qps(
let s = s.clone();
tokio::spawn(async move {
while let Ok(()) = rx.recv_async().await {
tokio::select! {
mut res =
client.work_http2(&mut client_state) => {
let is_cancel = is_cancel_error(&res);
let is_reconnect = is_hyper_error(&res);
set_connection_time(&mut res, connection_time);
report_tx.send_async(res).await.unwrap();
if is_cancel || is_reconnect {
return is_cancel;
}
}
_ = s.acquire() => {
return true;
}
let (is_cancel, is_reconnect) =
work_http2_or_acquire(
&client,
&mut client_state,
&report_tx,
connection_time,
None,
&s,
)
.await;
if is_cancel || is_reconnect {
return is_cancel;
}
}
true
Expand Down Expand Up @@ -1574,22 +1624,18 @@ pub async fn work_until_with_qps_latency_correction(
let s = s.clone();
tokio::spawn(async move {
while let Ok(start) = rx.recv_async().await {
tokio::select! {
mut res =
client.work_http2(&mut client_state) => {
set_start_latency_correction(&mut res, start);
set_connection_time(&mut res, connection_time);
let is_cancel = is_cancel_error(&res);
let is_reconnect = is_hyper_error(&res);
report_tx.send_async(res).await.unwrap();
if is_cancel || is_reconnect {
return is_cancel;
}
}
_ = s.acquire() => {
return true;
}

let (is_cancel, is_reconnect) =
work_http2_or_acquire(
&client,
&mut client_state,
&report_tx,
connection_time,
Some(start),
&s,
)
.await;
if is_cancel || is_reconnect {
return is_cancel;
}
}
true
Expand Down

0 comments on commit 68ff8db

Please sign in to comment.