Находки в опенсорсе: Python
950 subscribers
4 photos
138 links
Легкие задачки в опенсорсе из мира Python

Чат: @opensource_findings_chat
Download Telegram
🚀 New issue to ag2ai/faststream by @melenudo
📝 Bug: auto_commit is always True in Confluent Kafka (#2610)

Describe the bug

Even if you use the deprecated autocommit subscriber parameter, or if you use the ack_policy parameter with a value other than AckPolicy.ACK_FIRST, the consumer will set enable.auto.commit to True.

How to reproduce

from asyncio import sleep
from pydantic import BaseModel, Field, NonNegativeFloat

from faststream import FastStream, Logger
from faststream.confluent import KafkaBroker


class DataBasic(BaseModel):
data: NonNegativeFloat = Field(
..., examples=[0.5], description="Float data example"
)


broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@broker.publisher("output_data")
@broker.subscriber("input_data", group_id="my-group", auto_commit=False)
async def on_input_data(msg: DataBasic, logger: Logger) -> DataBasic:
logger.info(msg)
await sleep(20)
return DataBasic(data=msg.data + 1.0)


You can run this snippet (using the deprecated
auto_commit; the same behavior can be observed if you use ack_policy=AckPolicy.ACK).

In Kafka, you will notice that the message is automatically committed before
on_input_data finishes.

You can also debug the code and observe that in the consumer:

https://github.com/ag2ai/faststream/blob/8a4c60bdae02c7632c15ff1a1d15b268da6e095d/faststream/confluent/helpers/client.py#L236

self.config always has the property enable.auto.commit to True


Expected behavior
When use
@subscriber(...,ack_policy=AckPolicy.ACK) the autocommit must be disabled (same behavior for a policy different than AckPolicy.ACK_FIRST)

Observed behavior
enable.auto.commit is always True ignoring subscriber parameters.

Environment

Running FastStream 0.6.2 with CPython 3.12.9 on Darwin


#goodfirstissue #bug

sent via relator
🚀 New issue to ag2ai/faststream by @HelgeKrueger
📝 Bug: additional "Subscribe" in docs (#2617)

Describe the bug

In the sidebar, the subscriber is called "my_subscriberSubscribe" instead of "my_subscriber"

How to reproduce

from faststream import FastStream
from faststream.rabbit import RabbitBroker


broker = RabbitBroker()

@broker.subscriber("queue", title="my_subscriber")
async def test(): ...

app = FastStream(broker)

then run uv run faststream docs serve main:app

Screenshots

🖼️Image#good_first_issue #bug

sent via relator
1