Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions Flipcash/Core/Controllers/ConversationController.swift
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ final class ConversationController {
@ObservationIgnored private let owner: KeyPair
@ObservationIgnored private var startTask: Task<Void, Never>?
@ObservationIgnored private var streamTask: Task<Void, Never>?
@ObservationIgnored private var connectionStateTask: Task<Void, Never>?
/// Whether the event stream has been seen `.live` at least once. The first
/// `.live` is the initial connection (the feed/transcript are already loaded
/// by `start()` and the screen), so it's skipped; every `.live` after it is a
/// reconnect whose missed window needs refetching.
@ObservationIgnored private var hasSeenStreamLive = false
@ObservationIgnored private var hydratingConversationIDs: Set<ConversationID> = []
@ObservationIgnored private var markReadTasks: [ConversationID: Task<Void, Never>] = [:]

Expand Down Expand Up @@ -113,6 +119,7 @@ final class ConversationController {
startTask = Task {
await hydrateFromDatabase()
openStream()
observeConnectionState()
await loadFeed()
}
}
Expand All @@ -131,6 +138,33 @@ final class ConversationController {
}
}

/// The event stream is a live, cursorless push and the server never replays,
/// so messages delivered while it was down are lost. Watch the connection
/// state: the first `.live` is the initial connection, and every `.live`
/// after it is a reconnect whose missed window we refetch.
private func observeConnectionState() {
guard connectionStateTask == nil else { return }
let states = streaming.conversationConnectionState()
connectionStateTask = Task { [weak self] in
for await state in states {
guard let self else { return }
guard state == .live else { continue }
if self.hasSeenStreamLive {
await self.refetchAfterReconnect()
} else {
self.hasSeenStreamLive = true
}
}
}
}

private func refetchAfterReconnect() async {
await loadFeed()
if let visibleConversationID {
await loadMessages(for: visibleConversationID)
}
}

/// Surfaces a counterpart's READ pointer advance — the signal behind the
/// "Read 3:42 PM" receipt — so it can be traced in the log stream.
private func logCounterpartRead(_ event: ConversationStreamEvent) {
Expand All @@ -149,6 +183,8 @@ final class ConversationController {
startTask = nil
streamTask?.cancel()
streamTask = nil
connectionStateTask?.cancel()
connectionStateTask = nil
markReadTasks.values.forEach { $0.cancel() }
markReadTasks.removeAll()
streaming.closeConversationStream()
Expand Down
4 changes: 4 additions & 0 deletions Flipcash/Core/Controllers/FlipClient+Protocols.swift
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ protocol ConversationMessaging: AnyObject, Sendable {
/// `event.v1 StreamEvents` lifecycle behind `ConversationStreamEvent`.
protocol ConversationEventStreaming: AnyObject, Sendable {
func openConversationStream(owner: KeyPair) -> AsyncStream<ConversationStreamEvent>
/// The event stream's connection state. The stream carries no cursor and the
/// server never replays, so the controller treats the first `.live` as the
/// initial connection and refetches the missed window on each `.live` after.
func conversationConnectionState() -> AsyncStream<EventStreamConnectionState>
func ensureConversationStreamConnected()
func closeConversationStream()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ extension FlipClient {
return eventStreamer.events
}

/// The event stream's connection state over time, so the caller can refetch
/// the window a dropped-and-reconnected stream missed.
public nonisolated func conversationConnectionState() -> AsyncStream<EventStreamConnectionState> {
eventStreamer.connectionState
}

public nonisolated func ensureConversationStreamConnected() {
Task { await eventStreamer.ensureConnected() }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,17 @@ import GRPC

private let logger = Logger(label: "flipcash.event-streamer")

/// The connection state of the event stream. The streamer reports these facts;
/// it's the consumer's job to decide what they mean — e.g. refetch the window a
/// drop left, or drive a "reconnecting" indicator.
public enum EventStreamConnectionState: Sendable {
/// A connection proved it's delivering (its first ping arrived). Emitted once
/// per connection, so a reconnect produces a fresh `.live`.
case live
/// The stream was torn down — a drop being reconnected, or a stop.
case disconnected
}

/// Owns the single per-user bidirectional event stream (`event.v1 StreamEvents`)
/// and decodes conversation updates into `ConversationStreamEvent`s consumed via `events`. The
/// server enforces one stream per user, so exactly one instance must exist per
Expand All @@ -27,6 +38,13 @@ public actor EventStreamer {
public nonisolated let events: AsyncStream<ConversationStreamEvent>
private let continuation: AsyncStream<ConversationStreamEvent>.Continuation

/// The stream's connection state over time. The stream carries no cursor and
/// the server never replays, so a consumer treats the first `.live` as the
/// initial connection and refetches the missed window on each `.live` after
/// that. Consume with `for await state in streamer.connectionState`.
public nonisolated let connectionState: AsyncStream<EventStreamConnectionState>
private let connectionStateContinuation: AsyncStream<EventStreamConnectionState>.Continuation

private let service: EventStreamingService
private var owner: KeyPair?

Expand All @@ -40,19 +58,27 @@ public actor EventStreamer {
/// Bumped on every `openStream()`; gRPC callbacks capture the value at open
/// time so a torn-down stream's late status can't tear down its replacement.
private var streamGeneration: UInt = 0
/// Whether the current connection has been reported `.live`. Dedupes the
/// per-ping liveness proof into a single `.live` per connection, and gates
/// the matching `.disconnected` on teardown.
private var isConnectionLive = false

init(service: EventStreamingService) {
self.service = service
let (stream, continuation) = AsyncStream<ConversationStreamEvent>.makeStream()
self.events = stream
self.continuation = continuation
let (connectionState, connectionStateContinuation) = AsyncStream<EventStreamConnectionState>.makeStream()
self.connectionState = connectionState
self.connectionStateContinuation = connectionStateContinuation
}

deinit {
pingTimeoutTask?.cancel()
reconnectTask?.cancel()
streamReference?.destroy()
continuation.finish()
connectionStateContinuation.finish()
}

// MARK: - Public API
Expand Down Expand Up @@ -83,6 +109,7 @@ public actor EventStreamer {
public func stop() {
isStreaming = false
isReconnecting = false
reportConnection(live: false)
backoff.reset()
owner = nil
pingTimeoutTask?.cancel()
Expand Down Expand Up @@ -176,6 +203,10 @@ public actor EventStreamer {
// keeps escalating the delay instead of hammering reconnect.
backoff.reset()

// The same proof of life marks the connection delivering — once per
// connection — so the consumer can refetch the window a drop left.
reportConnection(live: true)

let timeout = pingTracker.receivedPing(updatedTimeout: Int(ping.pingDelay.seconds))
schedulePingTimeout(seconds: timeout)

Expand Down Expand Up @@ -238,10 +269,19 @@ public actor EventStreamer {
reconnectTask = nil
isReconnecting = false
backoff.reset()
reportConnection(live: false)
streamReference?.destroy()
streamReference = nil
}

/// Emits a connection-state transition, ignoring repeats — every ping would
/// otherwise re-report `.live`, and several teardown paths can each fire.
private func reportConnection(live: Bool) {
guard isConnectionLive != live else { return }
isConnectionLive = live
connectionStateContinuation.yield(live ? .live : .disconnected)
}

private func schedulePingTimeout(seconds: Int) {
pingTimeoutTask?.cancel()
pingTimeoutTask = Task { [weak self] in
Expand All @@ -258,6 +298,7 @@ public actor EventStreamer {
pingTimeoutTask?.cancel()
pingTimeoutTask = nil
pingTracker = PingTracker()
reportConnection(live: false)
streamReference?.destroy()
streamReference = nil

Expand Down
79 changes: 79 additions & 0 deletions FlipcashTests/ConversationControllerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,85 @@ struct ConversationControllerTests {
controller.stop()
}

// MARK: - Reconnect catch-up -

@Test("a reconnect refetches the visible conversation, catching up messages missed while the stream was down")
func reconnectCatchesUpVisibleTranscript() async throws {
let mock = MockConversations()
mock.feed = [Conversation(id: ConversationID.test(1), members: [], lastMessage: nil, lastActivity: Date(timeIntervalSince1970: 100))]
mock.messages = [ConversationMessage(id: MessageID(value: 1), senderID: nil, content: .text("one"), date: Date(timeIntervalSince1970: 10), unreadSeq: 1)]
let controller = makeController(mock)

controller.start()
try await waitUntil { mock.connectionStateStreamOpened }
// The initial connection's first `.live` is the baseline — no gap to fill.
mock.emitConnectionState(.live)

// Open the chat: load its first page and mark it visible (as the screen does).
await controller.loadMessages(for: ConversationID.test(1))
controller.visibleConversationID = ConversationID.test(1)
// Prerequisite for the catch-up assertion below: the transcript must start at [1].
try #require(controller.messages(for: ConversationID.test(1)).map(\.id.value) == [1])

// A newer message lands server-side during the window the stream was down —
// the live event for it was never delivered.
mock.messages = [
ConversationMessage(id: MessageID(value: 1), senderID: nil, content: .text("one"), date: Date(timeIntervalSince1970: 10), unreadSeq: 1),
ConversationMessage(id: MessageID(value: 2), senderID: nil, content: .text("two"), date: Date(timeIntervalSince1970: 20), unreadSeq: 2),
]

// The stream drops and comes back: the reconnect's `.live` triggers catch-up.
mock.emitConnectionState(.disconnected)
mock.emitConnectionState(.live)

// The reconnect refetch pulls the newest page and the missed message appears.
try await waitUntil { controller.messages(for: ConversationID.test(1)).map(\.id.value) == [1, 2] }
// Reaching catch-up proves the FIFO consumer processed both `.live`s. The
// transcript was paged exactly twice — the explicit open + the one reconnect
// — so the baseline `.live` correctly fetched nothing.
#expect(mock.latestPageQueries == [ConversationID.test(1), ConversationID.test(1)])
controller.stop()
}

@Test("a reconnect refreshes the feed even with no conversation open")
func reconnectRefreshesFeed() async throws {
let mock = MockConversations()
let controller = makeController(mock)

controller.start()
try await waitUntil { mock.connectionStateStreamOpened }
try await waitUntil { controller.conversations.isEmpty }
mock.emitConnectionState(.live) // initial connection — baseline

// The feed gains a conversation while the stream was down.
mock.feed = [Conversation(id: ConversationID.test(1), members: [], lastMessage: nil, lastActivity: Date(timeIntervalSince1970: 100))]
mock.emitConnectionState(.disconnected)
mock.emitConnectionState(.live) // reconnect → refetch

try await waitUntil { controller.conversations.map(\.id) == [ConversationID.test(1)] }
controller.stop()
}

@Test("a reconnect with no conversation open refreshes the feed but fetches no transcript")
func reconnectWithoutVisibleConversationSkipsMessages() async throws {
let mock = MockConversations()
let controller = makeController(mock)

controller.start()
try await waitUntil { mock.connectionStateStreamOpened }
mock.emitConnectionState(.live) // initial connection — baseline

// No conversation is visible. The feed refresh proves the refetch ran;
// the transcript page must not be fetched.
mock.feed = [Conversation(id: ConversationID.test(1), members: [], lastMessage: nil, lastActivity: Date(timeIntervalSince1970: 100))]
mock.emitConnectionState(.disconnected)
mock.emitConnectionState(.live) // reconnect → loadFeed only

try await waitUntil { controller.conversations.map(\.id) == [ConversationID.test(1)] }
#expect(mock.latestPageQueries.isEmpty)
controller.stop()
}

// MARK: - Pagination -

@Test("loadOlderMessages pages before the oldest loaded id and prepends the page")
Expand Down
24 changes: 23 additions & 1 deletion FlipcashTests/TestSupport/MockConversations.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ final class MockConversations: ConversationFetching, ConversationMessaging, Conv
private var _messages: [ConversationMessage] = []
private var _olderMessages: [ConversationMessage] = []
private var _olderQueries: [MessageID] = []
private var _latestPageQueries: [ConversationID] = []
private var _sendResult: ConversationMessage?
private var _sent: [Sent] = []
private var _markedRead: [MessageID] = []
private var _didEnsure = false
private var _didClose = false
private var _streamContinuation: AsyncStream<ConversationStreamEvent>.Continuation?
private var _connectionStateContinuation: AsyncStream<EventStreamConnectionState>.Continuation?

var feed: [Conversation] {
get { lock.withLock { _feed } }
Expand All @@ -41,6 +43,8 @@ final class MockConversations: ConversationFetching, ConversationMessaging, Conv
}
/// The `before` cursors `getMessages` was paged with.
var olderQueries: [MessageID] { lock.withLock { _olderQueries } }
/// The conversations `getMessages` was asked for the newest page of (`before == nil`).
var latestPageQueries: [ConversationID] { lock.withLock { _latestPageQueries } }
var sendResult: ConversationMessage? {
get { lock.withLock { _sendResult } }
set { lock.withLock { _sendResult = newValue } }
Expand All @@ -52,12 +56,21 @@ final class MockConversations: ConversationFetching, ConversationMessaging, Conv
/// Whether `openConversationStream` has been called — events emitted
/// before that are dropped, so tests wait on this before `emit(_:)`.
var streamOpened: Bool { lock.withLock { _streamContinuation != nil } }
/// Whether `conversationConnectionState` has been subscribed — states emitted
/// before that are dropped, so tests wait on this before `emitConnectionState(_:)`.
var connectionStateStreamOpened: Bool { lock.withLock { _connectionStateContinuation != nil } }

/// Push a live event onto the stream returned by `openConversationStream`.
func emit(_ event: ConversationStreamEvent) {
lock.withLock { _streamContinuation }?.yield(event)
}

/// Push a connection-state transition onto the stream returned by
/// `conversationConnectionState`, as the streamer does on a ping or teardown.
func emitConnectionState(_ state: EventStreamConnectionState) {
lock.withLock { _connectionStateContinuation }?.yield(state)
}

// MARK: - ConversationFetching

func getDmChatFeed(owner: KeyPair) async throws -> [Conversation] { feed }
Expand All @@ -72,7 +85,10 @@ final class MockConversations: ConversationFetching, ConversationMessaging, Conv
// MARK: - ConversationMessaging

func getMessages(owner: KeyPair, conversationID: ConversationID, before: MessageID?) async throws -> [ConversationMessage] {
guard let before else { return messages }
guard let before else {
lock.withLock { _latestPageQueries.append(conversationID) }
return messages
}
lock.withLock { _olderQueries.append(before) }
return olderMessages
}
Expand All @@ -97,6 +113,12 @@ final class MockConversations: ConversationFetching, ConversationMessaging, Conv
return stream
}

func conversationConnectionState() -> AsyncStream<EventStreamConnectionState> {
let (stream, continuation) = AsyncStream<EventStreamConnectionState>.makeStream()
lock.withLock { _connectionStateContinuation = continuation }
return stream
}

func ensureConversationStreamConnected() { lock.withLock { _didEnsure = true } }
func closeConversationStream() { lock.withLock { _didClose = true } }
}
Expand Down