hypercorn.utils module#

exception hypercorn.utils.FrameTooLargeError#

Bases: Exception

exception hypercorn.utils.LifespanFailureError(stage: str, message: str)#

Bases: Exception

exception hypercorn.utils.LifespanTimeoutError(stage: str)#

Bases: Exception

exception hypercorn.utils.MustReloadError#

Bases: Exception

exception hypercorn.utils.NoAppError#

Bases: Exception

exception hypercorn.utils.ShutdownError#

Bases: Exception

exception hypercorn.utils.UnexpectedMessageError(state: Enum, message_type: str)#

Bases: Exception

hypercorn.utils.build_and_validate_headers(headers: Iterable[Tuple[bytes, bytes]]) List[Tuple[bytes, bytes]]#
async hypercorn.utils.check_multiprocess_shutdown_event(shutdown_event: Event, sleep: Callable[[float], Awaitable[Any]]) None#
hypercorn.utils.filter_pseudo_headers(headers: List[Tuple[bytes, bytes]]) List[Tuple[bytes, bytes]]#
async hypercorn.utils.invoke_asgi(app: Union[Type[ASGI2Protocol], Callable[[Union[HTTPScope, WebsocketScope, LifespanScope], Callable[[], Awaitable[Union[HTTPRequestEvent, HTTPDisconnectEvent, WebsocketConnectEvent, WebsocketReceiveEvent, WebsocketDisconnectEvent, LifespanStartupEvent, LifespanShutdownEvent]]], Callable[[Union[HTTPResponseStartEvent, HTTPResponseBodyEvent, HTTPServerPushEvent, HTTPEarlyHintEvent, HTTPDisconnectEvent, WebsocketAcceptEvent, WebsocketSendEvent, WebsocketResponseStartEvent, WebsocketResponseBodyEvent, WebsocketCloseEvent, LifespanStartupCompleteEvent, LifespanStartupFailedEvent, LifespanShutdownCompleteEvent, LifespanShutdownFailedEvent]], Awaitable[None]]], Awaitable[None]]], scope: Union[HTTPScope, WebsocketScope, LifespanScope], receive: Callable[[], Awaitable[Union[HTTPRequestEvent, HTTPDisconnectEvent, WebsocketConnectEvent, WebsocketReceiveEvent, WebsocketDisconnectEvent, LifespanStartupEvent, LifespanShutdownEvent]]], send: Callable[[Union[HTTPResponseStartEvent, HTTPResponseBodyEvent, HTTPServerPushEvent, HTTPEarlyHintEvent, HTTPDisconnectEvent, WebsocketAcceptEvent, WebsocketSendEvent, WebsocketResponseStartEvent, WebsocketResponseBodyEvent, WebsocketCloseEvent, LifespanStartupCompleteEvent, LifespanStartupFailedEvent, LifespanShutdownCompleteEvent, LifespanShutdownFailedEvent]], Awaitable[None]]) None#
hypercorn.utils.load_application(path: str) Union[Type[ASGI2Protocol], Callable[[Union[HTTPScope, WebsocketScope, LifespanScope], Callable[[], Awaitable[Union[HTTPRequestEvent, HTTPDisconnectEvent, WebsocketConnectEvent, WebsocketReceiveEvent, WebsocketDisconnectEvent, LifespanStartupEvent, LifespanShutdownEvent]]], Callable[[Union[HTTPResponseStartEvent, HTTPResponseBodyEvent, HTTPServerPushEvent, HTTPEarlyHintEvent, HTTPDisconnectEvent, WebsocketAcceptEvent, WebsocketSendEvent, WebsocketResponseStartEvent, WebsocketResponseBodyEvent, WebsocketCloseEvent, LifespanStartupCompleteEvent, LifespanStartupFailedEvent, LifespanShutdownCompleteEvent, LifespanShutdownFailedEvent]], Awaitable[None]]], Awaitable[None]]]#
async hypercorn.utils.observe_changes(sleep: Callable[[float], Awaitable[Any]]) None#
hypercorn.utils.parse_socket_addr(family: int, address: tuple) Optional[Tuple[str, int]]#
async hypercorn.utils.raise_shutdown(shutdown_event: Callable[[...], Awaitable[None]]) None#
hypercorn.utils.repr_socket_addr(family: int, address: tuple) str#
hypercorn.utils.restart() None#
hypercorn.utils.suppress_body(method: str, status_code: int) bool#
hypercorn.utils.valid_server_name(config: Config, request: Request) bool#
hypercorn.utils.write_pid_file(pid_path: str) None#