diff --git a/apps/flipcash/shared/contacts/src/main/kotlin/com/flipcash/app/contacts/ContactCoordinator.kt b/apps/flipcash/shared/contacts/src/main/kotlin/com/flipcash/app/contacts/ContactCoordinator.kt index 29fe02cae..7ca232ba2 100644 --- a/apps/flipcash/shared/contacts/src/main/kotlin/com/flipcash/app/contacts/ContactCoordinator.kt +++ b/apps/flipcash/shared/contacts/src/main/kotlin/com/flipcash/app/contacts/ContactCoordinator.kt @@ -143,9 +143,9 @@ class ContactCoordinator @Inject constructor( .launchIn(scope) cluster.filterNotNull() - .flatMapLatest { networkObserver.state } + .flatMapLatest { networkObserver.state.map { it.connected } } .distinctUntilChanged() - .filter { it.connected } + .filter { it } .onEach { trace(tag = TAG, message = "Network connected, triggering contact sync", type = TraceType.Process) launchSync() diff --git a/apps/flipcash/shared/contacts/src/test/kotlin/com/flipcash/app/contacts/ContactCoordinatorNetworkSyncTest.kt b/apps/flipcash/shared/contacts/src/test/kotlin/com/flipcash/app/contacts/ContactCoordinatorNetworkSyncTest.kt new file mode 100644 index 000000000..e5fc1b99e --- /dev/null +++ b/apps/flipcash/shared/contacts/src/test/kotlin/com/flipcash/app/contacts/ContactCoordinatorNetworkSyncTest.kt @@ -0,0 +1,81 @@ +@file:OptIn(ExperimentalCoroutinesApi::class) + +package com.flipcash.app.contacts + +import com.getcode.utils.network.ConnectionType +import com.getcode.utils.network.NetworkConnectivityListener +import com.getcode.utils.network.NetworkState +import com.getcode.utils.network.SignalStrength +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.asStateFlow +import kotlinx.coroutines.flow.distinctUntilChanged +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.filterNotNull +import kotlinx.coroutines.flow.flatMapLatest +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.test.advanceUntilIdle +import kotlinx.coroutines.test.runTest +import org.junit.Test +import kotlin.test.assertEquals + +class ContactCoordinatorNetworkSyncTest { + + private class FakeNetworkObserver : NetworkConnectivityListener { + val _state = MutableStateFlow(NetworkState.Default) + override val state: StateFlow = _state.asStateFlow() + override val isConnected: Boolean get() = _state.value.connected + override val type: ConnectionType get() = _state.value.type + } + + @Test + fun `signal strength changes while connected do not re-trigger sync`() = runTest { + val networkObserver = FakeNetworkObserver() + var syncCount = 0 + + val cluster = MutableStateFlow(null) + + // Mirrors the FIXED subscription from ContactCoordinator.init + val job = cluster + .filterNotNull() + .flatMapLatest { + networkObserver.state + .map { it.connected } + .distinctUntilChanged() + } + .filter { it } + .onEach { syncCount++ } + .launchIn(this) + + cluster.value = Unit + advanceUntilIdle() + + // Initial connected + networkObserver._state.value = NetworkState(true, SignalStrength.Good, ConnectionType.Wifi) + advanceUntilIdle() + assertEquals(1, syncCount, "First connected event should trigger sync") + + // Rapid signal/type changes — all still connected + networkObserver._state.value = NetworkState(true, SignalStrength.Great, ConnectionType.Wifi) + networkObserver._state.value = NetworkState(true, SignalStrength.Strong, ConnectionType.Wifi) + networkObserver._state.value = NetworkState(true, SignalStrength.Good, ConnectionType.Cellular) + networkObserver._state.value = NetworkState(true, SignalStrength.Poor, ConnectionType.Cellular) + networkObserver._state.value = NetworkState(true, SignalStrength.Great, ConnectionType.Wifi) + advanceUntilIdle() + + assertEquals(1, syncCount, "Signal strength / type changes should NOT re-trigger sync") + + // Disconnect → reconnect + networkObserver._state.value = NetworkState(false, SignalStrength.Unknown, ConnectionType.Unknown) + advanceUntilIdle() + networkObserver._state.value = NetworkState(true, SignalStrength.Good, ConnectionType.Wifi) + advanceUntilIdle() + + assertEquals(2, syncCount, "Reconnect after disconnect should trigger sync") + + job.cancel() + } +} diff --git a/apps/flipcash/shared/tokens/src/main/kotlin/com/flipcash/app/tokens/TokenCoordinator.kt b/apps/flipcash/shared/tokens/src/main/kotlin/com/flipcash/app/tokens/TokenCoordinator.kt index 817119705..b86f2b906 100644 --- a/apps/flipcash/shared/tokens/src/main/kotlin/com/flipcash/app/tokens/TokenCoordinator.kt +++ b/apps/flipcash/shared/tokens/src/main/kotlin/com/flipcash/app/tokens/TokenCoordinator.kt @@ -151,9 +151,9 @@ class TokenCoordinator @Inject constructor( ProcessLifecycleOwner.get().lifecycle.addObserver(this) cluster.filterNotNull() - .flatMapLatest { networkObserver.state } + .flatMapLatest { networkObserver.state.map { it.connected } } .distinctUntilChanged() - .filter { it.connected } + .filter { it } .onEach { trace(tag = TAG, message = "Network connected, triggering token update", type = TraceType.Process) retryable { update() } @@ -221,15 +221,30 @@ class TokenCoordinator @Inject constructor( modifyBalance(token, amount) { current, delta -> current + delta } } + suspend fun add(mint: Mint, nativeAmount: Fiat) { + val token = getTokenMetadata(mint).getOrNull()?.token ?: return + add(token, nativeAmount.toLocalFiat(mint)) + } + suspend fun subtract(token: Token, fiat: LocalFiat) { val rate = exchange.rateToUsd(fiat.rate.currency) val amount = rate?.let { fiat.nativeAmount.convertingTo(it) } if (amount != null) { - trace(tag = TAG, message = "Subtracting ${amount.formatted()} to ${token.symbol}", type = TraceType.Process) + trace(tag = TAG, message = "Subtracting ${amount.formatted()} from ${token.symbol}", type = TraceType.Process) } modifyBalance(token, amount) { current, delta -> current - delta } } + suspend fun subtract(mint: Mint, nativeAmount: Fiat) { + val token = getTokenMetadata(mint).getOrNull()?.token ?: return + subtract(token, nativeAmount.toLocalFiat(mint)) + } + + private fun Fiat.toLocalFiat(mint: Mint): LocalFiat { + val rate = exchange.rateFor(currencyCode) ?: Rate.oneToOne + return LocalFiat.fromNativeAmount(this, rate, mint) + } + // endregion // region Public API — Token Metadata (implements TokenMetadataProvider) diff --git a/scripts/stress-test-event-stream.sh b/scripts/stress-test-event-stream.sh new file mode 100755 index 000000000..21f6ce86f --- /dev/null +++ b/scripts/stress-test-event-stream.sh @@ -0,0 +1,115 @@ +#!/bin/bash +# Event Stream Stress Test +# +# Prerequisites: adb connected to device, app installed + +set -euo pipefail + +LABEL="${1:-test}" +PKG="com.flipcash.app.android" +LOGFILE="event-stream-${LABEL}-$(date +%Y%m%d-%H%M%S).log" +TAGS="gRPC:V BIDI:V ChatCoordinator:V EventStream:V event-streaming:V EventStreamingController:V *:S" + +echo "=== Event Stream Stress Test ($LABEL) ===" +echo "Logging to: $LOGFILE" +echo "" + +# Clear logcat +adb logcat -c + +# Start logcat capture in background +adb logcat $TAGS | tee "$LOGFILE" & +LOGCAT_PID=$! +trap "kill $LOGCAT_PID 2>/dev/null; echo; echo 'Logs saved to $LOGFILE'" EXIT + +pause() { + echo "" + echo ">>> $1" + echo " Press ENTER when ready..." + read -r +} + +wait_and_log() { + local duration=$1 + local label=$2 + echo " [$label] Waiting ${duration}s..." + sleep "$duration" + echo " [$label] Done." +} + +echo "--- Test 1: Cold Start ---" +echo " Force stopping app..." +adb shell am force-stop "$PKG" +sleep 2 +echo " Launching app..." +adb shell am start -n "$PKG/.MainActivity" -a android.intent.action.MAIN -c android.intent.category.LAUNCHER +wait_and_log 15 "cold-start" +echo " CHECK: Look for 'flipcash-stream => READY' and 'Stream activated on first response'" +echo "" + +echo "--- Test 2: Airplane Mode Toggle (network loss/recovery) ---" +echo " Enabling airplane mode..." +adb shell cmd connectivity airplane-mode enable +wait_and_log 5 "airplane-on" +echo " Disabling airplane mode..." +adb shell cmd connectivity airplane-mode disable +wait_and_log 20 "airplane-off" +echo " CHECK: Stream should reconnect. Look for 'Opening bidirectional stream' then 'Stream activated'" +echo "" + +echo "--- Test 3: Rapid Network Flapping (3 cycles) ---" +for i in 1 2 3; do + echo " Cycle $i: airplane ON..." + adb shell cmd connectivity airplane-mode enable + sleep 3 + echo " Cycle $i: airplane OFF..." + adb shell cmd connectivity airplane-mode disable + sleep 8 +done +wait_and_log 15 "flap-settle" +echo " CHECK: Should settle to one active stream, no zombie states" +echo "" + +echo "--- Test 4: Background/Foreground Cycling ---" +echo " Sending to background..." +adb shell input keyevent KEYCODE_HOME +wait_and_log 5 "background" +echo " Bringing to foreground..." +adb shell am start -n "$PKG/.MainActivity" -a android.intent.action.MAIN -c android.intent.category.LAUNCHER +wait_and_log 10 "foreground" +echo " Repeat..." +adb shell input keyevent KEYCODE_HOME +sleep 3 +adb shell am start -n "$PKG/.MainActivity" -a android.intent.action.MAIN -c android.intent.category.LAUNCHER +wait_and_log 10 "foreground-2" +echo " CHECK: Stream should close on background, reopen on foreground" +echo "" + +echo "--- Test 5: Long Idle (simulates carrier NAT timeout) ---" +echo " Waiting 90s with app in foreground (covers typical 30-90s NAT timeout)..." +wait_and_log 90 "long-idle" +echo " CHECK: Heartbeat should detect dead stream and reconnect if needed" +echo "" + +echo "--- Test 6: Wi-Fi Toggle (different network path) ---" +echo " Disabling Wi-Fi..." +adb shell svc wifi disable +wait_and_log 10 "wifi-off" +echo " Enabling Wi-Fi..." +adb shell svc wifi enable +wait_and_log 15 "wifi-on" +echo " CHECK: Stream reconnects on new network" +echo "" + +echo "" +echo "=== Stress test complete ===" +echo "" +echo "Review logs: less $LOGFILE" +echo "" +echo "Key things to grep for:" +echo " grep 'flipcash-stream =>' $LOGFILE # channel state changes" +echo " grep 'activateStream' $LOGFILE # when stream becomes 'connected'" +echo " grep 'Stream active:' $LOGFILE # isActive state changes" +echo " grep 'zombie\\|Giving up' $LOGFILE # failure modes (should be absent)" +echo " grep 'Event stream error' $LOGFILE # retry triggers" +echo " grep 'Pong' $LOGFILE # successful ping/pong = healthy stream" diff --git a/services/opencode/src/main/kotlin/com/getcode/opencode/controllers/AccountController.kt b/services/opencode/src/main/kotlin/com/getcode/opencode/controllers/AccountController.kt index 46786f598..5354560fb 100644 --- a/services/opencode/src/main/kotlin/com/getcode/opencode/controllers/AccountController.kt +++ b/services/opencode/src/main/kotlin/com/getcode/opencode/controllers/AccountController.kt @@ -20,6 +20,7 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.filterNotNull import kotlinx.coroutines.flow.flatMapLatest import kotlinx.coroutines.flow.launchIn @@ -65,12 +66,11 @@ class AccountController @Inject constructor( init { cluster.filterNotNull() - .flatMapLatest { networkObserver.state } - .map { it.connected } - .onEach { connected -> - if (connected) { - retryable { fetchAdditionalAccountInfo() } - } + .flatMapLatest { networkObserver.state.map { it.connected } } + .distinctUntilChanged() + .filter { it } + .onEach { + retryable { fetchAdditionalAccountInfo() } }.launchIn(scope) } diff --git a/services/opencode/src/main/kotlin/com/getcode/opencode/internal/bidi/OpenStream.kt b/services/opencode/src/main/kotlin/com/getcode/opencode/internal/bidi/OpenStream.kt index cd8b68e40..21502e226 100644 --- a/services/opencode/src/main/kotlin/com/getcode/opencode/internal/bidi/OpenStream.kt +++ b/services/opencode/src/main/kotlin/com/getcode/opencode/internal/bidi/OpenStream.kt @@ -6,6 +6,7 @@ import io.grpc.Status import io.grpc.StatusException import io.grpc.StatusRuntimeException import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ClosedSendChannelException import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.catch @@ -129,7 +130,7 @@ fun openBidirectionalStream( ) collectionJob.cancel() requestChannel.close() - if (isRetryable(e)) { + if (isRetryable(e) || e is ClosedSendChannelException) { onReconnectAttempt?.invoke(attempt, e) continue } diff --git a/services/opencode/src/test/kotlin/com/getcode/opencode/internal/bidi/OpenBidirectionalStreamTest.kt b/services/opencode/src/test/kotlin/com/getcode/opencode/internal/bidi/OpenBidirectionalStreamTest.kt new file mode 100644 index 000000000..d29efe8a6 --- /dev/null +++ b/services/opencode/src/test/kotlin/com/getcode/opencode/internal/bidi/OpenBidirectionalStreamTest.kt @@ -0,0 +1,92 @@ +@file:OptIn(ExperimentalCoroutinesApi::class) + +package com.getcode.opencode.internal.bidi + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.ClosedSendChannelException +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.test.advanceUntilIdle +import kotlinx.coroutines.test.runTest +import org.junit.Test +import kotlin.test.assertEquals +import kotlin.test.assertTrue + +class OpenBidirectionalStreamTest { + + /** + * ClosedSendChannelException thrown during the initial send should be + * treated as retryable. We throw on every attempt and set maxReconnectAttempts=2 + * so the loop exhausts retries and the coroutine terminates cleanly. + * + * Without the fix, the first ClosedSendChannelException would call onError + * and return immediately (attemptCount == 1). With the fix, it retries + * until maxReconnectAttempts is exceeded (attemptCount == 3). + */ + @Test + fun `ClosedSendChannelException on initial send triggers retry`() = runTest { + var attemptCount = 0 + val errors = mutableListOf() + + val streamRef = BidirectionalStreamReference(this, "test-stream") + streamRef.retain() + + openBidirectionalStream>( + streamRef = streamRef, + apiCall = { _ -> + attemptCount++ + flow { } + }, + initialRequest = { + // Always throw — we're testing that the retry loop continues + throw ClosedSendChannelException("Channel was closed") + }, + responseHandler = { _: String, _: (String) -> Unit -> }, + onError = { errors.add(it) }, + maxReconnectAttempts = 2, + reconnectDelayMs = 0, + ) + + advanceUntilIdle() + + // With fix: retries until max attempts exceeded (3 attempts for maxReconnectAttempts=2) + // Without fix: gives up on first attempt (attemptCount == 1) + assertTrue( + attemptCount > 1, + "Should have retried after ClosedSendChannelException, but only $attemptCount attempt(s)" + ) + // The terminal error should be the max-attempts IllegalStateException, not ClosedSendChannelException + assertTrue( + errors.none { it is ClosedSendChannelException }, + "ClosedSendChannelException should not be reported as a terminal error" + ) + + streamRef.destroy() + } + + @Test + fun `non-retryable exceptions on initial send still propagate to onError`() = runTest { + val errors = mutableListOf() + + val streamRef = BidirectionalStreamReference(this, "test-stream") + streamRef.retain() + + openBidirectionalStream>( + streamRef = streamRef, + apiCall = { _ -> + throw IllegalArgumentException("Bad request format") + }, + initialRequest = { "hello" }, + responseHandler = { _: String, _: (String) -> Unit -> }, + onError = { errors.add(it) }, + maxReconnectAttempts = 3, + reconnectDelayMs = 0, + ) + + advanceUntilIdle() + + assertEquals(1, errors.size, "Non-retryable error should be reported exactly once") + assertTrue(errors[0] is IllegalArgumentException) + + streamRef.destroy() + } +}