@@ -46,7 +46,8 @@ include "_headers.pxi"
4646
4747from aiohttp cimport _find_header
4848
49- ALLOWED_UPGRADES = frozenset ({" websocket" })
49+
50+ cdef frozenset ALLOWED_UPGRADES = frozenset ({" websocket" })
5051DEF DEFAULT_FREELIST_SIZE = 250
5152
5253cdef extern from " Python.h" :
@@ -69,7 +70,7 @@ cdef object CONTENT_ENCODING = hdrs.CONTENT_ENCODING
6970cdef object EMPTY_PAYLOAD = _EMPTY_PAYLOAD
7071cdef object StreamReader = _StreamReader
7172cdef object DeflateBuffer = _DeflateBuffer
72- cdef bytes EMPTY_BYTES = b" "
73+ cdef tuple EMPTY_FEED_DATA_RESULT = ((), False , b" " )
7374
7475# RFC 9110 singleton headers — duplicates are rejected in strict mode.
7576# In lax mode (response parser default), the check is skipped entirely
@@ -298,7 +299,7 @@ cdef class HttpParser:
298299 bint _has_value
299300 int _header_name_size
300301
301- object _protocol
302+ readonly object protocol
302303 object _loop
303304 object _timer
304305
@@ -309,6 +310,7 @@ cdef class HttpParser:
309310 bint _read_until_eof
310311 bint _lax
311312
313+ bytes _tail
312314 bint _started
313315 object _url
314316 bytearray _buf
@@ -319,6 +321,9 @@ cdef class HttpParser:
319321 list _raw_headers
320322 bint _upgraded
321323 list _messages
324+ bint _more_data_available
325+ bint _paused
326+ bint _eof_pending
322327 object _payload
323328 bint _payload_error
324329 object _payload_exception
@@ -359,18 +364,22 @@ cdef class HttpParser:
359364 self ._cparser.data = < void * > self
360365 self ._cparser.content_length = 0
361366
362- self ._protocol = protocol
367+ self .protocol = protocol
363368 self ._loop = loop
364369 self ._timer = timer
365370
366371 self ._buf = bytearray()
372+ self ._more_data_available = False
373+ self ._paused = False
374+ self ._eof_pending = False
367375 self ._payload = None
368376 self ._payload_error = 0
369377 self ._payload_exception = payload_exception
370378 self ._messages = []
371379
372- self ._raw_name = EMPTY_BYTES
373- self ._raw_value = EMPTY_BYTES
380+ self ._raw_name = b" "
381+ self ._raw_value = b" "
382+ self ._tail = b" "
374383 self ._has_value = False
375384 self ._header_name_size = 0
376385
@@ -401,7 +410,7 @@ cdef class HttpParser:
401410
402411 cdef _process_header(self ):
403412 cdef str value
404- if self ._raw_name is not EMPTY_BYTES :
413+ if self ._raw_name ! = b " " :
405414 name = find_header(self ._raw_name)
406415 value = self ._raw_value.decode(' utf-8' , ' surrogateescape' )
407416
@@ -426,20 +435,20 @@ cdef class HttpParser:
426435 self ._has_value = False
427436 self ._header_name_size = 0
428437 self ._raw_headers.append((self ._raw_name, self ._raw_value))
429- self ._raw_name = EMPTY_BYTES
430- self ._raw_value = EMPTY_BYTES
438+ self ._raw_name = b " "
439+ self ._raw_value = b " "
431440
432441 cdef _on_header_field(self , char * at, size_t length):
433442 if self ._has_value:
434443 self ._process_header()
435444
436- if self ._raw_name is EMPTY_BYTES :
445+ if self ._raw_name == b " " :
437446 self ._raw_name = at[:length]
438447 else :
439448 self ._raw_name += at[:length]
440449
441450 cdef _on_header_value(self , char * at, size_t length):
442- if self ._raw_value is EMPTY_BYTES :
451+ if self ._raw_value == b " " :
443452 self ._raw_value = at[:length]
444453 else :
445454 self ._raw_value += at[:length]
@@ -495,14 +504,14 @@ cdef class HttpParser:
495504 self ._read_until_eof)
496505 ):
497506 payload = StreamReader(
498- self ._protocol , timer = self ._timer, loop = self ._loop,
507+ self .protocol , timer = self ._timer, loop = self ._loop,
499508 limit = self ._limit)
500509 else :
501510 payload = EMPTY_PAYLOAD
502511
503512 self ._payload = payload
504513 if encoding is not None and self ._auto_decompress:
505- self ._payload = DeflateBuffer(payload, encoding)
514+ self ._payload = DeflateBuffer(payload, encoding, max_decompress_size = self ._limit )
506515
507516 if not self ._response_with_body:
508517 payload = EMPTY_PAYLOAD
@@ -535,6 +544,10 @@ cdef class HttpParser:
535544
536545 # ## Public API ###
537546
547+ def pause_reading (self ):
548+ assert self ._payload is not None
549+ self ._paused = True
550+
538551 def feed_eof (self ):
539552 cdef bytes desc
540553
@@ -549,18 +562,52 @@ cdef class HttpParser:
549562 desc = cparser.llhttp_get_error_reason(self ._cparser)
550563 raise PayloadEncodingError(desc.decode(' latin-1' ))
551564 else :
565+ self ._eof_pending = True
566+ while self ._more_data_available:
567+ if self ._paused:
568+ self ._paused = False
569+ return # Will resume via feed_data(b"") later
570+ self ._more_data_available = self ._payload.feed_data(b" " )
552571 self ._payload.feed_eof()
572+ self ._payload = None
573+ self ._more_data_available = False
574+ self ._eof_pending = False
553575 elif self ._started:
554576 self ._on_headers_complete()
555577 if self ._messages:
556578 return self ._messages[- 1 ][0 ]
557579
558- def feed_data (self , data ):
580+ def feed_data (self , incoming_data ):
559581 cdef:
560582 size_t data_len
561583 size_t nb
562584 char * base
563585 cdef cparser.llhttp_errno_t errno
586+ cdef bytes data
587+
588+ # Proactor loop sends bytearray.
589+ # Ensure cython sees `data` as bytes
590+ if type (incoming_data) is not bytes:
591+ data = bytes(incoming_data)
592+ else :
593+ data = incoming_data
594+
595+ if self ._tail:
596+ data, self ._tail = self ._tail + data, b" "
597+
598+ if self ._more_data_available:
599+ result = cb_on_body(self ._cparser, b" " , 0 )
600+ if result is cparser.HPE_PAUSED:
601+ self ._tail = data
602+ return EMPTY_FEED_DATA_RESULT
603+
604+ if self ._eof_pending:
605+ self ._payload.feed_eof()
606+ self ._payload = None
607+ self ._eof_pending = False
608+ # We can't have new messages here, otherwise we wouldn't have
609+ # received EOF.
610+ return EMPTY_FEED_DATA_RESULT
564611
565612 PyObject_GetBuffer(data, & self .py_buf, PyBUF_SIMPLE)
566613 # Cache buffer pointer before PyBuffer_Release to avoid use-after-release.
@@ -574,12 +621,15 @@ cdef class HttpParser:
574621
575622 if errno is cparser.HPE_PAUSED_UPGRADE:
576623 cparser.llhttp_resume_after_upgrade(self ._cparser)
577-
578624 nb = cparser.llhttp_get_error_pos(self ._cparser) - base
625+ elif errno is cparser.HPE_PAUSED:
626+ cparser.llhttp_resume(self ._cparser)
627+ pos = cparser.llhttp_get_error_pos(self ._cparser) - base
628+ self ._tail = data[pos:]
579629
580630 PyBuffer_Release(& self .py_buf)
581631
582- if errno not in (cparser.HPE_OK, cparser.HPE_PAUSED_UPGRADE):
632+ if errno not in (cparser.HPE_OK, cparser.HPE_PAUSED, cparser. HPE_PAUSED_UPGRADE):
583633 if self ._payload_error == 0 :
584634 if self ._last_error is not None :
585635 ex = self ._last_error
@@ -603,8 +653,9 @@ cdef class HttpParser:
603653
604654 if self ._upgraded:
605655 return messages, True , data[nb:]
606- else :
607- return messages, False , b" "
656+ if not messages: # Shortcut to reduce Python overhead
657+ return EMPTY_FEED_DATA_RESULT
658+ return messages, False , b" "
608659
609660 def set_upgraded (self , val ):
610661 self ._upgraded = val
@@ -799,19 +850,26 @@ cdef int cb_on_body(cparser.llhttp_t* parser,
799850 const char * at, size_t length) except - 1 :
800851 cdef HttpParser pyparser = < HttpParser> parser.data
801852 cdef bytes body = at[:length]
802- try :
803- pyparser._payload.feed_data(body)
804- except BaseException as underlying_exc:
805- reraised_exc = underlying_exc
806- if pyparser._payload_exception is not None :
807- reraised_exc = pyparser._payload_exception(str (underlying_exc))
808-
809- set_exception(pyparser._payload, reraised_exc, underlying_exc)
810-
811- pyparser._payload_error = 1
812- return - 1
813- else :
814- return 0
853+ while body or pyparser._more_data_available:
854+ try :
855+ pyparser._more_data_available = pyparser._payload.feed_data(body)
856+ except BaseException as underlying_exc:
857+ reraised_exc = underlying_exc
858+ if pyparser._payload_exception is not None :
859+ reraised_exc = pyparser._payload_exception(str (underlying_exc))
860+
861+ set_exception(pyparser._payload, reraised_exc, underlying_exc)
862+
863+ pyparser._payload_error = 1
864+ pyparser._paused = False
865+ return - 1
866+ body = b" "
867+
868+ if pyparser._paused:
869+ pyparser._paused = False
870+ return cparser.HPE_PAUSED
871+ pyparser._paused = False
872+ return 0
815873
816874
817875cdef int cb_on_message_complete(cparser.llhttp_t* parser) except - 1 :
0 commit comments