Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,12 @@ internal class SendFlowViewModel @Inject constructor(
val label = if (name.isNotBlank()) "$formatted of $name" else formatted
if (sentBySelf) "You sent $label" else "You received $label"
}

// TODO:
is MessageContent.Deleted -> null
is MessageContent.Media -> null
is MessageContent.Reply -> null
is MessageContent.System -> null
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ sealed interface ChatListItem {
override val itemContentType: Any = when (content) {
is MessageContent.Text -> "text-bubble"
is MessageContent.Cash -> "cash-bubble"
is MessageContent.Deleted -> "deleted-message"
is MessageContent.Media -> "media"
is MessageContent.Reply -> "reply-message"
is MessageContent.System -> "system-message"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ fun ContentBubble(
val bubbleMaxWidth = when (item.content) {
is MessageContent.Text -> maxWidth * BUBBLE_MAX_WIDTH_FRACTION
is MessageContent.Cash -> maxWidth * CASH_BUBBLE_MAX_WIDTH_FRACTION
is MessageContent.Deleted -> maxWidth
is MessageContent.Media -> maxWidth * CASH_BUBBLE_MAX_WIDTH_FRACTION
is MessageContent.Reply -> maxWidth * BUBBLE_MAX_WIDTH_FRACTION
is MessageContent.System -> maxWidth
}

Row(
Expand All @@ -89,6 +93,12 @@ fun ContentBubble(
position = position,
maxWidth = bubbleMaxWidth,
)

// TODO
is MessageContent.Deleted -> Unit
is MessageContent.Media -> Unit
is MessageContent.Reply -> Unit
is MessageContent.System -> Unit
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,16 @@ import com.flipcash.services.models.chat.ChatMember
import com.flipcash.services.models.chat.ChatMessage
import com.flipcash.services.models.chat.MessagePointer
import com.flipcash.services.models.chat.ChatUpdate
import com.flipcash.services.models.chat.EmojiReaction
import com.flipcash.services.models.chat.MessageContent
import com.flipcash.services.models.chat.MetadataUpdate
import com.flipcash.services.models.chat.PointerType
import com.flipcash.services.models.chat.ReactionSummary
import com.flipcash.services.models.chat.ReactionUpdate
import com.flipcash.services.models.chat.TypingNotification
import com.flipcash.services.models.chat.TypingState
import com.flipcash.services.models.GetDeltaError
import com.flipcash.services.repository.DeltaUpdate
import com.flipcash.app.tokens.TokenCoordinator
import com.flipcash.libs.coroutines.DispatcherProvider
import com.flipcash.services.user.UserManager
Expand All @@ -57,6 +62,7 @@ import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.filterNotNull
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
Expand Down Expand Up @@ -88,12 +94,14 @@ class ChatCoordinator @Inject constructor(
companion object {
private const val TAG = "ChatCoordinator"
private val HEARTBEAT_INTERVAL = 30.seconds
private val GAP_FILL_DELAY = 2.seconds
}

private val supervisorJob = SupervisorJob()
private val scope = CoroutineScope(dispatchers.IO + supervisorJob)
private val cluster = MutableStateFlow<AccountCluster?>(null)
private val _state = MutableStateFlow(ChatState())
private val sequenceTracker = EventSequenceTracker()
private var syncJob: Job? = null
private var flagObserverJob: Job? = null
private var eventStreamCollectJob: Job? = null
Expand Down Expand Up @@ -343,6 +351,7 @@ class ChatCoordinator @Inject constructor(
networkObserverJob?.cancel()
feedObserverJob = null
_state.value = ChatState()
sequenceTracker.clearAll()
cluster.value = null
metadataDataSource.clear()
messageDataSource.clear()
Expand Down Expand Up @@ -426,10 +435,19 @@ class ChatCoordinator @Inject constructor(
_state.update { it.copy(feedSyncState = FeedSyncState.Synced) }
trace(tag = TAG, message = "Feed synced: ${page.chats.size} chats", type = TraceType.Process)

// Prefetch first page of messages for chats with no cached messages
page.chats
.filterNot { messageDataSource.hasMessages(it.chatId) }
.forEach { chat -> loadMessages(chat.chatId) }
// Delta-sync for chats with a known event sequence; full load for new chats
for (chat in page.chats) {
if (chat.latestEventSequence > 0) {
val localSeq = metadataDataSource.getLatestEventSequence(chat.chatId)
if (localSeq > 0 && localSeq < chat.latestEventSequence) {
performDeltaSync(chat.chatId)
continue
}
}
if (!messageDataSource.hasMessages(chat.chatId)) {
loadMessages(chat.chatId)
}
}
}
.onFailure { error ->
_state.update { it.copy(feedSyncState = FeedSyncState.Error) }
Expand Down Expand Up @@ -485,23 +503,51 @@ class ChatCoordinator @Inject constructor(

private suspend fun applyUpdate(update: ChatUpdate) {
val chatId = update.chatId

// --- Resolve messages: prefer events, fall back to deprecated newMessages ---

val resolvedMessages = if (update.events.isNotEmpty()) {
update.events
.flatMap { event -> event.mutations.map { it.message } }
.sortedBy { it.eventSequence }
.distinctBy { it.messageId }
} else {
@Suppress("DEPRECATION")
update.newMessages
}

trace(
tag = TAG,
message = "applyUpdate: chatId=$chatId, newMessages=${update.newMessages.size}, pointers=${update.pointerUpdates.size}, typing=${update.typingNotifications.size}",
message = "applyUpdate: chatId=$chatId, messages=${resolvedMessages.size}, events=${update.events.size}, pointers=${update.pointerUpdates.size}, reactions=${update.reactionUpdates.size}, typing=${update.typingNotifications.size}",
type = TraceType.Process,
)

// --- Persist to DB first (suspend, off main thread) ---

val lastMsg = if (update.newMessages.isNotEmpty()) {
trace(tag = TAG, message = "Upserting ${update.newMessages.size} new messages for $chatId", type = TraceType.Process)
messageDataSource.upsert(chatId, update.newMessages)
update.newMessages.maxByOrNull { it.messageId }?.also { msg ->
val lastMsg = if (resolvedMessages.isNotEmpty()) {
trace(tag = TAG, message = "Upserting ${resolvedMessages.size} messages for $chatId", type = TraceType.Process)
messageDataSource.upsert(chatId, resolvedMessages)
resolvedMessages.maxByOrNull { it.messageId }?.also { msg ->
metadataDataSource.updateLastMessageId(chatId, msg.messageId)
metadataDataSource.updateLastActivity(chatId, msg.timestamp.toEpochMilliseconds())
}
} else null

// Advance event sequence cursor — gap-aware (only advance contiguous frontier)
if (update.events.isNotEmpty()) {
val dbCursor = metadataDataSource.getLatestEventSequence(chatId)
val incomingSequences = update.events.map { it.sequence }
val result = sequenceTracker.processSequences(chatId, dbCursor, incomingSequences)

if (result.newContiguousSequence > dbCursor) {
metadataDataSource.updateLatestEventSequence(chatId, result.newContiguousSequence)
}

if (result.hasGap) {
scheduleGapFill(chatId)
}
}

for (pointer in update.pointerUpdates) {
memberDataSource.updatePointers(chatId, pointer)
}
Expand All @@ -525,10 +571,24 @@ class ChatCoordinator @Inject constructor(
}
}

// --- Process reaction updates into in-memory overlay ---

if (update.reactionUpdates.isNotEmpty()) {
_state.update { state ->
val chatOverlays = state.reactionOverlays[chatId]?.toMutableMap() ?: mutableMapOf()
for (reactionUpdate in update.reactionUpdates) {
applyReactionUpdate(chatOverlays, reactionUpdate)
}
state.copy(
reactionOverlays = state.reactionOverlays + (chatId to chatOverlays.toMap())
)
}
}

// --- Eagerly update token balance for incoming cash ---

val selfId = userManager.accountId
for (msg in update.newMessages) {
for (msg in resolvedMessages) {
if (msg.senderId == selfId) continue
for (content in msg.content) {
if (content is MessageContent.Cash) {
Expand Down Expand Up @@ -560,6 +620,99 @@ class ChatCoordinator @Inject constructor(
}
}

private fun applyReactionUpdate(
overlays: MutableMap<Long, ReactionSummary>,
update: ReactionUpdate,
) {
val existing = overlays[update.messageId]
val existingReactions = existing?.reactions?.toMutableList() ?: mutableListOf()

// Find existing reaction for this emoji
val idx = existingReactions.indexOfFirst { it.emoji == update.emoji }
if (idx >= 0) {
val current = existingReactions[idx]
// LWW guard using sequence
if (update.sequence <= current.sequence) return
existingReactions[idx] = EmojiReaction(
emoji = update.emoji,
count = update.count,
reactedBySelf = current.reactedBySelf, // preserved; server will correct on next full fetch
sampleReactors = current.sampleReactors,
sequence = update.sequence,
)
} else {
existingReactions.add(
EmojiReaction(
emoji = update.emoji,
count = update.count,
reactedBySelf = false,
sampleReactors = emptyList(),
sequence = update.sequence,
)
)
}

// Remove reactions with count == 0
existingReactions.removeAll { it.count <= 0 }

overlays[update.messageId] = ReactionSummary(
messageId = update.messageId,
reactions = existingReactions.toList(),
)
}

private fun scheduleGapFill(chatId: ChatId) {
val job = scope.launch {
delay(GAP_FILL_DELAY)
if (!sequenceTracker.hasGap(chatId)) return@launch
trace(tag = TAG, message = "Gap fill timeout: fetching delta for $chatId", type = TraceType.Process)
performDeltaSync(chatId)
}
sequenceTracker.setGapFillJob(chatId, job)
}

private suspend fun performDeltaSync(chatId: ChatId) {
val afterSequence = metadataDataSource.getLatestEventSequence(chatId)
trace(tag = TAG, message = "Delta sync for $chatId from sequence $afterSequence", type = TraceType.Process)

try {
val result = messagingController.getDelta(chatId, afterSequence).first()
result
.onSuccess { delta ->
if (delta.messages.isNotEmpty()) {
messageDataSource.upsert(chatId, delta.messages)
val latest = delta.messages.maxByOrNull { it.messageId }
latest?.let { msg ->
metadataDataSource.updateLastMessageId(chatId, msg.messageId)
metadataDataSource.updateLastActivity(chatId, msg.timestamp.toEpochMilliseconds())
}
}
if (delta.latestSequence > afterSequence) {
metadataDataSource.updateLatestEventSequence(chatId, delta.latestSequence)
}
// getDelta is authoritative — reset tracker to the server's sequence
sequenceTracker.resetTo(chatId, delta.latestSequence)
trace(tag = TAG, message = "Delta sync complete: ${delta.messages.size} messages, sequence ${delta.latestSequence}", type = TraceType.Process)
}
.onFailure { error ->
if (error is GetDeltaError.ResetRequired) {
trace(tag = TAG, message = "Delta sync reset required for $chatId, falling back to full load", type = TraceType.Process)
sequenceTracker.clear(chatId)
loadMessages(chatId)
} else {
trace(tag = TAG, message = "Delta sync failed for $chatId: ${error.message}", type = TraceType.Error)
}
}
} catch (e: Exception) {
trace(tag = TAG, message = "Delta sync exception for $chatId: ${e.message}", type = TraceType.Error)
}
}

fun observeReactions(chatId: ChatId, messageId: Long): Flow<ReactionSummary?> {
return _state.map { it.reactionOverlays[chatId]?.get(messageId) }
.distinctUntilChanged()
}

private fun applyTypingNotification(
typists: MutableSet<ActiveTypist>,
notification: TypingNotification,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package com.flipcash.shared.chat

import com.flipcash.services.models.chat.ChatId
import com.flipcash.services.models.chat.ChatMetadata
import com.flipcash.services.models.chat.ReactionSummary
import com.getcode.opencode.model.core.ID
import kotlin.time.Instant

data class ChatState(
val feed: List<ChatMetadata> = emptyList(),
val typingIndicators: Map<ChatId, Set<ActiveTypist>> = emptyMap(),
val reactionOverlays: Map<ChatId, Map<Long, ReactionSummary>> = emptyMap(),
val feedSyncState: FeedSyncState = FeedSyncState.Idle,
val activeChat: ChatId? = null,
)
Expand Down
Loading
Loading