@@ -76,6 +76,7 @@ static void free_batch_commit_buf(struct ublk_thread *t)
7676 free (t -> commit_buf );
7777 }
7878 allocator_deinit (& t -> commit_buf_alloc );
79+ free (t -> commit );
7980}
8081
8182static int alloc_batch_commit_buf (struct ublk_thread * t )
@@ -84,7 +85,13 @@ static int alloc_batch_commit_buf(struct ublk_thread *t)
8485 unsigned int total = buf_size * t -> nr_commit_buf ;
8586 unsigned int page_sz = getpagesize ();
8687 void * buf = NULL ;
87- int ret ;
88+ int i , ret , j = 0 ;
89+
90+ t -> commit = calloc (t -> nr_queues , sizeof (* t -> commit ));
91+ for (i = 0 ; i < t -> dev -> dev_info .nr_hw_queues ; i ++ ) {
92+ if (t -> q_map [i ])
93+ t -> commit [j ++ ].q_id = i ;
94+ }
8895
8996 allocator_init (& t -> commit_buf_alloc , t -> nr_commit_buf );
9097
@@ -107,6 +114,17 @@ static int alloc_batch_commit_buf(struct ublk_thread *t)
107114 return ret ;
108115}
109116
117+ static unsigned int ublk_thread_nr_queues (const struct ublk_thread * t )
118+ {
119+ int i ;
120+ int ret = 0 ;
121+
122+ for (i = 0 ; i < t -> dev -> dev_info .nr_hw_queues ; i ++ )
123+ ret += !!t -> q_map [i ];
124+
125+ return ret ;
126+ }
127+
110128void ublk_batch_prepare (struct ublk_thread * t )
111129{
112130 /*
@@ -119,10 +137,13 @@ void ublk_batch_prepare(struct ublk_thread *t)
119137 */
120138 struct ublk_queue * q = & t -> dev -> q [0 ];
121139
140+ /* cache nr_queues because we don't support dynamic load-balance yet */
141+ t -> nr_queues = ublk_thread_nr_queues (t );
142+
122143 t -> commit_buf_elem_size = ublk_commit_elem_buf_size (t -> dev );
123144 t -> commit_buf_size = ublk_commit_buf_size (t );
124145 t -> commit_buf_start = t -> nr_bufs ;
125- t -> nr_commit_buf = 2 ;
146+ t -> nr_commit_buf = 2 * t -> nr_queues ;
126147 t -> nr_bufs += t -> nr_commit_buf ;
127148
128149 t -> cmd_flags = 0 ;
@@ -144,11 +165,12 @@ static void free_batch_fetch_buf(struct ublk_thread *t)
144165{
145166 int i ;
146167
147- for (i = 0 ; i < UBLKS_T_NR_FETCH_BUF ; i ++ ) {
168+ for (i = 0 ; i < t -> nr_fetch_bufs ; i ++ ) {
148169 io_uring_free_buf_ring (& t -> ring , t -> fetch [i ].br , 1 , i );
149170 munlock (t -> fetch [i ].fetch_buf , t -> fetch [i ].fetch_buf_size );
150171 free (t -> fetch [i ].fetch_buf );
151172 }
173+ free (t -> fetch );
152174}
153175
154176static int alloc_batch_fetch_buf (struct ublk_thread * t )
@@ -159,7 +181,12 @@ static int alloc_batch_fetch_buf(struct ublk_thread *t)
159181 int ret ;
160182 int i = 0 ;
161183
162- for (i = 0 ; i < UBLKS_T_NR_FETCH_BUF ; i ++ ) {
184+ /* double fetch buffer for each queue */
185+ t -> nr_fetch_bufs = t -> nr_queues * 2 ;
186+ t -> fetch = calloc (t -> nr_fetch_bufs , sizeof (* t -> fetch ));
187+
188+ /* allocate one buffer for each queue */
189+ for (i = 0 ; i < t -> nr_fetch_bufs ; i ++ ) {
163190 t -> fetch [i ].fetch_buf_size = buf_size ;
164191
165192 if (posix_memalign ((void * * )& t -> fetch [i ].fetch_buf , pg_sz ,
@@ -185,7 +212,7 @@ int ublk_batch_alloc_buf(struct ublk_thread *t)
185212{
186213 int ret ;
187214
188- ublk_assert (t -> nr_commit_buf < 16 );
215+ ublk_assert (t -> nr_commit_buf < 2 * UBLK_MAX_QUEUES );
189216
190217 ret = alloc_batch_commit_buf (t );
191218 if (ret )
@@ -271,13 +298,20 @@ static void ublk_batch_queue_fetch(struct ublk_thread *t,
271298 t -> fetch [buf_idx ].fetch_buf_off = 0 ;
272299}
273300
274- void ublk_batch_start_fetch (struct ublk_thread * t ,
275- struct ublk_queue * q )
301+ void ublk_batch_start_fetch (struct ublk_thread * t )
276302{
277303 int i ;
304+ int j = 0 ;
305+
306+ for (i = 0 ; i < t -> dev -> dev_info .nr_hw_queues ; i ++ ) {
307+ if (t -> q_map [i ]) {
308+ struct ublk_queue * q = & t -> dev -> q [i ];
278309
279- for (i = 0 ; i < UBLKS_T_NR_FETCH_BUF ; i ++ )
280- ublk_batch_queue_fetch (t , q , i );
310+ /* submit two fetch commands for each queue */
311+ ublk_batch_queue_fetch (t , q , j ++ );
312+ ublk_batch_queue_fetch (t , q , j ++ );
313+ }
314+ }
281315}
282316
283317static unsigned short ublk_compl_batch_fetch (struct ublk_thread * t ,
@@ -317,7 +351,7 @@ static unsigned short ublk_compl_batch_fetch(struct ublk_thread *t,
317351 return buf_idx ;
318352}
319353
320- int ublk_batch_queue_prep_io_cmds (struct ublk_thread * t , struct ublk_queue * q )
354+ static int __ublk_batch_queue_prep_io_cmds (struct ublk_thread * t , struct ublk_queue * q )
321355{
322356 unsigned short nr_elem = q -> q_depth ;
323357 unsigned short buf_idx = ublk_alloc_commit_buf (t );
@@ -354,6 +388,22 @@ int ublk_batch_queue_prep_io_cmds(struct ublk_thread *t, struct ublk_queue *q)
354388 return 0 ;
355389}
356390
391+ int ublk_batch_queue_prep_io_cmds (struct ublk_thread * t , struct ublk_queue * q )
392+ {
393+ int ret = 0 ;
394+
395+ pthread_spin_lock (& q -> lock );
396+ if (q -> flags & UBLKS_Q_PREPARED )
397+ goto unlock ;
398+ ret = __ublk_batch_queue_prep_io_cmds (t , q );
399+ if (!ret )
400+ q -> flags |= UBLKS_Q_PREPARED ;
401+ unlock :
402+ pthread_spin_unlock (& q -> lock );
403+
404+ return ret ;
405+ }
406+
357407static void ublk_batch_compl_commit_cmd (struct ublk_thread * t ,
358408 const struct io_uring_cqe * cqe ,
359409 unsigned op )
@@ -401,59 +451,89 @@ void ublk_batch_compl_cmd(struct ublk_thread *t,
401451 }
402452}
403453
404- void ublk_batch_commit_io_cmds (struct ublk_thread * t )
454+ static void __ublk_batch_commit_io_cmds (struct ublk_thread * t ,
455+ struct batch_commit_buf * cb )
405456{
406457 struct io_uring_sqe * sqe ;
407458 unsigned short buf_idx ;
408- unsigned short nr_elem = t -> commit . done ;
459+ unsigned short nr_elem = cb -> done ;
409460
410461 /* nothing to commit */
411462 if (!nr_elem ) {
412- ublk_free_commit_buf (t , t -> commit . buf_idx );
463+ ublk_free_commit_buf (t , cb -> buf_idx );
413464 return ;
414465 }
415466
416467 ublk_io_alloc_sqes (t , & sqe , 1 );
417- buf_idx = t -> commit . buf_idx ;
418- sqe -> addr = (__u64 )t -> commit . elem ;
468+ buf_idx = cb -> buf_idx ;
469+ sqe -> addr = (__u64 )cb -> elem ;
419470 sqe -> len = nr_elem * t -> commit_buf_elem_size ;
420471
421472 /* commit isn't per-queue command */
422- ublk_init_batch_cmd (t , t -> commit . q_id , sqe , UBLK_U_IO_COMMIT_IO_CMDS ,
473+ ublk_init_batch_cmd (t , cb -> q_id , sqe , UBLK_U_IO_COMMIT_IO_CMDS ,
423474 t -> commit_buf_elem_size , nr_elem , buf_idx );
424475 ublk_setup_commit_sqe (t , sqe , buf_idx );
425476}
426477
427- static void ublk_batch_init_commit (struct ublk_thread * t ,
428- unsigned short buf_idx )
478+ void ublk_batch_commit_io_cmds (struct ublk_thread * t )
479+ {
480+ int i ;
481+
482+ for (i = 0 ; i < t -> nr_queues ; i ++ ) {
483+ struct batch_commit_buf * cb = & t -> commit [i ];
484+
485+ if (cb -> buf_idx != UBLKS_T_COMMIT_BUF_INV_IDX )
486+ __ublk_batch_commit_io_cmds (t , cb );
487+ }
488+
489+ }
490+
491+ static void __ublk_batch_init_commit (struct ublk_thread * t ,
492+ struct batch_commit_buf * cb ,
493+ unsigned short buf_idx )
429494{
430495 /* so far only support 1:1 queue/thread mapping */
431- t -> commit .q_id = t -> idx ;
432- t -> commit .buf_idx = buf_idx ;
433- t -> commit .elem = ublk_get_commit_buf (t , buf_idx );
434- t -> commit .done = 0 ;
435- t -> commit .count = t -> commit_buf_size /
496+ cb -> buf_idx = buf_idx ;
497+ cb -> elem = ublk_get_commit_buf (t , buf_idx );
498+ cb -> done = 0 ;
499+ cb -> count = t -> commit_buf_size /
436500 t -> commit_buf_elem_size ;
437501}
438502
439- void ublk_batch_prep_commit (struct ublk_thread * t )
503+ /* COMMIT_IO_CMDS is per-queue command, so use its own commit buffer */
504+ static void ublk_batch_init_commit (struct ublk_thread * t ,
505+ struct batch_commit_buf * cb )
440506{
441507 unsigned short buf_idx = ublk_alloc_commit_buf (t );
442508
443509 ublk_assert (buf_idx != UBLKS_T_COMMIT_BUF_INV_IDX );
444- ublk_batch_init_commit (t , buf_idx );
510+ ublk_assert (!ublk_batch_commit_prepared (cb ));
511+
512+ __ublk_batch_init_commit (t , cb , buf_idx );
513+ }
514+
515+ void ublk_batch_prep_commit (struct ublk_thread * t )
516+ {
517+ int i ;
518+
519+ for (i = 0 ; i < t -> nr_queues ; i ++ )
520+ t -> commit [i ].buf_idx = UBLKS_T_COMMIT_BUF_INV_IDX ;
445521}
446522
447523void ublk_batch_complete_io (struct ublk_thread * t , struct ublk_queue * q ,
448524 unsigned tag , int res )
449525{
450- struct batch_commit_buf * cb = & t -> commit ;
451- struct ublk_batch_elem * elem = ( struct ublk_batch_elem * )( cb -> elem +
452- cb -> done * t -> commit_buf_elem_size ) ;
526+ unsigned q_t_idx = ublk_queue_idx_in_thread ( t , q ) ;
527+ struct batch_commit_buf * cb = & t -> commit [ q_t_idx ];
528+ struct ublk_batch_elem * elem ;
453529 struct ublk_io * io = & q -> ios [tag ];
454530
455- ublk_assert (q -> q_id == t -> commit .q_id );
531+ if (!ublk_batch_commit_prepared (cb ))
532+ ublk_batch_init_commit (t , cb );
533+
534+ ublk_assert (q -> q_id == cb -> q_id );
456535
536+ elem = (struct ublk_batch_elem * )(cb -> elem + cb -> done * t -> commit_buf_elem_size );
457537 elem -> tag = tag ;
458538 elem -> buf_index = ublk_batch_io_buf_idx (t , q , tag );
459539 elem -> result = res ;
@@ -464,3 +544,64 @@ void ublk_batch_complete_io(struct ublk_thread *t, struct ublk_queue *q,
464544 cb -> done += 1 ;
465545 ublk_assert (cb -> done <= cb -> count );
466546}
547+
548+ void ublk_batch_setup_map (unsigned char (* q_thread_map )[UBLK_MAX_QUEUES ],
549+ int nthreads , int queues )
550+ {
551+ int i , j ;
552+
553+ /*
554+ * Setup round-robin queue-to-thread mapping for arbitrary N:M combinations.
555+ *
556+ * This algorithm distributes queues across threads (and threads across queues)
557+ * in a balanced round-robin fashion to ensure even load distribution.
558+ *
559+ * Examples:
560+ * - 2 threads, 4 queues: T0=[Q0,Q2], T1=[Q1,Q3]
561+ * - 4 threads, 2 queues: T0=[Q0], T1=[Q1], T2=[Q0], T3=[Q1]
562+ * - 3 threads, 3 queues: T0=[Q0], T1=[Q1], T2=[Q2] (1:1 mapping)
563+ *
564+ * Phase 1: Mark which queues each thread handles (boolean mapping)
565+ */
566+ for (i = 0 , j = 0 ; i < queues || j < nthreads ; i ++ , j ++ ) {
567+ q_thread_map [j % nthreads ][i % queues ] = 1 ;
568+ }
569+
570+ /*
571+ * Phase 2: Convert boolean mapping to sequential indices within each thread.
572+ *
573+ * Transform from: q_thread_map[thread][queue] = 1 (handles queue)
574+ * To: q_thread_map[thread][queue] = N (queue index within thread)
575+ *
576+ * This allows each thread to know the local index of each queue it handles,
577+ * which is essential for buffer allocation and management. For example:
578+ * - Thread 0 handling queues [0,2] becomes: q_thread_map[0][0]=1, q_thread_map[0][2]=2
579+ * - Thread 1 handling queues [1,3] becomes: q_thread_map[1][1]=1, q_thread_map[1][3]=2
580+ */
581+ for (j = 0 ; j < nthreads ; j ++ ) {
582+ unsigned char seq = 1 ;
583+
584+ for (i = 0 ; i < queues ; i ++ ) {
585+ if (q_thread_map [j ][i ])
586+ q_thread_map [j ][i ] = seq ++ ;
587+ }
588+ }
589+
590+ #if 0
591+ for (j = 0 ; j < nthreads ; j ++ ) {
592+ printf ("thread %0d: " , j );
593+ for (i = 0 ; i < queues ; i ++ ) {
594+ if (q_thread_map [j ][i ])
595+ printf ("%03u " , i );
596+ }
597+ printf ("\n" );
598+ }
599+ printf ("\n" );
600+ for (j = 0 ; j < nthreads ; j ++ ) {
601+ for (i = 0 ; i < queues ; i ++ ) {
602+ printf ("%03u " , q_thread_map [j ][i ]);
603+ }
604+ printf ("\n" );
605+ }
606+ #endif
607+ }
0 commit comments