Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ class ContactCoordinator @Inject constructor(
.launchIn(scope)

cluster.filterNotNull()
.flatMapLatest { networkObserver.state }
.flatMapLatest { networkObserver.state.map { it.connected } }
.distinctUntilChanged()
.filter { it.connected }
.filter { it }
.onEach {
trace(tag = TAG, message = "Network connected, triggering contact sync", type = TraceType.Process)
launchSync()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
@file:OptIn(ExperimentalCoroutinesApi::class)

package com.flipcash.app.contacts

import com.getcode.utils.network.ConnectionType
import com.getcode.utils.network.NetworkConnectivityListener
import com.getcode.utils.network.NetworkState
import com.getcode.utils.network.SignalStrength
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.filterNotNull
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runTest
import org.junit.Test
import kotlin.test.assertEquals

class ContactCoordinatorNetworkSyncTest {

private class FakeNetworkObserver : NetworkConnectivityListener {
val _state = MutableStateFlow(NetworkState.Default)
override val state: StateFlow<NetworkState> = _state.asStateFlow()
override val isConnected: Boolean get() = _state.value.connected
override val type: ConnectionType get() = _state.value.type
}

@Test
fun `signal strength changes while connected do not re-trigger sync`() = runTest {
val networkObserver = FakeNetworkObserver()
var syncCount = 0

val cluster = MutableStateFlow<Unit?>(null)

// Mirrors the FIXED subscription from ContactCoordinator.init
val job = cluster
.filterNotNull()
.flatMapLatest {
networkObserver.state
.map { it.connected }
.distinctUntilChanged()
}
.filter { it }
.onEach { syncCount++ }
.launchIn(this)

cluster.value = Unit
advanceUntilIdle()

// Initial connected
networkObserver._state.value = NetworkState(true, SignalStrength.Good, ConnectionType.Wifi)
advanceUntilIdle()
assertEquals(1, syncCount, "First connected event should trigger sync")

// Rapid signal/type changes — all still connected
networkObserver._state.value = NetworkState(true, SignalStrength.Great, ConnectionType.Wifi)
networkObserver._state.value = NetworkState(true, SignalStrength.Strong, ConnectionType.Wifi)
networkObserver._state.value = NetworkState(true, SignalStrength.Good, ConnectionType.Cellular)
networkObserver._state.value = NetworkState(true, SignalStrength.Poor, ConnectionType.Cellular)
networkObserver._state.value = NetworkState(true, SignalStrength.Great, ConnectionType.Wifi)
advanceUntilIdle()

assertEquals(1, syncCount, "Signal strength / type changes should NOT re-trigger sync")

// Disconnect → reconnect
networkObserver._state.value = NetworkState(false, SignalStrength.Unknown, ConnectionType.Unknown)
advanceUntilIdle()
networkObserver._state.value = NetworkState(true, SignalStrength.Good, ConnectionType.Wifi)
advanceUntilIdle()

assertEquals(2, syncCount, "Reconnect after disconnect should trigger sync")

job.cancel()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,9 @@ class TokenCoordinator @Inject constructor(
ProcessLifecycleOwner.get().lifecycle.addObserver(this)

cluster.filterNotNull()
.flatMapLatest { networkObserver.state }
.flatMapLatest { networkObserver.state.map { it.connected } }
.distinctUntilChanged()
.filter { it.connected }
.filter { it }
.onEach {
trace(tag = TAG, message = "Network connected, triggering token update", type = TraceType.Process)
retryable { update() }
Expand Down Expand Up @@ -221,15 +221,30 @@ class TokenCoordinator @Inject constructor(
modifyBalance(token, amount) { current, delta -> current + delta }
}

suspend fun add(mint: Mint, nativeAmount: Fiat) {
val token = getTokenMetadata(mint).getOrNull()?.token ?: return
add(token, nativeAmount.toLocalFiat(mint))
}

suspend fun subtract(token: Token, fiat: LocalFiat) {
val rate = exchange.rateToUsd(fiat.rate.currency)
val amount = rate?.let { fiat.nativeAmount.convertingTo(it) }
if (amount != null) {
trace(tag = TAG, message = "Subtracting ${amount.formatted()} to ${token.symbol}", type = TraceType.Process)
trace(tag = TAG, message = "Subtracting ${amount.formatted()} from ${token.symbol}", type = TraceType.Process)
}
modifyBalance(token, amount) { current, delta -> current - delta }
}

suspend fun subtract(mint: Mint, nativeAmount: Fiat) {
val token = getTokenMetadata(mint).getOrNull()?.token ?: return
subtract(token, nativeAmount.toLocalFiat(mint))
}

private fun Fiat.toLocalFiat(mint: Mint): LocalFiat {
val rate = exchange.rateFor(currencyCode) ?: Rate.oneToOne
return LocalFiat.fromNativeAmount(this, rate, mint)
}

// endregion

// region Public API — Token Metadata (implements TokenMetadataProvider)
Expand Down
115 changes: 115 additions & 0 deletions scripts/stress-test-event-stream.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#!/bin/bash
# Event Stream Stress Test
#
# Prerequisites: adb connected to device, app installed

set -euo pipefail

LABEL="${1:-test}"
PKG="com.flipcash.app.android"
LOGFILE="event-stream-${LABEL}-$(date +%Y%m%d-%H%M%S).log"
TAGS="gRPC:V BIDI:V ChatCoordinator:V EventStream:V event-streaming:V EventStreamingController:V *:S"

echo "=== Event Stream Stress Test ($LABEL) ==="
echo "Logging to: $LOGFILE"
echo ""

# Clear logcat
adb logcat -c

# Start logcat capture in background
adb logcat $TAGS | tee "$LOGFILE" &
LOGCAT_PID=$!
trap "kill $LOGCAT_PID 2>/dev/null; echo; echo 'Logs saved to $LOGFILE'" EXIT

pause() {
echo ""
echo ">>> $1"
echo " Press ENTER when ready..."
read -r
}

wait_and_log() {
local duration=$1
local label=$2
echo " [$label] Waiting ${duration}s..."
sleep "$duration"
echo " [$label] Done."
}

echo "--- Test 1: Cold Start ---"
echo " Force stopping app..."
adb shell am force-stop "$PKG"
sleep 2
echo " Launching app..."
adb shell am start -n "$PKG/.MainActivity" -a android.intent.action.MAIN -c android.intent.category.LAUNCHER
wait_and_log 15 "cold-start"
echo " CHECK: Look for 'flipcash-stream => READY' and 'Stream activated on first response'"
echo ""

echo "--- Test 2: Airplane Mode Toggle (network loss/recovery) ---"
echo " Enabling airplane mode..."
adb shell cmd connectivity airplane-mode enable
wait_and_log 5 "airplane-on"
echo " Disabling airplane mode..."
adb shell cmd connectivity airplane-mode disable
wait_and_log 20 "airplane-off"
echo " CHECK: Stream should reconnect. Look for 'Opening bidirectional stream' then 'Stream activated'"
echo ""

echo "--- Test 3: Rapid Network Flapping (3 cycles) ---"
for i in 1 2 3; do
echo " Cycle $i: airplane ON..."
adb shell cmd connectivity airplane-mode enable
sleep 3
echo " Cycle $i: airplane OFF..."
adb shell cmd connectivity airplane-mode disable
sleep 8
done
wait_and_log 15 "flap-settle"
echo " CHECK: Should settle to one active stream, no zombie states"
echo ""

echo "--- Test 4: Background/Foreground Cycling ---"
echo " Sending to background..."
adb shell input keyevent KEYCODE_HOME
wait_and_log 5 "background"
echo " Bringing to foreground..."
adb shell am start -n "$PKG/.MainActivity" -a android.intent.action.MAIN -c android.intent.category.LAUNCHER
wait_and_log 10 "foreground"
echo " Repeat..."
adb shell input keyevent KEYCODE_HOME
sleep 3
adb shell am start -n "$PKG/.MainActivity" -a android.intent.action.MAIN -c android.intent.category.LAUNCHER
wait_and_log 10 "foreground-2"
echo " CHECK: Stream should close on background, reopen on foreground"
echo ""

echo "--- Test 5: Long Idle (simulates carrier NAT timeout) ---"
echo " Waiting 90s with app in foreground (covers typical 30-90s NAT timeout)..."
wait_and_log 90 "long-idle"
echo " CHECK: Heartbeat should detect dead stream and reconnect if needed"
echo ""

echo "--- Test 6: Wi-Fi Toggle (different network path) ---"
echo " Disabling Wi-Fi..."
adb shell svc wifi disable
wait_and_log 10 "wifi-off"
echo " Enabling Wi-Fi..."
adb shell svc wifi enable
wait_and_log 15 "wifi-on"
echo " CHECK: Stream reconnects on new network"
echo ""

echo ""
echo "=== Stress test complete ==="
echo ""
echo "Review logs: less $LOGFILE"
echo ""
echo "Key things to grep for:"
echo " grep 'flipcash-stream =>' $LOGFILE # channel state changes"
echo " grep 'activateStream' $LOGFILE # when stream becomes 'connected'"
echo " grep 'Stream active:' $LOGFILE # isActive state changes"
echo " grep 'zombie\\|Giving up' $LOGFILE # failure modes (should be absent)"
echo " grep 'Event stream error' $LOGFILE # retry triggers"
echo " grep 'Pong' $LOGFILE # successful ping/pong = healthy stream"
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.filterNotNull
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.launchIn
Expand Down Expand Up @@ -65,12 +66,11 @@ class AccountController @Inject constructor(

init {
cluster.filterNotNull()
.flatMapLatest { networkObserver.state }
.map { it.connected }
.onEach { connected ->
if (connected) {
retryable { fetchAdditionalAccountInfo() }
}
.flatMapLatest { networkObserver.state.map { it.connected } }
.distinctUntilChanged()
.filter { it }
.onEach {
retryable { fetchAdditionalAccountInfo() }
}.launchIn(scope)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import io.grpc.Status
import io.grpc.StatusException
import io.grpc.StatusRuntimeException
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedSendChannelException
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
Expand Down Expand Up @@ -129,7 +130,7 @@ fun <Request, Response, StreamRef> openBidirectionalStream(
)
collectionJob.cancel()
requestChannel.close()
if (isRetryable(e)) {
if (isRetryable(e) || e is ClosedSendChannelException) {
onReconnectAttempt?.invoke(attempt, e)
continue
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
@file:OptIn(ExperimentalCoroutinesApi::class)

package com.getcode.opencode.internal.bidi

import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.ClosedSendChannelException
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runTest
import org.junit.Test
import kotlin.test.assertEquals
import kotlin.test.assertTrue

class OpenBidirectionalStreamTest {

/**
* ClosedSendChannelException thrown during the initial send should be
* treated as retryable. We throw on every attempt and set maxReconnectAttempts=2
* so the loop exhausts retries and the coroutine terminates cleanly.
*
* Without the fix, the first ClosedSendChannelException would call onError
* and return immediately (attemptCount == 1). With the fix, it retries
* until maxReconnectAttempts is exceeded (attemptCount == 3).
*/
@Test
fun `ClosedSendChannelException on initial send triggers retry`() = runTest {
var attemptCount = 0
val errors = mutableListOf<Throwable>()

val streamRef = BidirectionalStreamReference<String, String>(this, "test-stream")
streamRef.retain()

openBidirectionalStream<String, String, BidirectionalStreamReference<String, String>>(
streamRef = streamRef,
apiCall = { _ ->
attemptCount++
flow { }
},
initialRequest = {
// Always throw — we're testing that the retry loop continues
throw ClosedSendChannelException("Channel was closed")
},
responseHandler = { _: String, _: (String) -> Unit -> },
onError = { errors.add(it) },
maxReconnectAttempts = 2,
reconnectDelayMs = 0,
)

advanceUntilIdle()

// With fix: retries until max attempts exceeded (3 attempts for maxReconnectAttempts=2)
// Without fix: gives up on first attempt (attemptCount == 1)
assertTrue(
attemptCount > 1,
"Should have retried after ClosedSendChannelException, but only $attemptCount attempt(s)"
)
// The terminal error should be the max-attempts IllegalStateException, not ClosedSendChannelException
assertTrue(
errors.none { it is ClosedSendChannelException },
"ClosedSendChannelException should not be reported as a terminal error"
)

streamRef.destroy()
}

@Test
fun `non-retryable exceptions on initial send still propagate to onError`() = runTest {
val errors = mutableListOf<Throwable>()

val streamRef = BidirectionalStreamReference<String, String>(this, "test-stream")
streamRef.retain()

openBidirectionalStream<String, String, BidirectionalStreamReference<String, String>>(
streamRef = streamRef,
apiCall = { _ ->
throw IllegalArgumentException("Bad request format")
},
initialRequest = { "hello" },
responseHandler = { _: String, _: (String) -> Unit -> },
onError = { errors.add(it) },
maxReconnectAttempts = 3,
reconnectDelayMs = 0,
)

advanceUntilIdle()

assertEquals(1, errors.size, "Non-retryable error should be reported exactly once")
assertTrue(errors[0] is IllegalArgumentException)

streamRef.destroy()
}
}
Loading