Skip to content

Orchestration Layer

Source: Notion | Last edited: 2025-12-03 | ID: 29b2d2dc-3ef...


🧠 QuantOS Orchestration Layer — Independent Architecture Design

Section titled “🧠 QuantOS Orchestration Layer — Independent Architecture Design”
  • Agent-native compute fabric: execute DAGs emitted by the Compiler (training, backtests, inference, feature jobs) across heterogeneous compute.
  • Cloud-neutral, elastic, cost-aware: schedule jobs to CPU/GPU/Spot pools based on policy.
  • Unified batch + stream: one orchestration plane for backfills and real-time pipelines.
  • Reproducible & observable: strong lineage, metrics, logs, traces; deterministic re-runs.
  • Isolation & safety: per-job sandboxes, secrets scoping, policy gates (stg → canary → prod).

flowchart LR
subgraph In["Inputs (from Strategy/Compiler)"]
DAG["DAG Spec<br/>(JSON/YAML)"]
CFG["Runtime Params<br/>(env, resources, SLOs)"]
ART["Artifacts<br/>(images, models, wheels)"]
DATA["Data/Feature Refs"]
end
subgraph Orch["Orchestration Layer"]
Q["Job Queue<br/>(Kafka/Argo Events)"]
WF["Workflow Engine<br/>(Argo / Airflow)"]
TMP["Job Templates<br/>(K8s CRDs • RayJobs • Helm)"]
CL["Compute Pools<br/>(CPU • GPU • Spot • Ray • Spark)"]
SCL["Autoscaling<br/>(KEDA • Cluster Autoscaler)"]
SCH["Scheduling Policy<br/>(cost • latency • locality • quota)"]
end
subgraph Out["Outputs"]
RPT["Run Reports<br/>(metrics.json • artifacts)"]
LLG["Logs & Traces<br/>(Loki • Tempo/Jaeger)"]
LNG["Lineage<br/>(OpenLineage/Marquez)"]
end
DAG --> Q --> WF --> TMP --> CL
CFG --> TMP
ART --> TMP
DATA --> TMP
WF --> SCL
WF --> SCH
CL --> RPT
CL --> LLG
CL --> LNG

Minimal example (YAML):

version: 1
pipeline_id: bt_xs_bilstm_v1
slo:
max_runtime: "2h"
retry: {max_attempts: 2, backoff_sec: 60}
priority: normal
qos: spot_ok
resources:
default: {cpu: "4", mem: "16Gi"}
nodes:
- id: fetch_data
kind: trino_sql
params: {sql_uri: "s3://queries/bt_fetch.sql"}
outs: [dataset_uri]
- id: compute_features
kind: python_task
image: "ghcr.io/archetype/feature-kit:0.3.2"
env: {FEATURE_VIEW: xs_v2}
ins: [dataset_uri]
outs: [feature_table]
- id: train_model
kind: ray_job
image: "ghcr.io/archetype/trainers:0.8.1"
params: {trainer: bilstm, epochs: 30}
resources: {gpu: 1, cpu: "8", mem: "32Gi"}
ins: [feature_table]
outs: [model_uri, metrics_json]
edges:
- {from: fetch_data, to: compute_features}
- {from: compute_features, to: train_model}
{
"submission_id": "sub-2025-10-29-001",
"env": "staging",
"labels": {"project": "xs", "owner": "research"},
"secrets_scope": ["binance-readonly", "s3-lake-ro"],
"priority": "high",
"affinity": {"compute_pool": "gpu_spot"},
"ttl_days": 14
}
  • python_task (containerized Python entrypoint)
  • trino_sql / spark_sql (SQL batch transforms)
  • ray_job (distributed training/inference)
  • flink_job / materialize_view (streaming features)
  • custom_crd (extensible CRD for specialized compute)

  • Cost-aware: prefer Spot for batch; on-demand for low-latency inference.
  • Data locality: co-locate jobs with Iceberg/ClickHouse shards when possible.
  • GPU packing: binpack by GPU slice; prevent head-of-line blocking.
  • Quotas: per-team CPU/GPU/IO budgets; burst with priority credits.
  • SLO routing: high-priority jobs preempt best-effort queues.
  • KEDA: scale deployments/Jobs from Redpanda/Kafka lag, queue length, or custom metrics.
  • Cluster Autoscaler: add/remove nodes (CPU/GPU/Spot) per node group.
  • Ray Autoscaler: scale Ray clusters by pending tasks/placement groups.

  • Secrets: HashiCorp Vault + External Secrets; short-lived tokens; per-job scope.
  • Network: namespace-level policies; egress allow-lists to data and registries only.
  • Runtime: read-only rootFS, non-root UIDs, seccomp/apparmor profiles.
  • Governance: promotion gates (staging → canary → prod) via OPA/Conftest policies:
    • Model card present, metrics ≥ thresholds, max DD within bounds, reproducibility proof.
  • Audit: every submission emits an immutable audit record (ClickHouse table), linked by submission_id.

  • Metrics: Prometheus (job runtime, queue lag, GPU/CPU utilization, spot interruption rate).
  • Logs: Loki, indexed by submission_id, pipeline_id, node_id.
  • Tracing: OpenTelemetry → Tempo/Jaeger. Parent span = submission; child spans = nodes.
  • Lineage: OpenLineage/Marquez; edges map 1:1 to DAG; outputs record Iceberg snapshot IDs and artifact hashes.


POST /api/orch/submit
Body: { dag_spec, runtime_params, refs }
→ 202 Accepted { submission_id }
GET /api/orch/status/{submission_id}
GET /api/orch/logs/{submission_id}?node=train_model
GET /api/orch/lineage/{submission_id}
  • Models: s3://artifacts/models/{pipeline_id}/{version}/model.bin
  • Metrics: s3://artifacts/reports/{submission_id}/metrics.json
  • Lineage: OpenLineage event with dataset version (Iceberg snapshot).

infra/
├─ modules/
│ ├─ argo-workflows/ # Helm install + RBAC + SSO (optional)
│ ├─ redpanda/ # Event bus (or MSK alternative)
│ ├─ keda/ # Event-based autoscaling
│ ├─ cluster-autoscaler/ # Node group autoscaling
│ ├─ ray-operator/ # KubeRay operator + sample clusters
│ ├─ ray-serve/ # Inference gateway deployments
│ ├─ materialize/ # Streaming SQL engine
│ ├─ registry/ # Postgres API for plugins/models/strategies
│ ├─ oci-registry/ # Harbor/OCI for images & model artifacts
│ ├─ vault/ # Secrets mgmt
│ ├─ external-secrets/ # Vault/KMS → K8s secret sync
│ ├─ observability/ # Prometheus/Grafana/Loki/Tempo
│ └─ policy-engine/ # OPA/Gatekeeper + Conftest jobs
└─ envs/{dev,prod}/main.tf

Example: Argo (Helm)

resource "helm_release" "argo" {
name = "argo-workflows"
repository = "https://argoproj.github.io/argo-helm"
chart = "argo-workflows"
namespace = "orchestr"
create_namespace = true
values = [yamlencode({
controller = { replicas = 2 }
server = { insecure = true, extraArgs = ["--auth-mode=server"] }
workflow = { serviceAccount = "argo-workflow" }
})]
}

Example: Ray Cluster (CRD)

apiVersion: ray.io/v1
kind: RayCluster
metadata: { name: ray-gpu, namespace: compute }
spec:
headGroupSpec:
template:
spec:
containers:
- name: ray-head
image: rayproject/ray:2.34.0
resources: { requests: { cpu: "4", memory: "8Gi" } }
workerGroupSpecs:
- replicas: 3
groupName: gpu
template:
spec:
containers:
- name: ray-worker
image: rayproject/ray:2.34.0
resources:
limits: { nvidia.com/gpu: 1, cpu: "8", memory: "32Gi" }
requests: { nvidia.com/gpu: 1, cpu: "8", memory: "32Gi" }

Example: KEDA Scaler (queue-driven)

apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata: { name: orch-wf-scaler, namespace: orchestr }
spec:
scaleTargetRef: { name: argo-workflows-server }
triggers:
- type: kafka
metadata:
bootstrapServers: redpanda.kafka.svc:9092
topic: orch.jobs
lagThreshold: "1000"


11) SLOs, Quotas & Policies (suggested defaults)

Section titled “11) SLOs, Quotas & Policies (suggested defaults)”
  • Backtest/Train: queue wait P95 ≤ 2 min; cost target: Spot >70%.
  • Inference: p99 latency ≤ 150 ms (in-cluster); autoscale within 30 sec.
  • Data locality: Iceberg scan jobs must run in same AZ or VPC endpoint.
  • Quota: per-team GPU hours/day; burst credit with approval.
  • Promotion: Require model card + metrics + reproducible artifact hash.

  • Workflow: Argo Workflows

  • Queue: Redpanda (Kafka-compatible)

  • Compute: Ray (training, inference), Materialize (streaming SQL)

  • Autoscaling: KEDA + Cluster Autoscaler (separate node groups for CPU/GPU/Spot)

  • Secrets: Vault + External Secrets + KMS

  • Lineage & Observability: OpenLineage + Prometheus + Loki + Tempo

  • APIs: gRPC/REST for submission/status; Arrow Flight for data/feature access Why this is optimal

  • Minimal ops, strong extensibility; Python-first; K8s-native; aligns perfectly with your Data/Feature architecture and Execution layer interfaces.