# ============================================================ # app/core/observability.py - OpenTelemetry Setup # ============================================================ import os import logging import time import uuid from typing import Optional, Dict, Any from functools import wraps from contextlib import contextmanager import asyncio from opentelemetry import trace, metrics from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor from opentelemetry.instrumentation.requests import RequestsInstrumentor from opentelemetry.instrumentation.redis import RedisInstrumentor from starlette.middleware.base import BaseHTTPMiddleware from starlette.requests import Request logger = logging.getLogger(__name__) # ============================================================ # Helper Functions # ============================================================ def _parse_otlp_headers(headers_str: str) -> Dict[str, str]: """Parse OTEL_EXPORTER_OTLP_HEADERS (format: key1=val1,key2=val2)""" headers = {} try: if not headers_str: return headers for pair in headers_str.split(","): if "=" in pair: key, val = pair.split("=", 1) headers[key.strip()] = val.strip() except Exception as e: logger.warning(f"Failed to parse OTLP headers: {e}") return headers # ============================================================ # Configuration # ============================================================ class ObservabilityConfig: """Observability settings from environment""" ENABLED: bool = os.getenv("OTEL_ENABLED", "true").lower() == "true" ENVIRONMENT: str = os.getenv("ENVIRONMENT", "development") SERVICE_NAME: str = os.getenv("OTEL_SERVICE_NAME", "lojiz-aida") SERVICE_VERSION: str = os.getenv("OTEL_SERVICE_VERSION", "1.0.0") # Exporters OTLP_ENDPOINT: Optional[str] = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT") OTLP_HEADERS: Dict[str, str] = _parse_otlp_headers( os.getenv("OTEL_EXPORTER_OTLP_HEADERS", "") ) # Sampling TRACE_SAMPLE_RATE: float = float(os.getenv("OTEL_TRACE_SAMPLE_RATE", "0.1")) # Console export for development CONSOLE_EXPORT: bool = os.getenv("OTEL_CONSOLE_EXPORT", "false").lower() == "true" # ============================================================ # Initialize Tracing # ============================================================ def init_tracing(): """Initialize OpenTelemetry tracing""" if not ObservabilityConfig.ENABLED: logger.info("âš ï¸ OpenTelemetry disabled (OTEL_ENABLED=false)") return try: # Create resource resource = Resource.create({ "service.name": ObservabilityConfig.SERVICE_NAME, "service.version": ObservabilityConfig.SERVICE_VERSION, "deployment.environment": ObservabilityConfig.ENVIRONMENT, }) # Create tracer provider tracer_provider = TracerProvider(resource=resource) # Add exporter if configured if ObservabilityConfig.OTLP_ENDPOINT: try: exporter = OTLPSpanExporter( endpoint=ObservabilityConfig.OTLP_ENDPOINT, headers=ObservabilityConfig.OTLP_HEADERS, ) tracer_provider.add_span_processor(BatchSpanProcessor(exporter)) logger.info(f"✅ OTLP exporter configured: {ObservabilityConfig.OTLP_ENDPOINT}") except Exception as e: logger.warning(f"âš ï¸ OTLP exporter failed: {e}") # Console export for development if ObservabilityConfig.CONSOLE_EXPORT or ObservabilityConfig.ENVIRONMENT == "development": from opentelemetry.sdk.trace.export import SimpleSpanProcessor, ConsoleSpanExporter tracer_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter())) logger.info("✅ Console span exporter enabled") # Set global tracer provider trace.set_tracer_provider(tracer_provider) logger.info("✅ OpenTelemetry tracing initialized") except Exception as e: logger.error(f"⌠Failed to initialize tracing: {e}") raise # ============================================================ # Initialize Metrics # ============================================================ def init_metrics(): """Initialize OpenTelemetry metrics""" if not ObservabilityConfig.ENABLED: return try: resource = Resource.create({ "service.name": ObservabilityConfig.SERVICE_NAME, "service.version": ObservabilityConfig.SERVICE_VERSION, }) metric_readers = [] # Add OTLP exporter if configured if ObservabilityConfig.OTLP_ENDPOINT: try: exporter = OTLPMetricExporter( endpoint=ObservabilityConfig.OTLP_ENDPOINT, headers=ObservabilityConfig.OTLP_HEADERS, ) metric_readers.append( PeriodicExportingMetricReader(exporter, interval_millis=5000) ) logger.info("✅ OTLP metrics exporter configured") except Exception as e: logger.warning(f"âš ï¸ OTLP metrics exporter failed: {e}") meter_provider = MeterProvider( resource=resource, metric_readers=metric_readers, ) metrics.set_meter_provider(meter_provider) logger.info("✅ OpenTelemetry metrics initialized") except Exception as e: logger.error(f"⌠Failed to initialize metrics: {e}") # ============================================================ # Instrumentation # ============================================================ def instrument_fastapi(app): """Auto-instrument FastAPI app""" if not ObservabilityConfig.ENABLED: return try: FastAPIInstrumentor.instrument_app( app, excluded_urls="docs,openapi.json,health", ) logger.info("✅ FastAPI instrumented") except Exception as e: logger.warning(f"âš ï¸ FastAPI instrumentation failed: {e}") def instrument_libraries(): """Auto-instrument external libraries""" if not ObservabilityConfig.ENABLED: return try: HTTPXClientInstrumentor().instrument() RequestsInstrumentor().instrument() RedisInstrumentor().instrument() logger.info("✅ External libraries instrumented") except Exception as e: logger.warning(f"âš ï¸ Library instrumentation failed: {e}") # ============================================================ # Custom Tracer & Metrics # ============================================================ def get_tracer(name: str = __name__): """Get a tracer instance""" return trace.get_tracer(name) def get_meter(name: str = __name__): """Get a meter instance""" return metrics.get_meter(name) # ============================================================ # Span Context Manager # ============================================================ @contextmanager def trace_operation(operation_name: str, attributes: Optional[Dict[str, Any]] = None): """Context manager for tracing operations""" tracer = get_tracer(__name__) with tracer.start_as_current_span(operation_name) as span: if attributes: for key, value in attributes.items(): if value is not None: try: span.set_attribute(key, value) except Exception: pass # Skip invalid attributes try: yield span except Exception as e: span.record_exception(e) raise # ============================================================ # Decorators for tracing functions # ============================================================ def trace_function(func): """Decorator to trace function execution""" @wraps(func) async def async_wrapper(*args, **kwargs): with trace_operation(f"{func.__module__}.{func.__name__}"): return await func(*args, **kwargs) @wraps(func) def sync_wrapper(*args, **kwargs): with trace_operation(f"{func.__module__}.{func.__name__}"): return func(*args, **kwargs) if asyncio.iscoroutinefunction(func): return async_wrapper return sync_wrapper # ============================================================ # Middleware for Request Tracing # ============================================================ class RequestContextMiddleware(BaseHTTPMiddleware): """Add request context to all spans""" async def dispatch(self, request: Request, call_next): # Generate or get request ID request_id = request.headers.get("x-request-id", str(uuid.uuid4())) # Get current span and set request attributes current_span = trace.get_current_span() if current_span and current_span.is_recording(): current_span.set_attributes({ "http.request.id": request_id, "http.request.method": request.method, "http.request.path": request.url.path, "http.request.query": str(request.url.query), }) # Store in request state for use downstream request.state.request_id = request_id # Call next middleware start_time = time.time() response = await call_next(request) duration = time.time() - start_time # Update span with response if current_span and current_span.is_recording(): current_span.set_attributes({ "http.response.status_code": response.status_code, "http.duration_ms": int(duration * 1000), }) # Add request ID to response headers response.headers["x-request-id"] = request_id return response # ============================================================ # Token Counter for LLM usage tracking # ============================================================ class TokenUsageTracker: """Track token usage for LLM calls""" def __init__(self): self.meter = get_meter(__name__) self.token_counter = self.meter.create_counter( name="llm.tokens.used", unit="1", description="Total tokens used in LLM calls" ) self.cost_counter = self.meter.create_counter( name="llm.cost", unit="USD", description="Total cost of LLM calls" ) def record_tokens(self, model: str, prompt_tokens: int, completion_tokens: int, cost: float = 0.0): """Record token usage""" total_tokens = prompt_tokens + completion_tokens self.token_counter.add( total_tokens, {"model": model, "type": "total"} ) self.token_counter.add( prompt_tokens, {"model": model, "type": "prompt"} ) self.token_counter.add( completion_tokens, {"model": model, "type": "completion"} ) if cost > 0: self.cost_counter.add(cost, {"model": model}) # ============================================================ # Export for global use # ============================================================ _token_tracker = None def get_token_tracker() -> TokenUsageTracker: """Get or create token usage tracker""" global _token_tracker if _token_tracker is None: _token_tracker = TokenUsageTracker() return _token_tracker