Skip to content

Commit 28dd842

Browse files
committed
Fix bug in MercurySender
1 parent 6881875 commit 28dd842

1 file changed

Lines changed: 10 additions & 3 deletions

File tree

core/src/mercury/sender.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,21 @@ pub struct MercurySender {
66
mercury: MercuryManager,
77
uri: String,
88
pending: VecDeque<MercuryFuture<MercuryResponse>>,
9+
buffered_future: Option<MercuryFuture<MercuryResponse>>,
910
}
1011

1112
impl MercurySender {
12-
// TODO: pub(super) when stable
1313
pub(crate) fn new(mercury: MercuryManager, uri: String) -> MercurySender {
1414
MercurySender {
1515
mercury,
1616
uri,
1717
pending: VecDeque::new(),
18+
buffered_future: None,
1819
}
1920
}
2021

2122
pub fn is_flushed(&self) -> bool {
22-
self.pending.is_empty()
23+
self.buffered_future.is_none() && self.pending.is_empty()
2324
}
2425

2526
pub fn send(&mut self, item: Vec<u8>) {
@@ -28,8 +29,13 @@ impl MercurySender {
2829
}
2930

3031
pub async fn flush(&mut self) -> Result<(), MercuryError> {
31-
for fut in self.pending.drain(..) {
32+
if self.buffered_future.is_none() {
33+
self.buffered_future = self.pending.pop_front();
34+
}
35+
36+
while let Some(fut) = self.buffered_future.as_mut() {
3237
fut.await?;
38+
self.buffered_future = self.pending.pop_front();
3339
}
3440
Ok(())
3541
}
@@ -41,6 +47,7 @@ impl Clone for MercurySender {
4147
mercury: self.mercury.clone(),
4248
uri: self.uri.clone(),
4349
pending: VecDeque::new(),
50+
buffered_future: None,
4451
}
4552
}
4653
}

0 commit comments

Comments
 (0)