Находки в опенсорсе: Python
1.01K subscribers
4 photos
148 links
Легкие задачки в опенсорсе из мира Python

Чат: @opensource_findings_chat
Download Telegram
Изи ишью: 5 минут, зашли и вышли

Нужно просто в тестовых клиентах отфильтровывать тех подписчиков, что еще не запущены.

Нужно подправить все места включения self.broker.subscribers и добавить соответствующий тест

https://github.com/ag2ai/faststream/issues/2053

#faststream
🚀 New issue to ag2ai/faststream by @jsonvot
📝 Bug: The coexistence issue between URL and virtualhost (#2652)


Describe the bug

import asyncio
from faststream.rabbit import RabbitBroker

async def pub():
broker = RabbitBroker('amqp://guest:guest@localhost:5672/', virtualhost='/domestic-aed') # 1

#broker = RabbitBroker('amqp://guest:guest@localhost:5672', virtualhost='//domestic-aed') # 2
#broker = RabbitBroker('amqp://guest:guest@localhost:5672/', virtualhost='//domestic-aed') # 3
#broker = RabbitBroker('amqp://guest:guest@localhost:5672//domestic-aed') # 4

#broker = RabbitBroker('amqp://guest:guest@localhost:5672', virtualhost='/domestic-aed') # 5
async with broker:
await broker.publish(
"Hi!",
queue="test-queue",
exchange="test-exchange"
)
asyncio.run(pub())

In version v0.5.33, the first method works properly, and the trailing slash (/) at the end of the URL cannot be omitted. However, in versions >=0.5.34, due to additional handling of virtualhost, only parameter-based formats like 2, 3, 4 and 5 are supported.
I believe method 5 is the most intuitive and should be handled correctly, but it is currently treated as an invalid format. Using this method will result in the following error:
🖼️Image

Environment
faststream[rabbit]>=0.5.34


#bug #good_first_issue #faststream #ag2ai
sent via relator
🚀 New issue to ag2ai/faststream by @Sehat1137
📝 Skip relator notification from the dependabot (#2665)


Improve pipeline so that it is not triggered by events from dependabot.

https://github.com/ag2ai/faststream/blob/main/.github/workflows/relator.yaml#L19


#good_first_issue #github_actions #faststream #ag2ai
sent via relator
🤔3
🚀 New issue to ag2ai/faststream by @GrigoriyKuzevanov
📝 Bug: min_idle_time ignored when group and consumer are specified (#2678)


Describe the bug
When a StreamSub has both 'group' and 'consumer', and 'min_idle_time' specified, Faststream uses 'XREADGROUP' instead of 'XAUTOCALIM'

How to reproduce
Include source code:

from faststream import FastStream
from faststream.redis import RedisBroker, StreamSub

broker = RedisBroker("redis://localhost:6379")

@broker.subscriber(
stream=StreamSub(
"orders",
group="processors",
consumer="claimer",
min_idle_time=10000, # Should trigger XAUTOCLAIM
)
)
async def claiming_handler(msg):
print("Should use XAUTOCLAIM, but uses XREADGROUP")


app = FastStream(broker)

Redis MONITOR output shows:

XREADGROUP GROUP processors claimer BLOCK 100 STREAMS orders >

Expected behavior

XAUTOCLAIM orders processors claimer 10000 0-0 COUNT 1

Observed behavior
I suppose that a root cause in faststream/redis/subscriber/use_cases/stream_subscriber, method _StreamHandlerMixin.start():

if stream.group and stream.consumer:  # ← Checked FIRST
# Uses XREADGROUP
...
elif self.stream_sub.min_idle_time is None:
# Uses XREAD
...
else:
# Uses XAUTOCLAIM ← Never reached when group is set!
...

Or i just misunderstand the logic.

Environment
Running FastStream 0.6.3 with CPython 3.12.3 on Linux


#bug #good_first_issue #faststream #ag2ai
sent via relator
🚀 New issue to ag2ai/faststream by @gaby
📝 bug: Usage of custom logger results in no logs (#2677)


Is your feature request related to a problem? Please describe.
The built-logger is configured to always add colors, even when passing a logger to faststream. This is hardcoded here https://github.com/ag2ai/faststream/blob/main/faststream/_internal/logger/logging.py#L80 This affects systems collecting logs from faststream hosts. This makes loga generated by faststream to show in raw text as "\033[36mDEBUG\033[0m" instead of DEBUG.

Describe the solution you'd like
Make the use_colors param configurable instead of a hardcoded value.

Describe alternatives you've considered
Writing a custom log parser.


#enhancement #good_first_issue #faststream #ag2ai
sent via relator
🚀 New issue to ag2ai/faststream by @nectarindev
📝 feat: merge `Broker(context=...)` and `FastStream(context=...)` at broker-level (#2693)


I recently migrated my project to faststream 0.6. I was very interested in how I could add my dependencies to the context. Prior to version 0.6, I did something like this:

from faststream.annotations import ContextRepo
from faststream.kafka import KafkaBroker
from faststream.utils.context import context

broker = KafkaBroker()


@broker.subscriber("my_topic", group_id="my_group")
async def handle(
context: ContextRepo,
):
print("dependency: ", context.get("dependency")) # 42


async def lifespan(*args, **kwargs):
context.set_global("dependency", 42)

await broker.start()
try:
yield
finally:
await broker.stop()

I launched the broker as part of my application in lifespan without using the FastStream class.

For version 0.6, I saw examples where it was suggested to pass the context to FastStream, but that solution did not suit me. I discovered that the broker also accepts context, and that solves my problem:

broker = KafkaBroker(context=ContextRepo({"dependency": 42}))

...

async def lifespan(*args, **kwargs):
await broker.start()
try:
yield
finally:
await broker.stop()

But I also discovered that if I create a FastStream instance, its context will be used, even though I didn't use it to start the broker.

from fastapi import FastAPI
from faststream import ContextRepo, FastStream
from faststream.kafka import KafkaBroker

broker = KafkaBroker(context=ContextRepo({"broker_dependency": 2}))
app = FastStream(broker, context=ContextRepo({"application_dependency": 1}))


@broker.subscriber("my_topic", group_id="my_group")
async def handle(
context: ContextRepo,
):
print("broker_dependency: ", context.get("broker_dependency")) # None
print("application_dependency: ", context.get("application_dependency")) # 1


async def lifespan(*args, **kwargs):
await broker.start()
try:
yield
finally:
await broker.stop()


asgi = FastAPI(lifespan=lifespan)

I'm not sure that's normal behavior. It would make much more sense if only the broker's dependency were available.

⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯

Running FastStream 0.6.3 with CPython 3.12.4 on Linux


#enhancement #good_first_issue #core #faststream #ag2ai
sent via relator
1