Skip to content

Commit

Permalink
Forward error to waiter before sinking
Browse files Browse the repository at this point in the history
This ensures that an active waiter will be notified of a
connection failure before we resort to sending it to the sink.

Contributes to djc#141.
  • Loading branch information
couchand committed Mar 20, 2023
1 parent 03f95c9 commit c9be82c
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 7 deletions.
7 changes: 4 additions & 3 deletions bb8/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ where
while let Some(result) = stream.next().await {
match result {
Ok(()) => {}
Err(e) => this.inner.statics.error_sink.sink(e),
Err(e) => this.inner.forward_error(e),
}
}
});
Expand Down Expand Up @@ -125,7 +125,7 @@ where
match self.inner.manager.is_valid(&mut conn).await {
Ok(()) => return Ok(conn),
Err(e) => {
self.inner.statics.error_sink.sink(e);
self.inner.forward_error(e);
conn.drop_invalid();
continue;
}
Expand All @@ -140,7 +140,8 @@ where
};

match timeout(self.inner.statics.connection_timeout, rx).await {
Ok(Ok(mut guard)) => Ok(make_pooled_conn(self, guard.extract())),
Ok(Ok(Ok(mut guard))) => Ok(make_pooled_conn(self, guard.extract())),
Ok(Ok(Err(e))) => Err(RunError::User(e)),
_ => Err(RunError::TimedOut),
}
}
Expand Down
21 changes: 17 additions & 4 deletions bb8/src/internals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ where
internals: Mutex::new(PoolInternals::default()),
}
}

pub(crate) fn forward_error(&self, mut err: M::Error) {
let mut locked = self.internals.lock();
while let Some(waiter) = locked.waiters.pop_front() {
match waiter.send(Err(err)) {
Ok(_) => return,
Err(Err(e)) => err = e,
Err(Ok(_)) => unreachable!(),
}
}
self.statics.error_sink.sink(err);
}
}

/// The pool data that must be protected by a lock.
Expand All @@ -38,7 +50,7 @@ pub(crate) struct PoolInternals<M>
where
M: ManageConnection,
{
waiters: VecDeque<oneshot::Sender<InternalsGuard<M>>>,
waiters: VecDeque<oneshot::Sender<Result<InternalsGuard<M>, M::Error>>>,
conns: VecDeque<IdleConn<M::Connection>>,
num_conns: u32,
pending_conns: u32,
Expand Down Expand Up @@ -71,11 +83,12 @@ where
let mut guard = InternalsGuard::new(conn, pool);
while let Some(waiter) = self.waiters.pop_front() {
// This connection is no longer idle, send it back out
match waiter.send(guard) {
match waiter.send(Ok(guard)) {
Ok(()) => return,
Err(g) => {
Err(Ok(g)) => {
guard = g;
}
Err(Err(_)) => unreachable!(),
}
}

Expand Down Expand Up @@ -107,7 +120,7 @@ where

pub(crate) fn push_waiter(
&mut self,
waiter: oneshot::Sender<InternalsGuard<M>>,
waiter: oneshot::Sender<Result<InternalsGuard<M>, M::Error>>,
config: &Builder<M>,
) -> ApprovalIter {
self.waiters.push_back(waiter);
Expand Down
12 changes: 12 additions & 0 deletions bb8/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,18 @@ async fn test_lazy_initialization_failure() {
assert_eq!(res.unwrap_err(), RunError::TimedOut);
}

#[tokio::test]
async fn test_lazy_initialization_failure_no_retry() {
let manager = NthConnectionFailManager::<FakeConnection>::new(0);
let pool = Pool::builder()
.connection_timeout(Duration::from_secs(1))
.retry_connection(false)
.build_unchecked(manager);

let res = pool.get().await;
assert_eq!(res.unwrap_err(), RunError::User(Error));
}

#[tokio::test]
async fn test_get_timeout() {
let pool = Pool::builder()
Expand Down

0 comments on commit c9be82c

Please sign in to comment.