🚀 New issue to ag2ai/faststream by @melenudo
📝 Bug: auto_commit is always True in Confluent Kafka (#2610)
Describe the bug
Even if you use the deprecated autocommit subscriber parameter, or if you use the
How to reproduce
You can run this snippet (using the deprecated
In Kafka, you will notice that the message is automatically committed before
You can also debug the code and observe that in the consumer:
https://github.com/ag2ai/faststream/blob/8a4c60bdae02c7632c15ff1a1d15b268da6e095d/faststream/confluent/helpers/client.py#L236
Expected behavior
When use
Observed behavior
Environment
#goodfirstissue #bug
sent via relator
📝 Bug: auto_commit is always True in Confluent Kafka (#2610)
Describe the bug
Even if you use the deprecated autocommit subscriber parameter, or if you use the
ack_policy parameter with a value other than AckPolicy.ACK_FIRST, the consumer will set enable.auto.commit to True.How to reproduce
from asyncio import sleep
from pydantic import BaseModel, Field, NonNegativeFloat
from faststream import FastStream, Logger
from faststream.confluent import KafkaBroker
class DataBasic(BaseModel):
data: NonNegativeFloat = Field(
..., examples=[0.5], description="Float data example"
)
broker = KafkaBroker("localhost:9092")
app = FastStream(broker)
@broker.publisher("output_data")
@broker.subscriber("input_data", group_id="my-group", auto_commit=False)
async def on_input_data(msg: DataBasic, logger: Logger) -> DataBasic:
logger.info(msg)
await sleep(20)
return DataBasic(data=msg.data + 1.0)
You can run this snippet (using the deprecated
auto_commit; the same behavior can be observed if you use ack_policy=AckPolicy.ACK).In Kafka, you will notice that the message is automatically committed before
on_input_data finishes.You can also debug the code and observe that in the consumer:
https://github.com/ag2ai/faststream/blob/8a4c60bdae02c7632c15ff1a1d15b268da6e095d/faststream/confluent/helpers/client.py#L236
self.config always has the property enable.auto.commit to TrueExpected behavior
When use
@subscriber(...,ack_policy=AckPolicy.ACK) the autocommit must be disabled (same behavior for a policy different than AckPolicy.ACK_FIRST)Observed behavior
enable.auto.commit is always True ignoring subscriber parameters.Environment
Running FastStream 0.6.2 with CPython 3.12.9 on Darwin
#goodfirstissue #bug
sent via relator
🚀 New issue to ag2ai/faststream by @HelgeKrueger
📝 Bug: additional "Subscribe" in docs (#2617)
Describe the bug
In the sidebar, the subscriber is called "my_subscriberSubscribe" instead of "my_subscriber"
How to reproduce
then run
Screenshots
🖼️Image#good_first_issue #bug
sent via relator
📝 Bug: additional "Subscribe" in docs (#2617)
Describe the bug
In the sidebar, the subscriber is called "my_subscriberSubscribe" instead of "my_subscriber"
How to reproduce
from faststream import FastStream
from faststream.rabbit import RabbitBroker
broker = RabbitBroker()
@broker.subscriber("queue", title="my_subscriber")
async def test(): ...
app = FastStream(broker)
then run
uv run faststream docs serve main:app
Screenshots
🖼️Image#good_first_issue #bug
sent via relator
❤1