Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion contracts/stream_contract/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,9 +512,11 @@ impl StreamContract {
token_client.transfer(&contract_address, &sender, &refunded_amount);
}

// Mark stream as inactive
// Mark stream as inactive and clear any pause state
stream.is_active = false;
stream.status = StreamStatus::Cancelled;
stream.paused = false;
stream.paused_at = None;
stream.last_update_time = now;

let recipient = stream.recipient.clone();
Expand Down Expand Up @@ -588,6 +590,12 @@ impl StreamContract {
let mut stream = load_stream(&env, stream_id)?;
Self::validate_stream_ownership(&stream, &sender)?;

// Reject if the stream is not in Paused status — this covers streams
// that were cancelled while paused (is_active=false, paused=true).
if stream.status != StreamStatus::Paused {
return Err(StreamError::StreamInactive);
}

if !stream.paused {
return Err(StreamError::StreamInactive);
}
Expand Down Expand Up @@ -624,6 +632,20 @@ impl StreamContract {

// ─── Read-only Queries ────────────────────────────────────────────────────

/// Returns the total number of streams ever created (monotonically increasing).
///
/// This is the global stream ID counter, not the count of currently active
/// streams. It equals the highest stream ID that has been assigned, making
/// it useful for cursor-based or offset pagination without a full DB scan.
///
/// Returns `0` on a freshly-deployed contract where no stream has been created.
pub fn stream_count(env: Env) -> u64 {
env.storage()
.instance()
.get(&crate::types::DataKey::StreamCounter)
.unwrap_or(0)
}

/// Returns the stream record for `stream_id`, or `None` if it does not exist.
pub fn get_stream(env: Env, stream_id: u64) -> Option<Stream> {
try_load_stream(&env, stream_id)
Expand Down
171 changes: 171 additions & 0 deletions contracts/stream_contract/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2406,3 +2406,174 @@ fn test_resume_stream_emits_event() {
assert_eq!(payload.sender, sender);
assert_eq!(payload.new_end_time, 1150);
}

// ─── #421 stream_count ────────────────────────────────────────────────────────

#[test]
fn test_stream_count_returns_zero_on_fresh_contract() {
// A freshly deployed contract with no streams must return 0.
let env = Env::default();
env.mock_all_auths();
let client = create_contract(&env);
assert_eq!(client.stream_count(), 0);
}

#[test]
fn test_stream_count_increments_by_one_per_create() {
let env = Env::default();
env.mock_all_auths();
let (token, _) = create_token(&env);
let sender = Address::generate(&env);
mint(&env, &token, &sender, 3_000);

let client = create_contract(&env);

assert_eq!(client.stream_count(), 0);

client.create_stream(&sender, &Address::generate(&env), &token, &1_000, &100);
assert_eq!(client.stream_count(), 1);

client.create_stream(&sender, &Address::generate(&env), &token, &1_000, &100);
assert_eq!(client.stream_count(), 2);

client.create_stream(&sender, &Address::generate(&env), &token, &1_000, &100);
assert_eq!(client.stream_count(), 3);
}

#[test]
fn test_stream_count_is_not_decremented_by_cancel() {
// stream_count counts all streams ever created, not just active ones.
let env = Env::default();
env.mock_all_auths();
let (token, _) = create_token(&env);
let sender = Address::generate(&env);
mint(&env, &token, &sender, 1_000);

let client = create_contract(&env);
let id = client.create_stream(&sender, &Address::generate(&env), &token, &1_000, &1_000);
assert_eq!(client.stream_count(), 1);

client.cancel_stream(&sender, &id);
// Cancelling must NOT decrement the counter.
assert_eq!(client.stream_count(), 1);
}

#[test]
fn test_stream_count_matches_last_stream_id() {
// The counter equals the highest stream ID that has been issued.
let env = Env::default();
env.mock_all_auths();
let (token, _) = create_token(&env);
let sender = Address::generate(&env);
mint(&env, &token, &sender, 5_000);

let client = create_contract(&env);
for i in 1u64..=5 {
let id = client.create_stream(&sender, &Address::generate(&env), &token, &1_000, &100);
assert_eq!(id, i);
assert_eq!(client.stream_count(), i);
}
}

// ─── #787 resume_stream must reject cancelled-while-paused streams ────────────

#[test]
fn test_resume_after_cancel_while_paused_returns_stream_inactive() {
// Pause a stream, cancel it while paused, then attempt to resume.
// resume_stream must return StreamInactive and must NOT emit stream_resumed.
let env = Env::default();
env.mock_all_auths();
let (token, _) = create_token(&env);
let sender = Address::generate(&env);
let recipient = Address::generate(&env);
mint(&env, &token, &sender, 1_000);

let client = create_contract(&env);
// 1_000 tokens / 1_000 s = 1 token/s
let id = client.create_stream(&sender, &recipient, &token, &1_000, &1_000);

// Pause at t=100.
env.ledger().with_mut(|l| l.timestamp += 100);
client.pause_stream(&sender, &id);

// Cancel while paused at t=200.
env.ledger().with_mut(|l| l.timestamp += 100);
client.cancel_stream(&sender, &id);

// Verify the stream is correctly marked cancelled.
let s = client.get_stream(&id).unwrap();
assert!(!s.is_active);
assert_eq!(s.status, StreamStatus::Cancelled);
// cancel_stream must also clear the pause fields.
assert!(!s.paused);
assert!(s.paused_at.is_none());

// Attempting to resume must return StreamInactive.
let result = client.try_resume_stream(&sender, &id);
assert_eq!(result, Err(Ok(StreamError::StreamInactive)));

// No stream_resumed event must have been emitted.
let events = env.events().all();
let resumed_event = events.iter().find(|e| {
Symbol::try_from_val(&env, &e.1.get(0).unwrap()).unwrap()
== Symbol::new(&env, "stream_resumed")
});
assert!(
resumed_event.is_none(),
"stream_resumed must not be emitted after cancel"
);
}

#[test]
fn test_cancel_while_paused_clears_pause_fields() {
// After cancel_stream on a paused stream, paused and paused_at must be cleared.
let env = Env::default();
env.mock_all_auths();
let (token, _) = create_token(&env);
let sender = Address::generate(&env);
let recipient = Address::generate(&env);
mint(&env, &token, &sender, 1_000);

let client = create_contract(&env);
let id = client.create_stream(&sender, &recipient, &token, &1_000, &1_000);

env.ledger().with_mut(|l| l.timestamp += 200);
client.pause_stream(&sender, &id);

// Verify pause state is set before cancel.
let before = client.get_stream(&id).unwrap();
assert!(before.paused);
assert!(before.paused_at.is_some());

client.cancel_stream(&sender, &id);

// After cancel, pause state must be cleared.
let after = client.get_stream(&id).unwrap();
assert!(!after.paused, "paused flag must be false after cancel");
assert!(
after.paused_at.is_none(),
"paused_at must be None after cancel"
);
assert_eq!(after.status, StreamStatus::Cancelled);
}

#[test]
fn test_cancel_normal_stream_also_clears_pause_fields() {
// Cancelling a non-paused stream should set paused=false and paused_at=None
// (they are already in that state, but the assignment must be idempotent).
let env = Env::default();
env.mock_all_auths();
let (token, _) = create_token(&env);
let sender = Address::generate(&env);
mint(&env, &token, &sender, 1_000);

let client = create_contract(&env);
let id = client.create_stream(&sender, &Address::generate(&env), &token, &1_000, &1_000);

client.cancel_stream(&sender, &id);

let s = client.get_stream(&id).unwrap();
assert!(!s.paused);
assert!(s.paused_at.is_none());
assert_eq!(s.status, StreamStatus::Cancelled);
}
53 changes: 53 additions & 0 deletions frontend/src/lib/soroban.ts
Original file line number Diff line number Diff line change
Expand Up @@ -360,3 +360,56 @@ export async function resumeStream(
nativeToScVal(params.streamId, { type: "u64" }),
]);
}

/**
* Read-only call to the contract's `stream_count` view function.
*
* Returns the total number of streams ever created (monotonically increasing
* stream ID counter). This is NOT the count of currently active streams —
* cancelled and completed streams are still counted.
*
* Returns `0n` on a freshly-deployed contract where no stream has been created.
* Does not require wallet authentication.
*/
export async function fetchStreamCount(): Promise<bigint> {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const sdk: any = await import("@stellar/stellar-sdk");
const { Contract, TransactionBuilder, BASE_FEE, scValToNative, Keypair } = sdk;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const rpc: any = sdk.rpc ?? sdk.SorobanRpc;

const server = new rpc.Server(SOROBAN_RPC_URL, { allowHttp: false });
// stream_count is a read-only view — use a throwaway keypair for simulation.
const throwawayKeypair = Keypair.random();
const account = await server.getAccount(throwawayKeypair.publicKey()).catch(() => {
// If the throwaway account doesn't exist on-chain, build a minimal account object.
return { accountId: () => throwawayKeypair.publicKey(), sequenceNumber: () => "0", incrementSequenceNumber: () => {} };
});

const contract = new Contract(CONTRACT_ID);
const tx = new TransactionBuilder(account, {
fee: BASE_FEE,
networkPassphrase: NETWORK_PASSPHRASE,
})
.addOperation(contract.call("stream_count"))
.setTimeout(30)
.build();

const simResult = await server.simulateTransaction(tx);
if (rpc.Api?.isSimulationError?.(simResult) ?? simResult?.error) {
throw new SorobanCallError(`stream_count simulation failed: ${simResult.error}`, "NetworkError");
}

const rawResult = simResult?.result?.retval;
if (!rawResult) {
// Contract not yet initialized (no streams ever created).
return 0n;
}

const nativeValue = scValToNative(rawResult);
if (typeof nativeValue === "bigint") return nativeValue;
if (typeof nativeValue === "number") return BigInt(Math.trunc(nativeValue));
if (typeof nativeValue === "string") return BigInt(nativeValue);

throw new SorobanCallError("stream_count returned an unexpected value type.", "Unknown");
}
Loading