Infrastructure
Oculus Prime Flow Graph

🌐 Oculus Prime Flow Graph (Streams-Native)

Updated: February 20, 2026
Context: Complete end-to-end pipeline with all Sacred Orders
Scope: Full epistemic flow from acquisition to archival

🎯 Purpose

This document describes the canonical Oculus Prime flow after migration to Redis Streams.

Key constraints:

  1. Oculus Prime is the single edge gateway for H2M/M2M ingestion.
  2. Oculus Prime performs acquisition, normalization, and immutable persistence.
  3. Oculus Prime emits oculus_prime.evidence.created via StreamBus (legacy alias supported).
  4. Semantic enrichment starts downstream (Codex Hunters and beyond).

πŸ”„ End-to-End Flow

Zoom: πŸ’‘ Tip: Use mouse wheel + Ctrl to zoom
Zoom: πŸ’‘ Ctrl + Wheel to zoom | Drag to pan

Responsibility Boundary

Oculus Prime MUST:

  1. Create immutable Evidence Packs.
  2. Emit stream events and audit logs.
  3. Stay media-agnostic and domain-agnostic in acquisition behavior.

Oculus Prime MUST NOT:

  1. Run NER, embeddings, ontology mapping.
  2. Decide semantic relevance.
  3. Call downstream cognitive services directly.
  4. Read downstream cognitive tables.

πŸ“Š Pipeline Legend

Color Coding by Sacred Order

  • 🟒 Green (Perception): Oculus Prime + Codex Hunters β€” Acquisition & Discovery
  • πŸ”΅ Blue (Reason): Pattern Weavers β€” Ontology & Classification
  • 🟣 Purple (Memory): Memory Orders β€” Coherence & Sync
  • 🟠 Orange (Truth): Orthodoxy Wardens + Vault Keepers β€” Governance & Archival
  • πŸ”΅ Cyan (Discourse): Babel Gardens β€” Language Processing (parallel)

Component Types

  • πŸ—„οΈ Dark Blue Cylinders: PostgreSQL (structured storage)
  • πŸ”΄ Red Cylinders: Qdrant (vector embeddings)
  • πŸ”΄ Red Boxes: Redis Streams (StreamBus event channels)

🎯 Detailed Pipeline Flow

1️⃣ PERCEPTION Layer: Oculus Prime (Intake)

Responsibilities:

  • Accept file uploads from external sources (documents, images, audio, video, geo, CAD)
  • Extract literal text (NO semantic interpretation)
  • Create immutable Evidence Packs
  • Emit standardized events

PostgreSQL Tables:

  • evidence_packs β€” Append-only evidence storage
  • intake_event_log β€” Audit trail of all emitted events
  • intake_event_failures β€” Failed emission diagnostics

StreamBus Events Emitted:

  • oculus_prime.evidence.created (v2 canonical)
  • intake.evidence.created (v1 legacy alias, dual-write mode)

Event Payload:

{
  "event_id": "EVT-{UUID}",
  "evidence_id": "EVD-{UUID}",
  "chunk_id": "CHK-{N}",
  "source_type": "document|image|audio|video|stream|sensor",
  "source_uri": "/uploads/file.ext",
  "source_hash": "sha256:...",
  "byte_size": 1024000,
  "language_detected": "en",
  "sampling_policy_ref": "SAMPPOL-DOC-DEFAULT-V1"
}

2️⃣ PERCEPTION β†’ MEMORY: Codex Hunters (Discovery & Enrichment)

Responsibilities:

  • Consume evidence created events
  • Perform discovery (entity recognition)
  • Restore data (normalization)
  • Bind entities (relationship linking)
  • Generate embeddings for semantic search

Consumer Group: codex_hunters

Consumed Events:

  • oculus_prime.evidence.created (primary)
  • intake.evidence.created (legacy fallback)

Processing Pipeline:

  1. TrackerConsumer β€” Discovery pipeline (entity detection)
  2. RestorerConsumer β€” Data normalization & quality assessment
  3. BinderConsumer β€” Entity relationship binding

PostgreSQL Tables:

  • entities β€” Discovered entities with metadata
  • entity_metadata β€” Additional entity attributes

Qdrant Collections:

  • codex_entities β€” Vector embeddings for entities (via Embedding API)

StreamBus Events Emitted:

  • codex.entity.discovered β€” New entity found
  • codex.entity.restored β€” Data normalized
  • codex.entity.bound β€” Relationships established

3️⃣ REASON Layer: Pattern Weavers (Ontology Mapping)

Responsibilities:

  • Map entities to domain ontologies
  • Extract concepts and relationships
  • Classify entities into taxonomies
  • Enable semantic pattern matching

Consumer Group: pattern_weavers

Consumed Events:

  • codex.entity.discovered
  • codex.entity.bound

Processing Pipeline:

  1. WeaverConsumer β€” Ontology mapping & concept extraction
  2. KeywordMatcherConsumer β€” Pattern classification & keyword matching

PostgreSQL Tables:

  • patterns β€” Detected patterns and relationships
  • ontology_mappings β€” Entity-to-ontology bindings

Qdrant Collections:

  • pattern_taxonomy β€” Vector embeddings for patterns (via Embedding API)

StreamBus Events Emitted:

  • pattern.weave.response β€” Pattern weaving completed
  • pattern.taxonomy.updated β€” Taxonomy changes

4️⃣ DISCOURSE Layer: Babel Gardens (Language Processing β€” Parallel)

Responsibilities:

  • Language detection and translation
  • Emotion and sentiment analysis
  • Topic classification
  • Linguistic synthesis

Consumer Group: babel_gardens

Consumed Events (optional, parallel to Pattern Weavers):

  • codex.entity.discovered

Processing Pipeline:

  1. SynthesisConsumer β€” Language processing & synthesis
  2. TopicClassifierConsumer β€” Topic extraction
  3. LanguageDetectorConsumer β€” Language identification

Qdrant Collections:

  • babel_embeddings β€” Linguistic embeddings for multilingual semantic search

StreamBus Events Emitted:

  • babel.embedding.response β€” Embedding generation completed
  • babel.emotion.response β€” Emotion analysis result
  • babel.topic.response β€” Topic classification result

5️⃣ MEMORY Layer: Memory Orders (Coherence Monitor)

Responsibilities:

  • Monitor dual-memory coherence (PostgreSQL ↔ Qdrant)
  • Detect drift and synchronization issues
  • Aggregate system health metrics
  • Plan and execute sync operations

Processing Pipeline:

  1. CoherenceAnalyzer β€” Compare PostgreSQL vs Qdrant counts
  2. HealthAggregator β€” Collect health metrics from all components
  3. SyncPlanner β€” Generate drift resolution plans

PostgreSQL Tables:

  • coherence_reports β€” Coherence check audit trail
  • health_metrics β€” System health snapshots

StreamBus Events Emitted:

  • memory.coherence.checked β€” Coherence check completed
  • memory.health.checked β€” System health report
  • memory.sync.completed β€” Sync operation finished

Event Payload Example:

{
  "status": "healthy|warning|critical",
  "pg_count": 10000,
  "qdrant_count": 9950,
  "drift_percentage": 0.5,
  "drift_absolute": 50,
  "table": "entities",
  "collection": "codex_entities"
}

6️⃣ TRUTH Layer: Orthodoxy Wardens (Governance)

Responsibilities:

  • Data quality auditing
  • Policy compliance validation
  • Schema validation
  • Integrity enforcement

Processing Pipeline:

  1. QualityAuditor β€” Assess data quality metrics
  2. ComplianceValidator β€” Enforce governance policies

PostgreSQL Tables:

  • audit_logs β€” Comprehensive audit trail
  • compliance_reports β€” Policy violation tracking

StreamBus Events Emitted:

  • orthodoxy.audit.completed β€” Audit finished (consumed by Vault Keepers)

7️⃣ TRUTH Layer: Vault Keepers (Archival & Persistence)

Responsibilities:

  • Archive evidence and enriched metadata
  • Create state snapshots
  • Verify data integrity (hash validation)
  • Manage backup and recovery

Consumer Group: vault_keepers

Consumed Events:

  • orthodoxy.audit.completed β€” Audit events for archival
  • memory.sync.completed β€” Sync events for snapshot triggers

Processing Pipeline:

  1. ArchiveConsumer β€” Evidence archival
  2. SnapshotConsumer β€” State snapshot creation
  3. IntegrityConsumer β€” Hash verification & integrity checks

PostgreSQL Tables:

  • vault_archives β€” Archived evidence and metadata
  • snapshots β€” Point-in-time state snapshots
  • integrity_checks β€” Hash validation records

StreamBus Events Emitted:

  • vault.archive.completed β€” Archival finished
  • vault.snapshot.created β€” Snapshot saved
  • vault.integrity.validated β€” Integrity check passed

πŸ”‘ Key Integration Points

Who Embeds to Qdrant?

  1. Codex Hunters β†’ codex_entities collection (via Embedding API)
  2. Pattern Weavers β†’ pattern_taxonomy collection (via Embedding API)
  3. Babel Gardens β†’ babel_embeddings collection (direct QdrantAgent)

Who Logs to PostgreSQL?

  1. Oculus Prime β†’ evidence_packs, intake_event_log, intake_event_failures
  2. Codex Hunters β†’ entities, entity_metadata, entity_relations (ontological edges from FK data)
  3. Pattern Weavers β†’ patterns, ontology_mappings, entity_relations (ontological edges from LLM)
  4. Memory Orders β†’ coherence_reports, health_metrics
  5. Orthodoxy Wardens β†’ audit_logs, compliance_reports
  6. Vault Keepers β†’ vault_archives, snapshots, integrity_checks

Event Flow Summary

oculus_prime.evidence.created
  ↓
codex.entity.discovered β†’ codex.entity.restored β†’ codex.entity.bound
  ↓                                                    ↓
pattern.weave.response ← pattern.taxonomy.updated     babel.embedding.response
  ↓                                                    ↓
memory.coherence.checked β†’ memory.health.checked β†’ memory.sync.completed
  ↓                                                    ↓
orthodoxy.audit.completed ────────────────────────→ vault.archive.completed
                                                       ↓
                                            vault.snapshot.created
                                                       ↓
                                            vault.integrity.validated

πŸš€ Migration & Compatibility

Event Naming Migration (v1 β†’ v2)

  • v1 Legacy: intake.evidence.created
  • v2 Canonical: oculus_prime.evidence.created
  • Migration Mode: Controlled by OCULUS_PRIME_EVENT_MIGRATION_MODE env var
    • v1_only β€” Emit only legacy channel
    • v2_only β€” Emit only canonical channel
    • dual_write β€” Emit both (default for gradual rollout)

Consumer Configuration

Codex Hunters listeners support:

  • OCULUS_PRIME_EVENT_MIGRATION_MODE β€” Which channel(s) Oculus Prime emits
  • CODEX_OCULUS_CONSUME_LEGACY_ALIAS β€” Whether to consume legacy intake.* channel

πŸ“š References

  • Oculus Prime Implementation: /infrastructure/edge/oculus_prime/
  • Event Emitter: /infrastructure/edge/oculus_prime/core/event_emitter.py
  • Codex Hunters Consumer: /services/api_codex_hunters/streams_listener.py
  • Pattern Weavers Consumer: /services/api_pattern_weavers/adapters/bus_adapter.py
  • Memory Orders Coherence: /services/api_memory_orders/adapters/bus_adapter.py
  • Vault Keepers Archival: /services/api_vault_keepers/adapters/bus_adapter.py
  • StreamBus Transport: /vitruvyan_core/core/synaptic_conclave/transport/streams.py