Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feat] Add Galileo Logging Callback #4567

Merged
merged 4 commits into from
Jul 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions docs/my-website/docs/proxy/logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ import TabItem from '@theme/TabItem';

Log Proxy Input, Output, Exceptions using Langfuse, OpenTelemetry, Custom Callbacks, DataDog, DynamoDB, s3 Bucket

## Table of Contents

- [Logging to Langfuse](#logging-proxy-inputoutput---langfuse)
- [Logging with OpenTelemetry (OpenTelemetry)](#logging-proxy-inputoutput-in-opentelemetry-format)
- [Async Custom Callbacks](#custom-callback-class-async)
- [Async Custom Callback APIs](#custom-callback-apis-async)
- [Logging to Galileo](#logging-llm-io-to-galielo)
- [Logging to OpenMeter](#logging-proxy-inputoutput---langfuse)
- [Logging to s3 Buckets](#logging-proxy-inputoutput---s3-buckets)
- [Logging to DataDog](#logging-proxy-inputoutput---datadog)
Expand Down Expand Up @@ -1056,6 +1059,67 @@ litellm_settings:

Start the LiteLLM Proxy and make a test request to verify the logs reached your callback API


## [Beta] Logging LLM I/O to Galileo

Log LLM I/O on [www.rungalileo.io](https://www.rungalileo.io/)

:::info

Beta Integration

:::

**Required Env Variables**

```bash
export GALILEO_BASE_URL="" # For most users, this is the same as their console URL except with the word 'console' replaced by 'api' (e.g. http://www.console.galileo.myenterprise.com -> http://www.api.galileo.myenterprise.com)
export GALILEO_PROJECT_ID=""
export GALILEO_USERNAME=""
export GALILEO_PASSWORD=""
```

### Quick Start

1. Add to Config.yaml
```yaml
model_list:
- litellm_params:
api_base: https://exampleopenaiendpoint-production.up.railway.app/
api_key: my-fake-key
model: openai/my-fake-model
model_name: fake-openai-endpoint

litellm_settings:
success_callback: ["galileo"] # πŸ‘ˆ KEY CHANGE
```

2. Start Proxy

```
litellm --config /path/to/config.yaml
```

3. Test it!

```bash
curl --location 'http://0.0.0.0:4000/chat/completions' \
--header 'Content-Type: application/json' \
--data ' {
"model": "fake-openai-endpoint",
"messages": [
{
"role": "user",
"content": "what llm are you"
}
],
}
'
```


πŸŽ‰ That's it - Expect to see your Logs on your Galileo Dashboard

## Logging Proxy Cost + Usage - OpenMeter

Bill customers according to their LLM API usage with [OpenMeter](../observability/openmeter.md)
Expand Down
134 changes: 134 additions & 0 deletions litellm/integrations/galileo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import os
from datetime import datetime
from typing import List

import litellm
from litellm._logging import verbose_logger
from litellm.integrations.custom_logger import CustomLogger
from litellm.integrations.types.galileo import LLMResponse
from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler


class GalileoObserve(CustomLogger):
def __init__(self) -> None:
self.in_memory_records: List[dict] = []
self.batch_size = 1
self.base_url = os.getenv("GALILEO_BASE_URL", None)
self.project_id = os.getenv("GALILEO_PROJECT_ID", None)
self.headers = None
self.async_httpx_handler = AsyncHTTPHandler()
pass

def set_galileo_headers(self):
# following https://docs.rungalileo.io/galileo/gen-ai-studio-products/galileo-observe/how-to/logging-data-via-restful-apis#logging-your-records

headers = {
"accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded",
}
galileo_login_response = self.async_httpx_handler.post(
url=f"{self.base_url}/login",
headers=headers,
data={
"username": os.getenv("GALILEO_USERNAME"),
"password": os.getenv("GALILEO_PASSWORD"),
},
)

access_token = galileo_login_response.json()["access_token"]

self.headers = {
"accept": "application/json",
"Content-Type": "application/json",
"Authorization": f"Bearer {access_token}",
}

def get_output_str_from_response(self, response_obj, kwargs):
output = None
if response_obj is not None and (
kwargs.get("call_type", None) == "embedding"
or isinstance(response_obj, litellm.EmbeddingResponse)
):
output = None
elif response_obj is not None and isinstance(
response_obj, litellm.ModelResponse
):
output = response_obj["choices"][0]["message"].json()
elif response_obj is not None and isinstance(
response_obj, litellm.TextCompletionResponse
):
output = response_obj.choices[0].text
elif response_obj is not None and isinstance(
response_obj, litellm.ImageResponse
):
output = response_obj["data"]

return output

async def async_log_success_event(
self,
kwargs,
start_time,
end_time,
response_obj,
):
verbose_logger.debug(f"On Async Success")

_latency_ms = int((end_time - start_time).total_seconds() * 1000)
_call_type = kwargs.get("call_type", "litellm")
input_text = litellm.utils.get_formatted_prompt(
data=kwargs, call_type=_call_type
)

_usage = response_obj.get("usage", {}) or {}
num_input_tokens = _usage.get("prompt_tokens", 0)
num_output_tokens = _usage.get("completion_tokens", 0)

output_text = self.get_output_str_from_response(
response_obj=response_obj, kwargs=kwargs
)

request_record = LLMResponse(
latency_ms=_latency_ms,
status_code=200,
input_text=input_text,
output_text=output_text,
node_type=_call_type,
model=kwargs.get("model", "-"),
num_input_tokens=num_input_tokens,
num_output_tokens=num_output_tokens,
created_at=start_time.strftime(
"%Y-%m-%dT%H:%M:%S"
), # timestamp str constructed in "%Y-%m-%dT%H:%M:%S" format
)

# dump to dict
request_dict = request_record.model_dump()
self.in_memory_records.append(request_dict)

if len(self.in_memory_records) >= self.batch_size:
await self.flush_in_memory_records()

async def flush_in_memory_records(self):
verbose_logger.debug("flushing in memory records")
response = await self.async_httpx_handler.post(
url=f"{self.base_url}/projects/{self.project_id}/observe/ingest",
headers=self.headers,
json={"records": self.in_memory_records},
)

if response.status_code == 200:
verbose_logger.debug(
"Galileo Logger:successfully flushed in memory records"
)
self.in_memory_records = []
else:
verbose_logger.debug("Galileo Logger: failed to flush in memory records")
verbose_logger.debug(
"Galileo Logger error=%s, status code=%s",
response.text,
response.status_code,
)

async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
verbose_logger.debug(f"On Async Failure")
25 changes: 25 additions & 0 deletions litellm/integrations/types/galileo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from datetime import datetime
from typing import Any, Dict, List, Optional

from pydantic import BaseModel, Field


# from here: https://docs.rungalileo.io/galileo/gen-ai-studio-products/galileo-observe/how-to/logging-data-via-restful-apis#structuring-your-records
class LLMResponse(BaseModel):
latency_ms: int
status_code: int
input_text: str
output_text: str
node_type: str
model: str
num_input_tokens: int
num_output_tokens: int
output_logprobs: Optional[Dict[str, Any]] = Field(
default=None,
description="Optional. When available, logprobs are used to compute Uncertainty.",
)
created_at: str = Field(
..., description='timestamp constructed in "%Y-%m-%dT%H:%M:%S" format'
)
tags: Optional[List[str]] = None
user_metadata: Optional[Dict[str, Any]] = None
14 changes: 14 additions & 0 deletions litellm/litellm_core_utils/litellm_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
from ..integrations.custom_logger import CustomLogger
from ..integrations.datadog import DataDogLogger
from ..integrations.dynamodb import DyanmoDBLogger
from ..integrations.galileo import GalileoObserve
from ..integrations.greenscale import GreenscaleLogger
from ..integrations.helicone import HeliconeLogger
from ..integrations.lago import LagoLogger
Expand Down Expand Up @@ -1929,6 +1930,15 @@ def _init_custom_logger_compatible_class(
_openmeter_logger = OpenMeterLogger()
_in_memory_loggers.append(_openmeter_logger)
return _openmeter_logger # type: ignore

elif logging_integration == "galileo":
for callback in _in_memory_loggers:
if isinstance(callback, GalileoObserve):
return callback # type: ignore

galileo_logger = GalileoObserve()
_in_memory_loggers.append(galileo_logger)
return galileo_logger # type: ignore
elif logging_integration == "logfire":
if "LOGFIRE_TOKEN" not in os.environ:
raise ValueError("LOGFIRE_TOKEN not found in environment variables")
Expand Down Expand Up @@ -1985,6 +1995,10 @@ def get_custom_logger_compatible_class(
for callback in _in_memory_loggers:
if isinstance(callback, OpenMeterLogger):
return callback
elif logging_integration == "galileo":
for callback in _in_memory_loggers:
if isinstance(callback, GalileoObserve):
return callback
elif logging_integration == "logfire":
if "LOGFIRE_TOKEN" not in os.environ:
raise ValueError("LOGFIRE_TOKEN not found in environment variables")
Expand Down