diff --git a/Flipcash/Core/Controllers/ConversationController.swift b/Flipcash/Core/Controllers/ConversationController.swift index d0c0f6d2..8324cb36 100644 --- a/Flipcash/Core/Controllers/ConversationController.swift +++ b/Flipcash/Core/Controllers/ConversationController.swift @@ -61,6 +61,12 @@ final class ConversationController { @ObservationIgnored private let owner: KeyPair @ObservationIgnored private var startTask: Task? @ObservationIgnored private var streamTask: Task? + @ObservationIgnored private var connectionStateTask: Task? + /// Whether the event stream has been seen `.live` at least once. The first + /// `.live` is the initial connection (the feed/transcript are already loaded + /// by `start()` and the screen), so it's skipped; every `.live` after it is a + /// reconnect whose missed window needs refetching. + @ObservationIgnored private var hasSeenStreamLive = false @ObservationIgnored private var hydratingConversationIDs: Set = [] @ObservationIgnored private var markReadTasks: [ConversationID: Task] = [:] @@ -113,6 +119,7 @@ final class ConversationController { startTask = Task { await hydrateFromDatabase() openStream() + observeConnectionState() await loadFeed() } } @@ -131,6 +138,33 @@ final class ConversationController { } } + /// The event stream is a live, cursorless push and the server never replays, + /// so messages delivered while it was down are lost. Watch the connection + /// state: the first `.live` is the initial connection, and every `.live` + /// after it is a reconnect whose missed window we refetch. + private func observeConnectionState() { + guard connectionStateTask == nil else { return } + let states = streaming.conversationConnectionState() + connectionStateTask = Task { [weak self] in + for await state in states { + guard let self else { return } + guard state == .live else { continue } + if self.hasSeenStreamLive { + await self.refetchAfterReconnect() + } else { + self.hasSeenStreamLive = true + } + } + } + } + + private func refetchAfterReconnect() async { + await loadFeed() + if let visibleConversationID { + await loadMessages(for: visibleConversationID) + } + } + /// Surfaces a counterpart's READ pointer advance — the signal behind the /// "Read 3:42 PM" receipt — so it can be traced in the log stream. private func logCounterpartRead(_ event: ConversationStreamEvent) { @@ -149,6 +183,8 @@ final class ConversationController { startTask = nil streamTask?.cancel() streamTask = nil + connectionStateTask?.cancel() + connectionStateTask = nil markReadTasks.values.forEach { $0.cancel() } markReadTasks.removeAll() streaming.closeConversationStream() diff --git a/Flipcash/Core/Controllers/FlipClient+Protocols.swift b/Flipcash/Core/Controllers/FlipClient+Protocols.swift index 6b13e98a..549d2fcc 100644 --- a/Flipcash/Core/Controllers/FlipClient+Protocols.swift +++ b/Flipcash/Core/Controllers/FlipClient+Protocols.swift @@ -79,6 +79,10 @@ protocol ConversationMessaging: AnyObject, Sendable { /// `event.v1 StreamEvents` lifecycle behind `ConversationStreamEvent`. protocol ConversationEventStreaming: AnyObject, Sendable { func openConversationStream(owner: KeyPair) -> AsyncStream + /// The event stream's connection state. The stream carries no cursor and the + /// server never replays, so the controller treats the first `.live` as the + /// initial connection and refetches the missed window on each `.live` after. + func conversationConnectionState() -> AsyncStream func ensureConversationStreamConnected() func closeConversationStream() } diff --git a/FlipcashCore/Sources/FlipcashCore/Clients/Flip API/FlipClient+Chat.swift b/FlipcashCore/Sources/FlipcashCore/Clients/Flip API/FlipClient+Chat.swift index 138d53a1..6a8ee322 100644 --- a/FlipcashCore/Sources/FlipcashCore/Clients/Flip API/FlipClient+Chat.swift +++ b/FlipcashCore/Sources/FlipcashCore/Clients/Flip API/FlipClient+Chat.swift @@ -61,6 +61,12 @@ extension FlipClient { return eventStreamer.events } + /// The event stream's connection state over time, so the caller can refetch + /// the window a dropped-and-reconnected stream missed. + public nonisolated func conversationConnectionState() -> AsyncStream { + eventStreamer.connectionState + } + public nonisolated func ensureConversationStreamConnected() { Task { await eventStreamer.ensureConnected() } } diff --git a/FlipcashCore/Sources/FlipcashCore/Clients/Flip API/Services/EventStreamer.swift b/FlipcashCore/Sources/FlipcashCore/Clients/Flip API/Services/EventStreamer.swift index ca9d1624..94d415cb 100644 --- a/FlipcashCore/Sources/FlipcashCore/Clients/Flip API/Services/EventStreamer.swift +++ b/FlipcashCore/Sources/FlipcashCore/Clients/Flip API/Services/EventStreamer.swift @@ -11,6 +11,17 @@ import GRPC private let logger = Logger(label: "flipcash.event-streamer") +/// The connection state of the event stream. The streamer reports these facts; +/// it's the consumer's job to decide what they mean — e.g. refetch the window a +/// drop left, or drive a "reconnecting" indicator. +public enum EventStreamConnectionState: Sendable { + /// A connection proved it's delivering (its first ping arrived). Emitted once + /// per connection, so a reconnect produces a fresh `.live`. + case live + /// The stream was torn down — a drop being reconnected, or a stop. + case disconnected +} + /// Owns the single per-user bidirectional event stream (`event.v1 StreamEvents`) /// and decodes conversation updates into `ConversationStreamEvent`s consumed via `events`. The /// server enforces one stream per user, so exactly one instance must exist per @@ -27,6 +38,13 @@ public actor EventStreamer { public nonisolated let events: AsyncStream private let continuation: AsyncStream.Continuation + /// The stream's connection state over time. The stream carries no cursor and + /// the server never replays, so a consumer treats the first `.live` as the + /// initial connection and refetches the missed window on each `.live` after + /// that. Consume with `for await state in streamer.connectionState`. + public nonisolated let connectionState: AsyncStream + private let connectionStateContinuation: AsyncStream.Continuation + private let service: EventStreamingService private var owner: KeyPair? @@ -40,12 +58,19 @@ public actor EventStreamer { /// Bumped on every `openStream()`; gRPC callbacks capture the value at open /// time so a torn-down stream's late status can't tear down its replacement. private var streamGeneration: UInt = 0 + /// Whether the current connection has been reported `.live`. Dedupes the + /// per-ping liveness proof into a single `.live` per connection, and gates + /// the matching `.disconnected` on teardown. + private var isConnectionLive = false init(service: EventStreamingService) { self.service = service let (stream, continuation) = AsyncStream.makeStream() self.events = stream self.continuation = continuation + let (connectionState, connectionStateContinuation) = AsyncStream.makeStream() + self.connectionState = connectionState + self.connectionStateContinuation = connectionStateContinuation } deinit { @@ -53,6 +78,7 @@ public actor EventStreamer { reconnectTask?.cancel() streamReference?.destroy() continuation.finish() + connectionStateContinuation.finish() } // MARK: - Public API @@ -83,6 +109,7 @@ public actor EventStreamer { public func stop() { isStreaming = false isReconnecting = false + reportConnection(live: false) backoff.reset() owner = nil pingTimeoutTask?.cancel() @@ -176,6 +203,10 @@ public actor EventStreamer { // keeps escalating the delay instead of hammering reconnect. backoff.reset() + // The same proof of life marks the connection delivering — once per + // connection — so the consumer can refetch the window a drop left. + reportConnection(live: true) + let timeout = pingTracker.receivedPing(updatedTimeout: Int(ping.pingDelay.seconds)) schedulePingTimeout(seconds: timeout) @@ -238,10 +269,19 @@ public actor EventStreamer { reconnectTask = nil isReconnecting = false backoff.reset() + reportConnection(live: false) streamReference?.destroy() streamReference = nil } + /// Emits a connection-state transition, ignoring repeats — every ping would + /// otherwise re-report `.live`, and several teardown paths can each fire. + private func reportConnection(live: Bool) { + guard isConnectionLive != live else { return } + isConnectionLive = live + connectionStateContinuation.yield(live ? .live : .disconnected) + } + private func schedulePingTimeout(seconds: Int) { pingTimeoutTask?.cancel() pingTimeoutTask = Task { [weak self] in @@ -258,6 +298,7 @@ public actor EventStreamer { pingTimeoutTask?.cancel() pingTimeoutTask = nil pingTracker = PingTracker() + reportConnection(live: false) streamReference?.destroy() streamReference = nil diff --git a/FlipcashTests/ConversationControllerTests.swift b/FlipcashTests/ConversationControllerTests.swift index 385522f5..ed844ea8 100644 --- a/FlipcashTests/ConversationControllerTests.swift +++ b/FlipcashTests/ConversationControllerTests.swift @@ -294,6 +294,85 @@ struct ConversationControllerTests { controller.stop() } + // MARK: - Reconnect catch-up - + + @Test("a reconnect refetches the visible conversation, catching up messages missed while the stream was down") + func reconnectCatchesUpVisibleTranscript() async throws { + let mock = MockConversations() + mock.feed = [Conversation(id: ConversationID.test(1), members: [], lastMessage: nil, lastActivity: Date(timeIntervalSince1970: 100))] + mock.messages = [ConversationMessage(id: MessageID(value: 1), senderID: nil, content: .text("one"), date: Date(timeIntervalSince1970: 10), unreadSeq: 1)] + let controller = makeController(mock) + + controller.start() + try await waitUntil { mock.connectionStateStreamOpened } + // The initial connection's first `.live` is the baseline — no gap to fill. + mock.emitConnectionState(.live) + + // Open the chat: load its first page and mark it visible (as the screen does). + await controller.loadMessages(for: ConversationID.test(1)) + controller.visibleConversationID = ConversationID.test(1) + // Prerequisite for the catch-up assertion below: the transcript must start at [1]. + try #require(controller.messages(for: ConversationID.test(1)).map(\.id.value) == [1]) + + // A newer message lands server-side during the window the stream was down — + // the live event for it was never delivered. + mock.messages = [ + ConversationMessage(id: MessageID(value: 1), senderID: nil, content: .text("one"), date: Date(timeIntervalSince1970: 10), unreadSeq: 1), + ConversationMessage(id: MessageID(value: 2), senderID: nil, content: .text("two"), date: Date(timeIntervalSince1970: 20), unreadSeq: 2), + ] + + // The stream drops and comes back: the reconnect's `.live` triggers catch-up. + mock.emitConnectionState(.disconnected) + mock.emitConnectionState(.live) + + // The reconnect refetch pulls the newest page and the missed message appears. + try await waitUntil { controller.messages(for: ConversationID.test(1)).map(\.id.value) == [1, 2] } + // Reaching catch-up proves the FIFO consumer processed both `.live`s. The + // transcript was paged exactly twice — the explicit open + the one reconnect + // — so the baseline `.live` correctly fetched nothing. + #expect(mock.latestPageQueries == [ConversationID.test(1), ConversationID.test(1)]) + controller.stop() + } + + @Test("a reconnect refreshes the feed even with no conversation open") + func reconnectRefreshesFeed() async throws { + let mock = MockConversations() + let controller = makeController(mock) + + controller.start() + try await waitUntil { mock.connectionStateStreamOpened } + try await waitUntil { controller.conversations.isEmpty } + mock.emitConnectionState(.live) // initial connection — baseline + + // The feed gains a conversation while the stream was down. + mock.feed = [Conversation(id: ConversationID.test(1), members: [], lastMessage: nil, lastActivity: Date(timeIntervalSince1970: 100))] + mock.emitConnectionState(.disconnected) + mock.emitConnectionState(.live) // reconnect → refetch + + try await waitUntil { controller.conversations.map(\.id) == [ConversationID.test(1)] } + controller.stop() + } + + @Test("a reconnect with no conversation open refreshes the feed but fetches no transcript") + func reconnectWithoutVisibleConversationSkipsMessages() async throws { + let mock = MockConversations() + let controller = makeController(mock) + + controller.start() + try await waitUntil { mock.connectionStateStreamOpened } + mock.emitConnectionState(.live) // initial connection — baseline + + // No conversation is visible. The feed refresh proves the refetch ran; + // the transcript page must not be fetched. + mock.feed = [Conversation(id: ConversationID.test(1), members: [], lastMessage: nil, lastActivity: Date(timeIntervalSince1970: 100))] + mock.emitConnectionState(.disconnected) + mock.emitConnectionState(.live) // reconnect → loadFeed only + + try await waitUntil { controller.conversations.map(\.id) == [ConversationID.test(1)] } + #expect(mock.latestPageQueries.isEmpty) + controller.stop() + } + // MARK: - Pagination - @Test("loadOlderMessages pages before the oldest loaded id and prepends the page") diff --git a/FlipcashTests/TestSupport/MockConversations.swift b/FlipcashTests/TestSupport/MockConversations.swift index ff41b9d0..982b80cc 100644 --- a/FlipcashTests/TestSupport/MockConversations.swift +++ b/FlipcashTests/TestSupport/MockConversations.swift @@ -19,12 +19,14 @@ final class MockConversations: ConversationFetching, ConversationMessaging, Conv private var _messages: [ConversationMessage] = [] private var _olderMessages: [ConversationMessage] = [] private var _olderQueries: [MessageID] = [] + private var _latestPageQueries: [ConversationID] = [] private var _sendResult: ConversationMessage? private var _sent: [Sent] = [] private var _markedRead: [MessageID] = [] private var _didEnsure = false private var _didClose = false private var _streamContinuation: AsyncStream.Continuation? + private var _connectionStateContinuation: AsyncStream.Continuation? var feed: [Conversation] { get { lock.withLock { _feed } } @@ -41,6 +43,8 @@ final class MockConversations: ConversationFetching, ConversationMessaging, Conv } /// The `before` cursors `getMessages` was paged with. var olderQueries: [MessageID] { lock.withLock { _olderQueries } } + /// The conversations `getMessages` was asked for the newest page of (`before == nil`). + var latestPageQueries: [ConversationID] { lock.withLock { _latestPageQueries } } var sendResult: ConversationMessage? { get { lock.withLock { _sendResult } } set { lock.withLock { _sendResult = newValue } } @@ -52,12 +56,21 @@ final class MockConversations: ConversationFetching, ConversationMessaging, Conv /// Whether `openConversationStream` has been called — events emitted /// before that are dropped, so tests wait on this before `emit(_:)`. var streamOpened: Bool { lock.withLock { _streamContinuation != nil } } + /// Whether `conversationConnectionState` has been subscribed — states emitted + /// before that are dropped, so tests wait on this before `emitConnectionState(_:)`. + var connectionStateStreamOpened: Bool { lock.withLock { _connectionStateContinuation != nil } } /// Push a live event onto the stream returned by `openConversationStream`. func emit(_ event: ConversationStreamEvent) { lock.withLock { _streamContinuation }?.yield(event) } + /// Push a connection-state transition onto the stream returned by + /// `conversationConnectionState`, as the streamer does on a ping or teardown. + func emitConnectionState(_ state: EventStreamConnectionState) { + lock.withLock { _connectionStateContinuation }?.yield(state) + } + // MARK: - ConversationFetching func getDmChatFeed(owner: KeyPair) async throws -> [Conversation] { feed } @@ -72,7 +85,10 @@ final class MockConversations: ConversationFetching, ConversationMessaging, Conv // MARK: - ConversationMessaging func getMessages(owner: KeyPair, conversationID: ConversationID, before: MessageID?) async throws -> [ConversationMessage] { - guard let before else { return messages } + guard let before else { + lock.withLock { _latestPageQueries.append(conversationID) } + return messages + } lock.withLock { _olderQueries.append(before) } return olderMessages } @@ -97,6 +113,12 @@ final class MockConversations: ConversationFetching, ConversationMessaging, Conv return stream } + func conversationConnectionState() -> AsyncStream { + let (stream, continuation) = AsyncStream.makeStream() + lock.withLock { _connectionStateContinuation = continuation } + return stream + } + func ensureConversationStreamConnected() { lock.withLock { _didEnsure = true } } func closeConversationStream() { lock.withLock { _didClose = true } } }