Изи ишью: 5 минут, зашли и вышли
Нужно просто в тестовых клиентах отфильтровывать тех подписчиков, что еще не запущены.
Нужно подправить все места включения
https://github.com/ag2ai/faststream/issues/2053
#faststream
Нужно просто в тестовых клиентах отфильтровывать тех подписчиков, что еще не запущены.
Нужно подправить все места включения
self.broker.subscribers и добавить соответствующий тестhttps://github.com/ag2ai/faststream/issues/2053
#faststream
GitHub
Bug: Subscribers registered in runtime do not clean up after close · Issue #2053 · ag2ai/faststream
I am trying to use the new dynamic subscription from https://github.com/airtai/faststream/releases/tag/0.5.0 (see point number 9). While adding the dynamic subscription works well, I have issues cl...
🚀 New issue to ag2ai/faststream by @jsonvot
📝 Bug: The coexistence issue between URL and virtualhost (#2652)
Describe the bug
In version v0.5.33, the first method works properly, and the trailing slash (/) at the end of the URL cannot be omitted. However, in versions >=0.5.34, due to additional handling of virtualhost, only parameter-based formats like 2, 3, 4 and 5 are supported.
I believe method 5 is the most intuitive and should be handled correctly, but it is currently treated as an invalid format. Using this method will result in the following error:
🖼️Image
Environment
faststream[rabbit]>=0.5.34
#bug #good_first_issue #faststream #ag2ai
sent via relator
📝 Bug: The coexistence issue between URL and virtualhost (#2652)
Describe the bug
import asyncio
from faststream.rabbit import RabbitBroker
async def pub():
broker = RabbitBroker('amqp://guest:guest@localhost:5672/', virtualhost='/domestic-aed') # 1
#broker = RabbitBroker('amqp://guest:guest@localhost:5672', virtualhost='//domestic-aed') # 2
#broker = RabbitBroker('amqp://guest:guest@localhost:5672/', virtualhost='//domestic-aed') # 3
#broker = RabbitBroker('amqp://guest:guest@localhost:5672//domestic-aed') # 4
#broker = RabbitBroker('amqp://guest:guest@localhost:5672', virtualhost='/domestic-aed') # 5
async with broker:
await broker.publish(
"Hi!",
queue="test-queue",
exchange="test-exchange"
)
asyncio.run(pub())
In version v0.5.33, the first method works properly, and the trailing slash (/) at the end of the URL cannot be omitted. However, in versions >=0.5.34, due to additional handling of virtualhost, only parameter-based formats like 2, 3, 4 and 5 are supported.
I believe method 5 is the most intuitive and should be handled correctly, but it is currently treated as an invalid format. Using this method will result in the following error:
🖼️Image
Environment
faststream[rabbit]>=0.5.34
#bug #good_first_issue #faststream #ag2ai
sent via relator
🚀 New issue to ag2ai/faststream by @Sehat1137
📝 Skip relator notification from the dependabot (#2665)
Improve pipeline so that it is not triggered by events from dependabot.
https://github.com/ag2ai/faststream/blob/main/.github/workflows/relator.yaml#L19
#good_first_issue #github_actions #faststream #ag2ai
sent via relator
📝 Skip relator notification from the dependabot (#2665)
Improve pipeline so that it is not triggered by events from dependabot.
https://github.com/ag2ai/faststream/blob/main/.github/workflows/relator.yaml#L19
#good_first_issue #github_actions #faststream #ag2ai
sent via relator
🤔3
🚀 New issue to ag2ai/faststream by @GrigoriyKuzevanov
📝 Bug: min_idle_time ignored when group and consumer are specified (#2678)
Describe the bug
When a StreamSub has both 'group' and 'consumer', and 'min_idle_time' specified, Faststream uses 'XREADGROUP' instead of 'XAUTOCALIM'
How to reproduce
Include source code:
Redis MONITOR output shows:
Expected behavior
Observed behavior
I suppose that a root cause in
Or i just misunderstand the logic.
Environment
Running FastStream 0.6.3 with CPython 3.12.3 on Linux
#bug #good_first_issue #faststream #ag2ai
sent via relator
📝 Bug: min_idle_time ignored when group and consumer are specified (#2678)
Describe the bug
When a StreamSub has both 'group' and 'consumer', and 'min_idle_time' specified, Faststream uses 'XREADGROUP' instead of 'XAUTOCALIM'
How to reproduce
Include source code:
from faststream import FastStream
from faststream.redis import RedisBroker, StreamSub
broker = RedisBroker("redis://localhost:6379")
@broker.subscriber(
stream=StreamSub(
"orders",
group="processors",
consumer="claimer",
min_idle_time=10000, # Should trigger XAUTOCLAIM
)
)
async def claiming_handler(msg):
print("Should use XAUTOCLAIM, but uses XREADGROUP")
app = FastStream(broker)
Redis MONITOR output shows:
XREADGROUP GROUP processors claimer BLOCK 100 STREAMS orders >
Expected behavior
XAUTOCLAIM orders processors claimer 10000 0-0 COUNT 1
Observed behavior
I suppose that a root cause in
faststream/redis/subscriber/use_cases/stream_subscriber, method _StreamHandlerMixin.start():if stream.group and stream.consumer: # ← Checked FIRST
# Uses XREADGROUP
...
elif self.stream_sub.min_idle_time is None:
# Uses XREAD
...
else:
# Uses XAUTOCLAIM ← Never reached when group is set!
...
Or i just misunderstand the logic.
Environment
Running FastStream 0.6.3 with CPython 3.12.3 on Linux
#bug #good_first_issue #faststream #ag2ai
sent via relator
🚀 New issue to ag2ai/faststream by @gaby
📝 bug: Usage of custom logger results in no logs (#2677)
Is your feature request related to a problem? Please describe.
The built-logger is configured to always add colors, even when passing a logger to faststream. This is hardcoded here https://github.com/ag2ai/faststream/blob/main/faststream/_internal/logger/logging.py#L80 This affects systems collecting logs from faststream hosts. This makes loga generated by faststream to show in raw text as
Describe the solution you'd like
Make the
Describe alternatives you've considered
Writing a custom log parser.
#enhancement #good_first_issue #faststream #ag2ai
sent via relator
📝 bug: Usage of custom logger results in no logs (#2677)
Is your feature request related to a problem? Please describe.
The built-logger is configured to always add colors, even when passing a logger to faststream. This is hardcoded here https://github.com/ag2ai/faststream/blob/main/faststream/_internal/logger/logging.py#L80 This affects systems collecting logs from faststream hosts. This makes loga generated by faststream to show in raw text as
"\033[36mDEBUG\033[0m" instead of DEBUG.Describe the solution you'd like
Make the
use_colors param configurable instead of a hardcoded value.Describe alternatives you've considered
Writing a custom log parser.
#enhancement #good_first_issue #faststream #ag2ai
sent via relator
🚀 New issue to ag2ai/faststream by @nectarindev
📝 feat: merge `Broker(context=...)` and `FastStream(context=...)` at broker-level (#2693)
I recently migrated my project to faststream 0.6. I was very interested in how I could add my dependencies to the context. Prior to version 0.6, I did something like this:
I launched the broker as part of my application in lifespan without using the FastStream class.
For version 0.6, I saw examples where it was suggested to pass the context to FastStream, but that solution did not suit me. I discovered that the broker also accepts context, and that solves my problem:
But I also discovered that if I create a FastStream instance, its context will be used, even though I didn't use it to start the broker.
I'm not sure that's normal behavior. It would make much more sense if only the broker's dependency were available.
⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯
Running FastStream 0.6.3 with CPython 3.12.4 on Linux
#enhancement #good_first_issue #core #faststream #ag2ai
sent via relator
📝 feat: merge `Broker(context=...)` and `FastStream(context=...)` at broker-level (#2693)
I recently migrated my project to faststream 0.6. I was very interested in how I could add my dependencies to the context. Prior to version 0.6, I did something like this:
from faststream.annotations import ContextRepo
from faststream.kafka import KafkaBroker
from faststream.utils.context import context
broker = KafkaBroker()
@broker.subscriber("my_topic", group_id="my_group")
async def handle(
context: ContextRepo,
):
print("dependency: ", context.get("dependency")) # 42
async def lifespan(*args, **kwargs):
context.set_global("dependency", 42)
await broker.start()
try:
yield
finally:
await broker.stop()
I launched the broker as part of my application in lifespan without using the FastStream class.
For version 0.6, I saw examples where it was suggested to pass the context to FastStream, but that solution did not suit me. I discovered that the broker also accepts context, and that solves my problem:
broker = KafkaBroker(context=ContextRepo({"dependency": 42}))
...
async def lifespan(*args, **kwargs):
await broker.start()
try:
yield
finally:
await broker.stop()
But I also discovered that if I create a FastStream instance, its context will be used, even though I didn't use it to start the broker.
from fastapi import FastAPI
from faststream import ContextRepo, FastStream
from faststream.kafka import KafkaBroker
broker = KafkaBroker(context=ContextRepo({"broker_dependency": 2}))
app = FastStream(broker, context=ContextRepo({"application_dependency": 1}))
@broker.subscriber("my_topic", group_id="my_group")
async def handle(
context: ContextRepo,
):
print("broker_dependency: ", context.get("broker_dependency")) # None
print("application_dependency: ", context.get("application_dependency")) # 1
async def lifespan(*args, **kwargs):
await broker.start()
try:
yield
finally:
await broker.stop()
asgi = FastAPI(lifespan=lifespan)
I'm not sure that's normal behavior. It would make much more sense if only the broker's dependency were available.
⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯
Running FastStream 0.6.3 with CPython 3.12.4 on Linux
#enhancement #good_first_issue #core #faststream #ag2ai
sent via relator
❤1