diff --git a/langfuse/client.py b/langfuse/client.py index ee5753dd..bfe3887b 100644 --- a/langfuse/client.py +++ b/langfuse/client.py @@ -1,3 +1,4 @@ +from contextlib import contextmanager import datetime as dt import logging import os @@ -2557,6 +2558,85 @@ def get_langchain_handler( return trace.get_langchain_handler(update_parent=True) + @contextmanager + def observe_llama_index( + self, + *, + run_name: str, + run_description: Optional[str] = None, + run_metadata: Optional[Any] = None, + llama_index_integration_constructor_kwargs: Optional[Dict[str, Any]] = {}, + ): + """Context manager for observing LlamaIndex operations linked to this dataset item. + + This method sets up a LlamaIndex callback handler that integrates with Langfuse, allowing detailed logging + and tracing of LlamaIndex operations within the context of a specific dataset run. It ensures that all + operations performed within the context are linked to the appropriate dataset item and run in Langfuse. + + Args: + run_name (str): The name of the dataset run. + run_description (Optional[str]): Description of the dataset run. Defaults to None. + run_metadata (Optional[Any]): Additional metadata for the dataset run. Defaults to None. + llama_index_integration_constructor_kwargs (Optional[Dict[str, Any]]): Keyword arguments to pass + to the LlamaIndex integration constructor. Defaults to an empty dictionary. + + Yields: + LlamaIndexCallbackHandler: The callback handler for LlamaIndex operations. + + Example: + ```python + dataset_item = dataset.items[0] + + with dataset_item.observe_llama_index(run_name="example-run", run_description="Example LlamaIndex run") as handler: + # Perform LlamaIndex operations here + some_llama_index_operation() + ``` + + Raises: + ImportError: If required modules for LlamaIndex integration are not available. + """ + metadata = { + "dataset_item_id": self.id, + "run_name": run_name, + "dataset_id": self.dataset_id, + } + trace = self.langfuse.trace(name="dataset-run", metadata=metadata) + self.link( + trace, run_name, run_metadata=run_metadata, run_description=run_description + ) + + try: + import llama_index.core + from llama_index.core import Settings + from llama_index.core.callbacks import CallbackManager + + from langfuse.llama_index import LlamaIndexCallbackHandler + + callback_handler = LlamaIndexCallbackHandler( + **llama_index_integration_constructor_kwargs, + ) + callback_handler.set_root(trace, update_root=True) + + # Temporarily set the global handler to the new handler if previous handler is a LlamaIndexCallbackHandler + # LlamaIndex does not adding two errors of same type, so if global handler is already a LlamaIndexCallbackHandler, we need to remove it + prev_global_handler = llama_index.core.global_handler + prev_callback_manager = Settings.callback_manager + + if isinstance(prev_global_handler, LlamaIndexCallbackHandler): + llama_index.core.global_handler = None + + Settings.callback_manager = CallbackManager([callback_handler]) + + except Exception as e: + self.log.exception(e) + + try: + yield callback_handler + finally: + # Reset the handlers + Settings.callback_manager = prev_callback_manager + llama_index.core.global_handler = prev_global_handler + def get_llama_index_handler( self, *, diff --git a/tests/test_datasets.py b/tests/test_datasets.py index 46cc0925..2c2b8ce3 100644 --- a/tests/test_datasets.py +++ b/tests/test_datasets.py @@ -344,13 +344,13 @@ def test_llama_index_dataset(): dataset_item_id = None for item in dataset.items: - handler = item.get_llama_index_handler(run_name=run_name) - dataset_item_id = item.id + with item.observe_llama_index(run_name=run_name) as handler: + dataset_item_id = item.id - index = get_llama_index_index(handler) - index.as_query_engine().query( - "What did the speaker achieve in the past twelve months?" - ) + index = get_llama_index_index(handler) + index.as_query_engine().query( + "What did the speaker achieve in the past twelve months?" + ) langfuse.flush() run = langfuse.get_dataset_run(dataset_name, run_name)