faust.livecheck.app
¶
LiveCheck - Faust Application.
- class faust.livecheck.app.LiveCheck(id: str, *, test_topic_name: Optional[str] = None, bus_topic_name: Optional[str] = None, report_topic_name: Optional[str] = None, bus_concurrency: Optional[int] = None, test_concurrency: Optional[int] = None, send_reports: Optional[bool] = None, **kwargs: Any)[source]¶
LiveCheck application.
- SCAN_CATEGORIES: ClassVar[List[str]] = ['faust.agent', 'faust.command', 'faust.page', 'faust.service', 'faust.task', 'livecheck.case']¶
- class Signal(name: str = '', case: Optional[_Case] = None, index: int = -1)¶
Signal for test case using Kafka.
Used to wait for something to happen elsewhere.
- async send(value: Optional[VT] = None, *, key: Optional[Any] = None, force: bool = False) None ¶
Notify test that this signal is now complete.
- Return type:
None
- async wait(*, key: Optional[Any] = None, timeout: Optional[Union[timedelta, float, str]] = None) VT ¶
Wait for signal to be completed.
- Return type:
~VT
- case: _Case¶
- class Case(*, app: _LiveCheck, name: str, probability: Optional[float] = None, warn_stalled_after: Optional[Union[timedelta, float, str]] = None, active: Optional[bool] = None, signals: Optional[Iterable[BaseSignal]] = None, test_expires: Optional[Union[timedelta, float, str]] = None, frequency: Optional[Union[timedelta, float, str]] = None, realtime_logs: Optional[bool] = None, max_history: Optional[int] = None, max_consecutive_failures: Optional[int] = None, url_timeout_total: Optional[float] = None, url_timeout_connect: Optional[float] = None, url_error_retries: Optional[int] = None, url_error_delay_min: Optional[float] = None, url_error_delay_backoff: Optional[float] = None, url_error_delay_max: Optional[float] = None, **kwargs: Any)¶
LiveCheck test case.
- Runner¶
alias of
TestRunner
- property current_execution: Optional[TestRunner]¶
Return the currently executing
TestRunner
in this task. :rtype:_GenericAlias
[TestRunner
,None
]
- property current_test: Optional[TestExecution]¶
Return the currently active test in this task (if any). :rtype:
_GenericAlias
[TestExecution
,None
]
- async execute(test: TestExecution) None ¶
Execute test using
TestRunner
.- Return type:
None
- frequency: Optional[float] = None¶
How often we execute the test using fake data (define Case.make_fake_request()).
Set to None if production traffic is frequent enough to satisfy
warn_stalled_after
.
- async get_url(url: Union[str, URL], **kwargs: Any) Optional[bytes] ¶
Perform GET request using HTTP client.
- Return type:
_GenericAlias
[bytes
,None
]
- last_test_received: Optional[float] = None¶
The warn_stalled_after timer uses this to keep track of either when a test was last received, or the last time the timer timed out.
- logger: logging.Logger = <Logger faust.livecheck.case (WARNING)>¶
- max_history: int = 100¶
Max items to store in
latency_history
andruntime_history
.
- maybe_trigger(id: Optional[str] = None, *args: Any, **kwargs: Any) AsyncGenerator[Optional[TestExecution], None] ¶
Schedule test execution, or not, based on probability setting.
- Return type:
_GenericAlias
[_GenericAlias
[TestExecution
,None
],None
]
- async on_suite_fail(exc: SuiteFailed, new_state: State = State.FAIL) None ¶
Call when the suite fails.
- Return type:
None
- async on_test_error(runner: TestRunner, exc: BaseException) None ¶
Call when a test execution raises an exception.
- Return type:
None
- async on_test_failed(runner: TestRunner, exc: BaseException) None ¶
Call when invariant in test execution fails.
- Return type:
None
- async on_test_pass(runner: TestRunner) None ¶
Call when a test execution passes.
- Return type:
None
- async on_test_skipped(runner: TestRunner) None ¶
Call when a test is skipped.
- Return type:
None
- async on_test_start(runner: TestRunner) None ¶
Call when a test starts executing.
- Return type:
None
- async on_test_timeout(runner: TestRunner, exc: BaseException) None ¶
Call when a test execution times out.
- Return type:
None
- async post_report(report: TestReport) None ¶
Publish test report.
- Return type:
None
- async post_url(url: Union[str, URL], **kwargs: Any) Optional[bytes] ¶
Perform POST request using HTTP client.
- Return type:
_GenericAlias
[bytes
,None
]
- realtime_logs = False¶
- async resolve_signal(key: str, event: SignalEvent) None ¶
Mark test execution signal as resolved.
- Return type:
None
- async run(*test_args: Any, **test_kwargs: Any) None ¶
Override this to define your test case.
- Return type:
None
- property seconds_since_last_fail: Optional[float]¶
Return number of seconds since any test failed. :rtype:
_GenericAlias
[float
,None
]
- async trigger(id: Optional[str] = None, *args: Any, **kwargs: Any) TestExecution ¶
Schedule test execution ASAP.
- Return type:
- async url_request(method: str, url: Union[str, URL], **kwargs: Any) Optional[bytes] ¶
Perform URL request using HTTP client.
- Return type:
_GenericAlias
[bytes
,None
]
- warn_stalled_after: float = 1800.0¶
Timeout in seconds for when after we warn that nothing is processing.
- app: _LiveCheck¶
- signals: Dict[str, BaseSignal]¶
- log: CompositeLogger¶
- diag: DiagT¶
- async_exit_stack: AsyncExitStack¶
- exit_stack: ExitStack¶
- classmethod for_app(app: AppT, *, prefix: str = 'livecheck-', web_port: int = 9999, test_topic_name: Optional[str] = None, bus_topic_name: Optional[str] = None, report_topic_name: Optional[str] = None, bus_concurrency: Optional[int] = None, test_concurrency: Optional[int] = None, send_reports: Optional[bool] = None, **kwargs: Any) LiveCheck [source]¶
Create LiveCheck application targeting specific app.
The target app will be used to configure the LiveCheck app.
- Return type:
- send_reports: bool = True¶
Unset this if you don’t want reports to be sent to the
report_topic_name
topic.
- property current_test: Optional[TestExecution]¶
Return the current test context (if any). :rtype:
_GenericAlias
[TestExecution
,None
]
- on_produce_attach_test_headers(sender: AppT, key: Optional[bytes] = None, value: Optional[bytes] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[List[Tuple[str, bytes]]] = None, signal: Optional[BaseSignalT] = None, **kwargs: Any) None [source]¶
Attach test headers to Kafka produce requests.
- Return type:
None
- case(*, name: ~typing.Optional[str] = None, probability: ~typing.Optional[float] = None, warn_stalled_after: ~typing.Union[~datetime.timedelta, float, str] = datetime.timedelta(seconds=1800), active: ~typing.Optional[bool] = None, test_expires: ~typing.Optional[~typing.Union[~datetime.timedelta, float, str]] = None, frequency: ~typing.Optional[~typing.Union[~datetime.timedelta, float, str]] = None, max_history: ~typing.Optional[int] = None, max_consecutive_failures: ~typing.Optional[int] = None, url_timeout_total: ~typing.Optional[float] = None, url_timeout_connect: ~typing.Optional[float] = None, url_error_retries: ~typing.Optional[float] = None, url_error_delay_min: ~typing.Optional[float] = None, url_error_delay_backoff: ~typing.Optional[float] = None, url_error_delay_max: ~typing.Optional[float] = None, base: ~typing.Type[~faust.livecheck.case.Case] = <class 'faust.livecheck.case.Case'>) Callable[[Type], Case] [source]¶
Decorate class to be used as a test case.
- Return type:
_GenericAlias
[_GenericAlias
[+CT_co],Case
]- Returns:
- async post_report(report: TestReport) None [source]¶
Publish test report to reporting topic.
- Return type:
None
- logger: logging.Logger = <Logger faust.livecheck.app (WARNING)>¶
- on_startup_finished: Optional[Callable]¶
- boot_strategy: BootStrategyT¶
- agents: AgentManagerT¶
- sensors: SensorDelegateT¶
- log: CompositeLogger¶
- diag: DiagT¶
- async_exit_stack: AsyncExitStack¶
- exit_stack: ExitStack¶