@@ -241,7 +241,7 @@ public function handle_request( WP_REST_Request $request ) {
241241 }
242242
243243 // Get updates for this client.
244- $ room_response = $ this ->get_updates_after ( $ room , $ client_id , $ cursor , $ is_compactor );
244+ $ room_response = $ this ->get_updates ( $ room , $ client_id , $ cursor , $ is_compactor );
245245 $ room_response ['awareness ' ] = $ merged_awareness ;
246246
247247 $ response ['rooms ' ][] = $ room_response ;
@@ -253,8 +253,6 @@ public function handle_request( WP_REST_Request $request ) {
253253 /**
254254 * Checks if the current user can sync a specific entity type.
255255 *
256- * @since 7.0.0
257- *
258256 * @param string $entity_kind The entity kind.
259257 * @param string $entity_name The entity name.
260258 * @param string|null $object_id The object ID (if applicable).
@@ -287,12 +285,10 @@ private function can_user_sync_entity_type( string $entity_kind, string $entity_
287285 /**
288286 * Processes and stores an awareness update from a client.
289287 *
290- * @since 7.0.0
291- *
292- * @param string $room Room identifier.
293- * @param int $client_id Client identifier.
294- * @param array|null $awareness_update Awareness state sent by the client.
295- * @return array Updated awareness state for the room.
288+ * @param string $room Room identifier.
289+ * @param int $client_id Client identifier.
290+ * @param array<string, mixed>|null $awareness_update Awareness state sent by the client.
291+ * @return array<int, array<string, mixed>> Updated awareness state for the room.
296292 */
297293 private function process_awareness_update ( string $ room , int $ client_id , ?array $ awareness_update ): array {
298294 $ existing_awareness = $ this ->storage ->get_awareness_state ( $ room );
@@ -336,12 +332,10 @@ private function process_awareness_update( string $room, int $client_id, ?array
336332 /**
337333 * Processes a sync update based on its type.
338334 *
339- * @since 7.0.0
340- *
341- * @param string $room Room identifier.
342- * @param int $client_id Client identifier.
343- * @param int $cursor Client cursor (marker of last seen update).
344- * @param array $update Sync update with 'type' and 'data' fields.
335+ * @param string $room Room identifier.
336+ * @param int $client_id Client identifier.
337+ * @param int $cursor Client cursor (marker of last seen update).
338+ * @param array<string, mixed> $update Sync update with 'type' and 'data' fields.
345339 */
346340 private function process_sync_update ( string $ room , int $ client_id , int $ cursor , array $ update ): void {
347341 $ data = $ update ['data ' ];
@@ -354,107 +348,83 @@ private function process_sync_update( string $room, int $client_id, int $cursor,
354348 * updates with markers before the client's cursor to preserve updates
355349 * that arrived since the client's last sync.
356350 *
357- * The remove_updates_before_cursor method returns false if there
358- * is a newer compaction update already stored .
351+ * Check for a newer compaction update first. If one exists, skip this
352+ * compaction to avoid overwriting it .
359353 */
360- if ( $ this ->remove_updates_before_cursor ( $ room , $ cursor ) ) {
354+ $ updates_after_cursor = $ this ->storage ->get_updates_after_cursor ( $ room , $ cursor );
355+ $ has_newer_compaction = false ;
356+
357+ foreach ( $ updates_after_cursor as $ existing ) {
358+ if ( self ::UPDATE_TYPE_COMPACTION === $ existing ['type ' ] ) {
359+ $ has_newer_compaction = true ;
360+ break ;
361+ }
362+ }
363+
364+ if ( ! $ has_newer_compaction ) {
365+ $ this ->storage ->remove_updates_before_cursor ( $ room , $ cursor );
361366 $ this ->add_update ( $ room , $ client_id , $ type , $ data );
362367 }
363368 break ;
364369
365370 case self ::UPDATE_TYPE_SYNC_STEP1 :
371+ case self ::UPDATE_TYPE_SYNC_STEP2 :
372+ case self ::UPDATE_TYPE_UPDATE :
366373 /*
367374 * Sync step 1 announces a client's state vector. Other clients need
368375 * to see it so they can respond with sync_step2 containing missing
369376 * updates. The cursor-based filtering prevents re-delivery.
377+ *
378+ * Sync step 2 contains updates for a specific client.
379+ *
380+ * All updates are stored persistently.
370381 */
371382 $ this ->add_update ( $ room , $ client_id , $ type , $ data );
372383 break ;
373-
374- case self ::UPDATE_TYPE_SYNC_STEP2 :
375- // Sync step 2 contains updates for a specific client.
376- $ this ->add_update ( $ room , $ client_id , $ type , $ data );
377- break ;
378-
379- case self ::UPDATE_TYPE_UPDATE :
380- // Regular document updates are stored persistently.
381- $ this ->add_update ( $ room , $ client_id , $ type , $ data );
382- break ;
383384 }
384385 }
385386
386387 /**
387- * Adds an update to a room's update list.
388- *
389- * @since 7.0.0
388+ * Adds an update to a room's update list via storage.
390389 *
391390 * @param string $room Room identifier.
392391 * @param int $client_id Client identifier.
393392 * @param string $type Update type (sync_step1, sync_step2, update, compaction).
394393 * @param string $data Base64-encoded update data.
395394 */
396395 private function add_update ( string $ room , int $ client_id , string $ type , string $ data ): void {
397- $ update_envelope = array (
396+ $ update = array (
398397 'client_id ' => $ client_id ,
399- 'type ' => $ type ,
400398 'data ' => $ data ,
401- 'timestamp ' => $ this -> get_time_marker () ,
399+ 'type ' => $ type ,
402400 );
403401
404- $ this ->storage ->add_update ( $ room , $ update_envelope );
402+ $ this ->storage ->add_update ( $ room , $ update );
405403 }
406404
407405 /**
408- * Gets the current time in milliseconds as a comparable time marker .
406+ * Gets sync updates for a specific client from a room after a given cursor .
409407 *
410- * @since 7.0.0
411- *
412- * @return int Current time in milliseconds.
413- */
414- private function get_time_marker (): int {
415- return floor ( microtime ( true ) * 1000 );
416- }
417-
418- /**
419- * Gets sync updates from a room after a given cursor.
420- *
421- * @since 7.0.0
408+ * Delegates cursor-based retrieval to the storage layer, then applies
409+ * client-specific filtering and compaction logic.
422410 *
423411 * @param string $room Room identifier.
424412 * @param int $client_id Client identifier.
425413 * @param int $cursor Return updates after this cursor.
426414 * @param bool $is_compactor True if this client is nominated to perform compaction.
427415 * @return array<string, mixed> Response data for this room.
428416 */
429- private function get_updates_after ( string $ room , int $ client_id , int $ cursor , bool $ is_compactor ): array {
430- $ end_cursor = $ this ->get_time_marker () - 100 ; // Small buffer to ensure consistency.
431- $ all_updates = $ this ->storage ->get_all_updates ( $ room );
432- $ total_updates = count ( $ all_updates );
433- $ updates = array ();
434-
435- foreach ( $ all_updates as $ update ) {
436- // Skip updates from this client, unless they are compaction updates.
417+ private function get_updates ( string $ room , int $ client_id , int $ cursor , bool $ is_compactor ): array {
418+ $ updates_after_cursor = $ this ->storage ->get_updates_after_cursor ( $ room , $ cursor );
419+ $ total_updates = $ this ->storage ->get_update_count ( $ room );
420+
421+ // Filter out this client's updates, except compaction updates.
422+ $ typed_updates = array ();
423+ foreach ( $ updates_after_cursor as $ update ) {
437424 if ( $ client_id === $ update ['client_id ' ] && self ::UPDATE_TYPE_COMPACTION !== $ update ['type ' ] ) {
438425 continue ;
439426 }
440427
441- // Skip updates before our cursor.
442- if ( $ update ['timestamp ' ] > $ cursor ) {
443- $ updates [] = $ update ;
444- }
445- }
446-
447- // Sort by update timestamp to ensure order.
448- usort (
449- $ updates ,
450- function ( $ a , $ b ) {
451- return ( $ a ['timestamp ' ] ?? 0 ) <=> ( $ b ['timestamp ' ] ?? 0 );
452- }
453- );
454-
455- // Convert to typed update format for response.
456- $ typed_updates = array ();
457- foreach ( $ updates as $ update ) {
458428 $ typed_updates [] = array (
459429 'data ' => $ update ['data ' ],
460430 'type ' => $ update ['type ' ],
@@ -464,49 +434,15 @@ function ( $a, $b ) {
464434 // Determine if this client should perform compaction.
465435 $ compaction_request = null ;
466436 if ( $ is_compactor && $ total_updates > self ::COMPACTION_THRESHOLD ) {
467- $ compaction_request = $ all_updates ;
437+ $ compaction_request = $ updates_after_cursor ;
468438 }
469439
470440 return array (
471441 'compaction_request ' => $ compaction_request ,
472- 'end_cursor ' => $ end_cursor ,
442+ 'end_cursor ' => $ this -> storage -> get_cursor ( $ room ) ,
473443 'room ' => $ room ,
474444 'total_updates ' => $ total_updates ,
475445 'updates ' => $ typed_updates ,
476446 );
477447 }
478-
479- /**
480- * Removes updates from a room that are older than the given compaction marker.
481- *
482- * @since 7.0.0
483- *
484- * @param string $room Room identifier.
485- * @param int $cursor Remove updates with markers < this cursor.
486- * @return bool True if this compaction is the latest, false if a newer compaction update exists.
487- */
488- private function remove_updates_before_cursor ( string $ room , int $ cursor ): bool {
489- $ all_updates = $ this ->storage ->get_all_updates ( $ room );
490- $ this ->storage ->remove_all_updates ( $ room );
491-
492- $ is_latest_compaction = true ;
493- $ updates_to_keep = array ();
494-
495- foreach ( $ all_updates as $ update ) {
496- if ( $ update ['timestamp ' ] >= $ cursor ) {
497- $ updates_to_keep [] = $ update ;
498-
499- if ( self ::UPDATE_TYPE_COMPACTION === $ update ['type ' ] ) {
500- $ is_latest_compaction = false ;
501- }
502- }
503- }
504-
505- // Replace all updates with filtered list.
506- foreach ( $ updates_to_keep as $ update ) {
507- $ this ->storage ->add_update ( $ room , $ update );
508- }
509-
510- return $ is_latest_compaction ;
511- }
512448}
0 commit comments