Change history for Faust 1.1¶
This document contain change notes for bugfix releases in the Faust 1.1.x series. If you’re looking for changes in the latest series, please visit the latest Changelog.
For even older releases you can visit the History section.
1.1.3¶
- release-date:
2018-09-21 4:23 P.M PDT
- release-by:
Ask Solem (@ask)
Producer: Producing messages is now 8x to 20x faster.
- Stream: The
stream_publish_on_commit
setting is now disabled by default.
Some agents produce data into topics: they forward data after processing or modify tables requiring changelog events to be sent.
Kafka’s at-least-once delivery guarantee means we will never lose a message, and we can be certain any event sent to the source topic will be processed. It also means any source event can be processed multiple times.
If the source event is processed many times and part of the agents processing includes forwarding that event, or producing a new kind of event, then that will also happen as many times as the source event is reprocessed.
The
stream_publish_on_commit
setting attempts to minimize the chances of duplicate messages being produced, by buffering up any events sent in the agent and holding on to it until the offset of the source event is committed.Here’s an agent forwarding values to another topic:
@app.agent(source_topic) async def forward(stream): async for value in stream: await destination_topic.send(value=value)
If we execute this with
stream_publish_on_commit
enabled, then the send operation will be delayed until we have committed the offset for the source event.This works well when we commit often, but completely falls apart if the buffer grows too large and we have too much to do during commit.
The commit operation works like this (in pseudo code) when
stream_publish_on_commit
is enabled:async def commit(self): committable_offsets: Dict[TopicPartition, int] = ... # Operation A (send buffered messages related to source offsets) for tp, offset in committable_offsets.items(): send_messages_buffered_up_until_offset(tp, offset) # Operation B (actually tell Kafka the new offsets) consumer.commit(committable_offsets)
This is not an atomic operation - the worker could crash between completing Operation A and Operation B. If there are 1000 messages to send, it could send 500 of them then crash without committing.
In this case we end up with 500 duplicate messages when the source offsets are reprocessed. Is this safer than producing one and one, and committing fast? Probably not.
That said, if you make sure the buffer never grows too large then you can take advantage of this setting to actually reduce the number of duplicate messages sent when a source topic is reprocessed.
If you want to experiment with this, tweak the
broker_commit_every
andbroker_commit_interval
settings:app = faust.App('name', broker_commit_every=100, broker_commit_interval=1.0, stream_publish_on_commit=True)
The good news is that Kafka transactions are on the horizon. As soon as we have support in a Python client, we can perform this atomically, and without the overhead of buffering up messages until commit time (note from future: “exactly-once” was implemented in Faust 1.5).
- Stream: The
1.1.2¶
- release-date:
2018-09-19 5:09 P.M PDT
- release-by:
Ask Solem (@ask)
Requirements
Now depends on Mode 1.17.3.
Agent: Agents having concurrency=n was executing events n times.
An unrelated change caused these additional actors to have separate channels, when they should share the same channel.
The only tests verifying this was using mocks, so we’ve added a new functional test in
t/functional/agents
to be sure it won’t happen again.This test also demonstrated a case of starvation when using concurrency: a single concurrency slot could starve others from doing work. To fix this a
sleep(0)
was added toStream.__aiter__
, this could improve performance in general for workers with many agents.Huge thanks to Zhy on the Faust slack channel for testing and identifying this issue.
Agent: Less logging noise when using
concurrency
.This removes the additionally emitted “Starting…”/”Stopping…” logs, especially noisy with
@app.agent(concurrency=1000)
.
1.1.1¶
- release-date:
2018-09-17 4:06 P.M PDT
- release-by:
Ask Solem (@ask)
Requirements
Now depends on Mode 1.17.2.
- Web: Blueprint registered to app with URL prefix would end up
having double-slash.
- Documentation: Added project layout suggestions
to the application user guide.
Types: annotations now passing checks on mypy 0.630.
1.1.0¶
- release-date:
2018-09-14 1:07 P.M PDT
- release-by:
Ask Solem (@ask)
Important Notes¶
API: Agent/Channel.send now support keyword-only arguments only
Users often make the mistake of doing:
channel.send(x)
and expect that to send
x
as the value.But the signature is
(key, value, ...)
, so it ends up beingchannel.send(key=x, value=None)
.Fixing this will come in two parts:
Faust 1.1 (this change): Make them keyword-only arguments
This will make it an error if the names of arguments are not specified:
channel.send(key, value)
Needs to be changed to:
channel.send(key=key, value=value)
- Faust 1.2: We will change the signature
to
channel.send(value, key=key, ...)
At this stage all existing code will have changed to using keyword-only arguments.
App: The default key serializer is now
raw
(Issue #142).The default value serializer will still be
json
, but for keys it does not make as much sense to use json as the default: keys are very rarely expressed using complex structures.If you depend on the Faust 1.0 behavior you should override the default key serializer for the app:
app = faust.App('myapp', ..., key_serializer='json')
Contributed by Allison Wang (@allisonwang)
No longer depends on click_completion
If you want to use the shell completion command, you now have to install that dependency locally first:
$ ./examples/withdrawals.py completion Usage: withdrawals.py completion [OPTIONS] Error: Missing required dependency, but this is easy to fix. Run `pip install click_completion` from your virtualenv and try again!
Installing click_completion:
$ pip install click_completion [...]
News¶
Requirements
Now depends on Mode 1.17.1.
No longer depends on click_completion
Now works with CPython 3.6.0.
Models: Record: Now supports decimals option to convert string decimals back to Decimal
This can be used for any model to enable “Decimal-fields”:
class Fundamentals(faust.Record, decimals=True): open: Decimal high: Decimal low: Decimal volume: Decimal
When serialized this model will use string for decimal fields (the Javascript float type cannot be used without losing precision, it is a float after all), but when deserializing Faust will reconstruct them as Decimal objects from that string.
Model: Records now support custom coercion handlers.
Coercion converts one type into another, for example from string to
datettime
, or int/string toDecimal
.In models this means conversion from the serialized form back into a corresponding Python type.
To define a model where all
UUID
fields are serialized to string, but then converted back toUUID
objects when deserialized, do this:from uuid import UUID import faust class Account(faust.Record, coercions={UUID: UUID}): id: UUID
What about non-json serializable types?
The use of UUID in this example leaves one important detail out: json doesn’t support this type so how can models serialize it?
The Faust JSON serializer adds support for UUID objects by default, but if you have a custom class you would need to add that capability by adding a
__json__
handler:class MyType: def __init__(self, value: str): self.value = value def __json__(self): return self.value
You’d get tired writing this out for every class, so why not make an abstract model subclass:
from uuid import UUID import faust class UUIDAwareRecord(faust.Record, abstract=True, coercions={UUID: UUID}): ... class Account(UUIDAwareRecord): id: UUID
App: New
ssl_context
adds authentication support to Kafka.Contributed by Mika Eloranta (@melor).
Monitor: New Datadog monitor (Issue #160)
Contributed by Allison Wang (@allisonwang).
- App:
@app.task
decorator now acceptson_leader
argument (Issue #131).
Tasks created using the
@app.task
decorator will run once a worker is fully started.Similar to the
@app.timer
decorator, you can now create one-shot tasks that run on the leader worker only:@app.task(on_leader=True) async def mytask(): print('WORKER STARTED, AND I AM THE LEADER')
The decorated function may also accept the
app
as an argument:@app.task(on_leader=True) async def mytask(app): print(f'WORKER FOR APP {app} STARTED, AND I AM THE LEADER')
- App:
App: New
app.producer_only
attribute.If set the worker will start the app without consumer/tables/agents/topics.
App:
app.http_client
property is now read-write.Channel: In-memory channels were not working as expected.
Channel.send(key=key, value=value)
now works as expected.app.channel()
accidentally set themaxsize
to 1 by default, creating a deadlock.Channel.send()
now disregards thestream_publish_on_commit
setting.
Transport: aiokafka: Support timestamp-less messages
Fixes error when data sent with old Kafka broker not supporting timestamps:
[2018-08-27 08:00:49,262: ERROR]: [^--Consumer]: Drain messages raised: TypeError("unsupported operand type(s) for /: 'NoneType' and 'float'",) Traceback (most recent call last): File "faust/transport/consumer.py", line 497, in _drain_messages async for tp, message in ait: File "faust/transport/drivers/aiokafka.py", line 449, in getmany record.timestamp / 1000.0, TypeError: unsupported operand type(s) for /: 'NoneType' and 'float'
Contributed by Mika Eloranta (@melor).
Distribution:
pip install faust
no longer installs the examples directory.Fix contributed by Michael Seifert (@seifertm)
Web: Adds exception handling to views.
A view can now bail out early via raise self.NotFound() for example.
Web:
@table_route
decorator now supports taking key from the URL path.This is now used in the
examples/word_count.py
example to add an endpoint/count/{word}/
that routes to the correct worker with that count:@app.page('/word/{word}/count/') @table_route(table=word_counts, match_info='word') async def get_count(web, request, word): return web.json({ word: word_counts[word] })
Web: Support reverse lookup from view name via
url_for
web.url_for(view_name, **params)
Web: Adds support for Flask-like “blueprints”
Blueprint is basically just a description of a reusable app that you can add to your web application.
Blueprints are commonly used in most Flask-like web frameworks, but Flask blueprints are not compatible with e.g. Sanic blueprints.
The Faust blueprint is not directly compatible with any of them, but that should be fine.
To define a blueprint:
from faust import web blueprint = web.Blueprint('user') @blueprint.route('/', name='list') class UserListView(web.View): async def get(self, request: web.Request) -> web.Response: return self.json({'hello': 'world'}) @blueprint.route('/{username}/', name='detail') class UserDetailView(web.View): async def get(self, request: web.Request) -> web.Response: name = request.match_info['username'] return self.json({'hello': name}) async def post(self, request: web.Request) -> web.Response: ... async def delete(self, request: web.Request) -> web.Response: ...
Then to add the blueprint to a Faust app you register it:
blueprint.register(app, url_prefix='/users/')
Note
You can also create views from functions (in this case it will only support GET):
@blueprint.route('/', name='index') async def hello(self, request): return self.text('Hello world')
Why?
Asyncio web frameworks are moving quickly, and we want to be able to quickly experiment with different backend drivers.
Blueprints is a tiny abstraction that fit well into the already small web abstraction that we do have.
Documentation and examples improvements by
Tom Forbes (@orf).
Matthew Grossman (@matthewgrossman)
Denis Kataev (@kataev)
Allison Wang (@allisonwang)
Huyuumi (@diplozoon)
Project¶
CI: The following Python versions have been added to the build matrix:
CPython 3.7.0
CPython 3.6.6
CPython 3.6.0
Git:
All the version tags have been cleaned up to follow the format
v1.2.3
.New active maintenance branches:
1.0
and1.1
.