faust.livecheck.app

LiveCheck - Faust Application.

class faust.livecheck.app.LiveCheck(id: str, *, test_topic_name: Optional[str] = None, bus_topic_name: Optional[str] = None, report_topic_name: Optional[str] = None, bus_concurrency: Optional[int] = None, test_concurrency: Optional[int] = None, send_reports: Optional[bool] = None, **kwargs: Any)[source]

LiveCheck application.

SCAN_CATEGORIES: ClassVar[List[str]] = ['faust.agent', 'faust.command', 'faust.page', 'faust.service', 'faust.task', 'livecheck.case']
class Signal(name: str = '', case: Optional[_Case] = None, index: int = -1)

Signal for test case using Kafka.

Used to wait for something to happen elsewhere.

async send(value: Optional[VT] = None, *, key: Optional[Any] = None, force: bool = False) None

Notify test that this signal is now complete.

Return type:

None

async wait(*, key: Optional[Any] = None, timeout: Optional[Union[timedelta, float, str]] = None) VT

Wait for signal to be completed.

Return type:

~VT

name: str
case: _Case
index: int
class Case(*, app: _LiveCheck, name: str, probability: Optional[float] = None, warn_stalled_after: Optional[Union[timedelta, float, str]] = None, active: Optional[bool] = None, signals: Optional[Iterable[BaseSignal]] = None, test_expires: Optional[Union[timedelta, float, str]] = None, frequency: Optional[Union[timedelta, float, str]] = None, realtime_logs: Optional[bool] = None, max_history: Optional[int] = None, max_consecutive_failures: Optional[int] = None, url_timeout_total: Optional[float] = None, url_timeout_connect: Optional[float] = None, url_error_retries: Optional[int] = None, url_error_delay_min: Optional[float] = None, url_error_delay_backoff: Optional[float] = None, url_error_delay_max: Optional[float] = None, **kwargs: Any)

LiveCheck test case.

Runner

alias of TestRunner

active: bool = True
consecutive_failures: int = 0
property current_execution: Optional[TestRunner]

Return the currently executing TestRunner in this task. :rtype: _GenericAlias[TestRunner, None]

property current_test: Optional[TestExecution]

Return the currently active test in this task (if any). :rtype: _GenericAlias[TestExecution, None]

async execute(test: TestExecution) None

Execute test using TestRunner.

Return type:

None

frequency: Optional[float] = None

How often we execute the test using fake data (define Case.make_fake_request()).

Set to None if production traffic is frequent enough to satisfy warn_stalled_after.

frequency_avg: Optional[float] = None
async get_url(url: Union[str, URL], **kwargs: Any) Optional[bytes]

Perform GET request using HTTP client.

Return type:

_GenericAlias[bytes, None]

property label: str

Return human-readable label for this test case. :rtype: str

last_fail: Optional[float] = None

Timestamp of when the suite last failed.

last_test_received: Optional[float] = None

The warn_stalled_after timer uses this to keep track of either when a test was last received, or the last time the timer timed out.

latency_avg: Optional[float] = None
logger: logging.Logger = <Logger faust.livecheck.case (WARNING)>
async make_fake_request() None
Return type:

None

max_consecutive_failures: int = 30
max_history: int = 100

Max items to store in latency_history and runtime_history.

maybe_trigger(id: Optional[str] = None, *args: Any, **kwargs: Any) AsyncGenerator[Optional[TestExecution], None]

Schedule test execution, or not, based on probability setting.

Return type:

_GenericAlias[_GenericAlias[TestExecution, None], None]

async on_suite_fail(exc: SuiteFailed, new_state: State = State.FAIL) None

Call when the suite fails.

Return type:

None

async on_test_error(runner: TestRunner, exc: BaseException) None

Call when a test execution raises an exception.

Return type:

None

async on_test_failed(runner: TestRunner, exc: BaseException) None

Call when invariant in test execution fails.

Return type:

None

async on_test_pass(runner: TestRunner) None

Call when a test execution passes.

Return type:

None

async on_test_skipped(runner: TestRunner) None

Call when a test is skipped.

Return type:

None

async on_test_start(runner: TestRunner) None

Call when a test starts executing.

Return type:

None

async on_test_timeout(runner: TestRunner, exc: BaseException) None

Call when a test execution times out.

Return type:

None

async post_report(report: TestReport) None

Publish test report.

Return type:

None

async post_url(url: Union[str, URL], **kwargs: Any) Optional[bytes]

Perform POST request using HTTP client.

Return type:

_GenericAlias[bytes, None]

probability: float = 0.5

Probability of test running when live traffic is going through.

realtime_logs = False
async resolve_signal(key: str, event: SignalEvent) None

Mark test execution signal as resolved.

Return type:

None

async run(*test_args: Any, **test_kwargs: Any) None

Override this to define your test case.

Return type:

None

runtime_avg: Optional[float] = None
property seconds_since_last_fail: Optional[float]

Return number of seconds since any test failed. :rtype: _GenericAlias[float, None]

state_transition_delay: float = 60.0
status: State = 'INIT'

Current state of this test.

test_expires: timedelta = datetime.timedelta(seconds=10800)
total_failures: int = 0
async trigger(id: Optional[str] = None, *args: Any, **kwargs: Any) TestExecution

Schedule test execution ASAP.

Return type:

TestExecution

url_error_delay_backoff: float = 1.5
url_error_delay_max: float = 5.0
url_error_delay_min: float = 0.5
url_error_retries: int = 10
async url_request(method: str, url: Union[str, URL], **kwargs: Any) Optional[bytes]

Perform URL request using HTTP client.

Return type:

_GenericAlias[bytes, None]

url_timeout_connect: Optional[float] = None
url_timeout_total: Optional[float] = 300.0
warn_stalled_after: float = 1800.0

Timeout in seconds for when after we warn that nothing is processing.

app: _LiveCheck
name: str

Name of the test If not set this will be generated out of the subclass name.

latency_history: Deque[float]
frequency_history: Deque[float]
runtime_history: Deque[float]
signals: Dict[str, BaseSignal]
total_signals: int
total_by_state: Counter[State]
log: CompositeLogger
diag: DiagT
async_exit_stack: AsyncExitStack
exit_stack: ExitStack
classmethod for_app(app: AppT, *, prefix: str = 'livecheck-', web_port: int = 9999, test_topic_name: Optional[str] = None, bus_topic_name: Optional[str] = None, report_topic_name: Optional[str] = None, bus_concurrency: Optional[int] = None, test_concurrency: Optional[int] = None, send_reports: Optional[bool] = None, **kwargs: Any) LiveCheck[source]

Create LiveCheck application targeting specific app.

The target app will be used to configure the LiveCheck app.

Return type:

LiveCheck

test_topic_name: str = 'livecheck'
bus_topic_name: str = 'livecheck-bus'
report_topic_name: str = 'livecheck-report'
bus_concurrency: int = 30

Number of concurrent actors processing signal events.

test_concurrency: int = 100

Number of concurrent actors executing test cases.

send_reports: bool = True

Unset this if you don’t want reports to be sent to the report_topic_name topic.

cases: Dict[str, Case]
property current_test: Optional[TestExecution]

Return the current test context (if any). :rtype: _GenericAlias[TestExecution, None]

on_produce_attach_test_headers(sender: AppT, key: Optional[bytes] = None, value: Optional[bytes] = None, partition: Optional[int] = None, timestamp: Optional[float] = None, headers: Optional[List[Tuple[str, bytes]]] = None, signal: Optional[BaseSignalT] = None, **kwargs: Any) None[source]

Attach test headers to Kafka produce requests.

Return type:

None

case(*, name: ~typing.Optional[str] = None, probability: ~typing.Optional[float] = None, warn_stalled_after: ~typing.Union[~datetime.timedelta, float, str] = datetime.timedelta(seconds=1800), active: ~typing.Optional[bool] = None, test_expires: ~typing.Optional[~typing.Union[~datetime.timedelta, float, str]] = None, frequency: ~typing.Optional[~typing.Union[~datetime.timedelta, float, str]] = None, max_history: ~typing.Optional[int] = None, max_consecutive_failures: ~typing.Optional[int] = None, url_timeout_total: ~typing.Optional[float] = None, url_timeout_connect: ~typing.Optional[float] = None, url_error_retries: ~typing.Optional[float] = None, url_error_delay_min: ~typing.Optional[float] = None, url_error_delay_backoff: ~typing.Optional[float] = None, url_error_delay_max: ~typing.Optional[float] = None, base: ~typing.Type[~faust.livecheck.case.Case] = <class 'faust.livecheck.case.Case'>) Callable[[Type], Case][source]

Decorate class to be used as a test case.

Return type:

_GenericAlias[_GenericAlias[+CT_co], Case]

Returns:

faust.livecheck.Case.

add_case(case: Case) Case[source]

Add and register new test case.

Return type:

Case

async post_report(report: TestReport) None[source]

Publish test report to reporting topic.

Return type:

None

logger: logging.Logger = <Logger faust.livecheck.app (WARNING)>
async on_start() None[source]

Call when LiveCheck application starts.

Return type:

None

on_startup_finished: Optional[Callable]
boot_strategy: BootStrategyT
agents: AgentManagerT
sensors: SensorDelegateT
fixups: MutableSequence[FixupT]
log: CompositeLogger
diag: DiagT
async_exit_stack: AsyncExitStack
exit_stack: ExitStack
async on_started() None[source]

Call when LiveCheck application is fully started.

Return type:

None

bus[source]

Topic used for signal communication.

pending_tests[source]

Topic used to keep pending test executions.

reports[source]

Topic used to log test reports.