@@ -50,6 +50,7 @@ use mononoke_app::MononokeReposManager;
5050use mononoke_macros:: mononoke;
5151use mononoke_types:: RepositoryId ;
5252use mononoke_types:: Timestamp ;
53+ use rand:: RngExt as _;
5354use stats:: define_stats;
5455use stats:: prelude:: * ;
5556use tracing:: debug;
@@ -95,6 +96,8 @@ const DEQUEUE_STREAM_SLEEP_TIME: u64 = 1000;
9596// if it hasn't updated inprogress timestamp
9697const ABANDONED_REQUEST_THRESHOLD_SECS : i64 = 5 * 60 ;
9798const KEEP_ALIVE_INTERVAL : Duration = Duration :: from_secs ( 10 ) ;
99+ const CONCURRENCY_LIMIT_BACKOFF_BASE : Duration = Duration :: from_secs ( 15 ) ;
100+ const CONCURRENCY_LIMIT_BACKOFF_MAX_JITTER_SECS : u64 = 15 ;
98101
99102define_stats ! {
100103 prefix = "async_requests.worker" ;
@@ -373,6 +376,11 @@ impl AsyncMethodRequestWorker {
373376 release_ondemand_repo_impl ( repo_id, & self . ondemand_repo_refs , & self . repos_mgr ) ;
374377 }
375378
379+ fn concurrency_limit_backoff ( ) -> Duration {
380+ let jitter_secs = rand:: rng ( ) . random_range ( 0 ..=CONCURRENCY_LIMIT_BACKOFF_MAX_JITTER_SECS ) ;
381+ CONCURRENCY_LIMIT_BACKOFF_BASE + Duration :: from_secs ( jitter_secs)
382+ }
383+
376384 /// Params into stored response. Doesn't mark it as "in progress" (as this is done during dequeueing).
377385 /// Returns true if the result was successfully stored. Returns false if we
378386 /// lost the race (the request table was updated).
@@ -415,17 +423,23 @@ impl AsyncMethodRequestWorker {
415423 root_request_id,
416424 created_by. as_deref ( ) ,
417425 ) ;
418- log_start ( & ctx) ;
419426
420427 // Check concurrency limit for this request type. If exceeded,
421- // requeue so another worker can try later when capacity frees up.
428+ // hold the claim briefly before requeueing so the same hot request
429+ // does not immediately churn across the whole worker fleet.
422430 match self . queue . concurrency_limit_reached ( & ctx, & req_id. 1 ) . await {
423431 Ok ( true ) => {
424432 let row_id = req_id. 0 ;
433+ let backoff = Self :: concurrency_limit_backoff ( ) ;
425434 info ! (
426- "[{}] Concurrency limit reached for {}, requeuing " ,
427- & row_id, & req_id. 1.0 ,
435+ "[{}] Concurrency limit reached for {}, backing off for {:?} " ,
436+ & row_id, & req_id. 1.0 , backoff ,
428437 ) ;
438+ ctx. scuba ( )
439+ . clone ( )
440+ . add ( "backoff_ms" , backoff. as_millis ( ) as i64 )
441+ . log_with_msg ( "Request throttled by concurrency limit" , None ) ;
442+ tokio:: time:: sleep ( backoff) . await ;
429443 if let Err ( requeue_err) = self . queue . requeue ( & ctx, req_id) . await {
430444 error ! (
431445 "[{}] Failed to requeue request after concurrency limit: {:?}" ,
@@ -441,6 +455,8 @@ impl AsyncMethodRequestWorker {
441455 _ => { }
442456 }
443457
458+ log_start ( & ctx) ;
459+
444460 // Save refs for cleanup after self is partially moved.
445461 let ondemand_repo_refs = self . ondemand_repo_refs . clone ( ) ;
446462 let repos_mgr_for_cleanup = self . repos_mgr . clone ( ) ;
0 commit comments