Data Pipeline
Claw Insights collects data from five sources, stores it in SQLite, aggregates it into time-bucketed metrics, and pushes changes to clients in real time.
Collection Paths
1. sessions.json → Session State
The SessionReader watches OpenClaw's sessions.json file (via fs.watch + polling fallback). Each reload rebuilds the in-memory session map with hierarchy derived from spawnedBy fields. Sessions are classified as ACTIVE (<30 min), IDLE (<24 h), or DONE.
2. transcripts/*.jsonl → Conversations & Tokens
The TranscriptManager scans transcript files on startup (incremental when cached), then watches for changes every 10 seconds. Each .jsonl line is parsed for role, content, model, and token usage. Extracted data flows into token_usage_events and message_events tables via a batched sink.
3. Gateway Logs → Structured Events
The LogTailer tails daily log files (openclaw-YYYY-MM-DD.log), emitting parsed JSON entries. The LogIngester processor classifies each entry (error, warning, API call, tool call, restart) and writes to metric_events.
4. cron.json → Scheduled Jobs
The CronReader watches OpenClaw's cron.json for job definitions (id, schedule, enabled state, last run status). Data is held in memory — no database table needed.
5. openclaw CLI → Gateway Status
The GatewayClient shells out to openclaw status --json and openclaw --version with short-lived caches (10s for status, 60s for version). Responses are deduped per in-flight request.
Storage
SQLite with four core tables:
| Table | Purpose |
|---|---|
metric_events | Classified log events (errors, warnings, API/tool calls, restarts) |
token_usage_events | Per-turn token consumption by model (input, output, cache read/write) |
message_events | Conversation messages with role, session key, and line hash |
system_samples | Periodic CPU, memory, disk, and active session count snapshots |
Aggregation
The Aggregator queries raw tables and buckets data into time windows:
| Range | Bucket Size | Buckets |
|---|---|---|
| 30 min | 2 min | 15 |
| 1 hour | 5 min | 12 |
| 6 hours | 15 min | 24 |
| 12 hours | 30 min | 24 |
| 24 hours | 60 min | 24 |
Each bucket includes session count, token usage (with per-model breakdown), API calls, tool calls, errors, warnings, turns, and gateway uptime. Results are cached for 60 seconds.
The DataRetention service aggregates system_samples into hourly rollups and prunes raw data beyond the configured retention period (default: 7 days).
Real-time Updates
When data changes, the pipeline emits a dataChanged signal via GraphQL SSE subscription:
Source data changes → dataBus emit → GraphQL subscription → SSE push
→ Client receives { source, ts } → 500ms debounce → refetch affected queriesThe signal is lightweight — it tells the client what changed, not the data itself. The client decides which queries to refetch. No historical events are replayed on reconnect.