faust.cli.faust

Program faust (umbrella command).

class faust.cli.faust.agents(ctx: Context, *args: Any, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, **kwargs: Any)[source]

List agents.

title = 'Agents'
headers = ['name', 'topic', 'help']
sortkey = operator.attrgetter('name')
options: Optional[OptionList] = [option('--local/--no-local', help='Include agents using a local channel')]
async run(local: bool) None[source]

Dump list of available agents in this application.

Return type:

None

agents(*, local: bool = False) Sequence[AgentT][source]

Convert list of agents to terminal table rows.

Return type:

_GenericAlias[AgentT]

agent_to_row(agent: AgentT) Sequence[str][source]

Convert agent fields to terminal table row.

Return type:

_GenericAlias[str]

app: AppT
key_serializer: Optional[Union[CodecT, str]]

The codec used to serialize keys. Taken from instance parameters or key_serializer.

value_serialier: Optional[Union[CodecT, str]]

The codec used to serialize values. Taken from instance parameters or value_serializer.

debug: bool
quiet: bool
workdir: str
datadir: str
json: bool
logfile: str
stdout: IO
stderr: IO
args: Tuple
kwargs: Dict
faust.cli.faust.call_command(command: str, args: Optional[List[str]] = None, stdout: Optional[IO] = None, stderr: Optional[IO] = None, side_effects: bool = False, **kwargs: Any) Tuple[int, IO, IO][source]
Return type:

_GenericAlias[int, IO[AnyStr], IO[AnyStr]]

class faust.cli.faust.clean_versions(ctx: Context, *args: Any, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, **kwargs: Any)[source]

Delete old version directories.

Warning

This command will result in the destruction of the following files:

  1. Table data for previous versions of the app.

async run() None[source]

Execute command.

Return type:

None

remove_old_versiondirs() None[source]

Remove data from old application versions from data directory.

Return type:

None

app: AppT
key_serializer: Optional[Union[CodecT, str]]

The codec used to serialize keys. Taken from instance parameters or key_serializer.

value_serialier: Optional[Union[CodecT, str]]

The codec used to serialize values. Taken from instance parameters or value_serializer.

debug: bool
quiet: bool
workdir: str
datadir: str
json: bool
logfile: str
stdout: IO
stderr: IO
args: Tuple
kwargs: Dict
class faust.cli.faust.completion(ctx: Context, *args: Any, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, **kwargs: Any)[source]

Output shell completion to be evaluated by the shell.

require_app = False
async run() None[source]

Dump click completion script for Faust CLI.

Return type:

None

shell() str[source]

Return the current shell used in this environment.

Return type:

str

app: AppT
key_serializer: Optional[Union[CodecT, str]]

The codec used to serialize keys. Taken from instance parameters or key_serializer.

value_serialier: Optional[Union[CodecT, str]]

The codec used to serialize values. Taken from instance parameters or value_serializer.

debug: bool
quiet: bool
workdir: str
datadir: str
json: bool
logfile: str
stdout: IO
stderr: IO
args: Tuple
kwargs: Dict
class faust.cli.faust.livecheck(*args: Any, **kwargs: Any)[source]

Manage LiveCheck instances.

app: AppT
key_serializer: Optional[Union[CodecT, str]]

The codec used to serialize keys. Taken from instance parameters or key_serializer.

value_serialier: Optional[Union[CodecT, str]]

The codec used to serialize values. Taken from instance parameters or value_serializer.

debug: bool
quiet: bool
workdir: str
datadir: str
json: bool
logfile: str
stdout: IO
stderr: IO
args: Tuple
kwargs: Dict
class faust.cli.faust.model(ctx: Context, *args: Any, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, **kwargs: Any)[source]

Show model detail.

headers = ['field', 'type', 'default']
options: Optional[OptionList] = [argument('name')]
async run(name: str) None[source]

Dump list of registered models to terminal.

Return type:

None

model_fields(model: Type[ModelT]) Sequence[Sequence[str]][source]

Convert model fields to terminal table rows.

Return type:

_GenericAlias[_GenericAlias[str]]

field(field: FieldDescriptorT) Sequence[str][source]

Convert model field model to terminal table columns.

Return type:

_GenericAlias[str]

model_to_row(model: Type[ModelT]) Sequence[str][source]

Convert model to terminal table row.

Return type:

_GenericAlias[str]

app: AppT
key_serializer: Optional[Union[CodecT, str]]

The codec used to serialize keys. Taken from instance parameters or key_serializer.

value_serialier: Optional[Union[CodecT, str]]

The codec used to serialize values. Taken from instance parameters or value_serializer.

debug: bool
quiet: bool
workdir: str
datadir: str
json: bool
logfile: str
stdout: IO
stderr: IO
args: Tuple
kwargs: Dict
class faust.cli.faust.models(ctx: Context, *args: Any, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, **kwargs: Any)[source]

List all available models as a tabulated list.

title = 'Models'
headers = ['name', 'help']
sortkey = operator.attrgetter('_options.namespace')
options: Optional[OptionList] = [option('--builtins/--no-builtins', default=False)]
async run(*, builtins: bool) None[source]

Dump list of available models in this application.

Return type:

None

models(builtins: bool) Sequence[Type[ModelT]][source]

Convert list of models to terminal table rows.

Return type:

_GenericAlias[_GenericAlias[ModelT]]

model_to_row(model: Type[ModelT]) Sequence[str][source]

Convert model fields to terminal table columns.

Return type:

_GenericAlias[str]

app: AppT
key_serializer: Optional[Union[CodecT, str]]

The codec used to serialize keys. Taken from instance parameters or key_serializer.

value_serialier: Optional[Union[CodecT, str]]

The codec used to serialize values. Taken from instance parameters or value_serializer.

debug: bool
quiet: bool
workdir: str
datadir: str
json: bool
logfile: str
stdout: IO
stderr: IO
args: Tuple
kwargs: Dict
class faust.cli.faust.reset(ctx: Context, *args: Any, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, **kwargs: Any)[source]

Delete local table state.

Warning

This command will result in the destruction of the following files:

  1. The local database directories/files backing tables (does not apply if an in-memory store like memory:// is used).

Notes

This data is technically recoverable from the Kafka cluster (if intact), but it’ll take a long time to get the data back as you need to consume each changelog topic in total.

It’d be faster to copy the data from any standbys that happen to have the topic partitions you require.

async run() None[source]

Execute command.

Return type:

None

async reset_tables() None[source]

Reset local state for all tables.

Return type:

None

app: AppT
key_serializer: Optional[Union[CodecT, str]]

The codec used to serialize keys. Taken from instance parameters or key_serializer.

value_serialier: Optional[Union[CodecT, str]]

The codec used to serialize values. Taken from instance parameters or value_serializer.

debug: bool
quiet: bool
workdir: str
datadir: str
json: bool
logfile: str
stdout: IO
stderr: IO
args: Tuple
kwargs: Dict
class faust.cli.faust.send(ctx: Context, *args: Any, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, **kwargs: Any)[source]

Send message to agent/topic.

topic: Any
key: Optional[Union[bytes, _ModelT, Any]]
key_serializer: Optional[Union[CodecT, str]]

The codec used to serialize keys. Taken from instance parameters or key_serializer.

value: Union[bytes, _ModelT, Any]
value_serializer: Optional[Union[CodecT, str]]
repeat: int
min_latency: float
max_latency: float
options: Optional[OptionList] = [option('--key-type', '-K', help='Name of model to serialize key into.'), option('--key-serializer', help='Override default serializer for key.'), option('--value-type', '-V', help='Name of model to serialize value into.'), option('--value-serializer', help='Override default serializer for value.'), option('--key', '-k', help='String value for key (use json if model).'), option('--partition', type=<class 'int'>, help='Specific partition to send to.'), option('--repeat', '-r', type=<class 'int'>, default=1, help='Send message n times.'), option('--min-latency', type=<class 'float'>, default=0.0, help='Minimum delay between sending.'), option('--max-latency', type=<class 'float'>, default=0.0, help='Maximum delay between sending.'), argument('entity'), argument('value', default=None, required=False)]
async run(entity: str, value: str, *args: Any, key: Optional[str] = None, key_type: Optional[str] = None, key_serializer: Optional[str] = None, value_type: Optional[str] = None, value_serializer: Optional[str] = None, partition: int = 1, timestamp: Optional[float] = None, repeat: int = 1, min_latency: float = 0.0, max_latency: float = 0.0, **kwargs: Any) Any[source]

Send message to topic/agent/channel.

Return type:

Any

class faust.cli.faust.tables(ctx: Context, *args: Any, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, **kwargs: Any)[source]

List available tables.

title = 'Tables'
async run() None[source]

Dump list of application tables to terminal.

Return type:

None

app: AppT
key_serializer: Optional[Union[CodecT, str]]

The codec used to serialize keys. Taken from instance parameters or key_serializer.

value_serialier: Optional[Union[CodecT, str]]

The codec used to serialize values. Taken from instance parameters or value_serializer.

debug: bool
quiet: bool
workdir: str
datadir: str
json: bool
logfile: str
stdout: IO
stderr: IO
args: Tuple
kwargs: Dict
class faust.cli.faust.worker(ctx: Context, *args: Any, key_serializer: Optional[Union[CodecT, str]] = None, value_serializer: Optional[Union[CodecT, str]] = None, **kwargs: Any)[source]

Start worker instance for given app.

daemon: bool = True
redirect_stdouts: Optional[bool] = True
worker_options = [option('--with-web/--without-web', default=True, help='Enable/disable web server and related components.'), option('--web-port', '-p', default=None, type=IntRange(1, 65535), help='Port to run web server on (default: 6066)'), option('--web-transport', default=None, type=URL, help='Web server transport (default: tcp:)'), option('--web-bind', '-b', type=<class 'str'>), option('--web-host', '-h', default=None, type=<class 'str'>, help='Canonical host name for the web server (default: 0.0.0.0)')]
options: Optional[OptionList] = [option('--with-web/--without-web', default=True, help='Enable/disable web server and related components.'), option('--web-port', '-p', default=None, type=IntRange(1, 65535), help='Port to run web server on (default: 6066)'), option('--web-transport', default=None, type=URL, help='Web server transport (default: tcp:)'), option('--web-bind', '-b', type=<class 'str'>), option('--web-host', '-h', default=None, type=<class 'str'>, help='Canonical host name for the web server (default: 0.0.0.0)'), option('--logfile', '-f', callback=<function compat_option.<locals>._callback>, expose_value=False, default=None, type=<click.types.Path object>, help='Path to logfile (default is <stderr>).'), option('--loglevel', '-l', callback=<function compat_option.<locals>._callback>, expose_value=False, default='WARN', type=Choice(['crit', 'error', 'warn', 'info', 'debug']), help='Logging level to use.'), option('--blocking-timeout', callback=<function compat_option.<locals>._callback>, expose_value=False, default=None, type=<class 'float'>, help='when --debug: Blocking detector timeout.'), option('--console-port', callback=<function compat_option.<locals>._callback>, expose_value=False, default=50101, type=IntRange(1, 65535), help='when --debug: Port to run debugger console on.')]
on_worker_created(worker: Worker) None[source]

Print banner when worker starts.

Return type:

None

as_service(loop: AbstractEventLoop, *args: Any, **kwargs: Any) ServiceT[source]

Return the service this command should execute.

For the worker we simply start the application itself.

Note

The application will be started using a faust.Worker.

Return type:

ServiceT

banner(worker: Worker) str[source]

Generate the text banner emitted before the worker starts.

Return type:

str

faust_ident() str[source]

Return Faust version information as ANSI string.

Return type:

str

platform() str[source]

Return platform identifier as ANSI string.

Return type:

str

app: AppT
key_serializer: Optional[Union[CodecT, str]]

The codec used to serialize keys. Taken from instance parameters or key_serializer.

value_serialier: Optional[Union[CodecT, str]]

The codec used to serialize values. Taken from instance parameters or value_serializer.

debug: bool
quiet: bool
workdir: str
datadir: str
json: bool
logfile: str
stdout: IO
stderr: IO
args: Tuple
kwargs: Dict