Skip to content

Commit e7298b0

Browse files
committed
stream: improve Web Compression spec compliance
Pass rejectGarbageAfterEnd: true to createInflateRaw() and createBrotliDecompress(), matching the behavior already in place for deflate and gzip. The Compression Streams spec treats any data following a valid compressed payload as an error. When the underlying Node.js stream throws synchronously from write() (e.g. zlib rejects an invalid chunk type), destroy the stream so that the readable side is also errored. Without this, the readable side hangs forever waiting for data that will never arrive. Introduce a kValidateChunk callback option in the webstreams adapter layer. Compression streams use this to validate that chunks are BufferSource instances not backed by SharedArrayBuffer, replacing the previous monkey-patching of the underlying handle's write method. Unskip WPT compression bad-chunks tests which now run instead of hang and mark the remaining expected failures.
1 parent 41d5b41 commit e7298b0

6 files changed

Lines changed: 268 additions & 23 deletions

File tree

lib/internal/webstreams/adapters.js

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ const {
1111
SafePromiseAll,
1212
SafePromisePrototypeFinally,
1313
SafeSet,
14+
StringPrototypeStartsWith,
15+
Symbol,
1416
TypeError,
1517
TypedArrayPrototypeGetBuffer,
1618
TypedArrayPrototypeGetByteLength,
@@ -94,6 +96,8 @@ const { UV_EOF } = internalBinding('uv');
9496

9597
const encoder = new TextEncoder();
9698

99+
const kValidateChunk = Symbol('kValidateChunk');
100+
97101
// Collect all negative (error) ZLIB codes and Z_NEED_DICT
98102
const ZLIB_FAILURES = new SafeSet([
99103
...ArrayPrototypeFilter(
@@ -139,9 +143,10 @@ function handleKnownInternalErrors(cause) {
139143

140144
/**
141145
* @param {Writable} streamWritable
146+
* @param {object} [options]
142147
* @returns {WritableStream}
143148
*/
144-
function newWritableStreamFromStreamWritable(streamWritable) {
149+
function newWritableStreamFromStreamWritable(streamWritable, options = kEmptyObject) {
145150
// Not using the internal/streams/utils isWritableNodeStream utility
146151
// here because it will return false if streamWritable is a Duplex
147152
// whose writable option is false. For a Duplex that is not writable,
@@ -220,12 +225,25 @@ function newWritableStreamFromStreamWritable(streamWritable) {
220225
if (!streamWritable.writableObjectMode && isArrayBuffer(chunk)) {
221226
chunk = new Uint8Array(chunk);
222227
}
223-
if (streamWritable.writableNeedDrain || !streamWritable.write(chunk)) {
224-
backpressurePromise = PromiseWithResolvers();
225-
return SafePromisePrototypeFinally(
226-
backpressurePromise.promise, () => {
227-
backpressurePromise = undefined;
228-
});
228+
// If the underlying Node.js stream throws synchronously from
229+
// write() (e.g. zlib rejects an invalid chunk type), we must
230+
// destroy the stream so that the readable side is also errored.
231+
// Without this, the readable side would hang forever waiting
232+
// for data that will never arrive. The same applies to any
233+
// caller-provided chunk validation (e.g. web compression
234+
// SharedArrayBuffer rejection).
235+
try {
236+
options[kValidateChunk]?.(chunk);
237+
if (streamWritable.writableNeedDrain || !streamWritable.write(chunk)) {
238+
backpressurePromise = PromiseWithResolvers();
239+
return SafePromisePrototypeFinally(
240+
backpressurePromise.promise, () => {
241+
backpressurePromise = undefined;
242+
});
243+
}
244+
} catch (error) {
245+
destroy(streamWritable, error);
246+
throw error;
229247
}
230248
},
231249

@@ -662,9 +680,14 @@ function newReadableWritablePairFromDuplex(duplex, options = kEmptyObject) {
662680
return { readable, writable };
663681
}
664682

683+
const writableOptions = {
684+
__proto__: null,
685+
[kValidateChunk]: options[kValidateChunk],
686+
};
687+
665688
const writable =
666689
isWritable(duplex) ?
667-
newWritableStreamFromStreamWritable(duplex) :
690+
newWritableStreamFromStreamWritable(duplex, writableOptions) :
668691
new WritableStream();
669692

670693
if (!isWritable(duplex))
@@ -1064,4 +1087,5 @@ module.exports = {
10641087
newStreamDuplexFromReadableWritablePair,
10651088
newWritableStreamFromStreamBase,
10661089
newReadableStreamFromStreamBase,
1090+
kValidateChunk,
10671091
};

lib/internal/webstreams/compression.js

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,27 @@ const {
77

88
const {
99
newReadableWritablePairFromDuplex,
10+
kValidateChunk,
1011
} = require('internal/webstreams/adapters');
1112

1213
const { customInspect } = require('internal/webstreams/util');
1314

15+
const {
16+
isArrayBufferView,
17+
isSharedArrayBuffer,
18+
} = require('internal/util/types');
19+
1420
const {
1521
customInspectSymbol: kInspect,
1622
kEnumerableProperty,
1723
} = require('internal/util');
1824

25+
const {
26+
codes: {
27+
ERR_INVALID_ARG_TYPE,
28+
},
29+
} = require('internal/errors');
30+
1931
const { createEnumConverter } = require('internal/webidl');
2032

2133
let zlib;
@@ -24,6 +36,18 @@ function lazyZlib() {
2436
return zlib;
2537
}
2638

39+
// Per the Compression Streams spec, chunks must be BufferSource
40+
// (ArrayBuffer or ArrayBufferView not backed by SharedArrayBuffer).
41+
function validateBufferSourceChunk(chunk) {
42+
if (isArrayBufferView(chunk) && isSharedArrayBuffer(chunk.buffer)) {
43+
throw new ERR_INVALID_ARG_TYPE(
44+
'chunk',
45+
['Buffer', 'TypedArray', 'DataView'],
46+
chunk,
47+
);
48+
}
49+
}
50+
2751
const formatConverter = createEnumConverter('CompressionFormat', [
2852
'deflate',
2953
'deflate-raw',
@@ -62,7 +86,9 @@ class CompressionStream {
6286
this.#handle = lazyZlib().createBrotliCompress();
6387
break;
6488
}
65-
this.#transform = newReadableWritablePairFromDuplex(this.#handle);
89+
this.#transform = newReadableWritablePairFromDuplex(this.#handle, {
90+
[kValidateChunk]: validateBufferSourceChunk,
91+
});
6692
}
6793

6894
/**
@@ -108,25 +134,23 @@ class DecompressionStream {
108134
});
109135
break;
110136
case 'deflate-raw':
111-
this.#handle = lazyZlib().createInflateRaw();
137+
this.#handle = lazyZlib().createInflateRaw({
138+
rejectGarbageAfterEnd: true,
139+
});
112140
break;
113141
case 'gzip':
114142
this.#handle = lazyZlib().createGunzip({
115143
rejectGarbageAfterEnd: true,
116144
});
117145
break;
118146
case 'brotli':
119-
this.#handle = lazyZlib().createBrotliDecompress();
147+
this.#handle = lazyZlib().createBrotliDecompress({
148+
rejectGarbageAfterEnd: true,
149+
});
120150
break;
121151
}
122-
this.#transform = newReadableWritablePairFromDuplex(this.#handle);
123-
124-
this.#handle.on('error', (err) => {
125-
if (this.#transform?.writable &&
126-
!this.#transform.writable.locked &&
127-
typeof this.#transform.writable.abort === 'function') {
128-
this.#transform.writable.abort(err);
129-
}
152+
this.#transform = newReadableWritablePairFromDuplex(this.#handle, {
153+
[kValidateChunk]: validateBufferSourceChunk,
130154
});
131155
}
132156

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
'use strict';
2+
// Flags: --no-warnings --expose-internals
3+
require('../common');
4+
const assert = require('assert');
5+
const test = require('node:test');
6+
const { Writable } = require('stream');
7+
const {
8+
newWritableStreamFromStreamWritable,
9+
} = require('internal/webstreams/adapters');
10+
11+
// Verify that when the underlying Node.js stream throws synchronously from
12+
// write(), the writable web stream properly rejects and the underlying
13+
// stream is destroyed.
14+
15+
test('WritableStream from Node.js stream handles sync write throw', async () => {
16+
const error = new TypeError('invalid chunk');
17+
const writable = new Writable({
18+
write() {
19+
throw error;
20+
},
21+
});
22+
23+
const ws = newWritableStreamFromStreamWritable(writable);
24+
const writer = ws.getWriter();
25+
26+
await assert.rejects(writer.write('bad'), (err) => {
27+
assert.strictEqual(err, error);
28+
return true;
29+
});
30+
31+
// The underlying stream must be destroyed
32+
assert.strictEqual(writable.destroyed, true);
33+
});
34+
35+
test('WritableStream from Node.js stream - valid writes still work', async () => {
36+
const chunks = [];
37+
const writable = new Writable({
38+
write(chunk, _encoding, cb) {
39+
chunks.push(chunk);
40+
cb();
41+
},
42+
});
43+
44+
const ws = newWritableStreamFromStreamWritable(writable);
45+
const writer = ws.getWriter();
46+
47+
await writer.write(Buffer.from('hello'));
48+
await writer.write(Buffer.from(' world'));
49+
await writer.close();
50+
51+
assert.strictEqual(Buffer.concat(chunks).toString(), 'hello world');
52+
});
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
'use strict';
2+
require('../common');
3+
const assert = require('assert');
4+
const test = require('node:test');
5+
const { CompressionStream, DecompressionStream } = require('stream/web');
6+
7+
// Verify that writing invalid (non-BufferSource) chunks to
8+
// CompressionStream and DecompressionStream properly rejects
9+
// on both the write and the read side, instead of hanging.
10+
11+
const badChunks = [
12+
{ name: 'undefined', value: undefined, code: 'ERR_INVALID_ARG_TYPE' },
13+
{ name: 'null', value: null, code: 'ERR_STREAM_NULL_VALUES' },
14+
{ name: 'number', value: 3.14, code: 'ERR_INVALID_ARG_TYPE' },
15+
{ name: 'object', value: {}, code: 'ERR_INVALID_ARG_TYPE' },
16+
{ name: 'array', value: [65], code: 'ERR_INVALID_ARG_TYPE' },
17+
{
18+
name: 'SharedArrayBuffer',
19+
value: new SharedArrayBuffer(1),
20+
code: 'ERR_INVALID_ARG_TYPE',
21+
},
22+
{
23+
name: 'Uint8Array backed by SharedArrayBuffer',
24+
value: new Uint8Array(new SharedArrayBuffer(1)),
25+
code: 'ERR_INVALID_ARG_TYPE',
26+
},
27+
];
28+
29+
for (const format of ['deflate', 'deflate-raw', 'gzip', 'brotli']) {
30+
for (const { name, value, code } of badChunks) {
31+
const expected = { name: 'TypeError', code };
32+
33+
test(`CompressionStream rejects bad chunk (${name}) for ${format}`, async () => {
34+
const cs = new CompressionStream(format);
35+
const writer = cs.writable.getWriter();
36+
const reader = cs.readable.getReader();
37+
38+
const writePromise = writer.write(value);
39+
const readPromise = reader.read();
40+
41+
await assert.rejects(writePromise, expected);
42+
await assert.rejects(readPromise, expected);
43+
});
44+
45+
test(`DecompressionStream rejects bad chunk (${name}) for ${format}`, async () => {
46+
const ds = new DecompressionStream(format);
47+
const writer = ds.writable.getWriter();
48+
const reader = ds.readable.getReader();
49+
50+
const writePromise = writer.write(value);
51+
const readPromise = reader.read();
52+
53+
await assert.rejects(writePromise, expected);
54+
await assert.rejects(readPromise, expected);
55+
});
56+
}
57+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
'use strict';
2+
require('../common');
3+
const assert = require('assert');
4+
const test = require('node:test');
5+
const { CompressionStream, DecompressionStream } = require('stream/web');
6+
7+
// Verify that DecompressionStream rejects trailing data after a valid
8+
// compressed payload for all four supported formats (deflate, deflate-raw, gzip, brotli).
9+
10+
const input = Buffer.from('hello');
11+
const trailingJunk = Buffer.from([0xDE, 0xAD]);
12+
13+
async function compress(format, data) {
14+
const cs = new CompressionStream(format);
15+
const writer = cs.writable.getWriter();
16+
writer.write(data);
17+
writer.close();
18+
const chunks = await Array.fromAsync(cs.readable);
19+
return Buffer.concat(chunks.map((c) => Buffer.from(c)));
20+
}
21+
22+
for (const format of ['deflate', 'deflate-raw', 'gzip', 'brotli']) {
23+
test(`DecompressionStream rejects trailing junk for ${format}`, async () => {
24+
const compressed = await compress(format, input);
25+
const withJunk = Buffer.concat([compressed, trailingJunk]);
26+
27+
const ds = new DecompressionStream(format);
28+
const writer = ds.writable.getWriter();
29+
const reader = ds.readable.getReader();
30+
31+
writer.write(withJunk).catch(() => {});
32+
writer.close().catch(() => {});
33+
34+
await assert.rejects(async () => {
35+
const chunks = [];
36+
while (true) {
37+
const { done, value } = await reader.read();
38+
if (done) break;
39+
chunks.push(value);
40+
}
41+
}, (err) => {
42+
assert(err instanceof Error, `Expected Error, got ${err?.constructor?.name}`);
43+
return true;
44+
});
45+
});
46+
47+
test(`DecompressionStream accepts valid ${format} data without trailing junk`, async () => {
48+
const compressed = await compress(format, input);
49+
50+
const ds = new DecompressionStream(format);
51+
const writer = ds.writable.getWriter();
52+
53+
writer.write(compressed);
54+
writer.close();
55+
56+
const chunks = await Array.fromAsync(ds.readable);
57+
const result = Buffer.concat(chunks.map((c) => Buffer.from(c)));
58+
assert.strictEqual(result.toString(), 'hello');
59+
});
60+
}
61+
62+
// Extra: Verify that trailing data is also rejected when passed as a separate
63+
// chunk after the valid compressed data has been fully written.
64+
for (const format of ['deflate', 'deflate-raw', 'gzip', 'brotli']) {
65+
test(`DecompressionStream rejects trailing junk as separate chunk for ${format}`, async () => {
66+
const compressed = await compress(format, input);
67+
68+
const ds = new DecompressionStream(format);
69+
const writer = ds.writable.getWriter();
70+
const reader = ds.readable.getReader();
71+
72+
writer.write(compressed).catch(() => {});
73+
writer.write(trailingJunk).catch(() => {});
74+
writer.close().catch(() => {});
75+
76+
await assert.rejects(async () => {
77+
while (true) {
78+
const { done } = await reader.read();
79+
if (done) break;
80+
}
81+
}, (err) => {
82+
assert(err instanceof Error, `Expected Error, got ${err?.constructor?.name}`);
83+
return true;
84+
});
85+
});
86+
}

test/wpt/status/compression.json

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
{
2-
"compression-bad-chunks.any.js": {
3-
"skip": "Execution \"hangs\", ArrayBuffer and TypedArray is not accepted and throws, instead of rejects during writer.write"
4-
},
52
"decompression-bad-chunks.any.js": {
6-
"skip": "Execution \"hangs\", ArrayBuffer and TypedArray is not accepted and throws, instead of rejects during writer.write"
3+
"fail": {
4+
"expected": [
5+
"chunk of type invalid deflate bytes should error the stream for brotli",
6+
"chunk of type invalid gzip bytes should error the stream for brotli"
7+
]
8+
}
79
},
810
"compression-with-detach.window.js": {
911
"requires": ["crypto"]

0 commit comments

Comments
 (0)