Sanic
Sanic server exposing an OpenRPC server over HTTP and Websocket.
import asyncio
from openrpc import RPCServer
from sanic import HTTPResponse, Request, Sanic, text, Websocket
app = Sanic("DemoServer")
rpc = RPCServer(title="DemoServer", version="1.0.0")
@rpc.method()
async def add(a: int, b: int) -> int:
    return a + b
@app.websocket("/api/v1/")
async def ws_process_rpc(_request: Request, ws: Websocket) -> None:
    """Process RPC requests through websocket."""
    async def _process_rpc(request: str) -> None:
        json_rpc_response = await rpc.process_request_async(request)
        if json_rpc_response is not None:
            await ws.send(json_rpc_response)
    async for msg in ws:
        asyncio.create_task(_process_rpc(msg))
@app.post("/api/v1/")
async def http_process_rpc(request: Request) -> HTTPResponse:
    """Process RPC request through HTTP server."""
    json_rpc_response = await rpc.process_request_async(request.body)
    return text(json_rpc_response, headers={"Content-Type": "application/json"})
if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8080)