web3.py Patterns: Real-Time Event Subscriptions
Most Ethereum clients include eth_subscribe
support, allowing you to listen for specific events as they occur. This applies to a limited set of events: new block headers, the syncing status of a node, new pending transactions, and emitted logs from smart contracts. This blog post will quickly cover how to subscribe to these events with the brand new web3.py subscription manager.
Important! 🚧
Subscriptions require a persistent socket connection between you and the Ethereum client. For that reason, you must use web3.py's WebSocketProvider
or AsyncIPCProvider
to utilize subscriptions. As it is the more common of the two, all examples in this blog post will leverage the WebSocketProvider
.
This post also assumes you have a basic understanding of asynchronous programming patterns in Python. If you're new to these concepts, you may need to do some reading first, but the examples will hopefully be digestible to a wide audience.
An introduction to subscriptions
When you subscribe to an event – new block headers, for example – you'll receive a subscription ID. The Ethereum client will then maintain a connection to your application and send along any related event until you unsubscribe with that ID. That example in code:
import asyncio
from web3 import AsyncWeb3, WebSocketProvider
async def example():
# connect to a node:
async with AsyncWeb3(WebSocketProvider("wss://...")) as w3:
# subscribe to new block headers:
subscription_id = await w3.eth.subscribe("newHeads")
print(subscription_id)
# listen for events as they occur:
async for response in w3.socket.process_subscriptions():
# handle each event:
print(response)
# unsubscribe:
if response["number"] > 42012345:
await w3.eth.unsubscribe(subscription_id)
break
asyncio.run(example())
web3.py's subscription manager
The example above is the "manual" approach to managing subscriptions. It's not so complicated in the case of listening for new block headers, but things get considerably more complex once you start listening for smart contract event logs or managing multiple subscriptions.
As of v7.7.0, web3.py includes some additional convenient subscription management features. We'll step through them now.
1.) The subscription_manager
First, your w3
instance now includes a new module, subscription_manager
. While you may still use the w3.eth.subscribe
method from the previous example, the subscription_manager
offers an additional way to start one or more subscriptions. We're going to pass in a list of events we want to subscribe to within the w3.subscription_manager.subscribe
method.
await w3.subscription_manager.subscribe([sub1, sub2, ...])
2.) Subscription types
To aid in defining those subscriptions, subscription type classes have been introduced: NewHeadsSubscription
, PendingTxSubscription
, LogsSubscription
, and SyncingSubscription
. Each class is context aware, meaning it will throw an error if you provide an unexpected data type.
from web3.utils.subscriptions import (
NewHeadsSubscription,
PendingTxSubscription,
LogsSubscription,
)
sub1 = NewHeadsSubscription(
label="new-heads-mainnet", # optional label
handler=new_heads_handler
)
sub2 = PendingTxSubscription(
label="pending-tx-mainnet", # optional label
full_transactions=True,
handler=pending_tx_handler,
)
sub3 = LogsSubscription(
label="WETH transfers", # optional label
address=weth_contract.address,
topics=[weth_contract.events.Transfer().topic],
handler=log_handler,
# optional `handler_context` args to help parse a response
handler_context={"transfer_event": weth_contract.events.Transfer()},
)
3.) Handlers
In the example above, there is a handler
specified for each subscription. These are context-aware functions that you can declare separate from the subscription logic. Within each handler, parse and perform whatever logic you require.
Note that in addition to the result
being processed, the handler_context
in each handler provides access to your async_w3
instance, the subscription
instance, and any custom values declared within the handler_context
of the subscription.
from web3.utils.subscriptions import LogsSubscriptionContext
async def new_heads_handler(
handler_context: LogsSubscriptionContext,
) -> None:
log_receipt = handler_context.result
print(f"New log: {log_receipt}\n")
event_data = handler_context.transfer_event.process_log(log_receipt)
print(f"Log event data: {event_data}\n")
if log_receipt["blockNumber"] > 42012345:
await handler_context.subscription.unsubscribe()
4.) handle_subscriptions
Finally, when all your subscriptions are configured, utilize the handle_subscriptions
method to begin processing them. If you need to listen for events on multiple chains, create one w3
instance per chain.
async def sub_manager():
...
# handle subscriptions via configured handlers:
await w3.subscription_manager.handle_subscriptions()
# or, gather one w3 instance per chain:
await asyncio.gather(
w3.subscription_manager.handle_subscriptions(),
l2_w3.subscription_manager.handle_subscriptions(),
)
asyncio.run(sub_manager())
5.) Unsubscribing
If you don't want to subscribe indefinitely to an event, you can unsubscribe
at any point. The first example in this post demonstrated the manual approach:
await w3.eth.unsubscribe(subscription_id)
The new handler pattern will keep track of the subscription ID for you however, so the same can be accomplished via the handler_context
without an ID:
async def new_heads_handler(handler_context):
...
if some_condition:
await handler_context.subscription.unsubscribe()
Lastly, if you're wrapping up the whole show, you can reach for unsubscribe_all
on the subscription_manager
:
await w3.subscription_manager.unsubscribe_all()
assert subscription_manager.subscriptions == []
An example
Putting all the pieces together:
import asyncio
from web3 import AsyncWeb3, WebSocketProvider
from web3.utils.subscriptions import (
NewHeadsSubscription,
NewHeadsSubscriptionContext,
LogsSubscription,
LogsSubscriptionContext,
)
# -- declare handlers --
async def new_heads_handler(
handler_context: NewHeadsSubscriptionContext,
) -> None:
header = handler_context.result
print(f"New block header: {header}\n")
async def log_handler(
handler_context: LogsSubscriptionContext,
) -> None:
log_receipt = handler_context.result
print(f"Log receipt: {log_receipt}\n")
async def sub_manager():
# -- initialize provider --
w3 = await AsyncWeb3(WebSocketProvider("wss://..."))
# -- subscribe to event(s) --
await w3.subscription_manager.subscribe(
[
NewHeadsSubscription(
label="new-heads-mainnet",
handler=new_heads_handler
),
LogsSubscription(
label="WETH transfers",
address=w3.to_checksum_address(
"0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
),
topics=["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"],
handler=log_handler,
),
]
)
# -- listen for events --
await w3.subscription_manager.handle_subscriptions()
asyncio.run(sub_manager())
Wrapping up
Subscribing to events in web3.py
is a powerful way to build responsive applications that interact with the Ethereum blockchain. We hope these additions to your toolbelt aid you along the way. If you'd like to get even deeper in the weeds, check out the web3.py internals documentation.
As always, feedback is welcome, file a GitHub issue if you bump into a bug or have a feature request, and join the Ethereum Python Community Discord server to give and get help. 🐍✨