Находки в опенсорсе: Python
1.01K subscribers
4 photos
148 links
Легкие задачки в опенсорсе из мира Python

Чат: @opensource_findings_chat
Download Telegram
🚀 New issue to ag2ai/faststream by @nectarindev
📝 feat: merge `Broker(context=...)` and `FastStream(context=...)` at broker-level (#2693)


I recently migrated my project to faststream 0.6. I was very interested in how I could add my dependencies to the context. Prior to version 0.6, I did something like this:

from faststream.annotations import ContextRepo
from faststream.kafka import KafkaBroker
from faststream.utils.context import context

broker = KafkaBroker()


@broker.subscriber("my_topic", group_id="my_group")
async def handle(
context: ContextRepo,
):
print("dependency: ", context.get("dependency")) # 42


async def lifespan(*args, **kwargs):
context.set_global("dependency", 42)

await broker.start()
try:
yield
finally:
await broker.stop()

I launched the broker as part of my application in lifespan without using the FastStream class.

For version 0.6, I saw examples where it was suggested to pass the context to FastStream, but that solution did not suit me. I discovered that the broker also accepts context, and that solves my problem:

broker = KafkaBroker(context=ContextRepo({"dependency": 42}))

...

async def lifespan(*args, **kwargs):
await broker.start()
try:
yield
finally:
await broker.stop()

But I also discovered that if I create a FastStream instance, its context will be used, even though I didn't use it to start the broker.

from fastapi import FastAPI
from faststream import ContextRepo, FastStream
from faststream.kafka import KafkaBroker

broker = KafkaBroker(context=ContextRepo({"broker_dependency": 2}))
app = FastStream(broker, context=ContextRepo({"application_dependency": 1}))


@broker.subscriber("my_topic", group_id="my_group")
async def handle(
context: ContextRepo,
):
print("broker_dependency: ", context.get("broker_dependency")) # None
print("application_dependency: ", context.get("application_dependency")) # 1


async def lifespan(*args, **kwargs):
await broker.start()
try:
yield
finally:
await broker.stop()


asgi = FastAPI(lifespan=lifespan)

I'm not sure that's normal behavior. It would make much more sense if only the broker's dependency were available.

⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯

Running FastStream 0.6.3 with CPython 3.12.4 on Linux


#enhancement #good_first_issue #core #faststream #ag2ai
sent via relator
1