Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

how to attach FastStream NATS OpenTelemetry middleware #174

Open
theobouwman opened this issue Jun 22, 2024 · 1 comment
Open

how to attach FastStream NATS OpenTelemetry middleware #174

theobouwman opened this issue Jun 22, 2024 · 1 comment

Comments

@theobouwman
Copy link

theobouwman commented Jun 22, 2024

I already have our NATSService used by our FastAPI instance which works great. Up until now just for KV store for some caching. I want to add fastStream as a separate instance which will need to use the common shared code like all services/repos/config/observability etc.

The problem is: I initialise the broker in the NATSProvider which is already being used by other services, but how can I attach that instance to the FastStream instance and attach handlers to it?

Here my provider:

class NATSProvider(Provider):
    @provide(scope=Scope.APP)
    async def nats_service(self) -> NATSService:
        async def error_cb(e):
            logging.error(f"Error in NATS: {e}")
            sentry_sdk.capture_exception(e)

        tracer_provider = trace.get_tracer_provider()

        _broker = NatsBroker(
            get_config().NATS_SERVER_URL,
            user_credentials="./secret.creds",
            error_cb=error_cb,
            middlewares=(
                NatsTelemetryMiddleware(
                    tracer_provider=tracer_provider,
                ),
            )
        )
    
        client = await _broker.connect()

        nats_service = NATSService(_broker)
        await nats_service.init()

        return nats_service

my container:

container = make_async_container(
    DBProvider(),
    NATSProvider(),
    ServiceProvider(),
)

here the observability init, this is shared code in the shared/ folder, used by my FastAPI API and FastStream Worker:

def init_observability(
    service_name: str,
    app: FastAPI = None,
):
    resource = Resource.create({
        "service.name": service_name,
    })

    tracer_provider = TracerProvider(resource=resource)

    if get_config().ENVIRONMENT == 'development':
        OTLP_HTTP_ENDPOINT = get_config().OTLP_HTTP_ENDPOINT
        trace_exporter = OTLPSpanExporter(endpoint=OTLP_HTTP_ENDPOINT)
        tracer_provider.add_span_processor(BatchSpanProcessor(trace_exporter))
    else:
        tracer_provider.add_span_processor(SentrySpanProcessor())

    trace.set_tracer_provider(tracer_provider)
    set_global_textmap(SentryPropagator())

    if app:
        FastAPIInstrumentor.instrument_app(app, tracer_provider=tracer_provider)

    if get_config().ENVIRONMENT != 'development':
        sentry_sdk.init(
            dsn=get_config().SENTRY_DSN,
            environment=get_config().ENVIRONMENT,
            traces_sample_rate=0.4,
            profiles_sample_rate=0.4,
            enable_tracing=True,
            instrumenter="otel",
        )

And here is the FastStream app.py:

from di import container

from common.config import get_config
from common.schemas.worker import NewUserSignupPayload

app = FastStream(broker)


init_observability("momo-worker")
firebase_initialization()

setup_dishka(container, app)

@broker.subscriber(
    "user.email.summary.send",
    queue="worker",
    max_workers=40,
    deliver_policy=DeliverPolicy.ALL
)
@broker.publisher(
    "user.email.summary.success",
)
@inject
async def handler(msg: NewUserSignupPayload, logger: Logger, user_service: Annotated[UserService, FromDishka()], nats_service: Annotated[NATSService, FromDishka()]):
    logger.info(msg)
    try:
        tasks = []

        for i in range(10):
            tasks.append(asyncio.create_task(nats_service.get_cache_key_organisation_event('test')))

        user = await user_service.get_user_by_email(msg.user_id)

        await asyncio.gather(*tasks)
    except Exception as e:
        pass
@theobouwman theobouwman changed the title FastStream NATS OpenTelemetry middleware how to attach FastStream NATS OpenTelemetry middleware Jun 22, 2024
@IvanKirpichnikov
Copy link
Contributor

please describe your problem in more detail

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants