@@ -109,6 +109,11 @@ impl WithdrawalStore {
109109 self . batches . insert ( batch_index, withdrawals) ;
110110 }
111111
112+ /// Replace the entire store with an authoritative set of pending batches.
113+ pub ( crate ) fn replace_batches ( & mut self , batches : BTreeMap < u64 , Vec < abi:: Withdrawal > > ) {
114+ self . batches = batches;
115+ }
116+
112117 /// Get all withdrawals for a batch.
113118 pub fn get_batch ( & self , batch_index : u64 ) -> Option < & Vec < abi:: Withdrawal > > {
114119 self . batches . get ( & batch_index)
@@ -126,6 +131,32 @@ impl WithdrawalStore {
126131 pub fn batch_count ( & self ) -> usize {
127132 self . batches . len ( )
128133 }
134+
135+ /// Return the smallest and largest portal slot indices currently present.
136+ fn slot_range ( & self ) -> Option < ( u64 , u64 ) > {
137+ let first = * self . batches . keys ( ) . next ( ) ?;
138+ let last = * self . batches . keys ( ) . next_back ( ) ?;
139+ Some ( ( first, last) )
140+ }
141+
142+ /// Return a compact summary of the store as `(batch_count, first_slot, last_slot)`.
143+ pub ( crate ) fn summary ( & self ) -> ( usize , Option < u64 > , Option < u64 > ) {
144+ let ( first_slot, last_slot) = self
145+ . slot_range ( )
146+ . map_or ( ( None , None ) , |( first, last) | ( Some ( first) , Some ( last) ) ) ;
147+ ( self . batch_count ( ) , first_slot, last_slot)
148+ }
149+
150+ /// Return the nearest populated slots before and after `slot`, if any exist.
151+ fn neighboring_slots ( & self , slot : u64 ) -> ( Option < u64 > , Option < u64 > ) {
152+ let prev = self . batches . range ( ..slot) . next_back ( ) . map ( |( & idx, _) | idx) ;
153+ let next = self
154+ . batches
155+ . range ( slot. saturating_add ( 1 ) ..)
156+ . next ( )
157+ . map ( |( & idx, _) | idx) ;
158+ ( prev, next)
159+ }
129160}
130161
131162impl Default for WithdrawalStore {
@@ -155,6 +186,15 @@ pub fn compute_remaining_queue(withdrawals: &[abi::Withdrawal], processed_count:
155186// Withdrawal processor
156187// ---------------------------------------------------------------------------
157188
189+ struct StoreSnapshot {
190+ batch_count : usize ,
191+ first_slot : Option < u64 > ,
192+ last_slot : Option < u64 > ,
193+ prev_slot : Option < u64 > ,
194+ next_slot : Option < u64 > ,
195+ withdrawals : Option < Vec < abi:: Withdrawal > > ,
196+ }
197+
158198/// Background task that processes withdrawals from the ZonePortal queue on Tempo L1.
159199///
160200/// The processor waits for a [`Notify`] signal from the batch submitter (indicating a batch
@@ -173,6 +213,7 @@ pub struct WithdrawalProcessor {
173213 portal : ZonePortal :: ZonePortalInstance < DynProvider < TempoNetwork > , TempoNetwork > ,
174214 store : SharedWithdrawalStore ,
175215 notify : Arc < Notify > ,
216+ repair_notify : Arc < Notify > ,
176217 metrics : WithdrawalProcessorMetrics ,
177218}
178219
@@ -185,6 +226,7 @@ impl WithdrawalProcessor {
185226 provider : DynProvider < TempoNetwork > ,
186227 store : SharedWithdrawalStore ,
187228 notify : Arc < Notify > ,
229+ repair_notify : Arc < Notify > ,
188230 ) -> Self {
189231 let portal = ZonePortal :: new ( config. portal_address , provider) ;
190232
@@ -193,10 +235,30 @@ impl WithdrawalProcessor {
193235 portal,
194236 store,
195237 notify,
238+ repair_notify,
196239 metrics : WithdrawalProcessorMetrics :: default ( ) ,
197240 }
198241 }
199242
243+ /// Read the current store contents relevant to `slot` under a single lock.
244+ ///
245+ /// This keeps the diagnostic fields used in missing-slot logs consistent
246+ /// with each other and with the batch lookup result.
247+ fn capture_store_snapshot ( & self , slot : u64 ) -> StoreSnapshot {
248+ let store = self . store . lock ( ) ;
249+ let ( batch_count, first_slot, last_slot) = store. summary ( ) ;
250+ let ( prev_slot, next_slot) = store. neighboring_slots ( slot) ;
251+
252+ StoreSnapshot {
253+ batch_count,
254+ first_slot,
255+ last_slot,
256+ prev_slot,
257+ next_slot,
258+ withdrawals : store. get_batch ( slot) . cloned ( ) ,
259+ }
260+ }
261+
200262 /// Run the processor loop. This method never returns under normal operation.
201263 ///
202264 /// Waits for a notification from the batch submitter (or a fallback timeout) before
@@ -229,7 +291,14 @@ impl WithdrawalProcessor {
229291
230292 let head_val: u64 = head. try_into ( ) . map_err ( |_| eyre:: eyre!( "head overflow" ) ) ?;
231293 let tail_val: u64 = tail. try_into ( ) . map_err ( |_| eyre:: eyre!( "tail overflow" ) ) ?;
232- let store_batch_count = self . store . lock ( ) . batch_count ( ) ;
294+ let StoreSnapshot {
295+ batch_count : store_batch_count,
296+ first_slot : store_first_slot,
297+ last_slot : store_last_slot,
298+ prev_slot : prev_store_slot,
299+ next_slot : next_store_slot,
300+ withdrawals,
301+ } = self . capture_store_snapshot ( head_val) ;
233302 self . record_queue_metrics ( head_val, tail_val, store_batch_count) ;
234303
235304 if head_val == tail_val {
@@ -245,18 +314,20 @@ impl WithdrawalProcessor {
245314 "Withdrawal queue has pending slots"
246315 ) ;
247316
248- let withdrawals = {
249- let store = self . store . lock ( ) ;
250- store. get_batch ( head_val) . cloned ( )
251- } ;
252-
253317 let withdrawals = match withdrawals {
254318 Some ( w) if !w. is_empty ( ) => w,
255319 _ => {
320+ self . repair_notify . notify_one ( ) ;
256321 warn ! (
257322 slot = head_val,
258- store_batches = self . store. lock( ) . batch_count( ) ,
259- "No withdrawal data in store for current head slot, waiting for data"
323+ tail = tail_val,
324+ pending_slots,
325+ store_batches = store_batch_count,
326+ store_first_slot,
327+ store_last_slot,
328+ prev_store_slot,
329+ next_store_slot,
330+ "No withdrawal data in store for current head slot"
260331 ) ;
261332 return Ok ( ( ) ) ;
262333 }
@@ -413,9 +484,11 @@ pub fn spawn_withdrawal_processor(
413484 provider : DynProvider < TempoNetwork > ,
414485 store : SharedWithdrawalStore ,
415486 notify : Arc < Notify > ,
487+ repair_notify : Arc < Notify > ,
416488) -> tokio:: task:: JoinHandle < ( ) > {
417489 tokio:: spawn ( async move {
418- let mut processor = WithdrawalProcessor :: new ( config, provider, store, notify) ;
490+ let mut processor =
491+ WithdrawalProcessor :: new ( config, provider, store, notify, repair_notify) ;
419492 loop {
420493 if let Err ( e) = processor. run ( ) . await {
421494 error ! ( error = %e, "Withdrawal processor failed, restarting in 5s" ) ;
@@ -429,8 +502,22 @@ pub fn spawn_withdrawal_processor(
429502mod tests {
430503 use super :: * ;
431504 use crate :: abi:: EMPTY_SENTINEL ;
432- use alloy_primitives:: { address, keccak256} ;
505+ use alloy_primitives:: { Bytes , U256 , address, keccak256} ;
506+ use alloy_provider:: { Provider , ProviderBuilder } ;
433507 use alloy_sol_types:: SolValue ;
508+ use alloy_transport:: mock:: Asserter ;
509+ use tempo_alloy:: TempoNetwork ;
510+ use tokio:: time:: timeout;
511+
512+ fn mock_provider ( asserter : Asserter ) -> DynProvider < TempoNetwork > {
513+ ProviderBuilder :: new_with_network :: < TempoNetwork > ( )
514+ . connect_mocked_client ( asserter)
515+ . erased ( )
516+ }
517+
518+ fn abi_encode_u64 ( value : u64 ) -> Bytes {
519+ Bytes :: copy_from_slice ( & U256 :: from ( value) . to_be_bytes :: < 32 > ( ) )
520+ }
434521
435522 fn test_withdrawal ( to : Address , amount : u128 ) -> abi:: Withdrawal {
436523 abi:: Withdrawal {
@@ -587,4 +674,54 @@ mod tests {
587674 store. add_batch ( 1 , vec ! [ test_withdrawal( addr, 999 ) ] ) ;
588675 assert_eq ! ( store. batch_count( ) , 2 ) ;
589676 }
677+
678+ #[ test]
679+ fn store_replace_batches_reconciles_authoritative_view ( ) {
680+ let mut store = WithdrawalStore :: new ( ) ;
681+ let addr = address ! ( "0x0000000000000000000000000000000000000042" ) ;
682+
683+ store. add_batch ( 0 , vec ! [ test_withdrawal( addr, 100 ) ] ) ;
684+ store. add_batch ( 9 , vec ! [ test_withdrawal( addr, 900 ) ] ) ;
685+
686+ let mut reconciled = BTreeMap :: new ( ) ;
687+ reconciled. insert ( 5 , vec ! [ test_withdrawal( addr, 500 ) ] ) ;
688+ reconciled. insert ( 6 , vec ! [ test_withdrawal( addr, 600 ) ] ) ;
689+
690+ store. replace_batches ( reconciled) ;
691+
692+ assert ! ( !store. has_batch( 0 ) ) ;
693+ assert ! ( !store. has_batch( 9 ) ) ;
694+ assert ! ( store. has_batch( 5 ) ) ;
695+ assert ! ( store. has_batch( 6 ) ) ;
696+ assert_eq ! ( store. batch_count( ) , 2 ) ;
697+ }
698+
699+ #[ tokio:: test]
700+ async fn process_queue_requests_monitor_resync_when_head_slot_missing ( ) {
701+ let l1 = Asserter :: new ( ) ;
702+ l1. push_success ( & abi_encode_u64 ( 51 ) ) ;
703+ l1. push_success ( & abi_encode_u64 ( 71 ) ) ;
704+
705+ let config = WithdrawalProcessorConfig {
706+ portal_address : address ! ( "0x7069DeC4E64Fd07334A0933eDe836C17259c9B23" ) ,
707+ l1_rpc_url : "http://unused.test" . to_string ( ) ,
708+ fallback_poll_interval : Duration :: from_secs ( 1 ) ,
709+ } ;
710+ let notify = Arc :: new ( Notify :: new ( ) ) ;
711+ let repair_notify = Arc :: new ( Notify :: new ( ) ) ;
712+ let mut processor = WithdrawalProcessor :: new (
713+ config,
714+ mock_provider ( l1. clone ( ) ) ,
715+ SharedWithdrawalStore :: new ( ) ,
716+ notify,
717+ repair_notify. clone ( ) ,
718+ ) ;
719+
720+ processor. process_queue ( ) . await . unwrap ( ) ;
721+
722+ timeout ( Duration :: from_millis ( 50 ) , repair_notify. notified ( ) )
723+ . await
724+ . expect ( "missing head slot should request a monitor resync" ) ;
725+ assert ! ( l1. read_q( ) . is_empty( ) ) ;
726+ }
590727}
0 commit comments