src.uagents.asgi
ASGIServer Objects ↗ (opens in a new tab)
class ASGIServer()
ASGI server for receiving incoming envelopes.
init ↗ (opens in a new tab)
def __init__(port: int, loop: asyncio.AbstractEventLoop, queries: Dict[str, asyncio.Future], logger: Optional[Logger] = None)
Initialize the ASGI server.
Arguments:
port
int - The port to listen on.loop
asyncio.AbstractEventLoop - The event loop to use.queries
Dict[str, asyncio.Future] - The dictionary of queries to resolve.logger
Optional[Logger] - The logger to use.
server ↗ (opens in a new tab)
@property def server()
Property to access the underlying uvicorn server.
Returns: The server.
add_rest_endpoint ↗ (opens in a new tab)
def add_rest_endpoint(address: str, method: RestMethod, endpoint: str, request: Optional[Type[Model]], response: Type[Union[Model, BaseModel]])
Add a REST endpoint to the server.
has_rest_endpoint ↗ (opens in a new tab)
def has_rest_endpoint(method: RestMethod, endpoint: str) -> bool
Check if the server has a REST endpoint registered.
handle_readiness_probe ↗ (opens in a new tab)
async def handle_readiness_probe(headers: CaseInsensitiveDict, send)
Handle a readiness probe sent via the HEAD method.
handle_missing_content_type ↗ (opens in a new tab)
async def handle_missing_content_type(headers: CaseInsensitiveDict, send)
Handle missing content type header.
serve ↗ (opens in a new tab)
async def serve()
Start the server.
call ↗ (opens in a new tab)
async def __call__(scope, receive, send)
Handle an incoming ASGI message, dispatching the envelope to the appropriate handler, and waiting for any queries to be resolved.