- SDK: client with BatchTransport, trace decorator/context manager, log_decision, thread-local context stack, nested trace→span support - API: POST /api/traces (batch ingest), GET /api/traces (paginated list), GET /api/traces/[id] (full trace with relations), GET /api/health - Tests: 8 unit tests for SDK (all passing) - Transport: thread-safe buffer with background flush thread
93 lines
3.0 KiB
Python
93 lines
3.0 KiB
Python
"""Batch transport for sending trace data to AgentLens API."""
|
|
|
|
import json
|
|
import logging
|
|
import threading
|
|
import time
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
import httpx
|
|
|
|
from agentlens.models import TraceData
|
|
|
|
logger = logging.getLogger("agentlens")
|
|
|
|
|
|
class BatchTransport:
|
|
"""Thread-safe batch transport that buffers and sends traces to API."""
|
|
|
|
def __init__(
|
|
self,
|
|
api_key: str,
|
|
endpoint: str,
|
|
max_batch_size: int = 10,
|
|
flush_interval: float = 5.0,
|
|
) -> None:
|
|
self._api_key = api_key
|
|
self._endpoint = endpoint.rstrip("/")
|
|
self._max_batch_size = max_batch_size
|
|
self._flush_interval = flush_interval
|
|
self._buffer: List[Dict[str, Any]] = []
|
|
self._lock = threading.Lock()
|
|
self._shutdown_event = threading.Event()
|
|
self._flush_thread = threading.Thread(
|
|
target=self._flush_loop, daemon=True, name="agentlens-flush"
|
|
)
|
|
self._flush_thread.start()
|
|
self._client = httpx.Client(timeout=30.0)
|
|
|
|
def add(self, trace: TraceData) -> None:
|
|
"""Add a completed trace to send buffer."""
|
|
trace_dict = trace.to_dict()
|
|
with self._lock:
|
|
self._buffer.append(trace_dict)
|
|
should_flush = len(self._buffer) >= self._max_batch_size
|
|
if should_flush:
|
|
self._do_flush()
|
|
|
|
def _flush_loop(self) -> None:
|
|
"""Background loop that periodically flushes buffer."""
|
|
while not self._shutdown_event.is_set():
|
|
self._shutdown_event.wait(timeout=self._flush_interval)
|
|
if not self._shutdown_event.is_set():
|
|
self._do_flush()
|
|
|
|
def _do_flush(self) -> None:
|
|
"""Flush all buffered traces to the API."""
|
|
with self._lock:
|
|
if not self._buffer:
|
|
return
|
|
batch = self._buffer.copy()
|
|
self._buffer.clear()
|
|
|
|
try:
|
|
response = self._client.post(
|
|
f"{self._endpoint}/api/traces",
|
|
json={"traces": batch},
|
|
headers={
|
|
"Authorization": f"Bearer {self._api_key}",
|
|
"Content-Type": "application/json",
|
|
},
|
|
)
|
|
if response.status_code >= 400:
|
|
logger.warning(
|
|
"AgentLens: Failed to send traces (HTTP %d): %s",
|
|
response.status_code,
|
|
response.text[:200],
|
|
)
|
|
except Exception as e:
|
|
logger.warning("AgentLens: Failed to send traces: %s", e)
|
|
# Put traces back in buffer for retry (optional, up to you)
|
|
# For now, drop on failure to avoid unbounded growth
|
|
|
|
def flush(self) -> None:
|
|
"""Manually flush all buffered traces."""
|
|
self._do_flush()
|
|
|
|
def shutdown(self) -> None:
|
|
"""Shutdown transport: flush remaining traces and stop background thread."""
|
|
self._shutdown_event.set()
|
|
self._flush_thread.join(timeout=5.0)
|
|
self._do_flush() # Final flush
|
|
self._client.close()
|