11import { Socket } from 'node:net' ;
2+ import { Writable } from 'node:stream' ;
23
34import { expect } from 'chai' ;
45import * as sinon from 'sinon' ;
@@ -11,7 +12,9 @@ import {
1112 MongoClientAuthProviders ,
1213 MongoDBCollectionNamespace ,
1314 MongoNetworkTimeoutError ,
15+ MongoRuntimeError ,
1416 ns ,
17+ promiseWithResolvers ,
1518 SizedMessageTransform
1619} from '../../mongodb' ;
1720import * as mock from '../../tools/mongodb-mock/index' ;
@@ -333,5 +336,49 @@ describe('new Connection()', function () {
333336 expect ( stream . read ( 1 ) ) . to . deep . equal ( Buffer . from ( [ 6 , 0 , 0 , 0 , 5 , 6 ] ) ) ;
334337 expect ( stream . read ( 1 ) ) . to . equal ( null ) ;
335338 } ) ;
339+
340+ it ( 'parses many wire messages when chunk arrives' , function ( ) {
341+ const stream = new SizedMessageTransform ( { connection : { } as any } ) ;
342+
343+ let dataCount = 0 ;
344+ stream . on ( 'data' , ( ) => {
345+ dataCount += 1 ;
346+ } ) ;
347+
348+ // 3 messages of size 8
349+ stream . write (
350+ Buffer . from ( [
351+ ...[ 8 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ] ,
352+ ...[ 8 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ] ,
353+ ...[ 8 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ]
354+ ] )
355+ ) ;
356+
357+ expect ( dataCount ) . to . equal ( 3 ) ;
358+ } ) ;
359+
360+ it ( 'waits for a drain event when destination needs backpressure' , async function ( ) {
361+ const stream = new SizedMessageTransform ( { connection : { } as any } ) ;
362+ const destination = new Writable ( {
363+ highWaterMark : 1 ,
364+ objectMode : true ,
365+ write : ( chunk , encoding , callback ) => {
366+ void stream ;
367+ setTimeout ( 1 ) . then ( ( ) => callback ( ) ) ;
368+ }
369+ } ) ;
370+
371+ // 1000 messages of size 8
372+ stream . write (
373+ Buffer . from ( Array . from ( { length : 1000 } , ( ) => [ 8 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ] ) . flat ( 1 ) )
374+ ) ;
375+
376+ const { promise, resolve, reject } = promiseWithResolvers ( ) ;
377+
378+ stream . on ( 'error' , reject ) . pipe ( destination ) . on ( 'error' , reject ) . on ( 'finish' , resolve ) ;
379+
380+ const error = await promise . catch ( error => error ) ;
381+ expect ( error ) . to . be . instanceOf ( MongoRuntimeError ) ;
382+ } ) ;
336383 } ) ;
337384} ) ;
0 commit comments