Source code for faust.cli.send

"""Program ``faust send`` used to send events to agents and topics."""

import asyncio
import random
from typing import Any, Optional

from faust.types import CodecArg, K, RecordMetadata, V

from .base import AppCommand, argument, option

__all__ = ["send"]


[docs]class send(AppCommand): """Send message to agent/topic.""" topic: Any key: K key_serializer: CodecArg value: V value_serializer: CodecArg repeat: int min_latency: float max_latency: float options = [ option("--key-type", "-K", help="Name of model to serialize key into."), option("--key-serializer", help="Override default serializer for key."), option("--value-type", "-V", help="Name of model to serialize value into."), option("--value-serializer", help="Override default serializer for value."), option("--key", "-k", help="String value for key (use json if model)."), option("--partition", type=int, help="Specific partition to send to."), option("--repeat", "-r", type=int, default=1, help="Send message n times."), option( "--min-latency", type=float, default=0.0, help="Minimum delay between sending.", ), option( "--max-latency", type=float, default=0.0, help="Maximum delay between sending.", ), argument("entity"), argument("value", default=None, required=False), ]
[docs] async def run( self, entity: str, value: str, *args: Any, key: Optional[str] = None, key_type: Optional[str] = None, key_serializer: Optional[str] = None, value_type: Optional[str] = None, value_serializer: Optional[str] = None, partition: int = 1, timestamp: Optional[float] = None, repeat: int = 1, min_latency: float = 0.0, max_latency: float = 0.0, **kwargs: Any, ) -> Any: """Send message to topic/agent/channel.""" if key is not None: key = self.to_key(key_type, key) if value is not None: value = self.to_value(value_type, value) topic = self.to_topic(entity) for i in range(repeat): self.carp(f"k={key!r} v={value!r} -> {topic!r}...") fut_send_complete = await topic.send( key=key, value=value, partition=partition, timestamp=timestamp, key_serializer=key_serializer, value_serializer=value_serializer, ) meta: RecordMetadata = await fut_send_complete self.say(self.dumps(meta._asdict())) if i and max_latency: await asyncio.sleep( random.uniform(min_latency, max_latency) # nosec B311 ) await self.app.producer.stop()