Testing¶
Basics¶
To test an agent when unit testing or functional testing, use the special
Agent.test()
mode to send items to the stream while processing it locally:
app = faust.App('test-example')
class Order(faust.Record, serializer='json'):
account_id: str
product_id: str
amount: int
price: float
orders_topic = app.topic('orders', value_type=Order)
orders_for_account = app.Table('order-count-by-account', default=int)
@app.agent(orders_topic)
async def order(orders):
async for order in orders.group_by(Order.account_id):
orders_for_account[order.account_id] += 1
yield order
Our agent reads a stream of orders and keeps a count of them by account id in a distributed table also partitioned by the account id.
To test this agent we use order.test_context()
:
async def test_order():
# start and stop the agent in this block
async with order.test_context() as agent:
order = Order(account_id='1', product_id='2', amount=1, price=300)
# sent order to the test agents local channel, and wait
# the agent to process it.
await agent.put(order)
# at this point the agent already updated the table
assert orders_for_account[order.account_id] == 1
await agent.put(order)
assert orders_for_account[order.account_id] == 2
async def run_tests():
app.conf.store = 'memory://' # tables must be in-memory
await test_order()
if __name__ == '__main__':
import asyncio
loop = asyncio.get_event_loop()
loop.run_until_complete(run_tests())
For the rest of this guide we’ll be using https://pypi.org/project/pytest/ and https://pypi.org/project/pytest-asyncio/ for our examples. If you’re using a different testing framework you may have to adapt them a bit to work.
Testing with https://pypi.org/project/pytest/¶
Testing that an agent sends to topic/calls another agent.¶
When unit testing you should mock any dependencies of the agent being tested,
If your agent calls another function: mock that function to verify it was called.
If your agent sends a message to a topic: mock that topic to verify a message was sent.
If your agent calls another agent: mock the other agent to verify it was called.
Here’s an example agent that calls another agent:
import faust
app = faust.App('example-test-agent-call')
@app.agent()
async def foo(stream):
async for value in stream:
await bar.send(value)
yield value
@app.agent()
async def bar(stream):
async for value in stream:
yield value + 'YOLO'
To test these two agents you have to test them in isolation of each other:
first test foo
with bar
mocked, then in a different test do bar
:
import pytest
from unittest.mock import Mock, patch
from example import app, foo, bar
@pytest.fixture(scope="function")
def test_app(event_loop):
"""passing in event_loop helps avoid 'attached to a different loop' error"""
app.loop = event_loop
app.finalize()
app.conf.store = 'memory://'
app.flow_control.resume()
return app
@pytest.mark.asyncio()
async def test_foo(test_app):
with patch(__name__ + '.bar') as mocked_bar:
mocked_bar.send = mock_coro()
async with foo.test_context() as agent:
await agent.put('hey')
mocked_bar.send.assert_called_with('hey')
def mock_coro(return_value=None, **kwargs):
"""Create mock coroutine function."""
async def wrapped(*args, **kwargs):
return return_value
return Mock(wraps=wrapped, **kwargs)
@pytest.mark.asyncio()
async def test_bar(test_app):
async with bar.test_context() as agent:
event = await agent.put('hey')
assert agent.results[event.message.offset] == 'heyYOLO'
You can put the test_app fixture into a [conftest.py file](https://docs.pytest.org/en/6.2.x/fixture.html#scope-sharing-fixtures-across-classes-modules-packages-or-session). If the fixture is not in the same file as the app’s definition (which should be the case) you must import the app the fixture definition:
Note
The https://pypi.org/project/pytest-asyncio/ extension must be installed to run these tests. If you don’t have it use pip to install it:
$ pip install -U pytest-asyncio
Testing and windowed tables¶
If your table is windowed and you want to verify that the value for a key is
correctly set, use table[k].current(event)
to get the value placed within
the window of the current event:
import faust
import pytest
@pytest.mark.asyncio()
async def test_process_order():
app.conf.store = 'memory://'
async with process_order.test_context() as agent:
order = Order(account_id='1', product_id='2', amount=1, price=300)
event = await agent.put(order)
# windowed table: we select window relative to the current event
assert orders_for_account['1'].current(event) == 1
# in the window 3 hours ago there were no orders:
assert orders_for_account['1'].delta(3600 * 3, event)
class Order(faust.Record, serializer='json'):
account_id: str
product_id: str
amount: int
price: float
app = faust.App('test-example')
orders_topic = app.topic('orders', value_type=Order)
# order count within the last hour (window is a 1-hour TumblingWindow).
orders_for_account = app.Table(
'order-count-by-account', default=int,
).tumbling(3600).relative_to_stream()
@app.agent(orders_topic)
async def process_order(orders):
async for order in orders.group_by(Order.account_id):
orders_for_account[order.account_id] += 1
yield order