How to use JSON-RPC with Websockets in Python?

websockets json

We’ll use Websockets to take JSON-RPC requests. It should respond to “ping” with “pong”.

Install websockets to take requests and jsonrpcserver to process them:

pip install websockets jsonrpcserver

Create a server.py:

import asyncio

from jsonrpcserver import method, Success, Result, async_dispatch
import websockets


@method
async def ping() -> Result:
    return Success("pong")


async def main(websocket, path):
    if response := await async_dispatch(await websocket.recv()):
        await websocket.send(response)


start_server = websockets.serve(main, "localhost", 5000)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

Start the server:

$ python server.py

Client

Use jsonrpcclient to send requests:

pip install websockets jsonrpcclient

Create a client.py:

import asyncio
import logging

from jsonrpcclient import Ok, parse_json, request_json
import websockets


async def main():
    async with websockets.connect("ws://localhost:5000") as ws:
        await ws.send(request_json("ping"))
        response = parse_json(await ws.recv())

    if isinstance(response, Ok):
        print(response.result)
    else:
        logging.error(response.message)


asyncio.get_event_loop().run_until_complete(main())

Run the client:

$ python client.py
pong