hypercorn.protocol.h2 module#

exception hypercorn.protocol.h2.BufferCompleteError#

Bases: Exception

class hypercorn.protocol.h2.H2Protocol(app: Union[Type[ASGI2Protocol], Callable[[Union[HTTPScope, WebsocketScope, LifespanScope], Callable[[], Awaitable[Union[HTTPRequestEvent, HTTPDisconnectEvent, WebsocketConnectEvent, WebsocketReceiveEvent, WebsocketDisconnectEvent, LifespanStartupEvent, LifespanShutdownEvent]]], Callable[[Union[HTTPResponseStartEvent, HTTPResponseBodyEvent, HTTPServerPushEvent, HTTPEarlyHintEvent, HTTPDisconnectEvent, WebsocketAcceptEvent, WebsocketSendEvent, WebsocketResponseStartEvent, WebsocketResponseBodyEvent, WebsocketCloseEvent, LifespanStartupCompleteEvent, LifespanStartupFailedEvent, LifespanShutdownCompleteEvent, LifespanShutdownFailedEvent]], Awaitable[None]]], Awaitable[None]]], config: Config, context: WorkerContext, task_group: TaskGroup, ssl: bool, client: Optional[Tuple[str, int]], server: Optional[Tuple[str, int]], send: Callable[[Event], Awaitable[None]])#

Bases: object

async handle(event: Event) None#
property idle: bool#
async initiate(headers: Optional[List[Tuple[bytes, bytes]]] = None, settings: Optional[str] = None) None#
async send_task() None#
async stream_send(event: Event) None#
class hypercorn.protocol.h2.StreamBuffer(event_class: Type[Event])#

Bases: object

async close() None#
property complete: bool#
async drain() None#
async pop(max_length: int) bytes#
async push(data: bytes) None#
set_complete() None#