Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions watermelon/src/client/jetstream/commands/consumer_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,28 @@ use tokio::time::{Sleep, sleep};
use watermelon_proto::{ServerMessage, StatusCode, error::ServerError};

use crate::{
client::{Consumer, JetstreamClient, JetstreamError},
client::{Client, Consumer, JetstreamClient, JetstreamError},
subscription::Subscription,
};

use super::message::JetstreamMessage;

pin_project! {
/// A consumer batch request
///
/// Obtained from [`JetstreamClient::consumer_batch`].
///
/// Yields [`JetstreamMessage`] items that support acknowledgment
/// via [`ack`](JetstreamMessage::ack), [`nak`](JetstreamMessage::nak),
/// [`term`](JetstreamMessage::term), and [`progress`](JetstreamMessage::progress).
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct ConsumerBatch {
subscription: Subscription,
#[pin]
timeout: Sleep,
pending_msgs: usize,
client: Client,
}
}

Expand Down Expand Up @@ -85,13 +92,14 @@ impl ConsumerBatch {
subscription,
timeout,
pending_msgs: max_msgs,
client: client.client,
})
}
}
}

impl Stream for ConsumerBatch {
type Item = Result<ServerMessage, ConsumerBatchError>;
type Item = Result<JetstreamMessage, ConsumerBatchError>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
Expand All @@ -112,7 +120,7 @@ impl Stream for ConsumerBatch {
None | Some(StatusCode::OK) => {
*this.pending_msgs -= 1;

Poll::Ready(Some(Ok(msg)))
Poll::Ready(Some(Ok(JetstreamMessage::new(msg, this.client.clone()))))
}
Some(StatusCode::IDLE_HEARTBEAT) => {
cx.waker().wake_by_ref();
Expand Down
5 changes: 2 additions & 3 deletions watermelon/src/client/jetstream/commands/consumer_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@ use std::{

use futures_core::{FusedStream, Stream};
use pin_project_lite::pin_project;
use watermelon_proto::ServerMessage;

use crate::{
client::{Consumer, JetstreamClient, JetstreamError},
util::BoxFuture,
};

use super::{ConsumerBatch, consumer_batch::ConsumerBatchError};
use super::{ConsumerBatch, consumer_batch::ConsumerBatchError, message::JetstreamMessage};

pin_project! {
/// A consumer stream of batch requests
Expand Down Expand Up @@ -78,7 +77,7 @@ impl ConsumerStream {
}

impl Stream for ConsumerStream {
type Item = Result<ServerMessage, ConsumerStreamError>;
type Item = Result<JetstreamMessage, ConsumerStreamError>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
Expand Down
255 changes: 255 additions & 0 deletions watermelon/src/client/jetstream/commands/message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
use std::{
fmt::{Debug, Display},
time::Duration,
};

use bytes::Bytes;
use watermelon_proto::{ServerMessage, Subject};

use crate::client::{Client, ClientClosedError};

/// An error encountered while acknowledging a `JetStream` message.
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum JetstreamMessageAckError {
/// The client has been closed.
#[error("client closed")]
ClientClosed(#[source] ClientClosedError),
/// The message has no reply subject to acknowledge on.
#[error("message has no reply subject")]
NoReplySubject,
/// The message was already acknowledged (ack/nak/term).
/// Progress (`+WPI`) does not trigger this.
#[error("message already acknowledged")]
AlreadyAcknowledged,
}

/// A `JetStream` message that supports acknowledgment operations.
///
/// Wraps a [`ServerMessage`] and provides methods for ACK, NAK, TERM,
/// and progress indicator. Obtained from [`ConsumerBatch`](super::ConsumerBatch)
/// and [`ConsumerStream`](super::ConsumerStream).
///
/// The [`message`](JetstreamMessage::message) field is public, matching the
/// [`ServerMessage`] pattern — access `subject`, `reply_subject`, `headers`,
/// and `payload` directly on `msg.message.base`.
pub struct JetstreamMessage {
/// The underlying server message. Access `.message.base.subject`, `.message.base.headers`,
/// `.message.base.payload`, `.message.base.reply_subject`, `.message.base.status_code`,
/// `.message.base.subscription_id`.
pub message: ServerMessage,
/// Client handle, dropped after a terminal acknowledgment.
client: Option<Client>,
/// Whether the message has been acknowledged (ack, nak, or term).
/// Progress (`+WPI`) does NOT set this flag, as it can be sent multiple times.
acknowledged: bool,
}

impl JetstreamMessage {
/// Construct a new [`JetstreamMessage`] from a raw server message.
#[must_use]
pub(crate) fn new(message: ServerMessage, client: Client) -> Self {
Self {
message,
client: Some(client),
acknowledged: false,
}
}

/// Acknowledge the message has been processed.
///
/// After calling this, the server will not redeliver the message.
///
/// # Errors
///
/// Returns an error if the message has no reply subject, was already
/// acknowledged, or the client is closed.
pub async fn ack(&mut self) -> Result<(), JetstreamMessageAckError> {
let reply_subject = self.take_reply_subject()?;
self.ensure_not_acked()?;

let client = self.take_client()?;

client
.publish(reply_subject)
.payload(Bytes::from_static(b"+ACK"))
.await
.map_err(JetstreamMessageAckError::ClientClosed)?;
self.mark_acknowledged();
Ok(())
}

/// Negative acknowledge — request redelivery.
///
/// The message will be redelivered after the consumer's `ack_wait` period.
///
/// # Errors
///
/// Returns an error if the message has no reply subject, was already
/// acknowledged, or the client is closed.
pub async fn nak(&mut self) -> Result<(), JetstreamMessageAckError> {
let reply_subject = self.take_reply_subject()?;
self.ensure_not_acked()?;

let client = self.take_client()?;

client
.publish(reply_subject)
.payload(Bytes::from_static(b"-NAK"))
.await
.map_err(JetstreamMessageAckError::ClientClosed)?;
self.mark_acknowledged();
Ok(())
}

/// Negative acknowledge with a delay before redelivery.
///
/// The message will be redelivered after the specified delay.
///
/// # Errors
///
/// Returns an error if the message has no reply subject, was already
/// acknowledged, or the client is closed.
pub async fn nak_with_delay(
&mut self,
delay: Duration,
) -> Result<(), JetstreamMessageAckError> {
let reply_subject = self.take_reply_subject()?;
self.ensure_not_acked()?;

let payload = format!("-NAK {{\"delay\": {}}}", delay.as_nanos()).into_bytes();

let client = self.take_client()?;

client
.publish(reply_subject)
.payload(Bytes::from(payload))
.await
.map_err(JetstreamMessageAckError::ClientClosed)?;
self.mark_acknowledged();
Ok(())
}

/// Send a progress indicator — extend the redelivery deadline.
///
/// Resets the redelivery timer without acknowledging or rejecting the message.
/// Can be called multiple times, unlike [`ack`](Self::ack), [`nak`](Self::nak),
/// and [`term`](Self::term).
///
/// # Errors
///
/// Returns an error if the message has no reply subject or the client is closed.
pub async fn progress(&self) -> Result<(), JetstreamMessageAckError> {
let reply_subject = self
.message
.base
.reply_subject
.as_ref()
.ok_or(JetstreamMessageAckError::NoReplySubject)?
.clone();

let client = self
.client
.as_ref()
.ok_or(JetstreamMessageAckError::NoReplySubject)?;

client
.publish(reply_subject)
.payload(Bytes::from_static(b"+WPI"))
.await
.map_err(JetstreamMessageAckError::ClientClosed)?;
Ok(())
}

/// Terminate the message — stop all redelivery attempts.
///
/// The message will never be redelivered, regardless of the `max_deliver` setting.
///
/// # Errors
///
/// Returns an error if the message has no reply subject, was already
/// acknowledged, or the client is closed.
pub async fn term(&mut self) -> Result<(), JetstreamMessageAckError> {
let reply_subject = self.take_reply_subject()?;
self.ensure_not_acked()?;

let client = self.take_client()?;

client
.publish(reply_subject)
.payload(Bytes::from_static(b"+TERM"))
.await
.map_err(JetstreamMessageAckError::ClientClosed)?;
self.mark_acknowledged();
Ok(())
}

/// Terminate the message with a reason.
///
/// The reason will be included in the `JetStream` advisory event sent by the server.
///
/// # Errors
///
/// Returns an error if the message has no reply subject, was already
/// acknowledged, or the client is closed.
pub async fn term_with_reason(
&mut self,
reason: impl Display,
) -> Result<(), JetstreamMessageAckError> {
let reply_subject = self.take_reply_subject()?;
self.ensure_not_acked()?;

let payload = format!("+TERM {reason}").into_bytes();

let client = self.take_client()?;

client
.publish(reply_subject)
.payload(Bytes::from(payload))
.await
.map_err(JetstreamMessageAckError::ClientClosed)?;
self.mark_acknowledged();
Ok(())
}

/// Check if the message has been acknowledged (ack/nak/term).
/// Progress (`+WPI`) does not set this flag.
#[must_use]
pub fn is_acked(&self) -> bool {
self.acknowledged
}

fn take_reply_subject(&mut self) -> Result<Subject, JetstreamMessageAckError> {
self.message
.base
.reply_subject
.take()
.ok_or(JetstreamMessageAckError::NoReplySubject)
}

fn take_client(&mut self) -> Result<Client, JetstreamMessageAckError> {
self.client
.take()
.ok_or(JetstreamMessageAckError::NoReplySubject)
}

fn ensure_not_acked(&self) -> Result<(), JetstreamMessageAckError> {
if self.acknowledged {
return Err(JetstreamMessageAckError::AlreadyAcknowledged);
}
Ok(())
}

fn mark_acknowledged(&mut self) {
self.acknowledged = true;
}
}

impl Debug for JetstreamMessage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("JetstreamMessage")
.field("message", &self.message)
.field("acknowledged", &self.acknowledged)
.finish_non_exhaustive()
}
}
8 changes: 8 additions & 0 deletions watermelon/src/client/jetstream/commands/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
pub use self::consumer_batch::ConsumerBatch;
pub use self::consumer_list::Consumers;
pub use self::consumer_stream::{ConsumerStream, ConsumerStreamError};
pub use self::message::{JetstreamMessage, JetstreamMessageAckError};
pub use self::publish::{
ClientJetstreamPublish, DoClientJetstreamPublish, DoOwnedClientJetstreamPublish,
JetstreamPublish, JetstreamPublishBuilder, JetstreamPublishError, OwnedClientJetstreamPublish,
PubAck,
};
pub use self::stream_list::Streams;

mod consumer_batch;
mod consumer_list;
mod consumer_stream;
mod message;
mod publish;
mod stream_list;
Loading