From 94848992ea1b113d1ddb930b1eea6184dd35b70e Mon Sep 17 00:00:00 2001 From: Paolo Barbolini Date: Wed, 27 May 2026 15:26:47 +0000 Subject: [PATCH 1/2] feat!(jetstream): add JetstreamMessage with ACK API Breaking: ConsumerBatch and ConsumerStream now yield JetstreamMessage instead of raw ServerMessage. Add JetstreamMessage wrapper around ServerMessage that provides acknowledgment operations: ack, nak, nak_with_delay, term, term_with_reason, progress, and is_acked. The client handle is an Option that gets dropped after a terminal acknowledgment, releasing the Arc refcount immediately. --- .../jetstream/commands/consumer_batch.rs | 14 +- .../jetstream/commands/consumer_stream.rs | 5 +- .../src/client/jetstream/commands/message.rs | 255 ++++++++++++++++++ .../src/client/jetstream/commands/mod.rs | 2 + watermelon/src/client/jetstream/mod.rs | 5 +- watermelon/src/client/mod.rs | 4 +- watermelon/src/lib.rs | 5 +- 7 files changed, 279 insertions(+), 11 deletions(-) create mode 100644 watermelon/src/client/jetstream/commands/message.rs diff --git a/watermelon/src/client/jetstream/commands/consumer_batch.rs b/watermelon/src/client/jetstream/commands/consumer_batch.rs index 1a6e2fd..ef5c64c 100644 --- a/watermelon/src/client/jetstream/commands/consumer_batch.rs +++ b/watermelon/src/client/jetstream/commands/consumer_batch.rs @@ -11,14 +11,20 @@ use tokio::time::{Sleep, sleep}; use watermelon_proto::{ServerMessage, StatusCode, error::ServerError}; use crate::{ - client::{Consumer, JetstreamClient, JetstreamError}, + client::{Client, Consumer, JetstreamClient, JetstreamError}, subscription::Subscription, }; +use super::message::JetstreamMessage; + pin_project! { /// A consumer batch request /// /// Obtained from [`JetstreamClient::consumer_batch`]. + /// + /// Yields [`JetstreamMessage`] items that support acknowledgment + /// via [`ack`](JetstreamMessage::ack), [`nak`](JetstreamMessage::nak), + /// [`term`](JetstreamMessage::term), and [`progress`](JetstreamMessage::progress). #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct ConsumerBatch { @@ -26,6 +32,7 @@ pin_project! { #[pin] timeout: Sleep, pending_msgs: usize, + client: Client, } } @@ -85,13 +92,14 @@ impl ConsumerBatch { subscription, timeout, pending_msgs: max_msgs, + client: client.client, }) } } } impl Stream for ConsumerBatch { - type Item = Result; + type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); @@ -112,7 +120,7 @@ impl Stream for ConsumerBatch { None | Some(StatusCode::OK) => { *this.pending_msgs -= 1; - Poll::Ready(Some(Ok(msg))) + Poll::Ready(Some(Ok(JetstreamMessage::new(msg, this.client.clone())))) } Some(StatusCode::IDLE_HEARTBEAT) => { cx.waker().wake_by_ref(); diff --git a/watermelon/src/client/jetstream/commands/consumer_stream.rs b/watermelon/src/client/jetstream/commands/consumer_stream.rs index 1ad3d19..9464c40 100644 --- a/watermelon/src/client/jetstream/commands/consumer_stream.rs +++ b/watermelon/src/client/jetstream/commands/consumer_stream.rs @@ -7,14 +7,13 @@ use std::{ use futures_core::{FusedStream, Stream}; use pin_project_lite::pin_project; -use watermelon_proto::ServerMessage; use crate::{ client::{Consumer, JetstreamClient, JetstreamError}, util::BoxFuture, }; -use super::{ConsumerBatch, consumer_batch::ConsumerBatchError}; +use super::{ConsumerBatch, consumer_batch::ConsumerBatchError, message::JetstreamMessage}; pin_project! { /// A consumer stream of batch requests @@ -78,7 +77,7 @@ impl ConsumerStream { } impl Stream for ConsumerStream { - type Item = Result; + type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); diff --git a/watermelon/src/client/jetstream/commands/message.rs b/watermelon/src/client/jetstream/commands/message.rs new file mode 100644 index 0000000..334cb1f --- /dev/null +++ b/watermelon/src/client/jetstream/commands/message.rs @@ -0,0 +1,255 @@ +use std::{ + fmt::{Debug, Display}, + time::Duration, +}; + +use bytes::Bytes; +use watermelon_proto::{ServerMessage, Subject}; + +use crate::client::{Client, ClientClosedError}; + +/// An error encountered while acknowledging a `JetStream` message. +#[derive(Debug, thiserror::Error)] +#[non_exhaustive] +pub enum JetstreamMessageAckError { + /// The client has been closed. + #[error("client closed")] + ClientClosed(#[source] ClientClosedError), + /// The message has no reply subject to acknowledge on. + #[error("message has no reply subject")] + NoReplySubject, + /// The message was already acknowledged (ack/nak/term). + /// Progress (`+WPI`) does not trigger this. + #[error("message already acknowledged")] + AlreadyAcknowledged, +} + +/// A `JetStream` message that supports acknowledgment operations. +/// +/// Wraps a [`ServerMessage`] and provides methods for ACK, NAK, TERM, +/// and progress indicator. Obtained from [`ConsumerBatch`](super::ConsumerBatch) +/// and [`ConsumerStream`](super::ConsumerStream). +/// +/// The [`message`](JetstreamMessage::message) field is public, matching the +/// [`ServerMessage`] pattern — access `subject`, `reply_subject`, `headers`, +/// and `payload` directly on `msg.message.base`. +pub struct JetstreamMessage { + /// The underlying server message. Access `.message.base.subject`, `.message.base.headers`, + /// `.message.base.payload`, `.message.base.reply_subject`, `.message.base.status_code`, + /// `.message.base.subscription_id`. + pub message: ServerMessage, + /// Client handle, dropped after a terminal acknowledgment. + client: Option, + /// Whether the message has been acknowledged (ack, nak, or term). + /// Progress (`+WPI`) does NOT set this flag, as it can be sent multiple times. + acknowledged: bool, +} + +impl JetstreamMessage { + /// Construct a new [`JetstreamMessage`] from a raw server message. + #[must_use] + pub(crate) fn new(message: ServerMessage, client: Client) -> Self { + Self { + message, + client: Some(client), + acknowledged: false, + } + } + + /// Acknowledge the message has been processed. + /// + /// After calling this, the server will not redeliver the message. + /// + /// # Errors + /// + /// Returns an error if the message has no reply subject, was already + /// acknowledged, or the client is closed. + pub async fn ack(&mut self) -> Result<(), JetstreamMessageAckError> { + let reply_subject = self.take_reply_subject()?; + self.ensure_not_acked()?; + + let client = self.take_client()?; + + client + .publish(reply_subject) + .payload(Bytes::from_static(b"+ACK")) + .await + .map_err(JetstreamMessageAckError::ClientClosed)?; + self.mark_acknowledged(); + Ok(()) + } + + /// Negative acknowledge — request redelivery. + /// + /// The message will be redelivered after the consumer's `ack_wait` period. + /// + /// # Errors + /// + /// Returns an error if the message has no reply subject, was already + /// acknowledged, or the client is closed. + pub async fn nak(&mut self) -> Result<(), JetstreamMessageAckError> { + let reply_subject = self.take_reply_subject()?; + self.ensure_not_acked()?; + + let client = self.take_client()?; + + client + .publish(reply_subject) + .payload(Bytes::from_static(b"-NAK")) + .await + .map_err(JetstreamMessageAckError::ClientClosed)?; + self.mark_acknowledged(); + Ok(()) + } + + /// Negative acknowledge with a delay before redelivery. + /// + /// The message will be redelivered after the specified delay. + /// + /// # Errors + /// + /// Returns an error if the message has no reply subject, was already + /// acknowledged, or the client is closed. + pub async fn nak_with_delay( + &mut self, + delay: Duration, + ) -> Result<(), JetstreamMessageAckError> { + let reply_subject = self.take_reply_subject()?; + self.ensure_not_acked()?; + + let payload = format!("-NAK {{\"delay\": {}}}", delay.as_nanos()).into_bytes(); + + let client = self.take_client()?; + + client + .publish(reply_subject) + .payload(Bytes::from(payload)) + .await + .map_err(JetstreamMessageAckError::ClientClosed)?; + self.mark_acknowledged(); + Ok(()) + } + + /// Send a progress indicator — extend the redelivery deadline. + /// + /// Resets the redelivery timer without acknowledging or rejecting the message. + /// Can be called multiple times, unlike [`ack`](Self::ack), [`nak`](Self::nak), + /// and [`term`](Self::term). + /// + /// # Errors + /// + /// Returns an error if the message has no reply subject or the client is closed. + pub async fn progress(&self) -> Result<(), JetstreamMessageAckError> { + let reply_subject = self + .message + .base + .reply_subject + .as_ref() + .ok_or(JetstreamMessageAckError::NoReplySubject)? + .clone(); + + let client = self + .client + .as_ref() + .ok_or(JetstreamMessageAckError::NoReplySubject)?; + + client + .publish(reply_subject) + .payload(Bytes::from_static(b"+WPI")) + .await + .map_err(JetstreamMessageAckError::ClientClosed)?; + Ok(()) + } + + /// Terminate the message — stop all redelivery attempts. + /// + /// The message will never be redelivered, regardless of the `max_deliver` setting. + /// + /// # Errors + /// + /// Returns an error if the message has no reply subject, was already + /// acknowledged, or the client is closed. + pub async fn term(&mut self) -> Result<(), JetstreamMessageAckError> { + let reply_subject = self.take_reply_subject()?; + self.ensure_not_acked()?; + + let client = self.take_client()?; + + client + .publish(reply_subject) + .payload(Bytes::from_static(b"+TERM")) + .await + .map_err(JetstreamMessageAckError::ClientClosed)?; + self.mark_acknowledged(); + Ok(()) + } + + /// Terminate the message with a reason. + /// + /// The reason will be included in the `JetStream` advisory event sent by the server. + /// + /// # Errors + /// + /// Returns an error if the message has no reply subject, was already + /// acknowledged, or the client is closed. + pub async fn term_with_reason( + &mut self, + reason: impl Display, + ) -> Result<(), JetstreamMessageAckError> { + let reply_subject = self.take_reply_subject()?; + self.ensure_not_acked()?; + + let payload = format!("+TERM {reason}").into_bytes(); + + let client = self.take_client()?; + + client + .publish(reply_subject) + .payload(Bytes::from(payload)) + .await + .map_err(JetstreamMessageAckError::ClientClosed)?; + self.mark_acknowledged(); + Ok(()) + } + + /// Check if the message has been acknowledged (ack/nak/term). + /// Progress (`+WPI`) does not set this flag. + #[must_use] + pub fn is_acked(&self) -> bool { + self.acknowledged + } + + fn take_reply_subject(&mut self) -> Result { + self.message + .base + .reply_subject + .take() + .ok_or(JetstreamMessageAckError::NoReplySubject) + } + + fn take_client(&mut self) -> Result { + self.client + .take() + .ok_or(JetstreamMessageAckError::NoReplySubject) + } + + fn ensure_not_acked(&self) -> Result<(), JetstreamMessageAckError> { + if self.acknowledged { + return Err(JetstreamMessageAckError::AlreadyAcknowledged); + } + Ok(()) + } + + fn mark_acknowledged(&mut self) { + self.acknowledged = true; + } +} + +impl Debug for JetstreamMessage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("JetstreamMessage") + .field("message", &self.message) + .field("acknowledged", &self.acknowledged) + .finish_non_exhaustive() + } +} diff --git a/watermelon/src/client/jetstream/commands/mod.rs b/watermelon/src/client/jetstream/commands/mod.rs index cecf3bc..86bfa45 100644 --- a/watermelon/src/client/jetstream/commands/mod.rs +++ b/watermelon/src/client/jetstream/commands/mod.rs @@ -1,9 +1,11 @@ pub use self::consumer_batch::ConsumerBatch; pub use self::consumer_list::Consumers; pub use self::consumer_stream::{ConsumerStream, ConsumerStreamError}; +pub use self::message::{JetstreamMessage, JetstreamMessageAckError}; pub use self::stream_list::Streams; mod consumer_batch; mod consumer_list; mod consumer_stream; +mod message; mod stream_list; diff --git a/watermelon/src/client/jetstream/mod.rs b/watermelon/src/client/jetstream/mod.rs index 98f4846..d89d987 100644 --- a/watermelon/src/client/jetstream/mod.rs +++ b/watermelon/src/client/jetstream/mod.rs @@ -8,7 +8,10 @@ use serde_json::json; use watermelon_proto::StatusCode; use watermelon_proto::{Subject, error::SubjectValidateError}; -pub use self::commands::{ConsumerBatch, ConsumerStream, ConsumerStreamError, Consumers, Streams}; +pub use self::commands::{ + ConsumerBatch, ConsumerStream, ConsumerStreamError, Consumers, JetstreamMessage, + JetstreamMessageAckError, Streams, +}; pub use self::resources::{ AckPolicy, Compression, Consumer, ConsumerConfig, ConsumerDurability, ConsumerSpecificConfig, ConsumerStorage, DeliverPolicy, DiscardPolicy, ReplayPolicy, RetentionPolicy, Storage, Stream, diff --git a/watermelon/src/client/mod.rs b/watermelon/src/client/mod.rs index 796d72b..adfc083 100644 --- a/watermelon/src/client/mod.rs +++ b/watermelon/src/client/mod.rs @@ -28,8 +28,8 @@ pub use self::jetstream::{ AckPolicy, Compression, Consumer, ConsumerBatch, ConsumerConfig, ConsumerDurability, ConsumerSpecificConfig, ConsumerStorage, ConsumerStream, ConsumerStreamError, Consumers, DeliverPolicy, DiscardPolicy, JetstreamApiError, JetstreamClient, JetstreamError, - JetstreamErrorCode, ReplayPolicy, RetentionPolicy, Storage, Stream, StreamConfig, StreamState, - Streams, + JetstreamErrorCode, JetstreamMessage, JetstreamMessageAckError, ReplayPolicy, RetentionPolicy, + Storage, Stream, StreamConfig, StreamState, Streams, }; pub use self::quick_info::QuickInfo; pub(crate) use self::quick_info::RawQuickInfo; diff --git a/watermelon/src/lib.rs b/watermelon/src/lib.rs index 0e2c046..5165541 100644 --- a/watermelon/src/lib.rs +++ b/watermelon/src/lib.rs @@ -55,8 +55,8 @@ pub mod jetstream { pub use crate::client::{ AckPolicy, Compression, Consumer, ConsumerBatch, ConsumerConfig, ConsumerDurability, ConsumerSpecificConfig, ConsumerStorage, ConsumerStream, Consumers, DeliverPolicy, - DiscardPolicy, JetstreamClient, ReplayPolicy, RetentionPolicy, Storage, Stream, - StreamConfig, StreamState, Streams, + DiscardPolicy, JetstreamClient, JetstreamMessage, JetstreamMessageAckError, ReplayPolicy, + RetentionPolicy, Storage, Stream, StreamConfig, StreamState, Streams, }; pub mod error { @@ -64,6 +64,7 @@ pub mod jetstream { pub use crate::client::{ ConsumerStreamError, JetstreamApiError, JetstreamError, JetstreamErrorCode, + JetstreamMessageAckError, }; } } From 08ae8010e050a1ddc9c6f097c55afd97c404ed86 Mon Sep 17 00:00:00 2001 From: Paolo Barbolini Date: Wed, 27 May 2026 15:29:59 +0000 Subject: [PATCH 2/2] feat!(jetstream): add JetStream publish API Add JetstreamClient::publish and ::publish_owned methods with a builder pattern for JetStream-specific publish options: stream targeting, deduplication (message_id), sequence expectations, and TTL. Returns PubAck on success with stream name and sequence number, or JetstreamPublishError for known failure cases (no stream, stream full, etc). --- .../src/client/jetstream/commands/mod.rs | 6 + .../src/client/jetstream/commands/publish.rs | 473 ++++++++++++++++++ watermelon/src/client/jetstream/mod.rs | 25 +- watermelon/src/client/mod.rs | 12 +- watermelon/src/lib.rs | 12 +- 5 files changed, 516 insertions(+), 12 deletions(-) create mode 100644 watermelon/src/client/jetstream/commands/publish.rs diff --git a/watermelon/src/client/jetstream/commands/mod.rs b/watermelon/src/client/jetstream/commands/mod.rs index 86bfa45..98b0d6a 100644 --- a/watermelon/src/client/jetstream/commands/mod.rs +++ b/watermelon/src/client/jetstream/commands/mod.rs @@ -2,10 +2,16 @@ pub use self::consumer_batch::ConsumerBatch; pub use self::consumer_list::Consumers; pub use self::consumer_stream::{ConsumerStream, ConsumerStreamError}; pub use self::message::{JetstreamMessage, JetstreamMessageAckError}; +pub use self::publish::{ + ClientJetstreamPublish, DoClientJetstreamPublish, DoOwnedClientJetstreamPublish, + JetstreamPublish, JetstreamPublishBuilder, JetstreamPublishError, OwnedClientJetstreamPublish, + PubAck, +}; pub use self::stream_list::Streams; mod consumer_batch; mod consumer_list; mod consumer_stream; mod message; +mod publish; mod stream_list; diff --git a/watermelon/src/client/jetstream/commands/publish.rs b/watermelon/src/client/jetstream/commands/publish.rs new file mode 100644 index 0000000..fe75da6 --- /dev/null +++ b/watermelon/src/client/jetstream/commands/publish.rs @@ -0,0 +1,473 @@ +use std::{ + fmt::{self, Debug}, + future::IntoFuture, +}; + +use bytes::Bytes; +use serde::{Deserialize, Serialize}; +use watermelon_proto::{ + StatusCode, Subject, + headers::{HeaderMap, HeaderName, HeaderValue}, +}; + +use crate::{ + client::{ClientClosedError, JetstreamClient, JetstreamError}, + util::BoxFuture, +}; + +/// Error returned when a `JetStream` publish does not succeed. +#[derive(Debug, thiserror::Error)] +#[non_exhaustive] +pub enum JetstreamPublishError { + /// `JetStream` is not enabled for this account. + #[error("jetstream not enabled for this account")] + JetStreamNotEnabled, + /// No stream matches the subject. + #[error("no stream matches the subject")] + NoStreamMatches, + /// The stream is full. + #[error("stream is full")] + StreamFull, + /// Messages are being discarded from the stream. + #[error("messages are being discarded")] + MessagesDiscarded, + /// Other `JetStream` publish error. + #[error("jetstream publish error: {0}")] + Other(String), +} + +/// Acknowledgment of a successful `JetStream` publish. +#[derive(Debug, Deserialize)] +pub struct PubAck { + /// The stream the message was published to. + pub stream: String, + /// The sequence number of the message in the stream. + pub sequence: u64, + /// The domain (if applicable). + #[serde(default)] + pub domain: Option, + /// The publish was a duplicate; this is the original sequence. + #[serde(default)] + pub duplicate: Option, +} + +/// A publishable `JetStream` message. +#[derive(Debug)] +pub struct JetstreamPublish { + subject: Subject, + payload: Bytes, + stream: Option, + expected_stream: Option, + expected_last_stream_sequence: Option, + expected_last_subject_sequence: Option, + expected_last_message_id: Option, + message_id: Option, + ttl: Option, +} + +/// A constructor for a publishable `JetStream` message. +/// +/// Obtained from [`JetstreamPublish::builder`]. +#[derive(Debug)] +pub struct JetstreamPublishBuilder { + publish: JetstreamPublish, +} + +/// A constructor for a `JetStream` publishable message to be sent using the given client. +/// +/// Obtained from [`JetstreamClient::publish`]. +pub struct ClientJetstreamPublish<'a> { + client: &'a JetstreamClient, + publish: JetstreamPublish, +} + +/// A `JetStream` publishable message ready to be published to the given client. +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct DoClientJetstreamPublish<'a> { + client: &'a JetstreamClient, + publish: JetstreamPublish, +} + +/// A constructor for a `JetStream` publishable message to be sent using the given owned client. +/// +/// Obtained from [`JetstreamClient::publish_owned`]. +pub struct OwnedClientJetstreamPublish { + client: JetstreamClient, + publish: JetstreamPublish, +} + +/// A `JetStream` publishable message ready to be published to the given owned client. +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct DoOwnedClientJetstreamPublish { + client: JetstreamClient, + publish: JetstreamPublish, +} + +macro_rules! jetstream_publish_builder { + ($payload_t:ty) => { + /// Set the stream to publish to. + #[must_use] + pub fn stream(mut self, stream: &str) -> Self { + self.publish_mut().stream = Some(stream.to_owned()); + self + } + + /// Expect the message to be in this stream, fail otherwise. + #[must_use] + pub fn expected_stream(mut self, stream: &str) -> Self { + self.publish_mut().expected_stream = Some(stream.to_owned()); + self + } + + /// Expect the last stream sequence to match this value. + #[must_use] + pub fn expected_last_stream_sequence(mut self, sequence: u64) -> Self { + self.publish_mut().expected_last_stream_sequence = Some(sequence); + self + } + + /// Expect the last subject sequence to match this value. + #[must_use] + pub fn expected_last_subject_sequence(mut self, sequence: u64) -> Self { + self.publish_mut().expected_last_subject_sequence = Some(sequence); + self + } + + /// Expect the last message ID to match this value. + #[must_use] + pub fn expected_last_message_id(mut self, id: &str) -> Self { + self.publish_mut().expected_last_message_id = Some(id.to_owned()); + self + } + + /// Set a message ID for deduplication. + #[must_use] + pub fn message_id(mut self, id: &str) -> Self { + self.publish_mut().message_id = Some(id.to_owned()); + self + } + + /// Set a TTL in seconds after which to discard the message. + #[must_use] + pub fn ttl(mut self, seconds: u32) -> Self { + self.publish_mut().ttl = Some(seconds); + self + } + + /// Serialize `payload` to JSON and use it as the payload. + /// + /// # Errors + /// + /// Returns an error if the serializer fails. + pub fn payload_json( + self, + payload: &T, + ) -> Result<$payload_t, serde_json::Error> { + let payload = serde_json::to_vec(payload)?; + Ok(self.payload(Bytes::from(payload))) + } + }; +} + +impl JetstreamPublish { + /// Build a new [`JetstreamPublish`]. + #[must_use] + pub fn builder(subject: Subject) -> JetstreamPublishBuilder { + JetstreamPublishBuilder::subject(subject) + } + + /// Publish this message to [`JetstreamClient`]. + pub fn client(self, client: &JetstreamClient) -> DoClientJetstreamPublish<'_> { + DoClientJetstreamPublish { + client, + publish: self, + } + } + + /// Publish this message to [`JetstreamClient`], taking ownership of it. + pub fn client_owned(self, client: JetstreamClient) -> DoOwnedClientJetstreamPublish { + DoOwnedClientJetstreamPublish { + client, + publish: self, + } + } +} + +impl JetstreamPublishBuilder { + #[must_use] + pub fn subject(subject: Subject) -> Self { + Self { + publish: JetstreamPublish { + subject, + payload: Bytes::new(), + stream: None, + expected_stream: None, + expected_last_stream_sequence: None, + expected_last_subject_sequence: None, + expected_last_message_id: None, + message_id: None, + ttl: None, + }, + } + } + + jetstream_publish_builder!(JetstreamPublish); + + #[must_use] + pub fn payload(mut self, payload: Bytes) -> JetstreamPublish { + self.publish.payload = payload; + self.publish + } + + fn publish_mut(&mut self) -> &mut JetstreamPublish { + &mut self.publish + } +} + +impl<'a> ClientJetstreamPublish<'a> { + pub(crate) fn build(client: &'a JetstreamClient, subject: Subject) -> Self { + Self { + client, + publish: JetstreamPublishBuilder::subject(subject).publish, + } + } + + jetstream_publish_builder!(DoClientJetstreamPublish<'a>); + + pub fn payload(mut self, payload: Bytes) -> DoClientJetstreamPublish<'a> { + self.publish.payload = payload; + self.publish.client(self.client) + } + + /// Convert this into [`OwnedClientJetstreamPublish`]. + #[must_use] + pub fn to_owned(self) -> OwnedClientJetstreamPublish { + OwnedClientJetstreamPublish { + client: self.client.clone(), + publish: self.publish, + } + } + + fn publish_mut(&mut self) -> &mut JetstreamPublish { + &mut self.publish + } +} + +impl OwnedClientJetstreamPublish { + pub(crate) fn build(client: JetstreamClient, subject: Subject) -> Self { + Self { + client, + publish: JetstreamPublishBuilder::subject(subject).publish, + } + } + + jetstream_publish_builder!(DoOwnedClientJetstreamPublish); + + pub fn payload(mut self, payload: Bytes) -> DoOwnedClientJetstreamPublish { + self.publish.payload = payload; + self.publish.client_owned(self.client) + } + + fn publish_mut(&mut self) -> &mut JetstreamPublish { + &mut self.publish + } +} + +impl DoClientJetstreamPublish<'_> { + /// Publish this message and await the [`PubAck`]. + /// + /// # Errors + /// + /// Returns an error if the client is closed or the server returns an error. + pub async fn publish(self) -> Result { + do_publish(self.client, self.publish).await + } +} + +impl<'a> IntoFuture for DoClientJetstreamPublish<'a> { + type Output = Result; + type IntoFuture = BoxFuture<'a, Self::Output>; + + fn into_future(self) -> Self::IntoFuture { + Box::pin(async move { do_publish(self.client, self.publish).await }) + } +} + +impl DoOwnedClientJetstreamPublish { + /// Publish this message and await the [`PubAck`]. + /// + /// # Errors + /// + /// Returns an error if the client is closed or the server returns an error. + pub async fn publish(self) -> Result { + do_publish(&self.client, self.publish).await + } +} + +impl IntoFuture for DoOwnedClientJetstreamPublish { + type Output = Result; + type IntoFuture = BoxFuture<'static, Self::Output>; + + fn into_future(self) -> Self::IntoFuture { + Box::pin(async move { do_publish(&self.client, self.publish).await }) + } +} + +/// Internal helper used by both the builder types and `JetstreamClient::publish`. +pub(crate) async fn do_publish( + client: &JetstreamClient, + jetstream_publish: JetstreamPublish, +) -> Result { + let JetstreamPublish { + subject, + payload, + stream, + expected_stream, + expected_last_stream_sequence, + expected_last_subject_sequence, + expected_last_message_id, + message_id, + ttl, + } = jetstream_publish; + + let headers = build_headers( + stream.as_deref(), + expected_stream.as_deref(), + expected_last_stream_sequence, + expected_last_subject_sequence, + expected_last_message_id.as_deref(), + message_id.as_deref(), + ttl, + ); + + // Use the core client's request API — it handles reply subjects, + // multiplexed subscriptions, and timeouts for us. + let response_fut = client + .client() + .request(subject) + .headers(headers) + .payload(payload) + .await + .map_err(JetstreamError::ClientClosed)?; + + let response = response_fut + .await + .map_err(|_| JetstreamError::ClientClosed(ClientClosedError))?; + + // Check for status codes + if let Some(status) = response.status_code { + if status == StatusCode::NO_RESPONDERS { + return Err(JetstreamError::PublishStatus( + crate::client::jetstream::JetstreamPublishError::JetStreamNotEnabled, + )); + } + let status_u16 = u16::from(status); + return Err(match status_u16 { + 503 => JetstreamError::PublishStatus( + crate::client::jetstream::JetstreamPublishError::JetStreamNotEnabled, + ), + 409 => JetstreamError::PublishStatus( + crate::client::jetstream::JetstreamPublishError::NoStreamMatches, + ), + _ => { + let detail = String::from_utf8_lossy(&response.base.payload).to_string(); + JetstreamError::PublishStatus( + crate::client::jetstream::JetstreamPublishError::Other(detail), + ) + } + }); + } + + let pub_ack = + serde_json::from_slice::(&response.base.payload).map_err(JetstreamError::Json)?; + Ok(pub_ack) +} + +pub(crate) fn build_headers( + stream: Option<&str>, + expected_stream: Option<&str>, + expected_last_stream_sequence: Option, + expected_last_subject_sequence: Option, + expected_last_message_id: Option<&str>, + message_id: Option<&str>, + ttl: Option, +) -> HeaderMap { + let mut headers = HeaderMap::new(); + + if let Some(s) = stream { + headers.insert( + HeaderName::from_static("Nats-Stream"), + HeaderValue::from_dangerous_value(s.into()), + ); + } + if let Some(s) = expected_stream { + headers.insert( + HeaderName::from_static("Nats-Expected-Stream"), + HeaderValue::from_dangerous_value(s.into()), + ); + } + if let Some(seq) = expected_last_stream_sequence { + headers.insert( + HeaderName::from_static("Nats-Expected-Last-Sequence"), + HeaderValue::from_dangerous_value(seq.to_string().into()), + ); + } + if let Some(seq) = expected_last_subject_sequence { + headers.insert( + HeaderName::from_static("Nats-Expected-Last-Subject-Sequence"), + HeaderValue::from_dangerous_value(seq.to_string().into()), + ); + } + if let Some(id) = expected_last_message_id { + headers.insert( + HeaderName::from_static("Nats-Expected-Last-Message-Id"), + HeaderValue::from_dangerous_value(id.into()), + ); + } + if let Some(id) = message_id { + headers.insert( + HeaderName::from_static("Nats-Message-Id"), + HeaderValue::from_dangerous_value(id.into()), + ); + } + if let Some(t) = ttl { + headers.insert( + HeaderName::from_static("Nats-TTL"), + HeaderValue::from_dangerous_value(t.to_string().into()), + ); + } + + headers +} + +impl Debug for ClientJetstreamPublish<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ClientJetstreamPublish") + .field("publish", &self.publish) + .finish_non_exhaustive() + } +} + +impl Debug for DoClientJetstreamPublish<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("DoClientJetstreamPublish") + .field("publish", &self.publish) + .finish_non_exhaustive() + } +} + +impl Debug for OwnedClientJetstreamPublish { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("OwnedClientJetstreamPublish") + .field("publish", &self.publish) + .finish_non_exhaustive() + } +} + +impl Debug for DoOwnedClientJetstreamPublish { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("DoOwnedClientJetstreamPublish") + .field("publish", &self.publish) + .finish_non_exhaustive() + } +} diff --git a/watermelon/src/client/jetstream/mod.rs b/watermelon/src/client/jetstream/mod.rs index d89d987..90f12cb 100644 --- a/watermelon/src/client/jetstream/mod.rs +++ b/watermelon/src/client/jetstream/mod.rs @@ -9,8 +9,10 @@ use watermelon_proto::StatusCode; use watermelon_proto::{Subject, error::SubjectValidateError}; pub use self::commands::{ - ConsumerBatch, ConsumerStream, ConsumerStreamError, Consumers, JetstreamMessage, - JetstreamMessageAckError, Streams, + ClientJetstreamPublish, ConsumerBatch, ConsumerStream, ConsumerStreamError, Consumers, + DoClientJetstreamPublish, DoOwnedClientJetstreamPublish, JetstreamMessage, + JetstreamMessageAckError, JetstreamPublish, JetstreamPublishBuilder, JetstreamPublishError, + OwnedClientJetstreamPublish, PubAck, Streams, }; pub use self::resources::{ AckPolicy, Compression, Consumer, ConsumerConfig, ConsumerDurability, ConsumerSpecificConfig, @@ -67,6 +69,8 @@ pub enum JetstreamError { Json(#[source] serde_json::Error), #[error("bad response code")] Api(#[source] JetstreamApiError), + #[error("publish error")] + PublishStatus(#[source] JetstreamPublishError), } impl JetstreamClient { @@ -277,6 +281,23 @@ impl JetstreamClient { .response_timeout(self.request_timeout) } + /// Start building a `JetStream` publish operation. + /// + /// Returns a builder that allows setting JetStream-specific headers + /// like stream targeting, deduplication, and sequence expectations. + #[must_use] + pub fn publish(&self, subject: Subject) -> ClientJetstreamPublish<'_> { + ClientJetstreamPublish::build(self, subject) + } + + /// Start building a `JetStream` publish operation, taking ownership of the client. + /// + /// When possible consider using [`JetstreamClient::publish`] instead. + #[must_use] + pub fn publish_owned(self, subject: Subject) -> OwnedClientJetstreamPublish { + OwnedClientJetstreamPublish::build(self, subject) + } + /// Get a reference to the inner NATS Core client #[must_use] pub fn client(&self) -> &Client { diff --git a/watermelon/src/client/mod.rs b/watermelon/src/client/mod.rs index adfc083..a1ef29f 100644 --- a/watermelon/src/client/mod.rs +++ b/watermelon/src/client/mod.rs @@ -25,11 +25,13 @@ pub use self::commands::{ RequestBuilder, ResponseError, ResponseFut, }; pub use self::jetstream::{ - AckPolicy, Compression, Consumer, ConsumerBatch, ConsumerConfig, ConsumerDurability, - ConsumerSpecificConfig, ConsumerStorage, ConsumerStream, ConsumerStreamError, Consumers, - DeliverPolicy, DiscardPolicy, JetstreamApiError, JetstreamClient, JetstreamError, - JetstreamErrorCode, JetstreamMessage, JetstreamMessageAckError, ReplayPolicy, RetentionPolicy, - Storage, Stream, StreamConfig, StreamState, Streams, + AckPolicy, ClientJetstreamPublish, Compression, Consumer, ConsumerBatch, ConsumerConfig, + ConsumerDurability, ConsumerSpecificConfig, ConsumerStorage, ConsumerStream, + ConsumerStreamError, Consumers, DeliverPolicy, DiscardPolicy, DoClientJetstreamPublish, + DoOwnedClientJetstreamPublish, JetstreamApiError, JetstreamClient, JetstreamError, + JetstreamErrorCode, JetstreamMessage, JetstreamMessageAckError, JetstreamPublish, + JetstreamPublishBuilder, JetstreamPublishError, OwnedClientJetstreamPublish, PubAck, + ReplayPolicy, RetentionPolicy, Storage, Stream, StreamConfig, StreamState, Streams, }; pub use self::quick_info::QuickInfo; pub(crate) use self::quick_info::RawQuickInfo; diff --git a/watermelon/src/lib.rs b/watermelon/src/lib.rs index 5165541..4aa3549 100644 --- a/watermelon/src/lib.rs +++ b/watermelon/src/lib.rs @@ -53,10 +53,12 @@ pub mod jetstream { //! Relies on NATS Core to communicate with the NATS server pub use crate::client::{ - AckPolicy, Compression, Consumer, ConsumerBatch, ConsumerConfig, ConsumerDurability, - ConsumerSpecificConfig, ConsumerStorage, ConsumerStream, Consumers, DeliverPolicy, - DiscardPolicy, JetstreamClient, JetstreamMessage, JetstreamMessageAckError, ReplayPolicy, - RetentionPolicy, Storage, Stream, StreamConfig, StreamState, Streams, + AckPolicy, ClientJetstreamPublish, Compression, Consumer, ConsumerBatch, ConsumerConfig, + ConsumerDurability, ConsumerSpecificConfig, ConsumerStorage, ConsumerStream, Consumers, + DeliverPolicy, DiscardPolicy, DoClientJetstreamPublish, DoOwnedClientJetstreamPublish, + JetstreamClient, JetstreamMessage, JetstreamMessageAckError, JetstreamPublish, + JetstreamPublishBuilder, JetstreamPublishError, OwnedClientJetstreamPublish, PubAck, + ReplayPolicy, RetentionPolicy, Storage, Stream, StreamConfig, StreamState, Streams, }; pub mod error { @@ -64,7 +66,7 @@ pub mod jetstream { pub use crate::client::{ ConsumerStreamError, JetstreamApiError, JetstreamError, JetstreamErrorCode, - JetstreamMessageAckError, + JetstreamMessageAckError, JetstreamPublishError, }; } }