Skip to content

Data Pipeline

Claw Insights collects data from five sources, stores it in SQLite, aggregates it into time-bucketed metrics, and pushes changes to clients in real time.

Collection Paths

1. sessions.json → Session State

The SessionReader watches OpenClaw's sessions.json file (via fs.watch + polling fallback). Each reload rebuilds the in-memory session map with hierarchy derived from spawnedBy fields. Sessions are classified as ACTIVE (<30 min), IDLE (<24 h), or DONE.

2. transcripts/*.jsonl → Conversations & Tokens

The TranscriptManager scans transcript files on startup (incremental when cached), then watches for changes every 10 seconds. Each .jsonl line is parsed for role, content, model, and token usage. Extracted data flows into token_usage_events and message_events tables via a batched sink.

3. Gateway Logs → Structured Events

The LogTailer tails daily log files (openclaw-YYYY-MM-DD.log), emitting parsed JSON entries. The LogIngester processor classifies each entry (error, warning, API call, tool call, restart) and writes to metric_events.

4. cron.json → Scheduled Jobs

The CronReader watches OpenClaw's cron.json for job definitions (id, schedule, enabled state, last run status). Data is held in memory — no database table needed.

5. openclaw CLI → Gateway Status

The GatewayClient shells out to openclaw status --json and openclaw --version with short-lived caches (10s for status, 60s for version). Responses are deduped per in-flight request.

Storage

SQLite with four core tables:

TablePurpose
metric_eventsClassified log events (errors, warnings, API/tool calls, restarts)
token_usage_eventsPer-turn token consumption by model (input, output, cache read/write)
message_eventsConversation messages with role, session key, and line hash
system_samplesPeriodic CPU, memory, disk, and active session count snapshots

Aggregation

The Aggregator queries raw tables and buckets data into time windows:

RangeBucket SizeBuckets
30 min2 min15
1 hour5 min12
6 hours15 min24
12 hours30 min24
24 hours60 min24

Each bucket includes session count, token usage (with per-model breakdown), API calls, tool calls, errors, warnings, turns, and gateway uptime. Results are cached for 60 seconds.

The DataRetention service aggregates system_samples into hourly rollups and prunes raw data beyond the configured retention period (default: 7 days).

Real-time Updates

When data changes, the pipeline emits a dataChanged signal via GraphQL SSE subscription:

Source data changes → dataBus emit → GraphQL subscription → SSE push
    → Client receives { source, ts } → 500ms debounce → refetch affected queries

The signal is lightweight — it tells the client what changed, not the data itself. The client decides which queries to refetch. No historical events are replayed on reconnect.

Released under the MIT License.