11import { Socket } from 'node:net' ;
2+ import { Writable } from 'node:stream' ;
23
34import { expect } from 'chai' ;
45import * as sinon from 'sinon' ;
@@ -11,7 +12,9 @@ import {
1112 MongoClientAuthProviders ,
1213 MongoDBCollectionNamespace ,
1314 MongoNetworkTimeoutError ,
15+ MongoRuntimeError ,
1416 ns ,
17+ promiseWithResolvers ,
1518 SizedMessageTransform
1619} from '../../mongodb' ;
1720import * as mock from '../../tools/mongodb-mock/index' ;
@@ -333,5 +336,76 @@ describe('new Connection()', function () {
333336 expect ( stream . read ( 1 ) ) . to . deep . equal ( Buffer . from ( [ 6 , 0 , 0 , 0 , 5 , 6 ] ) ) ;
334337 expect ( stream . read ( 1 ) ) . to . equal ( null ) ;
335338 } ) ;
339+
340+ it ( 'parses many wire messages when a single chunk arrives' , function ( ) {
341+ const stream = new SizedMessageTransform ( { connection : { } as any } ) ;
342+
343+ let dataCount = 0 ;
344+ stream . on ( 'data' , chunk => {
345+ expect ( chunk ) . to . have . lengthOf ( 8 ) ;
346+ dataCount += 1 ;
347+ } ) ;
348+
349+ // 3 messages of size 8
350+ stream . write (
351+ Buffer . from ( [
352+ ...[ 8 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ] ,
353+ ...[ 8 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ] ,
354+ ...[ 8 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ]
355+ ] )
356+ ) ;
357+
358+ expect ( dataCount ) . to . equal ( 3 ) ;
359+ } ) ;
360+
361+ it ( 'parses many wire messages when a single chunk arrives and processes the remaining partial when it is complete' , function ( ) {
362+ const stream = new SizedMessageTransform ( { connection : { } as any } ) ;
363+
364+ let dataCount = 0 ;
365+ stream . on ( 'data' , chunk => {
366+ expect ( chunk ) . to . have . lengthOf ( 8 ) ;
367+ dataCount += 1 ;
368+ } ) ;
369+
370+ // 3 messages of size 8
371+ stream . write (
372+ Buffer . from ( [
373+ ...[ 8 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ] ,
374+ ...[ 8 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ] ,
375+ ...[ 8 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ] ,
376+ ...[ 8 , 0 , 0 , 0 , 0 , 0 ] // two shy of 8
377+ ] )
378+ ) ;
379+
380+ expect ( dataCount ) . to . equal ( 3 ) ;
381+
382+ stream . write ( Buffer . from ( [ 0 , 0 ] ) ) ; // the rest of the last 8
383+
384+ expect ( dataCount ) . to . equal ( 4 ) ;
385+ } ) ;
386+
387+ it ( 'throws an error when backpressure detected' , async function ( ) {
388+ const stream = new SizedMessageTransform ( { connection : { } as any } ) ;
389+ const destination = new Writable ( {
390+ highWaterMark : 1 ,
391+ objectMode : true ,
392+ write : ( chunk , encoding , callback ) => {
393+ void stream ;
394+ setTimeout ( 1 ) . then ( ( ) => callback ( ) ) ;
395+ }
396+ } ) ;
397+
398+ // 1000 messages of size 8
399+ stream . write (
400+ Buffer . from ( Array . from ( { length : 1000 } , ( ) => [ 8 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ] ) . flat ( 1 ) )
401+ ) ;
402+
403+ const { promise, resolve, reject } = promiseWithResolvers ( ) ;
404+
405+ stream . on ( 'error' , reject ) . pipe ( destination ) . on ( 'error' , reject ) . on ( 'finish' , resolve ) ;
406+
407+ const error = await promise . catch ( error => error ) ;
408+ expect ( error ) . to . be . instanceOf ( MongoRuntimeError ) ;
409+ } ) ;
336410 } ) ;
337411} ) ;
0 commit comments