diff --git a/.well-known/release-manifest.json b/.well-known/release-manifest.json index 7c53e4923..580c70b9b 100644 --- a/.well-known/release-manifest.json +++ b/.well-known/release-manifest.json @@ -1,5 +1,5 @@ { - "updated": "2026-06-24T17:57:12Z", + "updated": "2026-06-22T19:33:06Z", "tracks": { "production": { "versionCode": 3939, @@ -8,7 +8,7 @@ "beta": null, "alpha": null, "internal": { - "versionCode": 3962, + "versionCode": 3953, "versionName": "2026.6.2" } } diff --git a/apps/flipcash/shared/chat/src/main/kotlin/com/flipcash/shared/chat/ChatCoordinator.kt b/apps/flipcash/shared/chat/src/main/kotlin/com/flipcash/shared/chat/ChatCoordinator.kt index 1fae39e3f..e34eec357 100644 --- a/apps/flipcash/shared/chat/src/main/kotlin/com/flipcash/shared/chat/ChatCoordinator.kt +++ b/apps/flipcash/shared/chat/src/main/kotlin/com/flipcash/shared/chat/ChatCoordinator.kt @@ -1,735 +1,119 @@ -@file:OptIn(ExperimentalCoroutinesApi::class, ExperimentalPagingApi::class) +@file:OptIn(ExperimentalPagingApi::class) package com.flipcash.shared.chat -import androidx.core.app.NotificationManagerCompat -import androidx.lifecycle.DefaultLifecycleObserver -import androidx.lifecycle.LifecycleOwner -import androidx.lifecycle.ProcessLifecycleOwner import androidx.paging.ExperimentalPagingApi -import androidx.paging.Pager -import androidx.paging.PagingConfig import androidx.paging.PagingData -import androidx.paging.map import com.flipcash.app.core.contacts.DeviceContact -import com.flipcash.app.featureflags.FeatureFlag -import com.flipcash.app.featureflags.FeatureFlagController -import com.flipcash.app.persistence.sources.ChatMemberDataSource -import com.flipcash.app.persistence.sources.mediator.ChatMessageRemoteMediator -import com.flipcash.app.persistence.sources.ChatMessageDataSource -import com.flipcash.app.persistence.sources.ChatMetadataDataSource -import com.flipcash.app.persistence.sources.ContactDataSource -import com.flipcash.app.persistence.entities.ChatMetadataEntity -import com.flipcash.services.controllers.ChatController -import com.flipcash.services.controllers.ChatMessagingController -import com.flipcash.services.controllers.EventStreamingController import com.flipcash.services.models.chat.ChatId -import com.flipcash.services.models.chat.ChatMetadata import com.flipcash.services.models.chat.ChatMember import com.flipcash.services.models.chat.ChatMessage import com.flipcash.services.models.chat.MessagePointer -import com.flipcash.services.models.chat.ChatUpdate -import com.flipcash.services.models.chat.EmojiReaction -import com.flipcash.services.models.chat.MessageContent -import com.flipcash.services.models.chat.MetadataUpdate -import com.flipcash.services.models.chat.PointerType import com.flipcash.services.models.chat.ReactionSummary -import com.flipcash.services.models.chat.ReactionUpdate -import com.flipcash.services.models.chat.TypingNotification import com.flipcash.services.models.chat.TypingState -import com.flipcash.services.models.GetDeltaError -import com.flipcash.services.repository.DeltaUpdate -import com.flipcash.app.tokens.TokenCoordinator -import com.flipcash.libs.coroutines.DispatcherProvider -import com.flipcash.services.user.UserManager -import com.getcode.opencode.model.accounts.AccountCluster -import com.getcode.opencode.providers.SessionListener -import com.getcode.utils.TraceType -import com.getcode.utils.network.NetworkConnectivityListener -import com.getcode.utils.decodeBase58 -import com.getcode.utils.trace -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.Job -import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.combine import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.flow.asStateFlow -import kotlinx.coroutines.flow.debounce -import kotlinx.coroutines.flow.distinctUntilChanged -import kotlinx.coroutines.flow.filter -import kotlinx.coroutines.flow.filterNotNull -import kotlinx.coroutines.flow.first -import kotlinx.coroutines.flow.flatMapLatest -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.flow.update -import kotlinx.coroutines.launch -import javax.inject.Inject -import javax.inject.Singleton -import kotlin.time.Clock -import kotlin.time.Duration.Companion.seconds - -@Singleton -class ChatCoordinator @Inject constructor( - private val chatController: ChatController, - private val messagingController: ChatMessagingController, - private val eventStreamingController: EventStreamingController, - private val metadataDataSource: ChatMetadataDataSource, - private val messageDataSource: ChatMessageDataSource, - private val memberDataSource: ChatMemberDataSource, - private val contactDataSource: ContactDataSource, - private val networkObserver: NetworkConnectivityListener, - private val notificationManager: NotificationManagerCompat, - private val userManager: UserManager, - private val tokenCoordinator: TokenCoordinator, - private val featureFlags: FeatureFlagController, - private val dispatchers: DispatcherProvider, -) : SessionListener, DefaultLifecycleObserver { - - companion object { - private const val TAG = "ChatCoordinator" - private val HEARTBEAT_INTERVAL = 30.seconds - private val GAP_FILL_DELAY = 2.seconds - } - - private val supervisorJob = SupervisorJob() - private val scope = CoroutineScope(dispatchers.IO + supervisorJob) - private val cluster = MutableStateFlow(null) - private val _state = MutableStateFlow(ChatState()) - private val sequenceTracker = EventSequenceTracker() - private var syncJob: Job? = null - private var flagObserverJob: Job? = null - private var eventStreamCollectJob: Job? = null - private var feedObserverJob: Job? = null - private var heartbeatJob: Job? = null - private var networkObserverJob: Job? = null - private var backgroundedActiveChat: ChatId? = null - - val state: StateFlow - get() = _state.asStateFlow() +/** + * Feed-level operations: observing the conversation list and its unread state. + * + * Implemented by [com.flipcash.shared.chat.internal.delegates.FeedSyncDelegate]. + */ +interface FeedOperations { + /** Reactive list of all DM conversations, sorted by last activity. */ val feed: Flow> - get() = _state.map { state -> - val selfId = userManager.accountId - state.feed.mapNotNull { metadata -> - // Filter out anonymous chats (DMs where the other member has no name or phone) - val otherMember = metadata.members.firstOrNull { it.userId != selfId } - if (otherMember != null) { - val profile = otherMember.userProfile - val hasIdentity = !profile.displayName.isNullOrBlank() || - !profile.verifiedPhoneNumber.isNullOrBlank() - if (!hasIdentity) return@mapNotNull null - } - - val readPointer = metadata.members - .firstOrNull { it.userId == selfId } - ?.pointers - ?.firstOrNull { it.type == PointerType.READ } - ?.value ?: 0L - - val unreadCount = metadata.lastMessage?.let { lastMsg -> - if (lastMsg.messageId > readPointer && lastMsg.senderId != selfId) 1 else 0 - } ?: 0 - - ChatSummary(metadata = metadata, unreadCount = unreadCount) - } - } - - // region SessionListener - - override suspend fun onUserLoggedIn(cluster: AccountCluster) { - trace(tag = TAG, message = "User logged in, hydrating chat", type = TraceType.User) - this.cluster.value = cluster - observeFeedFromDb() - syncFeed() - openEventStream() - startHeartbeat() - observeFeatureFlag() - } - - // endregion - - // region Lifecycle - - init { - ProcessLifecycleOwner.get().lifecycle.addObserver(this) - - networkObserverJob = cluster.filterNotNull() - .flatMapLatest { networkObserver.state } - .distinctUntilChanged() - .filter { it.connected } - .debounce(1.seconds) - .onEach { - if (!isChatEnabled()) return@onEach - trace(tag = TAG, message = "Network connected, re-syncing chat feed", type = TraceType.Process) - syncFeed() - openEventStream() - } - .launchIn(scope) - } - - override fun onStart(owner: LifecycleOwner) { - backgroundedActiveChat?.let { - setActiveChatId(it) - backgroundedActiveChat = null - } - scope.launch { - if (cluster.value != null && isChatEnabled()) { - trace(tag = TAG, message = "Lifecycle resumed, syncing chat feed", type = TraceType.Process) - syncFeed() - openEventStream() - startHeartbeat() - } - } - } - - override fun onStop(owner: LifecycleOwner) { - backgroundedActiveChat = _state.value.activeChat - setActiveChatId(null) - stopHeartbeat() - closeEventStream() - } - - // endregion - - // region Public API - - suspend fun getChatId(contact: DeviceContact): Result { - val raw = contactDataSource.getDmChatId(contact.e164) - if (raw.isNullOrEmpty()) { - return Result.failure(NoDmChatInitializedException(contact.e164)) - } - return runCatching { ChatId(raw.decodeBase58()) } - } - - fun observeUnreadConversations(): Flow { - return feed.map { summaries -> summaries.count { it.unreadCount > 0 } } - } - - fun observeMessages(chatId: ChatId): Flow> { - return messageDataSource.observeMessages(chatId) - } - - fun observeMessagesPaged(chatId: ChatId): Flow> { - return Pager( - config = PagingConfig(pageSize = 50), - remoteMediator = ChatMessageRemoteMediator(chatId, messagingController, messageDataSource), - ) { - messageDataSource.observeForChat(chatId) - }.flow.map { page -> - page.map { entity -> messageDataSource.toChatMessage(entity) } - } - } - - fun observeTypingIndicators(chatId: ChatId): Flow> { - return _state.map { it.typingIndicators[chatId] ?: emptySet() } - } - - fun observeMembers(chatId: ChatId): Flow> { - return memberDataSource.observeMembers(chatId) - } - - fun observeOtherReadPointer(chatId: ChatId): Flow { - val selfId = userManager.accountId - return memberDataSource.observeMembers(chatId) - .map { members -> - members.firstOrNull { it.userId != selfId } - ?.pointers - ?.firstOrNull { it.type == PointerType.READ } - } - .distinctUntilChanged() - } - - suspend fun loadMessages(chatId: ChatId) { - messagingController.getMessages(chatId) - .onSuccess { messages -> - messageDataSource.upsert(chatId, messages) - - val latest = messages.maxByOrNull { it.messageId } ?: return@onSuccess - metadataDataSource.updateLastMessageId(chatId, latest.messageId) - metadataDataSource.updateLastActivity(chatId, latest.timestamp.toEpochMilliseconds()) - } - } - - suspend fun sendMessage(chatId: ChatId, content: String): Result { - val senderId = userManager.accountId - ?: return Result.failure(IllegalStateException("Cannot send message without an account")) - - val content = listOf(MessageContent.Text(content)) - val (_, clientMessageId) = messageDataSource.insertPending( - chatId = chatId, - content = content, - senderId = senderId, - ) - - return messagingController.sendMessage(chatId, content, clientMessageId) - .onSuccess { serverMessage -> - messageDataSource.confirmPending(chatId, clientMessageId, serverMessage) - advanceReadPointer(chatId, serverMessage.messageId) - - // Update feed metadata — reactive flow picks up the change - metadataDataSource.updateLastMessageId(chatId, serverMessage.messageId) - metadataDataSource.updateLastActivity(chatId, serverMessage.timestamp.toEpochMilliseconds()) - } - .onFailure { - messageDataSource.failPending(chatId, clientMessageId) - } - } - - suspend fun advanceReadPointer(chatId: ChatId, messageId: Long): Result { - val selfId = userManager.accountId ?: return Result.failure( - IllegalStateException("No account") - ) - - // Update local pointer — reactive flow updates the feed's unread count - val pointer = MessagePointer( - type = PointerType.READ, - userId = selfId, - value = messageId, - timestamp = Clock.System.now(), - ) - memberDataSource.updatePointers(chatId, pointer) - - return messagingController.advancePointer(chatId, PointerType.READ, messageId) - } - - fun setActiveChatId(chatId: ChatId?) { - _state.update { it.copy(activeChat = chatId) } - } - - fun isActiveChat(chatId: ChatId): Boolean { - return _state.value.activeChat == chatId - } - - suspend fun getOtherMemberE164(chatId: ChatId): String? { - val selfId = userManager.accountId - val localMembers = memberDataSource.getMembersForChat(chatId) - val otherMember = localMembers.firstOrNull { it.userId != selfId } - if (otherMember != null) return otherMember.userProfile.verifiedPhoneNumber - // Chat not persisted locally yet — fetch from server - val metadata = chatController.getChat(chatId).getOrNull() ?: return null - memberDataSource.upsert(chatId, metadata.members) - return metadata.members - .firstOrNull { it.userId != selfId } - ?.userProfile?.verifiedPhoneNumber - } + /** Emits the number of conversations that have unread messages. */ + fun observeUnreadConversations(): Flow - fun dismissNotifications(chatId: ChatId) { - notificationManager.cancel(chatId.hashCode()) - } - - suspend fun markAsRead(chatId: ChatId): Result { - val messageId = state.value.feed - .firstOrNull { it.chatId == chatId } - ?.lastMessage?.messageId - ?: messageDataSource.getLatestMessageId(chatId) - ?: return Result.success(Unit) - return advanceReadPointer(chatId, messageId) - .also { dismissNotifications(chatId) } - } - - suspend fun notifyTyping(chatId: ChatId, typingState: TypingState): Result { - return messagingController.notifyIsTyping(chatId, typingState) - } - - fun refreshFeed() { - syncFeed() - } - - suspend fun reset() { - stopHeartbeat() - closeEventStream() - syncJob?.cancel() - flagObserverJob?.cancel() - feedObserverJob?.cancel() - networkObserverJob?.cancel() - feedObserverJob = null - _state.value = ChatState() - sequenceTracker.clearAll() - cluster.value = null - metadataDataSource.clear() - messageDataSource.clear() - memberDataSource.clear() - supervisorJob.cancel() - trace(tag = TAG, message = "reset complete", type = TraceType.Process) - } - - // endregion - - // region Internal - - private suspend fun isChatEnabled(): Boolean { - val featureFlag = featureFlags.get(FeatureFlag.PhoneNumberSend) - val serverFlag = userManager.state.value.flags?.enablePhoneNumberSend == true - return featureFlag || serverFlag - } - - private fun observeFeatureFlag() { - flagObserverJob?.cancel() - flagObserverJob = combine( - featureFlags.observe(FeatureFlag.PhoneNumberSend), - userManager.state.map { it.flags?.enablePhoneNumberSend == true }, - ) { featureFlag, serverFlag -> featureFlag || serverFlag } - .distinctUntilChanged() - .filter { it } - .onEach { - if (cluster.value != null) { - trace(tag = TAG, message = "Chat feature enabled, syncing feed", type = TraceType.Process) - syncFeed() - openEventStream() - startHeartbeat() - } - } - .launchIn(scope) - } - - private fun observeFeedFromDb() { - feedObserverJob?.cancel() - feedObserverJob = combine( - metadataDataSource.observeAll(), - memberDataSource.observeAll(), - ) { metadataEntities, membersByChat -> - buildFeedFromDb(metadataEntities, membersByChat) - }.onEach { feed -> - _state.update { it.copy(feed = feed) } - }.launchIn(scope) - } - - private suspend fun buildFeedFromDb( - metadataEntities: List, - membersByChat: Map>, - ): List { - return metadataEntities.map { entity -> - val members = membersByChat[entity.chatIdHex] ?: emptyList() - val lastMessage = entity.lastMessageId?.let { - messageDataSource.getLatest(entity.chatIdHex) - } - metadataDataSource.toMetadata(entity, members, lastMessage) - } - } - - private fun syncFeed() { - syncJob?.cancel() - syncJob = scope.launch { performFeedSync() } - } - - private suspend fun performFeedSync() { - _state.update { it.copy(feedSyncState = FeedSyncState.Syncing) } - chatController.getDmChatFeed() - .onSuccess { page -> - metadataDataSource.upsert(page.chats) - - for (chat in page.chats) { - memberDataSource.upsert(chat.chatId, chat.members) - chat.lastMessage?.let { msg -> - messageDataSource.upsert(chat.chatId, listOf(msg)) - } - } - - _state.update { it.copy(feedSyncState = FeedSyncState.Synced) } - trace(tag = TAG, message = "Feed synced: ${page.chats.size} chats", type = TraceType.Process) - - // Delta-sync for chats with a known event sequence; full load for new chats - for (chat in page.chats) { - if (chat.latestEventSequence > 0) { - val localSeq = metadataDataSource.getLatestEventSequence(chat.chatId) - if (localSeq > 0 && localSeq < chat.latestEventSequence) { - performDeltaSync(chat.chatId) - continue - } - } - if (!messageDataSource.hasMessages(chat.chatId)) { - loadMessages(chat.chatId) - } - } - } - .onFailure { error -> - _state.update { it.copy(feedSyncState = FeedSyncState.Error) } - trace(tag = TAG, message = "Feed sync failed: ${error.message}", type = TraceType.Error) - } - } - - private fun openEventStream() { - if (eventStreamingController.isConnected) { - trace(tag = TAG, message = "Event stream already connected, skipping open", type = TraceType.Process) - ensureCollector() - return - } - - eventStreamingController.open(scope) - ensureCollector() - } - - private fun ensureCollector() { - if (eventStreamCollectJob?.isActive != true) { - eventStreamCollectJob = scope.launch { - eventStreamingController.chatUpdates.collect { applyUpdate(it) } - } - } - } - - private fun closeEventStream() { - eventStreamCollectJob?.cancel() - eventStreamCollectJob = null - eventStreamingController.close() - } - - private fun startHeartbeat() { - stopHeartbeat() - heartbeatJob = scope.launch { - while (true) { - delay(HEARTBEAT_INTERVAL) - if (!eventStreamingController.isStreamActive) { - trace(tag = TAG, message = "Heartbeat: event stream dead, syncing feed and reconnecting", type = TraceType.Process) - syncFeed() - // Close the dead ref so open() creates a fresh one - eventStreamingController.close() - openEventStream() - } - } - } - } - - private fun stopHeartbeat() { - heartbeatJob?.cancel() - heartbeatJob = null - } - - private suspend fun applyUpdate(update: ChatUpdate) { - val chatId = update.chatId - - // --- Resolve messages: prefer events, fall back to deprecated newMessages --- - - val resolvedMessages = if (update.events.isNotEmpty()) { - update.events - .flatMap { event -> event.mutations.map { it.message } } - .sortedBy { it.eventSequence } - .distinctBy { it.messageId } - } else { - @Suppress("DEPRECATION") - update.newMessages - } - - trace( - tag = TAG, - message = "applyUpdate: chatId=$chatId, messages=${resolvedMessages.size}, events=${update.events.size}, pointers=${update.pointerUpdates.size}, reactions=${update.reactionUpdates.size}, typing=${update.typingNotifications.size}", - type = TraceType.Process, - ) - - // --- Persist to DB first (suspend, off main thread) --- - - val lastMsg = if (resolvedMessages.isNotEmpty()) { - trace(tag = TAG, message = "Upserting ${resolvedMessages.size} messages for $chatId", type = TraceType.Process) - messageDataSource.upsert(chatId, resolvedMessages) - resolvedMessages.maxByOrNull { it.messageId }?.also { msg -> - metadataDataSource.updateLastMessageId(chatId, msg.messageId) - metadataDataSource.updateLastActivity(chatId, msg.timestamp.toEpochMilliseconds()) - } - } else null - - // Advance event sequence cursor — gap-aware (only advance contiguous frontier) - if (update.events.isNotEmpty()) { - val dbCursor = metadataDataSource.getLatestEventSequence(chatId) - val incomingSequences = update.events.map { it.sequence } - val result = sequenceTracker.processSequences(chatId, dbCursor, incomingSequences) - - if (result.newContiguousSequence > dbCursor) { - metadataDataSource.updateLatestEventSequence(chatId, result.newContiguousSequence) - } - - if (result.hasGap) { - scheduleGapFill(chatId) - } - } - - for (pointer in update.pointerUpdates) { - memberDataSource.updatePointers(chatId, pointer) - } - - for (metaUpdate in update.metadataUpdates) { - when (metaUpdate) { - is MetadataUpdate.FullRefresh -> { - metadataDataSource.upsert(metaUpdate.metadata) - memberDataSource.deleteForChat(metaUpdate.metadata.chatId) - memberDataSource.upsert(metaUpdate.metadata.chatId, metaUpdate.metadata.members) - metaUpdate.metadata.lastMessage?.let { msg -> - messageDataSource.upsert(metaUpdate.metadata.chatId, listOf(msg)) - } - } - is MetadataUpdate.LastActivityChanged -> { - metadataDataSource.updateLastActivity( - chatId, - metaUpdate.newLastActivity.toEpochMilliseconds(), - ) - } - } - } - - // --- Process reaction updates into in-memory overlay --- + /** Triggers a server-side feed sync. Safe to call redundantly. */ + fun refreshFeed() +} - if (update.reactionUpdates.isNotEmpty()) { - _state.update { state -> - val chatOverlays = state.reactionOverlays[chatId]?.toMutableMap() ?: mutableMapOf() - for (reactionUpdate in update.reactionUpdates) { - applyReactionUpdate(chatOverlays, reactionUpdate) - } - state.copy( - reactionOverlays = state.reactionOverlays + (chatId to chatOverlays.toMap()) - ) - } - } +/** + * Ephemeral real-time observations derived from the server event stream. + * + * Typing indicators and reaction overlays are held in memory only — they are + * not persisted to Room. + * + * Implemented by [com.flipcash.shared.chat.internal.delegates.EventStreamDelegate]. + */ +interface EventStreamOperations { + /** Emits the set of users currently typing in [chatId]. */ + fun observeTypingIndicators(chatId: ChatId): Flow> + + /** Emits the current reaction summary for a specific message, or `null` if none. */ + fun observeReactions(chatId: ChatId, messageId: Long): Flow +} - // --- Eagerly update token balance for incoming cash --- +/** + * Per-chat messaging operations: sending, receiving, read receipts, and identity. + * + * All methods target a single conversation identified by [ChatId]. + * + * Implemented by [com.flipcash.shared.chat.internal.delegates.MessagingDelegate]. + */ +interface MessagingOperations { + /** Resolves the [ChatId] for an existing DM with [contact]. */ + suspend fun getChatId(contact: DeviceContact): Result - val selfId = userManager.accountId - for (msg in resolvedMessages) { - if (msg.senderId == selfId) continue - for (content in msg.content) { - if (content is MessageContent.Cash) { - tokenCoordinator.add(content.mint, content.amount) - } - } - } + /** Returns the E.164 phone number of the other member in a DM, or `null` if unknown. */ + suspend fun getOtherMemberE164(chatId: ChatId): String? - // --- Check if unknown chat requires a full feed sync --- + /** Marks [chatId] as the currently-viewed chat (used to suppress notifications). */ + fun setActiveChatId(chatId: ChatId?) - if (lastMsg != null) { - if (!metadataDataSource.exists(chatId)) { - syncFeed() - } - } + /** Returns `true` if [chatId] is the currently-viewed chat. */ + fun isActiveChat(chatId: ChatId): Boolean - // --- Update ephemeral state (typing indicators are not DB-backed) --- + /** Cancels any pending system notifications for [chatId]. */ + fun dismissNotifications(chatId: ChatId) - if (update.typingNotifications.isNotEmpty()) { - _state.update { state -> - val currentTypists = state.typingIndicators[chatId]?.toMutableSet() ?: mutableSetOf() - for (notification in update.typingNotifications) { - applyTypingNotification(currentTypists, notification) - } - state.copy( - typingIndicators = state.typingIndicators + (chatId to currentTypists.toSet()) - ) - } - } - } + /** Observes all messages in [chatId] as a flat list. */ + fun observeMessages(chatId: ChatId): Flow> - private fun applyReactionUpdate( - overlays: MutableMap, - update: ReactionUpdate, - ) { - val existing = overlays[update.messageId] - val existingReactions = existing?.reactions?.toMutableList() ?: mutableListOf() + /** Observes messages in [chatId] via Paging 3, with remote-mediated page loads. */ + fun observeMessagesPaged(chatId: ChatId): Flow> - // Find existing reaction for this emoji - val idx = existingReactions.indexOfFirst { it.emoji == update.emoji } - if (idx >= 0) { - val current = existingReactions[idx] - // LWW guard using sequence - if (update.sequence <= current.sequence) return - existingReactions[idx] = EmojiReaction( - emoji = update.emoji, - count = update.count, - reactedBySelf = current.reactedBySelf, // preserved; server will correct on next full fetch - sampleReactors = current.sampleReactors, - sequence = update.sequence, - ) - } else { - existingReactions.add( - EmojiReaction( - emoji = update.emoji, - count = update.count, - reactedBySelf = false, - sampleReactors = emptyList(), - sequence = update.sequence, - ) - ) - } + /** Observes the member list for [chatId]. */ + fun observeMembers(chatId: ChatId): Flow> - // Remove reactions with count == 0 - existingReactions.removeAll { it.count <= 0 } + /** Observes the other member's read pointer in [chatId] (for read receipts). */ + fun observeOtherReadPointer(chatId: ChatId): Flow - overlays[update.messageId] = ReactionSummary( - messageId = update.messageId, - reactions = existingReactions.toList(), - ) - } + /** Fetches the full message history for [chatId] from the server and persists locally. */ + suspend fun loadMessages(chatId: ChatId) - private fun scheduleGapFill(chatId: ChatId) { - val job = scope.launch { - delay(GAP_FILL_DELAY) - if (!sequenceTracker.hasGap(chatId)) return@launch - trace(tag = TAG, message = "Gap fill timeout: fetching delta for $chatId", type = TraceType.Process) - performDeltaSync(chatId) - } - sequenceTracker.setGapFillJob(chatId, job) - } + /** Sends a text message to [chatId]. Returns the server-confirmed [ChatMessage]. */ + suspend fun sendMessage(chatId: ChatId, content: String): Result - private suspend fun performDeltaSync(chatId: ChatId) { - val afterSequence = metadataDataSource.getLatestEventSequence(chatId) - trace(tag = TAG, message = "Delta sync for $chatId from sequence $afterSequence", type = TraceType.Process) + /** Advances the local and remote read pointer for [chatId] to [messageId]. */ + suspend fun advanceReadPointer(chatId: ChatId, messageId: Long): Result - try { - val result = messagingController.getDelta(chatId, afterSequence).first() - result - .onSuccess { delta -> - if (delta.messages.isNotEmpty()) { - messageDataSource.upsert(chatId, delta.messages) - val latest = delta.messages.maxByOrNull { it.messageId } - latest?.let { msg -> - metadataDataSource.updateLastMessageId(chatId, msg.messageId) - metadataDataSource.updateLastActivity(chatId, msg.timestamp.toEpochMilliseconds()) - } - } - if (delta.latestSequence > afterSequence) { - metadataDataSource.updateLatestEventSequence(chatId, delta.latestSequence) - } - // getDelta is authoritative — reset tracker to the server's sequence - sequenceTracker.resetTo(chatId, delta.latestSequence) - trace(tag = TAG, message = "Delta sync complete: ${delta.messages.size} messages, sequence ${delta.latestSequence}", type = TraceType.Process) - } - .onFailure { error -> - if (error is GetDeltaError.ResetRequired) { - trace(tag = TAG, message = "Delta sync reset required for $chatId, falling back to full load", type = TraceType.Process) - sequenceTracker.clear(chatId) - loadMessages(chatId) - } else { - trace(tag = TAG, message = "Delta sync failed for $chatId: ${error.message}", type = TraceType.Error) - } - } - } catch (e: Exception) { - trace(tag = TAG, message = "Delta sync exception for $chatId: ${e.message}", type = TraceType.Error) - } - } + /** Marks [chatId] as fully read (advances pointer to the latest message). */ + suspend fun markAsRead(chatId: ChatId): Result - fun observeReactions(chatId: ChatId, messageId: Long): Flow { - return _state.map { it.reactionOverlays[chatId]?.get(messageId) } - .distinctUntilChanged() - } + /** Notifies the server of the user's typing state in [chatId]. */ + suspend fun notifyTyping(chatId: ChatId, typingState: TypingState): Result +} - private fun applyTypingNotification( - typists: MutableSet, - notification: TypingNotification, - ) { - when (notification.state) { - TypingState.STARTED_TYPING, TypingState.STILL_TYPING -> { - typists.removeAll { it.userId == notification.userId } - typists.add(ActiveTypist(userId = notification.userId, since = Clock.System.now())) - } - TypingState.STOPPED_TYPING, TypingState.TYPING_TIMED_OUT -> { - typists.removeAll { it.userId == notification.userId } - } - TypingState.UNKNOWN -> Unit - } - } +/** + * Unified facade for the chat subsystem, composing [FeedOperations], + * [EventStreamOperations], and [MessagingOperations]. + * + * The concrete implementation is + * [RealChatCoordinator][com.flipcash.shared.chat.internal.RealChatCoordinator], + * which delegates each sub-interface to a focused singleton and wires + * cross-delegate events in its `init` block. + * + * @see com.flipcash.shared.chat.internal.RealChatCoordinator + */ +interface ChatCoordinator : FeedOperations, EventStreamOperations, MessagingOperations { + /** Full observable snapshot of chat state (feed, typing, reactions, active chat). */ + val state: StateFlow - // endregion + /** Tears down all connections, clears persisted data, and resets in-memory state. */ + suspend fun reset() } class NoDmChatInitializedException(e164: String) : Exception("No DM chat for $e164") diff --git a/apps/flipcash/shared/chat/src/main/kotlin/com/flipcash/shared/chat/inject/ChatModule.kt b/apps/flipcash/shared/chat/src/main/kotlin/com/flipcash/shared/chat/inject/ChatModule.kt index 53c1d1d01..59f1f2d9f 100644 --- a/apps/flipcash/shared/chat/src/main/kotlin/com/flipcash/shared/chat/inject/ChatModule.kt +++ b/apps/flipcash/shared/chat/src/main/kotlin/com/flipcash/shared/chat/inject/ChatModule.kt @@ -1,20 +1,28 @@ package com.flipcash.shared.chat.inject import com.flipcash.shared.chat.ChatCoordinator +import com.flipcash.shared.chat.internal.RealChatCoordinator import com.getcode.opencode.providers.SessionListener import dagger.Binds import dagger.Module import dagger.hilt.InstallIn import dagger.hilt.components.SingletonComponent import dagger.multibindings.IntoSet +import javax.inject.Singleton @Module @InstallIn(SingletonComponent::class) abstract class ChatModule { + @Binds + @Singleton + abstract fun bindChatCoordinator( + impl: RealChatCoordinator + ): ChatCoordinator + @Binds @IntoSet abstract fun bindSessionListener( - coordinator: ChatCoordinator + impl: RealChatCoordinator ): SessionListener } diff --git a/apps/flipcash/shared/chat/src/main/kotlin/com/flipcash/shared/chat/internal/ChatStateHolder.kt b/apps/flipcash/shared/chat/src/main/kotlin/com/flipcash/shared/chat/internal/ChatStateHolder.kt new file mode 100644 index 000000000..bc8beb725 --- /dev/null +++ b/apps/flipcash/shared/chat/src/main/kotlin/com/flipcash/shared/chat/internal/ChatStateHolder.kt @@ -0,0 +1,24 @@ +package com.flipcash.shared.chat.internal + +import com.flipcash.shared.chat.ChatState +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.asStateFlow +import kotlinx.coroutines.flow.update +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton +class ChatStateHolder @Inject constructor() { + private val _state = MutableStateFlow(ChatState()) + val state: StateFlow = _state.asStateFlow() + val current: ChatState get() = _state.value + + fun update(transform: (ChatState) -> ChatState) { + _state.update(transform) + } + + fun reset() { + _state.value = ChatState() + } +} diff --git a/apps/flipcash/shared/chat/src/main/kotlin/com/flipcash/shared/chat/internal/RealChatCoordinator.kt b/apps/flipcash/shared/chat/src/main/kotlin/com/flipcash/shared/chat/internal/RealChatCoordinator.kt new file mode 100644 index 000000000..e3b00f2ec --- /dev/null +++ b/apps/flipcash/shared/chat/src/main/kotlin/com/flipcash/shared/chat/internal/RealChatCoordinator.kt @@ -0,0 +1,227 @@ +@file:OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class) + +package com.flipcash.shared.chat.internal + +import androidx.lifecycle.DefaultLifecycleObserver +import androidx.lifecycle.LifecycleOwner +import androidx.lifecycle.ProcessLifecycleOwner +import com.flipcash.app.featureflags.FeatureFlag +import com.flipcash.app.featureflags.FeatureFlagController +import com.flipcash.libs.coroutines.DispatcherProvider +import com.flipcash.services.models.chat.ChatId +import com.flipcash.services.user.UserManager +import com.flipcash.shared.chat.ChatCoordinator +import com.flipcash.shared.chat.ChatState +import com.flipcash.shared.chat.EventStreamOperations +import com.flipcash.shared.chat.FeedOperations +import com.flipcash.shared.chat.MessagingOperations +import com.flipcash.shared.chat.internal.delegates.EventStreamDelegate +import com.flipcash.shared.chat.internal.delegates.FeedSyncDelegate +import com.flipcash.shared.chat.internal.delegates.MessagingDelegate +import com.getcode.opencode.model.accounts.AccountCluster +import com.getcode.opencode.providers.SessionListener +import com.getcode.utils.TraceType +import com.getcode.utils.network.NetworkConnectivityListener +import com.getcode.utils.trace +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.combine +import kotlinx.coroutines.flow.debounce +import kotlinx.coroutines.flow.distinctUntilChanged +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.filterNotNull +import kotlinx.coroutines.flow.flatMapLatest +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.launch +import javax.inject.Inject +import javax.inject.Singleton +import kotlin.time.Duration.Companion.seconds + +/** + * Thin orchestration shell that implements [ChatCoordinator] by composing three + * focused delegates via Kotlin `by` interface delegation: + * + * | Delegate | Interface | Responsibility | + * |----------|-----------|----------------| + * | [FeedSyncDelegate] | [FeedOperations] | Feed sync, DB observation, unread counts | + * | [EventStreamDelegate] | [EventStreamOperations] | Event stream, real-time updates, gap-aware sequencing, reactions, typing | + * | [MessagingDelegate] | [MessagingOperations] | Per-chat send/receive, read pointers, paging, notifications | + * + * **What lives here (and why):** + * - **Event routing** — each delegate exposes a `Flow` (backed by a `Channel`); + * the `init` block collects both and dispatches cross-delegate calls (e.g. + * feed-delegate's `DeltaSyncNeeded` → `eventStreamDelegate.performDeltaSync`, + * event-stream-delegate's `SyncFeedRequested` → `feedDelegate.syncFeed`). + * All cross-delegate wiring is visible in one place. + * - **Lifecycle methods** — [onStart]/[onStop] are inherently cross-cutting + * (stream connect/disconnect, heartbeat start/stop, active-chat save/restore). + * - **Flow observers** — network reconnect and feature-flag transitions that + * gate whether the chat subsystem should be active. + * + * Delegates require [initialize] with a shared [CoroutineScope] before use; + * this happens in [onUserLoggedIn]. + */ +@Singleton +class RealChatCoordinator @Inject constructor( + private val feedDelegate: FeedSyncDelegate, + private val eventStreamDelegate: EventStreamDelegate, + private val messagingDelegate: MessagingDelegate, + private val stateHolder: ChatStateHolder, + private val userManager: UserManager, + private val featureFlags: FeatureFlagController, + networkObserver: NetworkConnectivityListener, + dispatchers: DispatcherProvider, +) : ChatCoordinator, + SessionListener, + DefaultLifecycleObserver, + FeedOperations by feedDelegate, + EventStreamOperations by eventStreamDelegate, + MessagingOperations by messagingDelegate { + + companion object { + private const val TAG = "ChatCoordinator" + } + + private val supervisorJob = SupervisorJob() + private val scope = CoroutineScope(dispatchers.IO + supervisorJob) + private val cluster = MutableStateFlow(null) + private var flagObserverJob: Job? = null + private var networkObserverJob: Job? = null + private var backgroundedActiveChat: ChatId? = null + + override val state: StateFlow + get() = stateHolder.state + + // region SessionListener + + override suspend fun onUserLoggedIn(cluster: AccountCluster) { + trace(tag = TAG, message = "User logged in, hydrating chat", type = TraceType.User) + this.cluster.value = cluster + feedDelegate.initialize(scope) + eventStreamDelegate.initialize(scope) + feedDelegate.observeFeedFromDb() + feedDelegate.syncFeed() + eventStreamDelegate.open() + eventStreamDelegate.startHeartbeat { feedDelegate.syncFeed() } + observeFeatureFlag() + } + + // endregion + + // region Lifecycle + + init { + ProcessLifecycleOwner.get().lifecycle.addObserver(this) + + feedDelegate.events + .onEach { event -> + when (event) { + is FeedSyncDelegate.Event.LoadMessages -> + messagingDelegate.loadMessages(event.chatId) + is FeedSyncDelegate.Event.DeltaSyncNeeded -> + eventStreamDelegate.performDeltaSync(event.chatId) + } + }.launchIn(scope) + + eventStreamDelegate.events + .onEach { event -> + when (event) { + is EventStreamDelegate.Event.SyncFeedRequested -> + feedDelegate.syncFeed() + is EventStreamDelegate.Event.LoadMessages -> + messagingDelegate.loadMessages(event.chatId) + } + }.launchIn(scope) + + networkObserverJob = cluster.filterNotNull() + .flatMapLatest { networkObserver.state } + .distinctUntilChanged() + .filter { it.connected } + .debounce(1.seconds) + .onEach { + if (!isChatEnabled()) return@onEach + trace(tag = TAG, message = "Network connected, re-syncing chat feed", type = TraceType.Process) + feedDelegate.syncFeed() + eventStreamDelegate.open() + } + .launchIn(scope) + } + + override fun onStart(owner: LifecycleOwner) { + backgroundedActiveChat?.let { + messagingDelegate.setActiveChatId(it) + backgroundedActiveChat = null + } + scope.launch { + if (cluster.value != null && isChatEnabled()) { + trace(tag = TAG, message = "Lifecycle resumed, syncing chat feed", type = TraceType.Process) + feedDelegate.syncFeed() + eventStreamDelegate.open() + eventStreamDelegate.startHeartbeat { feedDelegate.syncFeed() } + } + } + } + + override fun onStop(owner: LifecycleOwner) { + backgroundedActiveChat = stateHolder.current.activeChat + messagingDelegate.setActiveChatId(null) + eventStreamDelegate.stopHeartbeat() + eventStreamDelegate.close() + } + + // endregion + + // region ChatCoordinator + + override suspend fun reset() { + eventStreamDelegate.stopHeartbeat() + eventStreamDelegate.close() + feedDelegate.cancelJobs() + flagObserverJob?.cancel() + networkObserverJob?.cancel() + stateHolder.reset() + eventStreamDelegate.clearAll() + cluster.value = null + messagingDelegate.clear() + supervisorJob.cancel() + trace(tag = TAG, message = "reset complete", type = TraceType.Process) + } + + // endregion + + // region Internal + + private suspend fun isChatEnabled(): Boolean { + val featureFlag = featureFlags.get(FeatureFlag.PhoneNumberSend) + val serverFlag = userManager.state.value.flags?.enablePhoneNumberSend == true + return featureFlag || serverFlag + } + + private fun observeFeatureFlag() { + flagObserverJob?.cancel() + flagObserverJob = combine( + featureFlags.observe(FeatureFlag.PhoneNumberSend), + userManager.state.map { it.flags?.enablePhoneNumberSend == true }, + ) { featureFlag, serverFlag -> featureFlag || serverFlag } + .distinctUntilChanged() + .filter { it } + .onEach { + if (cluster.value != null) { + trace(tag = TAG, message = "Chat feature enabled, syncing feed", type = TraceType.Process) + feedDelegate.syncFeed() + eventStreamDelegate.open() + eventStreamDelegate.startHeartbeat { feedDelegate.syncFeed() } + } + } + .launchIn(scope) + } + + // endregion +} diff --git a/apps/flipcash/shared/chat/src/main/kotlin/com/flipcash/shared/chat/internal/delegates/EventStreamDelegate.kt b/apps/flipcash/shared/chat/src/main/kotlin/com/flipcash/shared/chat/internal/delegates/EventStreamDelegate.kt new file mode 100644 index 000000000..5c3a6cf3a --- /dev/null +++ b/apps/flipcash/shared/chat/src/main/kotlin/com/flipcash/shared/chat/internal/delegates/EventStreamDelegate.kt @@ -0,0 +1,389 @@ +package com.flipcash.shared.chat.internal.delegates + +import com.flipcash.app.persistence.sources.ChatMemberDataSource +import com.flipcash.app.persistence.sources.ChatMessageDataSource +import com.flipcash.app.persistence.sources.ChatMetadataDataSource +import com.flipcash.app.tokens.TokenCoordinator +import com.flipcash.services.controllers.ChatMessagingController +import com.flipcash.services.controllers.EventStreamingController +import com.flipcash.services.models.chat.ChatId +import com.flipcash.services.models.chat.ChatUpdate +import com.flipcash.services.models.chat.EmojiReaction +import com.flipcash.services.models.chat.MessageContent +import com.flipcash.services.models.chat.MetadataUpdate +import com.flipcash.services.models.chat.ReactionSummary +import com.flipcash.services.models.chat.ReactionUpdate +import com.flipcash.services.models.chat.TypingNotification +import com.flipcash.services.models.chat.TypingState +import com.flipcash.services.models.GetDeltaError +import com.flipcash.shared.chat.ActiveTypist +import com.flipcash.shared.chat.EventSequenceTracker +import com.flipcash.shared.chat.EventStreamOperations +import com.flipcash.shared.chat.internal.ChatStateHolder +import com.flipcash.services.user.UserManager +import com.getcode.utils.TraceType +import com.getcode.utils.trace +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.distinctUntilChanged +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.receiveAsFlow +import kotlinx.coroutines.launch +import javax.inject.Inject +import javax.inject.Singleton +import kotlin.time.Clock +import kotlin.time.Duration.Companion.seconds + +/** + * Manages the server-sent event stream and applies real-time [ChatUpdate]s to + * local persistence and in-memory state. + * + * Responsibilities: + * - **Message persistence** — resolves messages from `ChatUpdate.events` (preferred) + * or the deprecated `newMessages` field, then upserts to Room. + * - **Gap-aware event sequencing** — uses [EventSequenceTracker] to maintain a + * contiguous frontier. Only the highest contiguous sequence is persisted; if a + * gap is detected, a timed [getDelta][performDeltaSync] backfill is scheduled. + * - **Reaction overlays** — merges [ReactionUpdate]s into an in-memory + * `Map>` with last-writer-wins on + * `EmojiReaction.sequence`. + * - **Typing indicators** — maintains per-chat `Set` in [ChatStateHolder]. + * - **Eager balance update** — credits incoming cash messages to [TokenCoordinator] + * before the server balance refresh arrives. + * - **Heartbeat** — periodically checks stream liveness and reconnects if dead. + * + * **Cross-delegate communication:** Emits [Event.SyncFeedRequested] when a message + * arrives for an unknown chat, and [Event.LoadMessages] when a delta reset requires + * a full message re-fetch. [RealChatCoordinator] routes these. + * + * Requires [initialize] with a [CoroutineScope] before any work can be launched. + * + * @see com.flipcash.shared.chat.internal.RealChatCoordinator + * @see EventSequenceTracker + */ +@Singleton +class EventStreamDelegate @Inject constructor( + private val eventStreamingController: EventStreamingController, + private val messagingController: ChatMessagingController, + private val metadataDataSource: ChatMetadataDataSource, + private val messageDataSource: ChatMessageDataSource, + private val memberDataSource: ChatMemberDataSource, + private val tokenCoordinator: TokenCoordinator, + private val userManager: UserManager, + private val stateHolder: ChatStateHolder, +) : EventStreamOperations { + + companion object { + private const val TAG = "EventStreamDelegate" + private val GAP_FILL_DELAY = 2.seconds + } + + sealed interface Event { + data object SyncFeedRequested : Event + data class LoadMessages(val chatId: ChatId) : Event + } + + private val _events = Channel(Channel.UNLIMITED) + val events: Flow = _events.receiveAsFlow() + + private val sequenceTracker = EventSequenceTracker() + private var scope: CoroutineScope? = null + private var eventStreamCollectJob: Job? = null + private var heartbeatJob: Job? = null + + // region EventStreamOperations + + override fun observeTypingIndicators(chatId: ChatId): Flow> { + return stateHolder.state.map { it.typingIndicators[chatId] ?: emptySet() } + } + + override fun observeReactions(chatId: ChatId, messageId: Long): Flow { + return stateHolder.state.map { it.reactionOverlays[chatId]?.get(messageId) } + .distinctUntilChanged() + } + + // endregion + + // region Internal + + internal fun initialize(scope: CoroutineScope) { + this.scope = scope + } + + internal fun open() { + val scope = scope ?: return + if (eventStreamingController.isConnected) { + trace(tag = TAG, message = "Event stream already connected, skipping open", type = TraceType.Process) + ensureCollector(scope) + return + } + + eventStreamingController.open(scope) + ensureCollector(scope) + } + + internal fun close() { + eventStreamCollectJob?.cancel() + eventStreamCollectJob = null + eventStreamingController.close() + } + + internal fun startHeartbeat(onReconnect: () -> Unit) { + val scope = scope ?: return + stopHeartbeat() + heartbeatJob = scope.launch { + while (true) { + delay(30.seconds) + if (!eventStreamingController.isStreamActive) { + trace(tag = TAG, message = "Heartbeat: event stream dead, syncing feed and reconnecting", type = TraceType.Process) + onReconnect() + eventStreamingController.close() + open() + } + } + } + } + + internal fun stopHeartbeat() { + heartbeatJob?.cancel() + heartbeatJob = null + } + + internal suspend fun performDeltaSync(chatId: ChatId) { + val afterSequence = metadataDataSource.getLatestEventSequence(chatId) + trace(tag = TAG, message = "Delta sync for $chatId from sequence $afterSequence", type = TraceType.Process) + + try { + val result = messagingController.getDelta(chatId, afterSequence).first() + result + .onSuccess { delta -> + if (delta.messages.isNotEmpty()) { + messageDataSource.upsert(chatId, delta.messages) + val latest = delta.messages.maxByOrNull { it.messageId } + latest?.let { msg -> + metadataDataSource.updateLastMessageId(chatId, msg.messageId) + metadataDataSource.updateLastActivity(chatId, msg.timestamp.toEpochMilliseconds()) + } + } + if (delta.latestSequence > afterSequence) { + metadataDataSource.updateLatestEventSequence(chatId, delta.latestSequence) + } + sequenceTracker.resetTo(chatId, delta.latestSequence) + trace(tag = TAG, message = "Delta sync complete: ${delta.messages.size} messages, sequence ${delta.latestSequence}", type = TraceType.Process) + } + .onFailure { error -> + if (error is GetDeltaError.ResetRequired) { + trace(tag = TAG, message = "Delta sync reset required for $chatId, falling back to full load", type = TraceType.Process) + sequenceTracker.clear(chatId) + _events.send(Event.LoadMessages(chatId)) + } else { + trace(tag = TAG, message = "Delta sync failed for $chatId: ${error.message}", type = TraceType.Error) + } + } + } catch (e: Exception) { + trace(tag = TAG, message = "Delta sync exception for $chatId: ${e.message}", type = TraceType.Error) + } + } + + internal fun clearAll() { + sequenceTracker.clearAll() + } + + private fun ensureCollector(scope: CoroutineScope) { + if (eventStreamCollectJob?.isActive != true) { + eventStreamCollectJob = scope.launch { + eventStreamingController.chatUpdates.collect { applyUpdate(it) } + } + } + } + + private suspend fun applyUpdate(update: ChatUpdate) { + val chatId = update.chatId + + // --- Resolve messages: prefer events, fall back to deprecated newMessages --- + + val resolvedMessages = if (update.events.isNotEmpty()) { + update.events + .flatMap { event -> event.mutations.map { it.message } } + .sortedBy { it.eventSequence } + .distinctBy { it.messageId } + } else { + @Suppress("DEPRECATION") + update.newMessages + } + + trace( + tag = TAG, + message = "applyUpdate: chatId=$chatId, messages=${resolvedMessages.size}, events=${update.events.size}, pointers=${update.pointerUpdates.size}, reactions=${update.reactionUpdates.size}, typing=${update.typingNotifications.size}", + type = TraceType.Process, + ) + + // --- Persist to DB --- + + val lastMsg = if (resolvedMessages.isNotEmpty()) { + trace(tag = TAG, message = "Upserting ${resolvedMessages.size} messages for $chatId", type = TraceType.Process) + messageDataSource.upsert(chatId, resolvedMessages) + resolvedMessages.maxByOrNull { it.messageId }?.also { msg -> + metadataDataSource.updateLastMessageId(chatId, msg.messageId) + metadataDataSource.updateLastActivity(chatId, msg.timestamp.toEpochMilliseconds()) + } + } else null + + // Advance event sequence cursor — gap-aware + if (update.events.isNotEmpty()) { + val dbCursor = metadataDataSource.getLatestEventSequence(chatId) + val incomingSequences = update.events.map { it.sequence } + val result = sequenceTracker.processSequences(chatId, dbCursor, incomingSequences) + + if (result.newContiguousSequence > dbCursor) { + metadataDataSource.updateLatestEventSequence(chatId, result.newContiguousSequence) + } + + if (result.hasGap) { + scheduleGapFill(chatId) + } + } + + for (pointer in update.pointerUpdates) { + memberDataSource.updatePointers(chatId, pointer) + } + + for (metaUpdate in update.metadataUpdates) { + when (metaUpdate) { + is MetadataUpdate.FullRefresh -> { + metadataDataSource.upsert(metaUpdate.metadata) + memberDataSource.deleteForChat(metaUpdate.metadata.chatId) + memberDataSource.upsert(metaUpdate.metadata.chatId, metaUpdate.metadata.members) + metaUpdate.metadata.lastMessage?.let { msg -> + messageDataSource.upsert(metaUpdate.metadata.chatId, listOf(msg)) + } + } + is MetadataUpdate.LastActivityChanged -> { + metadataDataSource.updateLastActivity( + chatId, + metaUpdate.newLastActivity.toEpochMilliseconds(), + ) + } + } + } + + // --- Process reaction updates --- + + if (update.reactionUpdates.isNotEmpty()) { + stateHolder.update { state -> + val chatOverlays = state.reactionOverlays[chatId]?.toMutableMap() ?: mutableMapOf() + for (reactionUpdate in update.reactionUpdates) { + applyReactionUpdate(chatOverlays, reactionUpdate) + } + state.copy( + reactionOverlays = state.reactionOverlays + (chatId to chatOverlays.toMap()) + ) + } + } + + // --- Eagerly update token balance for incoming cash --- + + val selfId = userManager.accountId + for (msg in resolvedMessages) { + if (msg.senderId == selfId) continue + for (content in msg.content) { + if (content is MessageContent.Cash) { + tokenCoordinator.add(content.mint, content.amount) + } + } + } + + // --- Unknown chat → full feed sync --- + + if (lastMsg != null) { + if (!metadataDataSource.exists(chatId)) { + _events.send(Event.SyncFeedRequested) + } + } + + // --- Typing indicators --- + + if (update.typingNotifications.isNotEmpty()) { + stateHolder.update { state -> + val currentTypists = state.typingIndicators[chatId]?.toMutableSet() ?: mutableSetOf() + for (notification in update.typingNotifications) { + applyTypingNotification(currentTypists, notification) + } + state.copy( + typingIndicators = state.typingIndicators + (chatId to currentTypists.toSet()) + ) + } + } + } + + private fun applyReactionUpdate( + overlays: MutableMap, + update: ReactionUpdate, + ) { + val existing = overlays[update.messageId] + val existingReactions = existing?.reactions?.toMutableList() ?: mutableListOf() + + val idx = existingReactions.indexOfFirst { it.emoji == update.emoji } + if (idx >= 0) { + val current = existingReactions[idx] + if (update.sequence <= current.sequence) return + existingReactions[idx] = EmojiReaction( + emoji = update.emoji, + count = update.count, + reactedBySelf = current.reactedBySelf, + sampleReactors = current.sampleReactors, + sequence = update.sequence, + ) + } else { + existingReactions.add( + EmojiReaction( + emoji = update.emoji, + count = update.count, + reactedBySelf = false, + sampleReactors = emptyList(), + sequence = update.sequence, + ) + ) + } + + existingReactions.removeAll { it.count <= 0 } + + overlays[update.messageId] = ReactionSummary( + messageId = update.messageId, + reactions = existingReactions.toList(), + ) + } + + private fun scheduleGapFill(chatId: ChatId) { + val scope = scope ?: return + val job = scope.launch { + delay(GAP_FILL_DELAY) + if (!sequenceTracker.hasGap(chatId)) return@launch + trace(tag = TAG, message = "Gap fill timeout: fetching delta for $chatId", type = TraceType.Process) + performDeltaSync(chatId) + } + sequenceTracker.setGapFillJob(chatId, job) + } + + private fun applyTypingNotification( + typists: MutableSet, + notification: TypingNotification, + ) { + when (notification.state) { + TypingState.STARTED_TYPING, TypingState.STILL_TYPING -> { + typists.removeAll { it.userId == notification.userId } + typists.add(ActiveTypist(userId = notification.userId, since = Clock.System.now())) + } + TypingState.STOPPED_TYPING, TypingState.TYPING_TIMED_OUT -> { + typists.removeAll { it.userId == notification.userId } + } + TypingState.UNKNOWN -> Unit + } + } + + // endregion +} diff --git a/apps/flipcash/shared/chat/src/main/kotlin/com/flipcash/shared/chat/internal/delegates/FeedSyncDelegate.kt b/apps/flipcash/shared/chat/src/main/kotlin/com/flipcash/shared/chat/internal/delegates/FeedSyncDelegate.kt new file mode 100644 index 000000000..e6afd904c --- /dev/null +++ b/apps/flipcash/shared/chat/src/main/kotlin/com/flipcash/shared/chat/internal/delegates/FeedSyncDelegate.kt @@ -0,0 +1,190 @@ +package com.flipcash.shared.chat.internal.delegates + +import com.flipcash.app.persistence.entities.ChatMetadataEntity +import com.flipcash.app.persistence.sources.ChatMemberDataSource +import com.flipcash.app.persistence.sources.ChatMessageDataSource +import com.flipcash.app.persistence.sources.ChatMetadataDataSource +import com.flipcash.services.controllers.ChatController +import com.flipcash.services.models.chat.ChatId +import com.flipcash.services.models.chat.ChatMember +import com.flipcash.services.models.chat.ChatMetadata +import com.flipcash.services.models.chat.PointerType +import com.flipcash.shared.chat.ChatSummary +import com.flipcash.shared.chat.FeedOperations +import com.flipcash.shared.chat.FeedSyncState +import com.flipcash.shared.chat.internal.ChatStateHolder +import com.flipcash.services.user.UserManager +import com.getcode.utils.TraceType +import com.getcode.utils.trace +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.combine +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.receiveAsFlow +import kotlinx.coroutines.launch +import javax.inject.Inject +import javax.inject.Singleton + +/** + * Owns the chat feed: syncing conversation metadata from the server, observing + * the local Room database, and projecting [ChatSummary] items with unread counts. + * + * **Cross-delegate communication:** After a feed sync, this delegate may discover + * chats that need their message history loaded or their event sequence caught up. + * Rather than calling other delegates directly, it emits [Event.LoadMessages] or + * [Event.DeltaSyncNeeded] on [events], which [RealChatCoordinator] routes to the + * appropriate delegate. + * + * Requires [initialize] with a [CoroutineScope] before any work can be launched. + * + * @see com.flipcash.shared.chat.internal.RealChatCoordinator + */ +@Singleton +class FeedSyncDelegate @Inject constructor( + private val chatController: ChatController, + private val metadataDataSource: ChatMetadataDataSource, + private val messageDataSource: ChatMessageDataSource, + private val memberDataSource: ChatMemberDataSource, + private val stateHolder: ChatStateHolder, + private val userManager: UserManager, +) : FeedOperations { + + companion object { + private const val TAG = "FeedSyncDelegate" + } + + sealed interface Event { + data class LoadMessages(val chatId: ChatId) : Event + data class DeltaSyncNeeded(val chatId: ChatId) : Event + } + + private val _events = Channel(Channel.UNLIMITED) + val events: Flow = _events.receiveAsFlow() + + private var scope: CoroutineScope? = null + private var syncJob: Job? = null + private var feedObserverJob: Job? = null + + // region FeedOperations + + override val feed: Flow> + get() = stateHolder.state.map { state -> + val selfId = userManager.accountId + state.feed.mapNotNull { metadata -> + val otherMember = metadata.members.firstOrNull { it.userId != selfId } + if (otherMember != null) { + val profile = otherMember.userProfile + val hasIdentity = !profile.displayName.isNullOrBlank() || + !profile.verifiedPhoneNumber.isNullOrBlank() + if (!hasIdentity) return@mapNotNull null + } + + val readPointer = metadata.members + .firstOrNull { it.userId == selfId } + ?.pointers + ?.firstOrNull { it.type == PointerType.READ } + ?.value ?: 0L + + val unreadCount = metadata.lastMessage?.let { lastMsg -> + if (lastMsg.messageId > readPointer && lastMsg.senderId != selfId) 1 else 0 + } ?: 0 + + ChatSummary(metadata = metadata, unreadCount = unreadCount) + } + } + + override fun observeUnreadConversations(): Flow { + return feed.map { summaries -> summaries.count { it.unreadCount > 0 } } + } + + override fun refreshFeed() { + syncFeed() + } + + // endregion + + // region Internal + + internal fun initialize(scope: CoroutineScope) { + this.scope = scope + } + + internal fun observeFeedFromDb() { + val scope = scope ?: return + feedObserverJob?.cancel() + feedObserverJob = combine( + metadataDataSource.observeAll(), + memberDataSource.observeAll(), + ) { metadataEntities, membersByChat -> + buildFeedFromDb(metadataEntities, membersByChat) + }.onEach { feed -> + stateHolder.update { it.copy(feed = feed) } + }.launchIn(scope) + } + + internal fun syncFeed() { + val scope = scope ?: return + syncJob?.cancel() + syncJob = scope.launch { performFeedSync() } + } + + internal fun cancelJobs() { + syncJob?.cancel() + feedObserverJob?.cancel() + feedObserverJob = null + } + + private suspend fun buildFeedFromDb( + metadataEntities: List, + membersByChat: Map>, + ): List { + return metadataEntities.map { entity -> + val members = membersByChat[entity.chatIdHex] ?: emptyList() + val lastMessage = entity.lastMessageId?.let { + messageDataSource.getLatest(entity.chatIdHex) + } + metadataDataSource.toMetadata(entity, members, lastMessage) + } + } + + private suspend fun performFeedSync() { + stateHolder.update { it.copy(feedSyncState = FeedSyncState.Syncing) } + chatController.getDmChatFeed() + .onSuccess { page -> + metadataDataSource.upsert(page.chats) + + for (chat in page.chats) { + memberDataSource.upsert(chat.chatId, chat.members) + chat.lastMessage?.let { msg -> + messageDataSource.upsert(chat.chatId, listOf(msg)) + } + } + + stateHolder.update { it.copy(feedSyncState = FeedSyncState.Synced) } + trace(tag = TAG, message = "Feed synced: ${page.chats.size} chats", type = TraceType.Process) + + for (chat in page.chats) { + if (chat.latestEventSequence > 0) { + val localSeq = metadataDataSource.getLatestEventSequence(chat.chatId) + if (localSeq > 0 && localSeq < chat.latestEventSequence) { + _events.send(Event.DeltaSyncNeeded(chat.chatId)) + continue + } + } + if (!messageDataSource.hasMessages(chat.chatId)) { + _events.send(Event.LoadMessages(chat.chatId)) + } + } + } + .onFailure { error -> + stateHolder.update { it.copy(feedSyncState = FeedSyncState.Error) } + trace(tag = TAG, message = "Feed sync failed: ${error.message}", type = TraceType.Error) + } + } + + // endregion +} diff --git a/apps/flipcash/shared/chat/src/main/kotlin/com/flipcash/shared/chat/internal/delegates/MessagingDelegate.kt b/apps/flipcash/shared/chat/src/main/kotlin/com/flipcash/shared/chat/internal/delegates/MessagingDelegate.kt new file mode 100644 index 000000000..589e639b6 --- /dev/null +++ b/apps/flipcash/shared/chat/src/main/kotlin/com/flipcash/shared/chat/internal/delegates/MessagingDelegate.kt @@ -0,0 +1,208 @@ +@file:OptIn(ExperimentalPagingApi::class) + +package com.flipcash.shared.chat.internal.delegates + +import androidx.core.app.NotificationManagerCompat +import androidx.paging.ExperimentalPagingApi +import androidx.paging.Pager +import androidx.paging.PagingConfig +import androidx.paging.PagingData +import androidx.paging.map +import com.flipcash.app.core.contacts.DeviceContact +import com.flipcash.app.persistence.sources.ChatMemberDataSource +import com.flipcash.app.persistence.sources.ChatMessageDataSource +import com.flipcash.app.persistence.sources.ChatMetadataDataSource +import com.flipcash.app.persistence.sources.ContactDataSource +import com.flipcash.app.persistence.sources.mediator.ChatMessageRemoteMediator +import com.flipcash.services.controllers.ChatController +import com.flipcash.services.controllers.ChatMessagingController +import com.flipcash.services.models.chat.ChatId +import com.flipcash.services.models.chat.ChatMember +import com.flipcash.services.models.chat.ChatMessage +import com.flipcash.services.models.chat.MessageContent +import com.flipcash.services.models.chat.MessagePointer +import com.flipcash.services.models.chat.PointerType +import com.flipcash.services.models.chat.TypingState +import com.flipcash.shared.chat.MessagingOperations +import com.flipcash.shared.chat.NoDmChatInitializedException +import com.flipcash.shared.chat.internal.ChatStateHolder +import com.flipcash.services.user.UserManager +import com.getcode.utils.decodeBase58 +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.distinctUntilChanged +import kotlinx.coroutines.flow.map +import javax.inject.Inject +import javax.inject.Singleton +import kotlin.time.Clock + +/** + * Handles all per-chat messaging operations: sending and receiving messages, + * read-pointer advancement, paging, member identity resolution, and notification + * management. + * + * This delegate is self-contained — it does not emit cross-delegate events. + * All methods target a single conversation identified by [ChatId]. + * + * Message sending follows an optimistic-insert pattern: + * 1. [insertPending][ChatMessageDataSource.insertPending] writes a local placeholder. + * 2. The server call returns the confirmed [ChatMessage]. + * 3. [confirmPending][ChatMessageDataSource.confirmPending] replaces the placeholder, + * or [failPending][ChatMessageDataSource.failPending] marks it as failed. + * + * @see com.flipcash.shared.chat.internal.RealChatCoordinator + */ +@Singleton +class MessagingDelegate @Inject constructor( + private val chatController: ChatController, + private val messagingController: ChatMessagingController, + private val metadataDataSource: ChatMetadataDataSource, + private val messageDataSource: ChatMessageDataSource, + private val memberDataSource: ChatMemberDataSource, + private val contactDataSource: ContactDataSource, + private val notificationManager: NotificationManagerCompat, + private val userManager: UserManager, + private val stateHolder: ChatStateHolder, +) : MessagingOperations { + + // region MessagingOperations + + override suspend fun getChatId(contact: DeviceContact): Result { + val raw = contactDataSource.getDmChatId(contact.e164) + if (raw.isNullOrEmpty()) { + return Result.failure(NoDmChatInitializedException(contact.e164)) + } + return runCatching { ChatId(raw.decodeBase58()) } + } + + override suspend fun getOtherMemberE164(chatId: ChatId): String? { + val selfId = userManager.accountId + val localMembers = memberDataSource.getMembersForChat(chatId) + val otherMember = localMembers.firstOrNull { it.userId != selfId } + if (otherMember != null) return otherMember.userProfile.verifiedPhoneNumber + + val metadata = chatController.getChat(chatId).getOrNull() ?: return null + memberDataSource.upsert(chatId, metadata.members) + return metadata.members + .firstOrNull { it.userId != selfId } + ?.userProfile?.verifiedPhoneNumber + } + + override fun setActiveChatId(chatId: ChatId?) { + stateHolder.update { it.copy(activeChat = chatId) } + } + + override fun isActiveChat(chatId: ChatId): Boolean { + return stateHolder.current.activeChat == chatId + } + + override fun dismissNotifications(chatId: ChatId) { + notificationManager.cancel(chatId.hashCode()) + } + + override fun observeMessages(chatId: ChatId): Flow> { + return messageDataSource.observeMessages(chatId) + } + + override fun observeMessagesPaged(chatId: ChatId): Flow> { + return Pager( + config = PagingConfig(pageSize = 50), + remoteMediator = ChatMessageRemoteMediator(chatId, messagingController, messageDataSource), + ) { + messageDataSource.observeForChat(chatId) + }.flow.map { page -> + page.map { entity -> messageDataSource.toChatMessage(entity) } + } + } + + override fun observeMembers(chatId: ChatId): Flow> { + return memberDataSource.observeMembers(chatId) + } + + override fun observeOtherReadPointer(chatId: ChatId): Flow { + val selfId = userManager.accountId + return memberDataSource.observeMembers(chatId) + .map { members -> + members.firstOrNull { it.userId != selfId } + ?.pointers + ?.firstOrNull { it.type == PointerType.READ } + } + .distinctUntilChanged() + } + + override suspend fun loadMessages(chatId: ChatId) { + messagingController.getMessages(chatId) + .onSuccess { messages -> + messageDataSource.upsert(chatId, messages) + + val latest = messages.maxByOrNull { it.messageId } ?: return@onSuccess + metadataDataSource.updateLastMessageId(chatId, latest.messageId) + metadataDataSource.updateLastActivity(chatId, latest.timestamp.toEpochMilliseconds()) + } + } + + override suspend fun sendMessage(chatId: ChatId, content: String): Result { + val senderId = userManager.accountId + ?: return Result.failure(IllegalStateException("Cannot send message without an account")) + + val content = listOf(MessageContent.Text(content)) + val (_, clientMessageId) = messageDataSource.insertPending( + chatId = chatId, + content = content, + senderId = senderId, + ) + + return messagingController.sendMessage(chatId, content, clientMessageId) + .onSuccess { serverMessage -> + messageDataSource.confirmPending(chatId, clientMessageId, serverMessage) + advanceReadPointer(chatId, serverMessage.messageId) + + metadataDataSource.updateLastMessageId(chatId, serverMessage.messageId) + metadataDataSource.updateLastActivity(chatId, serverMessage.timestamp.toEpochMilliseconds()) + } + .onFailure { + messageDataSource.failPending(chatId, clientMessageId) + } + } + + override suspend fun advanceReadPointer(chatId: ChatId, messageId: Long): Result { + val selfId = userManager.accountId ?: return Result.failure( + IllegalStateException("No account") + ) + + val pointer = MessagePointer( + type = PointerType.READ, + userId = selfId, + value = messageId, + timestamp = Clock.System.now(), + ) + memberDataSource.updatePointers(chatId, pointer) + + return messagingController.advancePointer(chatId, PointerType.READ, messageId) + } + + override suspend fun markAsRead(chatId: ChatId): Result { + val messageId = stateHolder.current.feed + .firstOrNull { it.chatId == chatId } + ?.lastMessage?.messageId + ?: messageDataSource.getLatestMessageId(chatId) + ?: return Result.success(Unit) + return advanceReadPointer(chatId, messageId) + .also { dismissNotifications(chatId) } + } + + override suspend fun notifyTyping(chatId: ChatId, typingState: TypingState): Result { + return messagingController.notifyIsTyping(chatId, typingState) + } + + // endregion + + // region Internal + + internal suspend fun clear() { + metadataDataSource.clear() + messageDataSource.clear() + memberDataSource.clear() + } + + // endregion +} diff --git a/apps/flipcash/shared/chat/src/test/kotlin/com/flipcash/shared/chat/ChatCoordinatorEagerBalanceTest.kt b/apps/flipcash/shared/chat/src/test/kotlin/com/flipcash/shared/chat/ChatCoordinatorEagerBalanceTest.kt index 572525c26..a01fd45e9 100644 --- a/apps/flipcash/shared/chat/src/test/kotlin/com/flipcash/shared/chat/ChatCoordinatorEagerBalanceTest.kt +++ b/apps/flipcash/shared/chat/src/test/kotlin/com/flipcash/shared/chat/ChatCoordinatorEagerBalanceTest.kt @@ -1,6 +1,5 @@ package com.flipcash.shared.chat -import androidx.core.app.NotificationManagerCompat import com.flipcash.app.featureflags.FeatureFlagController import com.flipcash.app.persistence.sources.ChatMemberDataSource import com.flipcash.app.persistence.sources.ChatMessageDataSource @@ -15,6 +14,11 @@ import com.flipcash.services.models.chat.ChatId import com.flipcash.services.models.chat.ChatMessage import com.flipcash.services.models.chat.ChatUpdate import com.flipcash.services.models.chat.MessageContent +import com.flipcash.shared.chat.internal.ChatStateHolder +import com.flipcash.shared.chat.internal.RealChatCoordinator +import com.flipcash.shared.chat.internal.delegates.EventStreamDelegate +import com.flipcash.shared.chat.internal.delegates.FeedSyncDelegate +import com.flipcash.shared.chat.internal.delegates.MessagingDelegate import com.getcode.opencode.model.financial.CurrencyCode import com.getcode.opencode.model.financial.Fiat import com.getcode.solana.keys.Mint @@ -35,6 +39,7 @@ import org.junit.Before import org.junit.Test import org.junit.runner.RunWith import org.robolectric.RobolectricTestRunner +import kotlin.time.Duration.Companion.milliseconds import kotlin.time.Instant @OptIn(ExperimentalCoroutinesApi::class) @@ -49,7 +54,7 @@ class ChatCoordinatorEagerBalanceTest { private val chatUpdatesChannel = Channel(capacity = Channel.UNLIMITED) private lateinit var tokenCoordinator: TokenCoordinator - private lateinit var coordinator: ChatCoordinator + private lateinit var coordinator: RealChatCoordinator private lateinit var testDispatchers: TestDispatchers @Before @@ -67,19 +72,52 @@ class ChatCoordinatorEagerBalanceTest { testDispatchers = TestDispatchers(TestCoroutineScheduler()) - coordinator = ChatCoordinator( + val stateHolder = ChatStateHolder() + val memberDataSource = mockk(relaxed = true) + val messagingController = mockk(relaxed = true) + val metadataDataSource = mockk(relaxed = true) + val messageDataSource = mockk(relaxed = true) + + val feedDelegate = FeedSyncDelegate( chatController = chatController, - messagingController = mockk(relaxed = true), - eventStreamingController = eventStreamingController, - metadataDataSource = mockk(relaxed = true), - messageDataSource = mockk(relaxed = true), - memberDataSource = mockk(relaxed = true), - contactDataSource = mockk(relaxed = true), - networkObserver = mockk(relaxed = true), - notificationManager = mockk(relaxed = true), + metadataDataSource = metadataDataSource, + messageDataSource = messageDataSource, + memberDataSource = memberDataSource, + stateHolder = stateHolder, userManager = userManager, + ) + + val eventStreamDelegate = EventStreamDelegate( + eventStreamingController = eventStreamingController, + messagingController = messagingController, + metadataDataSource = metadataDataSource, + messageDataSource = messageDataSource, + memberDataSource = memberDataSource, tokenCoordinator = tokenCoordinator, + userManager = userManager, + stateHolder = stateHolder, + ) + + val messagingDelegate = MessagingDelegate( + chatController = chatController, + messagingController = messagingController, + metadataDataSource = metadataDataSource, + messageDataSource = messageDataSource, + memberDataSource = memberDataSource, + contactDataSource = mockk(relaxed = true), + notificationManager = mockk(relaxed = true), + userManager = userManager, + stateHolder = stateHolder, + ) + + coordinator = RealChatCoordinator( + feedDelegate = feedDelegate, + eventStreamDelegate = eventStreamDelegate, + messagingDelegate = messagingDelegate, + stateHolder = stateHolder, + userManager = userManager, featureFlags = mockk(relaxed = true), + networkObserver = mockk(relaxed = true), dispatchers = testDispatchers, ) } @@ -125,7 +163,7 @@ class ChatCoordinatorEagerBalanceTest { triggerCollection() val amount = Fiat(fiat = 5.0, currencyCode = CurrencyCode.CAD) chatUpdatesChannel.send(chatUpdate(cashMessage(senderId = otherId, amount = amount))) - advanceTimeBy(1_000) + advanceTimeBy(1_000.milliseconds) runCurrent() coVerify(exactly = 1) { tokenCoordinator.add(mint, amount) } @@ -136,7 +174,7 @@ class ChatCoordinatorEagerBalanceTest { fun `self-sent cash message does not trigger tokenCoordinator add`() = runTest(testDispatchers.dispatcher) { triggerCollection() chatUpdatesChannel.send(chatUpdate(cashMessage(senderId = selfId))) - advanceTimeBy(1_000) + advanceTimeBy(1_000.milliseconds) runCurrent() coVerify(exactly = 0) { tokenCoordinator.add(any(), any()) } @@ -147,7 +185,7 @@ class ChatCoordinatorEagerBalanceTest { fun `text message does not trigger tokenCoordinator add`() = runTest(testDispatchers.dispatcher) { triggerCollection() chatUpdatesChannel.send(chatUpdate(textMessage(senderId = otherId))) - advanceTimeBy(1_000) + advanceTimeBy(1_000.milliseconds) runCurrent() coVerify(exactly = 0) { tokenCoordinator.add(any(), any()) } @@ -164,7 +202,7 @@ class ChatCoordinatorEagerBalanceTest { val msg2 = cashMessage(senderId = otherId, amount = amount2, mint = mintB).copy(messageId = 3L) chatUpdatesChannel.send(chatUpdate(msg1, msg2)) - advanceTimeBy(1_000) + advanceTimeBy(1_000.milliseconds) runCurrent() coVerify(exactly = 1) { tokenCoordinator.add(mint, amount1) } @@ -179,7 +217,7 @@ class ChatCoordinatorEagerBalanceTest { val outgoing = cashMessage(senderId = selfId).copy(messageId = 3L) chatUpdatesChannel.send(chatUpdate(incoming, outgoing)) - advanceTimeBy(1_000) + advanceTimeBy(1_000.milliseconds) runCurrent() coVerify(exactly = 1) { tokenCoordinator.add(any(), any()) } diff --git a/apps/flipcash/shared/chat/src/test/kotlin/com/flipcash/shared/chat/ChatCoordinatorEventsTest.kt b/apps/flipcash/shared/chat/src/test/kotlin/com/flipcash/shared/chat/ChatCoordinatorEventsTest.kt index 79c6fe9de..87d268598 100644 --- a/apps/flipcash/shared/chat/src/test/kotlin/com/flipcash/shared/chat/ChatCoordinatorEventsTest.kt +++ b/apps/flipcash/shared/chat/src/test/kotlin/com/flipcash/shared/chat/ChatCoordinatorEventsTest.kt @@ -1,6 +1,5 @@ package com.flipcash.shared.chat -import androidx.core.app.NotificationManagerCompat import com.flipcash.app.featureflags.FeatureFlagController import com.flipcash.app.persistence.sources.ChatMemberDataSource import com.flipcash.app.persistence.sources.ChatMessageDataSource @@ -19,6 +18,11 @@ import com.flipcash.services.models.chat.ChatUpdate import com.flipcash.services.models.chat.Emoji import com.flipcash.services.models.chat.MessageContent import com.flipcash.services.models.chat.ReactionUpdate +import com.flipcash.shared.chat.internal.ChatStateHolder +import com.flipcash.shared.chat.internal.RealChatCoordinator +import com.flipcash.shared.chat.internal.delegates.EventStreamDelegate +import com.flipcash.shared.chat.internal.delegates.FeedSyncDelegate +import com.flipcash.shared.chat.internal.delegates.MessagingDelegate import com.getcode.utils.network.NetworkConnectivityListener import com.flipcash.services.user.UserManager import io.mockk.coEvery @@ -40,6 +44,7 @@ import kotlin.test.assertEquals import kotlin.test.assertNotNull import kotlin.test.assertNull import kotlin.test.assertTrue +import kotlin.time.Duration.Companion.milliseconds import kotlin.time.Instant @OptIn(ExperimentalCoroutinesApi::class) @@ -54,7 +59,7 @@ class ChatCoordinatorEventsTest { private lateinit var metadataDataSource: ChatMetadataDataSource private lateinit var messageDataSource: ChatMessageDataSource - private lateinit var coordinator: ChatCoordinator + private lateinit var coordinator: RealChatCoordinator private lateinit var testDispatchers: TestDispatchers @Before @@ -71,22 +76,53 @@ class ChatCoordinatorEventsTest { metadataDataSource = mockk(relaxed = true) messageDataSource = mockk(relaxed = true) + val memberDataSource = mockk(relaxed = true) + val messagingController = mockk(relaxed = true) testDispatchers = TestDispatchers(TestCoroutineScheduler()) - coordinator = ChatCoordinator( + val stateHolder = ChatStateHolder() + + val feedDelegate = FeedSyncDelegate( chatController = chatController, - messagingController = mockk(relaxed = true), + metadataDataSource = metadataDataSource, + messageDataSource = messageDataSource, + memberDataSource = memberDataSource, + stateHolder = stateHolder, + userManager = userManager, + ) + + val eventStreamDelegate = EventStreamDelegate( eventStreamingController = eventStreamingController, + messagingController = messagingController, metadataDataSource = metadataDataSource, messageDataSource = messageDataSource, - memberDataSource = mockk(relaxed = true), + memberDataSource = memberDataSource, + tokenCoordinator = mockk(relaxed = true), + userManager = userManager, + stateHolder = stateHolder, + ) + + val messagingDelegate = MessagingDelegate( + chatController = chatController, + messagingController = messagingController, + metadataDataSource = metadataDataSource, + messageDataSource = messageDataSource, + memberDataSource = memberDataSource, contactDataSource = mockk(relaxed = true), - networkObserver = mockk(relaxed = true), - notificationManager = mockk(relaxed = true), + notificationManager = mockk(relaxed = true), + userManager = userManager, + stateHolder = stateHolder, + ) + + coordinator = RealChatCoordinator( + feedDelegate = feedDelegate, + eventStreamDelegate = eventStreamDelegate, + messagingDelegate = messagingDelegate, + stateHolder = stateHolder, userManager = userManager, - tokenCoordinator = mockk(relaxed = true), featureFlags = mockk(relaxed = true), + networkObserver = mockk(relaxed = true), dispatchers = testDispatchers, ) } @@ -131,7 +167,7 @@ class ChatCoordinatorEventsTest { events = listOf(chatEvent(1, eventMsg)), ) chatUpdatesChannel.send(update) - advanceTimeBy(1_000) + advanceTimeBy(1_000.milliseconds) runCurrent() // Should upsert the event message, not the deprecated one @@ -155,7 +191,7 @@ class ChatCoordinatorEventsTest { events = emptyList(), ) chatUpdatesChannel.send(update) - advanceTimeBy(1_000) + advanceTimeBy(1_000.milliseconds) runCurrent() coVerify { @@ -183,7 +219,7 @@ class ChatCoordinatorEventsTest { ), ) chatUpdatesChannel.send(update) - advanceTimeBy(1_000) + advanceTimeBy(1_000.milliseconds) runCurrent() // Should have 2 unique messages (deduped by messageId, taking first by sorted eventSequence) @@ -212,7 +248,7 @@ class ChatCoordinatorEventsTest { ), ) chatUpdatesChannel.send(update) - advanceTimeBy(1_000) + advanceTimeBy(1_000.milliseconds) runCurrent() coVerify { metadataDataSource.updateLatestEventSequence(chatId, 2L) } @@ -230,7 +266,7 @@ class ChatCoordinatorEventsTest { events = listOf(chatEvent(1, textMessage(id = 1, eventSequence = 1))), ) chatUpdatesChannel.send(update1) - advanceTimeBy(500) + advanceTimeBy(500.milliseconds) runCurrent() val update2 = ChatUpdate( @@ -238,7 +274,7 @@ class ChatCoordinatorEventsTest { events = listOf(chatEvent(3, textMessage(id = 3, eventSequence = 3))), ) chatUpdatesChannel.send(update2) - advanceTimeBy(500) + advanceTimeBy(500.milliseconds) runCurrent() // Cursor should advance to 1 (contiguous), not 3 @@ -257,21 +293,21 @@ class ChatCoordinatorEventsTest { chatId = chatId, events = listOf(chatEvent(1, textMessage(id = 1, eventSequence = 1))), )) - advanceTimeBy(100) + advanceTimeBy(100.milliseconds) runCurrent() chatUpdatesChannel.send(ChatUpdate( chatId = chatId, events = listOf(chatEvent(3, textMessage(id = 3, eventSequence = 3))), )) - advanceTimeBy(100) + advanceTimeBy(100.milliseconds) runCurrent() chatUpdatesChannel.send(ChatUpdate( chatId = chatId, events = listOf(chatEvent(2, textMessage(id = 2, eventSequence = 2))), )) - advanceTimeBy(100) + advanceTimeBy(100.milliseconds) runCurrent() // After filling the gap, cursor should advance to 3 @@ -302,7 +338,7 @@ class ChatCoordinatorEventsTest { ), ) chatUpdatesChannel.send(update) - advanceTimeBy(1_000) + advanceTimeBy(1_000.milliseconds) runCurrent() val state = coordinator.state.value @@ -335,7 +371,7 @@ class ChatCoordinatorEventsTest { ), ), )) - advanceTimeBy(500) + advanceTimeBy(500.milliseconds) runCurrent() // Stale update: count=1, sequence=2 (older) @@ -353,7 +389,7 @@ class ChatCoordinatorEventsTest { ), ), )) - advanceTimeBy(500) + advanceTimeBy(500.milliseconds) runCurrent() val reactions = coordinator.state.value.reactionOverlays[chatId]?.get(1L)?.reactions @@ -383,7 +419,7 @@ class ChatCoordinatorEventsTest { ), ), )) - advanceTimeBy(500) + advanceTimeBy(500.milliseconds) runCurrent() // Remove reaction (count=0) @@ -401,7 +437,7 @@ class ChatCoordinatorEventsTest { ), ), )) - advanceTimeBy(500) + advanceTimeBy(500.milliseconds) runCurrent() val reactions = coordinator.state.value.reactionOverlays[chatId]?.get(1L)?.reactions @@ -437,7 +473,7 @@ class ChatCoordinatorEventsTest { ), ), )) - advanceTimeBy(1_000) + advanceTimeBy(1_000.milliseconds) runCurrent() val reactions = coordinator.state.value.reactionOverlays[chatId]?.get(1L)?.reactions diff --git a/services/opencode/src/main/kotlin/com/getcode/opencode/internal/bidi/OpenStream.kt b/services/opencode/src/main/kotlin/com/getcode/opencode/internal/bidi/OpenStream.kt index 3d5597c9f..21502e226 100644 --- a/services/opencode/src/main/kotlin/com/getcode/opencode/internal/bidi/OpenStream.kt +++ b/services/opencode/src/main/kotlin/com/getcode/opencode/internal/bidi/OpenStream.kt @@ -124,8 +124,9 @@ fun openBidirectionalStream( } catch (e: Exception) { trace( tag = tag, - message = "Failed to send initial request: ${e.message}", + message = "Failed to send initial request", type = TraceType.Error, + error = e ) collectionJob.cancel() requestChannel.close()