Change history for Faust 1.5¶
This document contain change notes for bugfix releases in the Faust 1.5.x series. If you’re looking for changes in the latest series, please visit the latest Changelog.
For even older releases you can visit the History section.
1.5.4¶
- release-date:
2019-04-9 2:09 P.M PST
- release-by:
Ask Solem (@ask)
New
producer_api_version
setting.This can be set to the value “0.10” to remove headers from all messages produced.
Use this if you have downstream consumers that do not support the new Kafka protocol format yet.
The
stream_recovery_delay
setting has been disabled by default.After rebalancing the worker will sleep a bit before starting recovery, the idea being that another recovery may be waiting just behind it so we wait a bit, but this has shown to be not as effective as intended.
Web: Cache can now be configured to take headers into account.
Create the cache manager for your blueprint with the
include_headers
argument:cache = blueprint.cache(timeout=300.0, include_headers=True)
Contributed by Sanyam Satia (@ssatia).
1.5.3¶
- release-date:
2019-04-06 11:25 P.M PST
- release-by:
Ask Solem (@ask)
Requirements:
Now depends on robinhood-aiokafka 1.0.2
This version disables the “LeaveGroup” timeout added in 1.0.0, as it was causing problems.
Documentation: Fixed spelling.
Tests: Fixed flaky regression test.
1.5.2¶
- release-date:
2019-03-28 11:00 A.M PST
- release-by:
Ask Solem (@ask)
Requirements
Now depends on Mode 3.1.1.
Timers: Prevent drift + add some tiny drift.
Thanks to Bob Haddleton (@bobh66).
App: Autodiscovery now avoids importing
__main__.py
(Issue #324).Added regression test.
The
stream_ack_exceptions
setting has been deprecated.It was not having any effect, and we have no current use for it.
The
stream_ack_cancelled_tasks
setting has been deprecated.It was not having any effect, and we have no current use for it.
App: Autodiscovery failed to load when using
app.main()
in some cases (Issue #323).Added regression test.
Worker: Fixed error during agent shutdown.
Monitor: Monitor assignment latency + assignments completed/failed.
Implemented in the default monitor, but also for statsd and datadog.
CLI: The faust program had the wrong help description.
Docs: Fixes typo in
web_cors_options
example.App: Do no wait for table recovery finished signal, if the app is not starting the recovery service.
1.5.1¶
- release-date:
2019-03-24 09:45 P.M PST
- release-by:
Ask Solem (@ask)
Fixed hanging in partition assignment introduced in Faust 1.5 (Issue #320).
Contributed by Bob Haddleton (@bobh66).
1.5.0¶
- release-date:
2019-03-22 02:18 P.M PST
- release-by:
Ask Solem (@ask)
Requirements
Now depends on robinhood-aiokafka 1.0.1
Now depends on Mode 3.1.
Exactly-Once semantics: New
processing_guarantee
setting.Experimental support for “exactly-once” semantics.
This mode ensures tables and counts in tables/windows are consistent even as nodes in the cluster are abruptly terminated.
To enable this mode set the
processing_guarantee
setting:App(processing_guarantee='exactly_once')
Note
If you do enable “exactly_once” for an existing app, you must make sure all workers are running the latest version and possibly starting from a clean set of intermediate topics.
You can accomplish this by bumping up the app version number:
App(version=2, processing_guarantee='exactly_once')
The new processing guarantee require a new version of the assignor protocol, for this reason a “exactly_once” worker will not work with older versions of Faust running in the same consumer group: so to roll out this change you will have to stop all the workers, deploy the new version and only then restart the workers.
New optimizations for stream processing and windows.
If Cython is available during installation, Faust will be installed with compiled extensions.
You can set the
NO_CYTHON
environment variable to disable the use of these extensions even if compiled.New
topic_allow_declare
setting.If disabled your faust worker instances will never actually declare topics.
Use this if your Kafka administrator does not allow you to create topics.
New
ConsumerScheduler
setting.This class can override how events are delivered to agents. The default will go round robin between both topics and partitions, to ensure all topic partitions get a chance of being processed.
Contributed by Miha Troha (@miatroha).
Authentication: Support for GSSAPI authentication.
See documentation for the
broker_credentials
setting.Contributed by Julien Surloppe (@jsurloppe).
Authentication: Support for SASL authentication.
See documentation for the
broker_credentials
setting.New
broker_credentials
setting can also be used to configure SSL authentication.Models: Records can now use comparison operators.
Comparison of models using the
>
,<
,>=
and<=
operators now work similarly todataclasses
.Models: Now raise an error if non-default fields follows default fields.
The following model will now raise an error:
class Account(faust.Record): name: str amount: int = 3 userid: str
This is because a non-default field is defined after a default field, and this would mess up argument ordering.
To define the model without error, make sure you move default fields below any non-default fields:
class Account(faust.Record): name: str userid: str amount: int = 3
Note
Remember that when adding fields to an already existing model you should always add new fields as optional fields.
This will help your application stay backward compatible.
App: Sending messages API now supports a
headers
argument.When sending messages you can now attach arbitrary headers as a dict, or list of tuples; where the values are bytes:
await topic.send(key=key, value=value, headers={'x': b'foo'})
Supported transports
Headers are currently only supported by the default aiokafka transport, and requires Kafka server 0.11 and later.
Agent: RPC operations can now take advantage of message headers.
The default way to attach metadata to values, such as the reply-to address and the correlation id, is to wrap the value in an envelope.
With headers support now landed we can use message headers for this:
@app.agent(use_reply_headers=True) async def x(stream): async for item in stream: yield item ** 2
Faust will be using headers by default in version 2.0.
App: Sending messages API now supports a
timestamp
argument (Issue #276).When sending messages you can now specify the timestamp of the message:
await topic.send(key=key, value=value, timestamp=custom_timestamp)
If no timestamp is provided the current time will be used (
time.time()
).Contributed by Miha Troha (@mihatroha).
App: New
consumer_auto_offset_reset
setting (Issue #267).Contributed by Ryan Whitten (@rwhitten577).
Stream:
group_by
repartitioned topic name now includes the agent name (Issue #284).App: Web server is no longer running in a separate thread by default.
Running the web server in a separate thread is beneficial as it will not be affected by back pressure in the main thread event loop, but it also makes programming harder when it cannot share the loop of the parent.
If you want to run the web server in a separate thread, use the new
web_in_thread
setting.App: New
web_in_thread
controls separate thread for web server.App: New
logging_config
setting.App: Autodiscovery now ignores modules matching “test” (Issue #242).
Contributed by Chris Seto (@chrisseto).
Transport: aiokafka transport now supports headers when using Kafka server versions 0.11 and later.
Tables: New flags can be used to check if actives/standbys are up to date.
app.tables.actives_ready
Set to
True
when tables have synced all active partitions.app.tables.standbys_ready
Set to
True
when standby partitions are up-to-date.
RocksDB: Now crash with
ConsistencyError
if the persisted offset is greater than the current highwater.This means the changelog topic has been modified in Kafka and the recorded offset no longer exists. We crash as we believe this require human intervention, but should some projects have less strict durability requirements we may make this an option.
RocksDB:
len(table)
now only counts databases for active partitions (Issue #270).Agent: Fixes crash when worker assigned no partitions and having the
isolated_partitions
flag enabled (Issue #181).Table: Fixes
KeyError
crash for already removed key.Table: WindowRange is no longer a
NamedTuple
.This will make it easier to avoid hashing mistakes such that window ranges are never represented as both normal tuple and named tuple variants in the table.
Transports: Adds experimental
confluent://
transport.This transport uses the confluent-kafka client.
It is not feature complete, and notably is missing sticky partition assignment so you should not use this transport for tables.
Warning
The
confluent://
transport is not recommended for production use at this time as it has several limitations.Stream: Fixed deadlock when using
Stream.take
to buffer events (Issue #262).Contributed by Nimi Wariboko Jr (@nemosupremo).
Web: Views can now define
options
method to implement a handler for the HTTPOPTIONS
method. (Issue #304)Contributed by Perk Lim (@perklun).
Stream: Fixed acking behavior of
Stream.take
(Issue #266).When
take
is buffering the events should be acked after processing the buffer is complete, instead it was acking when adding into the buffer.Fix contributed by Amit Ripshtos (@amitripshtos).
- Transport: Aiokafka was not limiting how many messages to read in
a fetch request (Issue #292).
Fix contributed by Miha Troha (@mihatroha).
Typing: Added type stubs for
faust.web.Request
.Typing: Fixed type stubs for
@app.agent
decorator.Web: Added support for Cross-Resource Origin Sharing headers (CORS).
See new
web_cors_options
setting.- Debugging: Added OpenTracing hooks to streams/tasks/timers/Crontabs
and rebalancing process.
To enable you have to define a custom
Tracer
class that will record and publish the traces to systems such as Jeager or Zipkin.This class needs to have a
.trace(name, **extra_context)
context manager:from typing import Any, Dict, import opentracing from opentracing.ext.tags import SAMPLING_PRIORITY class FaustTracer: _tracers: Dict[str, opentracing.Tracer] _default_tracer: opentracing.Tracer = None def __init__(self) -> None: self._tracers = {} @cached_property def default_tracer(self) -> opentracing.Tracer: if self._default_tracer is None: self._default_tracer = self.get_tracer('APP_NAME') def trace(self, name: str, sample_rate: float = None, **extra_context: Any) -> opentracing.Span: span = self.default_tracer.start_span( operation_name=name, tags=extra_context, ) if sample_rate is not None: priority = 1 if random.uniform(0, 1) < sample_rate else 0 span.set_tag(SAMPLING_PRIORITY, priority) return span def get_tracer(self, service_name: str) -> opentracing.Tracer: tracer = self._tracers.get(service_name) if tracer is None: tracer = self._tracers[service_name] = CREATE_TRACER(service_name) return tracer._tracer
After implementing the interface you need to set the
app.tracer
attribute:app = faust.App(...) app.tracer = FaustTracer()
That’s it! Now traces will go through your custom tracing implementation.
CLI: Commands
--help
output now always show the default for every parameter.Channels: Fixed bug in
channel.send
that caused a memory leak.This bug was not present when using
app.topic()
.Documentation: Improvements by:
Amit Rip (@amitripshtos).
Sebastian Roll (@SebastianRoll).
Mousse (@zibuyu1995).
Zhanzhao (Deo) Liang (@DeoLeung).
Testing:
99% total unit test coverage
New script to verify documentation defaults are up to date are run for every git commit.