"""Program ``faust send`` used to send events to agents and topics."""
import asyncio
import random
from typing import Any, Optional
from faust.types import CodecArg, K, RecordMetadata, V
from .base import AppCommand, argument, option
__all__ = ["send"]
[docs]class send(AppCommand):
"""Send message to agent/topic."""
topic: Any
key: K
key_serializer: CodecArg
value: V
value_serializer: CodecArg
repeat: int
min_latency: float
max_latency: float
options = [
option("--key-type", "-K", help="Name of model to serialize key into."),
option("--key-serializer", help="Override default serializer for key."),
option("--value-type", "-V", help="Name of model to serialize value into."),
option("--value-serializer", help="Override default serializer for value."),
option("--key", "-k", help="String value for key (use json if model)."),
option("--partition", type=int, help="Specific partition to send to."),
option("--repeat", "-r", type=int, default=1, help="Send message n times."),
option(
"--min-latency",
type=float,
default=0.0,
help="Minimum delay between sending.",
),
option(
"--max-latency",
type=float,
default=0.0,
help="Maximum delay between sending.",
),
argument("entity"),
argument("value", default=None, required=False),
]
[docs] async def run(
self,
entity: str,
value: str,
*args: Any,
key: Optional[str] = None,
key_type: Optional[str] = None,
key_serializer: Optional[str] = None,
value_type: Optional[str] = None,
value_serializer: Optional[str] = None,
partition: int = 1,
timestamp: Optional[float] = None,
repeat: int = 1,
min_latency: float = 0.0,
max_latency: float = 0.0,
**kwargs: Any,
) -> Any:
"""Send message to topic/agent/channel."""
if key is not None:
key = self.to_key(key_type, key)
if value is not None:
value = self.to_value(value_type, value)
topic = self.to_topic(entity)
for i in range(repeat):
self.carp(f"k={key!r} v={value!r} -> {topic!r}...")
fut_send_complete = await topic.send(
key=key,
value=value,
partition=partition,
timestamp=timestamp,
key_serializer=key_serializer,
value_serializer=value_serializer,
)
meta: RecordMetadata = await fut_send_complete
self.say(self.dumps(meta._asdict()))
if i and max_latency:
await asyncio.sleep(
random.uniform(min_latency, max_latency) # nosec B311
)
await self.app.producer.stop()