Skip to content

Data layer

Source: Notion | Last edited: 2026-02-07 | ID: 3002d2dc-3ef...


Data Layer - Project Handover Documentation

Section titled “Data Layer - Project Handover Documentation”

Date: February 5, 2026


  1. Executive Summary
  2. Project Overview
  3. Technology Stack
  4. Architecture
  5. Project Structure
  6. Key Components
  7. Configuration
  8. Deployment
  9. Data Flow
  10. Recent Development
  11. Known Issues & TODOs
  12. Operational Runbook

Data Layer is a cryptocurrency market data ingestion system that fetches, processes, and stores market data from multiple exchanges (Binance, Bybit, OKX). The system consists of three main components:

  1. Temporal Worker - Batch/scheduled data processing workflows
  2. WebSocket Tunnel - Real-time market data streaming
  3. CLI - Workflow management and scheduling The system is production-ready with proper error handling, connection pooling, and graceful shutdown mechanisms.

Collect and store cryptocurrency market data including:

  • OHLCV Bars - Open, High, Low, Close, Volume data at multiple timeframes (1m, 5m, 15m, 1h)
  • Funding Rates - Perpetual contract funding rates (8h interval)
  • Fear & Greed Index - Market sentiment indicator (24h interval)
  • Binance - Spot & Futures (with CSV fetcher for fast backfill)
  • Bybit - Spot & Futures (CCXT)
  • OKX - Spot & Futures (CCXT)
  • Historical backfill with date range selection
  • Scheduled incremental updates
  • Real-time WebSocket streaming
  • Multi-exchange support via CCXT
  • Connection pooling for high throughput
  • Instrument filtering (venue, symbol, exchange type)


┌─────────────────────────────────────────────────────────────────────────┐
│ Data Sources │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌────────────────┐ │
│ │ Binance │ │ Bybit │ │ OKX │ │ Fear & Greed │ │
│ │ (WS + CSV) │ │ (CCXT) │ │ (CCXT) │ │ API │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ └───────┬────────┘ │
└─────────┼────────────────┼────────────────┼─────────────────┼───────────┘
│ │ │ │
└────────────────┴────────────────┴─────────────────┘
┌───────────────┼───────────────┐
▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌───────────┐
│ Temporal │ │ WebSocket│ │ CLI │
│ Worker │ │ Tunnel │ │ │
└─────┬─────┘ └─────┬─────┘ └─────┬─────┘
│ │ │
│ ▼ │
│ ┌─────────┐ │
│ │ Redis │ │
│ │ Pub/Sub │ │
│ └─────────┘ │
│ │
└───────────────┬───────────────┘
┌─────────────┐
│ ClickHouse │
│ Database │
└─────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│ Temporal Server (Port 7233) │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Schedules Task Queue: "default" │
│ ├── bars-1m (every 1m) ┌─────────────────────────────────┐ │
│ ├── bars-5m (every 5m) │ Worker Process │ │
│ ├── bars-15m (every 15m) │ ┌─────────────────────────────┐│ │
│ ├── bars-1h (every 1h) │ │ Activities ││ │
│ ├── funding-rates (8h) │ │ ├── FetcherActivity ││ │
│ └── fear-greed (24h) │ │ ├── ClickhouseActivity ││ │
│ │ │ ├── BackfillActivity ││ │
│ Workflows │ │ └── DataQualityActivity ││ │
│ ├── BackfillWorkflow │ └─────────────────────────────┘│ │
│ ├── BackfillTaskWorkflow │ ┌─────────────────────────────┐│ │
│ ├── IncrementalUpdater │ │ Workflows ││ │
│ ├── IncrementalTask │ │ ├── Backfill* ││ │
│ └── DataQualityWorkflow │ │ └── Incremental* ││ │
│ │ └─────────────────────────────┘│ │
│ └─────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│ TunnelServer │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Config (tunnel.yaml) SubscriberFactory │
│ ├── sources: ├── Routes by SubscriptionSource │
│ │ ├── binance: binance_ws └── Creates subscribers: │
│ │ └── bybit: ccxt ├── BinanceWebSocketSubscriber │
│ ├── streams: [ohlcv] └── CCXTWebSocketSubscriber │
│ └── timeframes: [1m,5m,1h] │
│ │
│ Subscription Format: source:exchange:stream:timeframe:symbol │
│ Example: binance_ws:binance:ohlcv:1m:BTC/USDT │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Subscriber │ │ Subscriber │ │ Subscriber │ ... │
│ │ BTC/USDT 1m │ │ ETH/USDT 1m │ │ BTC/USDT 5m │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ └──────────────────┴──────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────┐ │
│ │ Redis Pub/Sub │ │
│ └───────┬───────┘ │
│ │ │
│ ▼ │
│ ┌───────────────┐ │
│ │ WS Bridge │ (Optional - Browser clients) │
│ │ Port 8765 │ │
│ └───────────────┘ │
└─────────────────────────────────────────────────────────────────────┘

data-layer/
├── app/ # Temporal worker package
│ ├── activities/ # Activity implementations
│ │ ├── clickhouse.py # Database operations
│ │ ├── fetcher.py # Data fetching (CCXT, CSV)
│ │ ├── data_quality.py # Data validation
│ │ ├── backfill.py # Backfill validation
│ │ └── abstract_activity.py # Base class
│ ├── workflows/ # Workflow definitions
│ │ ├── backfill/ # Historical backfill
│ │ └── incremental/ # Scheduled updates
│ ├── models/ # Pydantic models
│ ├── client.py # Temporal client
│ ├── worker.py # Worker entry point
│ └── pyproject.toml # Package config
├── tunnel/ # WebSocket streaming package
│ ├── core/ # Core architecture
│ │ ├── base.py # WebSocketSubscriber ABC
│ │ ├── models.py # Subscription models
│ │ ├── enums.py # Source/stream enums
│ │ └── factory.py # SubscriberFactory
│ ├── subscribers/ # WebSocket implementations
│ │ ├── ccxt_ws.py # CCXT Pro subscriber
│ │ └── binance_ws.py # Native Binance WS
│ ├── publishers/ # Message publishers
│ │ └── redis_pub.py # Redis Pub/Sub
│ ├── websocket/ # Browser bridge
│ │ └── server.py # WS → Browser
│ ├── server.py # Main orchestrator
│ ├── config.py # Config loader
│ └── pyproject.toml # Package config
├── cli/ # CLI commands
│ ├── run.py # Click commands
│ ├── config.py # Config models
│ └── helpers.py # Utilities
├── shared/ # Shared utilities
│ ├── base_model.py # Pydantic base models
│ ├── config.py # Instrument filtering
│ ├── settings.py # Environment config
│ ├── datetime_utils.py # Date/time utils
│ └── network.py # HTTP client
├── services/ # External integrations
│ ├── clickhouse/ # ClickHouse service
│ │ ├── connector.py # Connection pool
│ │ └── service.py # Operations
│ ├── fetchers/ # Data fetchers
│ │ ├── ccxt/ # CCXT fetcher
│ │ ├── fear_greed/ # F&G API
│ │ └── csv_fetcher.py # CSV fetcher
│ └── models.py # Data models
├── config/ # YAML configurations
│ ├── instruments.yaml # Master instruments
│ ├── incremental/ # Schedule configs
│ │ ├── bars.yaml
│ │ ├── funding_rates.yaml
│ │ └── fear_greed.yaml
│ ├── backfill/
│ │ └── backfill.yaml
│ └── tunnel/
│ └── tunnel.yaml
├── infra/ # Infrastructure
│ ├── composes/ # Docker Compose files
│ ├── Dockerfiles/ # Container images
│ ├── migrations/ # ClickHouse schemas
│ └── scripts/ # Infra scripts
├── tests/ # Test suite
├── .taskfiles/ # Task definitions
├── pyproject.toml # Root workspace config
├── Taskfile.yml # Task runner
└── uv.lock # Dependency lock

Entry point for batch processing. Initializes:

  • Temporal client (with optional API key for Temporal Cloud)
  • Activity instances with ClickHouse service pool
  • Workflow registrations
  • Graceful shutdown handlers

Key Configuration:

# Service pool for parallel ClickHouse inserts
SERVICE_POOL_SIZE = 8 # Configurable via env
# Activity distribution (avoids contention)
pool_index = hash((workflow_id, activity_id)) % pool_size

BackfillWorkflow - Historical data loading

  1. Validate parameters (dates, instruments)
  2. Generate snapshot metadata
  3. Spawn parallel BackfillTaskWorkflow (max_concurrency=5)
  4. Aggregate results

IncrementalUpdaterWorkflow - Scheduled updates

  1. Query ClickHouse for latest timestamp
  2. Calculate delta since last update
  3. Spawn parallel IncrementalTaskWorkflow (max_concurrency=10)
  4. Insert new data

Real-time WebSocket streaming:

  1. Load config (instruments, sources, timeframes)
  2. Build subscriptions (cartesian product)
  3. Create subscribers via factory pattern
  4. Monitor health (5s interval)
  5. Auto-restart failed connections
  6. Publish to Redis Pub/Sub

Commands:

  • register-schedules - Register Temporal schedules
  • list-schedules - Show active schedules
  • delete-schedule - Remove schedule
  • backfill yaml <file> - Load backfill from YAML
  • backfill full - Backfill all instruments

Create .env file from .env.example:

Terminal window
# Temporal
TEMPORAL__HOST=localhost
TEMPORAL__NAMESPACE=default
TEMPORAL__API_KEY= # For Temporal Cloud
# ClickHouse
CLICKHOUSE__HOST=localhost
CLICKHOUSE__DATABASE=analytics
CLICKHOUSE__USERNAME=default
CLICKHOUSE__PASSWORD=
CLICKHOUSE__CONNECTION_POOL_MAXSIZE=50
CLICKHOUSE__SERVICE_POOL_SIZE=8
# Redis
REDIS__HOST=localhost
REDIS__PASSWORD=
# Worker
WORKER__MAX_CONCURRENT_ACTIVITIES=20
WORKER__MAX_CONCURRENT_WORKFLOW_TASKS=10
WORKER__TASK_QUEUE=default
# Network
NETWORK_CLIENT__LIMIT=100
NETWORK_CLIENT__DOWNLOAD_CONCURRENCY=200

config/instruments.yaml - Single source of truth:

instruments:
-venue: binance
symbol: BTC/USDT
exchange_type: spot
-venue: binance
symbol: BTC/USDT
exchange_type: future
-venue: bybit
symbol: ETH/USDT
exchange_type: spot

config/incremental/bars.yaml:

data_type: bars
timeframes:[1m, 5m, 15m, 1h]
max_concurrency:10
instruments:
all:true

config/incremental/funding_rates.yaml:

data_type: funding_rates
interval: 8h
instruments:
exchange_types:[future]

config/tunnel/tunnel.yaml:

streams:[ohlcv]
timeframes:[1m, 5m, 1h, 15m]
sources:
binance: binance_ws # Native WebSocket
bybit: ccxt # CCXT Pro
instruments:
all:true

Terminal window
# 1. Install dependencies
task setup:install
# 2. Start infrastructure (ClickHouse, Redis, Temporal)
task docker:infra
# 3. Run database migrations
task db:migrate
# 4. Start worker (new terminal)
task worker:dev
# 5. Register schedules
task cli:register-schedules
# 6. (Optional) Start tunnel
task tunnel:dev
Terminal window
# Infrastructure
task docker:infra # Start infrastructure
task docker:up # Start all services
task docker:down # Stop all
# Worker
task worker:dev # Development (auto-reload)
task worker:run # Production in Docker
# Tunnel
task tunnel:dev # Development
task tunnel:run # Production
# CLI
task cli:register-schedules
task cli:list-schedules
task cli:run -- backfill yaml config/backfill/backfill.yaml --wait
# Development
task dev:fmt # Format code
task dev:lint # Lint code
task dev:test # Run tests
# Database
task db:migrate # Run migrations

For Temporal Cloud:

  1. Set TEMPORAL__HOST to your Temporal Cloud endpoint
  2. Set TEMPORAL__API_KEY with your API key
  3. Set TEMPORAL__NAMESPACE to your namespace

For self-hosted:

  1. Use infra/composes/infra.yml for full stack
  2. Scale workers horizontally (multiple instances OK)
  3. Configure ClickHouse replication for HA

CLI: task cli:run -- backfill yaml config/backfill/backfill.yaml
Load YAML + instruments.yaml
Filter instruments (venue, symbol, exchange_type)
Start BackfillWorkflow
├── Validate parameters
├── Generate snapshot metadata
├── Insert snapshot to ClickHouse
└── For each instrument (parallel, max=5):
└── BackfillTaskWorkflow
├── FetcherActivity.fetch_bar_source()
│ ├── Try CSV fetcher (Binance only)
│ └── Fallback to CCXT
└── ClickhouseActivity.insert_ohlcv()
Temporal Schedule (e.g., every 1h for bars)
IncrementalUpdaterWorkflow
├── Query ClickHouse: MAX(timestamp) per symbol
├── Calculate delta: now - last_timestamp
└── For each instrument (parallel, max=10):
└── IncrementalTaskWorkflow
├── FetcherActivity.fetch_bar_source(since=max_timestamp)
└── ClickhouseActivity.insert_ohlcv()
TunnelServer starts
Load tunnel.yaml + instruments.yaml
Build subscriptions (sources × symbols × timeframes)
For each subscription:
└── SubscriberFactory.create()
├── binance_ws → BinanceWebSocketSubscriber
└── ccxt → CCXTWebSocketSubscriber
Connect → Subscribe → Receive messages
Publish to Redis channel
(Optional) WS Bridge to browsers

  • app/activities/fetcher.py - CSV fetcher improvements
  • app/worker.py - Service pool implementation
  • cli/config.py - Config updates
  • cli/run.py - CLI enhancements
  • config/incremental/fear_greed.yaml - Config changes
  • config/incremental/funding_rates.yaml - Config changes
  • app/publish.py - New publishing functionality

  1. Bybit CSV Fetcher - Faster backfill for Bybit (similar to Binance)
  2. OKX CSV Fetcher - Faster backfill for OKX
  1. Test Coverage - Tests exist but coverage unclear
  2. Data Quality Workflows - Started but not fully integrated
  3. Monitoring - No external monitoring integration
  4. Circuit Breakers - Limited error recovery patterns
  1. External alerting (Slack, PagerDuty)
  2. ClickHouse clustering/sharding
  3. Redis clustering
  4. Optimize Data Quality check

Terminal window
# 1. Start infrastructure
task docker:infra
# 2. Wait for services to be healthy
# Check Temporal UI: http://localhost:8233
# 3. Run migrations
task db:migrate
# 4. Start worker
task worker:dev
# 5. Register schedules (first time only)
task cli:register-schedules
Terminal window
# Graceful shutdown
Ctrl+C on worker process
# Stop infrastructure
task docker:down
Terminal window
# Full backfill for specific timeframe
task cli:run -- backfill full \
--timeframe 1h \
--start 2025-01-01 \
--end 2025-02-01 \
--wait
# From YAML config
task cli:run -- backfill yaml config/backfill/backfill.yaml --wait
Terminal window
# List all schedules
task cli:list-schedules
# View in Temporal UI
open http://localhost:8233

Worker not processing workflows:

  1. Check Temporal UI for worker connection
  2. Verify task queue matches: “default”
  3. Check worker logs: task worker:logs

ClickHouse connection issues:

  1. Verify ClickHouse is running: curl http://localhost:8123/ping
  2. Check credentials in .env
  3. Verify migrations ran: task db:migrate

WebSocket tunnel disconnections:

  1. Check Redis is running
  2. Review tunnel logs
  3. Verify exchange API status

For questions about this handover documentation, refer to the codebase’s git history or contact the development team.


The system uses a 3-layer data architecture in ClickHouse:

┌─────────────────────────────────────────────────────────────────────┐
│ DATA FLOW │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ LANDING LAYER STAGING LAYER DATA MART │
│ (Raw ingestion) (Deduplicated) (Query layer) │
│ │
│ landing_bars ─MV─▶ staging_bars ─MV─▶ bars_current │
│ │ │ bars_snapshots │
│ │ │ │
│ landing_funding_rates ─MV─▶ staging_funding ─MV─▶ funding_current │
│ │ │ funding_snap │
│ │ │ │
│ landing_fear_greed ─MV─▶ staging_fear_greed─MV─▶ fear_greed_* │
│ │
│ MV = Materialized View (automatic data flow + validation) │
└─────────────────────────────────────────────────────────────────────┘

Pre-populated values:

  • 1: binance
  • 2: okx
  • 3: bybit

Instrument types:

  • spot - Spot market
  • future - Futures contract
  • swap - Perpetual swap (e.g., BTC/USDT:USDT)

Engine: MergeTree Partition: toDate(timestamp) Order: (instrument_id, timeframe, timestamp_unix)

Engine: MergeTree Partition: toYYYYMM(timestamp) Order: (instrument_id, timestamp_unix)

Engine: MergeTree Partition: toYYYYMM(timestamp) Order: timestamp_unix

Uses ReplacingMergeTree engine for deduplication based on ingested_at column.

Same schema as landing_bars. Engine: ReplacingMergeTree(ingested_at)

Same schema as landing_funding_rates. Engine: ReplacingMergeTree(ingested_at)

Same schema as landing_fear_greed_index. Engine: ReplacingMergeTree(ingested_at)

  • analytics.bars_current - ReplacingMergeTree, latest bar per instrument/timeframe/timestamp
  • analytics.funding_rates_current - ReplacingMergeTree, latest funding rate
  • analytics.fear_greed_index_current - ReplacingMergeTree, latest index value
  • analytics.bars_snapshots - MergeTree, keeps all ingestion versions
  • analytics.funding_rates_snapshots - MergeTree, keeps all versions
  • analytics.fear_greed_index_snapshots - MergeTree, keeps all versions

Validates OHLCV data before staging:

WHERE open > 0 AND high > 0 AND low > 0 AND close > 0
AND high >= low
AND high >= open AND high >= close
AND low <= open AND low <= close
AND volume >= 0

Direct copy (no validation required).

Validates index range:

WHERE value BETWEEN 0 AND 100

Get latest OHLCV bars:

SELECT
i.symbol,
v.name as venue,
b.timeframe,
b.timestamp,
b.open, b.high, b.low, b.close, b.volume
FROM analytics.bars_current b FINAL
JOIN analytics.instruments i ON b.instrument_id = i.id
JOIN analytics.venues v ON i.venue_id = v.id
WHERE i.type = 'spot'
AND b.timeframe = '1h'
ORDER BY b.timestamp DESC
LIMIT 100

Get funding rate history:

SELECT
i.symbol,
v.name as venue,
f.timestamp,
f.rate,
f.mark_price
FROM analytics.funding_rates_current f FINAL
JOIN analytics.instruments i ON f.instrument_id = i.id
JOIN analytics.venues v ON i.venue_id = v.id
WHERE i.type = 'swap'
ORDER BY f.timestamp DESC
LIMIT 100

Get Fear & Greed trend:

SELECT
timestamp,
value,
classification
FROM analytics.fear_greed_index_current FINAL
ORDER BY timestamp DESC
LIMIT 30

Input: BackfillInput

{
"data_type": "bars", # bars, funding_rates, fear_greed
"timeframes": ["1m", "5m", "1h"], # For bars only
"start_time": "2025-01-01", # ISO date
"end_time": "2025-02-01", # ISO date
"max_concurrency": 5, # Parallel tasks
"instruments": [ # Filtered instruments
{"venue": "binance", "symbol": "BTC/USDT", "exchange_type": "spot"}
]
}

Input: IncrementalInput

{
"data_type": "bars", # bars, funding_rates, fear_greed
"timeframes": ["1m"], # For bars only
"max_concurrency": 10, # Parallel tasks
"instruments": [...] # Filtered instruments
}
Terminal window
# Register all schedules from config/incremental/*.yaml
task cli:register-schedules
# List active Temporal schedules
task cli:list-schedules
# Delete a specific schedule
task cli:run -- delete-schedule <schedule-id>
# Run backfill from YAML config
task cli:run -- backfill yaml <config-file> [--wait]
# Run full backfill for all DB instruments
task cli:run -- backfill full \
--timeframe <tf> \
--start <date> \
--end <date> \
[--wait]

Format: {exchange}:{exchange_type}:{stream}:{timeframe}:{symbol}

Examples:

binance:spot:ohlcv:1m:BTC-USDT
binance:future:ohlcv:5m:ETH-USDT
bybit:swap:ohlcv:1h:SOL-USDT

Message Format (JSON):

{
"timestamp": 1706745600000,
"open": 42500.00,
"high": 42600.00,
"low": 42400.00,
"close": 42550.00,
"volume": 1234.56
}

  1. CCXT (ccxt.pro) - Exchange WebSocket/REST APIs
  • No API keys required for public market data
  • Exchanges: Binance, Bybit, OKX
  1. Fear & Greed Index API - alternative.me
  • Public API, no authentication
  • Rate limited
  1. Binance CSV Data - data.binance.vision
  • Public, no authentication
  • Used for fast historical backfills