web3.py Patterns: Bloom Filters

Have you ever queried an Ethereum block and wondered what that "logsBloom" was? Have you gone looking for the most efficient way to find an event within a block? If you answered "yes" to either of the above, then you're in for a good time. This post will quickly describe Bloom filters, how they're leveraged in Ethereum, and how to filter blocks for events in Python.

What is a Bloom filter?

Bloom filters are efficient data structures that can be used to detect the likely presence of some element in a set. False positives can occur, but – more importantly – Bloom filters can guarantee that an item does not appear in a set. That's as deep as we need to go for the purposes of this post; see the Wikipedia entry for the details of the algorithm.

How are Bloom filters used in Ethereum?

A Bloom filter is included along with other metadata within each block. The name, "logsBloom", leads us in the right direction: it's a 256-byte Bloom filter that can be used to detect the presence of event logs within that block.

Why is this valuable? If you're looking for the history of a smart contract's usage, one very inefficient way to do it would be to query every block and its transactions to detect the presence of an event you're interested in. One alternative is to check a block's logsBloom for the event, then move on to the next block when no hits are detected.

Note: another Bloom filter is available in each transaction receipt, so that you can quickly detect the presence of an event at that scope, but we'll stick to the block-level Bloom filter for this example.

How can I detect an event in a block?

In short, you'll need to encode the event you're looking for, then check for its presence within the logsBloom. A utility library, eth-bloom, can do a lot of the heavy lifting.

# pseudocode

bloom = BloomFilter(logsBloom)
event_signature = encode("MyEvent(uint256)")

if event_signature in bloom:
    print("event (likely) found!")

The other important component is getting an event signature in the correct format. We'll cover that next.

How do I get an event signature?

Within smart contracts, events are declared once, then emitted at appropriate points within the execution logic. Let's consider one of the most frequently emitted events, the Transfer event of the ERC-20 token standard. Within Solidity, the Transfer event is defined and emitted like so:

# Solidity:

# declare an event
event Transfer(address indexed from, address indexed to, uint256 value);

# emit an event
emit Transfer(sender, recipient, amount);

The same idea in Vyper:

# Vyper:

# declare an event
event Transfer:
    from: indexed(address)
    to: indexed(address)
    value: uint256

# emit an event
emit Transfer(sender, recipient, amount)

Regardless of the contract language and whether the topics are indexed, the event signature is the string representation of the event name and argument types, with whitespace removed. In this example of the Transfer event, that works out to "Transfer(address,address,uint256)".

Finally, what gets represented in the logsBloom is the keccak256 hash of that event signature string.

event_signature = Web3.keccak(text="Transfer(address,address,uint256)")

if event_signature in bloom:
    print("event (likely) found!")
    # fetch and handle those logs

Putting it all together

Below is a brief working example that stitches together the concepts we've discussed.

from web3 import Web3, HTTPProvider
from eth_bloom import BloomFilter

w3 = Web3(HTTPProvider("..."))

block = w3.eth.get_block("latest")
bloom = BloomFilter(int.from_bytes(block["logsBloom"]))

event_signature = Web3.keccak(text="Transfer(address,address,uint256)")

if event_signature in bloom:
    print(f"Event (likely) found")
    # fetch and process logs here
else:
    print(f"Event not found")

Again, the Bloom filter only informs you of the likely presence of a log you're interested in. You've still got to fetch the logs if you want to process that data. Within the happy path code block, you can include a get_logs call, for example:

logs = w3.eth.get_logs(
  {
    "fromBlock": block["number"],
    "toBlock": block["number"],
    "address": contract_address,
    "topics": [event_signature],
  }
)

print(f"Found {len(logs)} events in {block['number']}")

What about realtime data?

We'll wander away from Bloom filters briefly here, but if your goal is to listen for events as they happen, the WebSocketProvider and AsyncIPCProvider offer eth_subscribe support; subscriptions can be established for specific event logs as they occur. You can set filter criteria to listen for events emitted by a particular contract address, or containing specific input values, but this example listens for all Transfer events:

import asyncio
from web3 import AsyncWeb3, WebSocketProvider
from eth_bloom import BloomFilter
from eth_abi import decode


async def log_sub():
    async with AsyncWeb3(WebSocketProvider(f"wss://...")) as w3:
        event_signature = w3.keccak(text="Transfer(address,address,uint256)")
        subscription_id = await w3.eth.subscribe("logs", [event_signature])

        async for response in w3.socket.process_subscriptions():
            log = response["result"]
            
            src = decode(["address"], log["topics"][1])[0]
            dst = decode(["address"], log["topics"][2])[0]
            wad = decode(["uint256"], log["data"])[0]

            print(
                f"\nTransfer from {src} to {dst} of {wad}\n"
                f"tx hash: {w3.to_hex(log['transactionHash'])}\n"
                f"token contract address: {log['address']}\n"
            )

                
asyncio.run(log_sub())

Let's return to Bloom filters and take a look at one more contrived example. In this script, we'll create a subscription to new block headers, then reference the logsBloom for each block, conditionally fetching logs if they are detected.

import asyncio
from web3 import AsyncWeb3, WebSocketProvider
from eth_bloom import BloomFilter
from eth_abi import decode


async def subscription_example():
    async with AsyncWeb3(WebSocketProvider(f"wss://...")) as w3:
        subscription_id = await w3.eth.subscribe("newHeads")

        async for response in w3.socket.process_subscriptions():
            block = response["result"]
            block_num = block["number"]
            print(f"New block: {block_num}")
            
            bloom = BloomFilter(int.from_bytes(block["logsBloom"]))

            event_signature = w3.keccak(text="Transfer(address,address,uint256)")

            if event_signature in bloom:
                logs = await w3.eth.get_logs(
                    {
                        "fromBlock": block_num,
                        "toBlock": block_num,
                        "topics": [event_signature],
                    }
                )
                print(f"Found {len(logs)} events in block {block_num}\n")
                
                for log in logs:
                    # process the logs here

                
asyncio.run(subscription_example())

Wrapping up

The Transfer example was used in this post, because it's virtually guaranteed to give you a successful query result in any given block. For that reason, its not especially useful to check for the presence of those events, unless scoped further.

Another good question is when to use the logsBloom versus get_logs or subscriptions. The answer, of course, is It Depends™ on your context. If you're looking for a rule of thumb:

  • If you want events as soon as they occur, create a subscription via the WebSocketProvider or AsyncIPCProvider.
  • If you've already collected a database of block headers, you can quickly iterate through them – without making any network requests – to find relevant events via the logsBloom.
  • If you want a historical record of events from scratch, scope your filters appropriately and iterate through small batches of blocks using get_logs.

Hopefully you learned something and this is enough to get you going on your next project! Join the Ethereum Python Community Discord to give and get support along the way, and follow @EthereumPython for more tutorials and release updates as they come out. Happy building! 🐍