π Oculus Prime Flow Graph (Streams-Native)
Updated: February 20, 2026
Context: Complete end-to-end pipeline with all Sacred Orders
Scope: Full epistemic flow from acquisition to archival
π― Purpose
This document describes the canonical Oculus Prime flow after migration to Redis Streams.
Key constraints:
- Oculus Prime is the single edge gateway for H2M/M2M ingestion.
- Oculus Prime performs acquisition, normalization, and immutable persistence.
- Oculus Prime emits
oculus_prime.evidence.createdvia StreamBus (legacy alias supported). - Semantic enrichment starts downstream (Codex Hunters and beyond).
π End-to-End Flow
Responsibility Boundary
Oculus Prime MUST:
- Create immutable Evidence Packs.
- Emit stream events and audit logs.
- Stay media-agnostic and domain-agnostic in acquisition behavior.
Oculus Prime MUST NOT:
- Run NER, embeddings, ontology mapping.
- Decide semantic relevance.
- Call downstream cognitive services directly.
- Read downstream cognitive tables.
π Pipeline Legend
Color Coding by Sacred Order
- π’ Green (Perception): Oculus Prime + Codex Hunters β Acquisition & Discovery
- π΅ Blue (Reason): Pattern Weavers β Ontology & Classification
- π£ Purple (Memory): Memory Orders β Coherence & Sync
- π Orange (Truth): Orthodoxy Wardens + Vault Keepers β Governance & Archival
- π΅ Cyan (Discourse): Babel Gardens β Language Processing (parallel)
Component Types
- ποΈ Dark Blue Cylinders: PostgreSQL (structured storage)
- π΄ Red Cylinders: Qdrant (vector embeddings)
- π΄ Red Boxes: Redis Streams (StreamBus event channels)
π― Detailed Pipeline Flow
1οΈβ£ PERCEPTION Layer: Oculus Prime (Intake)
Responsibilities:
- Accept file uploads from external sources (documents, images, audio, video, geo, CAD)
- Extract literal text (NO semantic interpretation)
- Create immutable Evidence Packs
- Emit standardized events
PostgreSQL Tables:
evidence_packsβ Append-only evidence storageintake_event_logβ Audit trail of all emitted eventsintake_event_failuresβ Failed emission diagnostics
StreamBus Events Emitted:
oculus_prime.evidence.created(v2 canonical)intake.evidence.created(v1 legacy alias, dual-write mode)
Event Payload:
{
"event_id": "EVT-{UUID}",
"evidence_id": "EVD-{UUID}",
"chunk_id": "CHK-{N}",
"source_type": "document|image|audio|video|stream|sensor",
"source_uri": "/uploads/file.ext",
"source_hash": "sha256:...",
"byte_size": 1024000,
"language_detected": "en",
"sampling_policy_ref": "SAMPPOL-DOC-DEFAULT-V1"
}2οΈβ£ PERCEPTION β MEMORY: Codex Hunters (Discovery & Enrichment)
Responsibilities:
- Consume evidence created events
- Perform discovery (entity recognition)
- Restore data (normalization)
- Bind entities (relationship linking)
- Generate embeddings for semantic search
Consumer Group: codex_hunters
Consumed Events:
oculus_prime.evidence.created(primary)intake.evidence.created(legacy fallback)
Processing Pipeline:
- TrackerConsumer β Discovery pipeline (entity detection)
- RestorerConsumer β Data normalization & quality assessment
- BinderConsumer β Entity relationship binding
PostgreSQL Tables:
entitiesβ Discovered entities with metadataentity_metadataβ Additional entity attributes
Qdrant Collections:
codex_entitiesβ Vector embeddings for entities (via Embedding API)
StreamBus Events Emitted:
codex.entity.discoveredβ New entity foundcodex.entity.restoredβ Data normalizedcodex.entity.boundβ Relationships established
3οΈβ£ REASON Layer: Pattern Weavers (Ontology Mapping)
Responsibilities:
- Map entities to domain ontologies
- Extract concepts and relationships
- Classify entities into taxonomies
- Enable semantic pattern matching
Consumer Group: pattern_weavers
Consumed Events:
codex.entity.discoveredcodex.entity.bound
Processing Pipeline:
- WeaverConsumer β Ontology mapping & concept extraction
- KeywordMatcherConsumer β Pattern classification & keyword matching
PostgreSQL Tables:
patternsβ Detected patterns and relationshipsontology_mappingsβ Entity-to-ontology bindings
Qdrant Collections:
pattern_taxonomyβ Vector embeddings for patterns (via Embedding API)
StreamBus Events Emitted:
pattern.weave.responseβ Pattern weaving completedpattern.taxonomy.updatedβ Taxonomy changes
4οΈβ£ DISCOURSE Layer: Babel Gardens (Language Processing β Parallel)
Responsibilities:
- Language detection and translation
- Emotion and sentiment analysis
- Topic classification
- Linguistic synthesis
Consumer Group: babel_gardens
Consumed Events (optional, parallel to Pattern Weavers):
codex.entity.discovered
Processing Pipeline:
- SynthesisConsumer β Language processing & synthesis
- TopicClassifierConsumer β Topic extraction
- LanguageDetectorConsumer β Language identification
Qdrant Collections:
babel_embeddingsβ Linguistic embeddings for multilingual semantic search
StreamBus Events Emitted:
babel.embedding.responseβ Embedding generation completedbabel.emotion.responseβ Emotion analysis resultbabel.topic.responseβ Topic classification result
5οΈβ£ MEMORY Layer: Memory Orders (Coherence Monitor)
Responsibilities:
- Monitor dual-memory coherence (PostgreSQL β Qdrant)
- Detect drift and synchronization issues
- Aggregate system health metrics
- Plan and execute sync operations
Processing Pipeline:
- CoherenceAnalyzer β Compare PostgreSQL vs Qdrant counts
- HealthAggregator β Collect health metrics from all components
- SyncPlanner β Generate drift resolution plans
PostgreSQL Tables:
coherence_reportsβ Coherence check audit trailhealth_metricsβ System health snapshots
StreamBus Events Emitted:
memory.coherence.checkedβ Coherence check completedmemory.health.checkedβ System health reportmemory.sync.completedβ Sync operation finished
Event Payload Example:
{
"status": "healthy|warning|critical",
"pg_count": 10000,
"qdrant_count": 9950,
"drift_percentage": 0.5,
"drift_absolute": 50,
"table": "entities",
"collection": "codex_entities"
}6οΈβ£ TRUTH Layer: Orthodoxy Wardens (Governance)
Responsibilities:
- Data quality auditing
- Policy compliance validation
- Schema validation
- Integrity enforcement
Processing Pipeline:
- QualityAuditor β Assess data quality metrics
- ComplianceValidator β Enforce governance policies
PostgreSQL Tables:
audit_logsβ Comprehensive audit trailcompliance_reportsβ Policy violation tracking
StreamBus Events Emitted:
orthodoxy.audit.completedβ Audit finished (consumed by Vault Keepers)
7οΈβ£ TRUTH Layer: Vault Keepers (Archival & Persistence)
Responsibilities:
- Archive evidence and enriched metadata
- Create state snapshots
- Verify data integrity (hash validation)
- Manage backup and recovery
Consumer Group: vault_keepers
Consumed Events:
orthodoxy.audit.completedβ Audit events for archivalmemory.sync.completedβ Sync events for snapshot triggers
Processing Pipeline:
- ArchiveConsumer β Evidence archival
- SnapshotConsumer β State snapshot creation
- IntegrityConsumer β Hash verification & integrity checks
PostgreSQL Tables:
vault_archivesβ Archived evidence and metadatasnapshotsβ Point-in-time state snapshotsintegrity_checksβ Hash validation records
StreamBus Events Emitted:
vault.archive.completedβ Archival finishedvault.snapshot.createdβ Snapshot savedvault.integrity.validatedβ Integrity check passed
π Key Integration Points
Who Embeds to Qdrant?
- Codex Hunters β
codex_entitiescollection (via Embedding API) - Pattern Weavers β
pattern_taxonomycollection (via Embedding API) - Babel Gardens β
babel_embeddingscollection (direct QdrantAgent)
Who Logs to PostgreSQL?
- Oculus Prime β
evidence_packs,intake_event_log,intake_event_failures - Codex Hunters β
entities,entity_metadata,entity_relations(ontological edges from FK data) - Pattern Weavers β
patterns,ontology_mappings,entity_relations(ontological edges from LLM) - Memory Orders β
coherence_reports,health_metrics - Orthodoxy Wardens β
audit_logs,compliance_reports - Vault Keepers β
vault_archives,snapshots,integrity_checks
Event Flow Summary
oculus_prime.evidence.created
β
codex.entity.discovered β codex.entity.restored β codex.entity.bound
β β
pattern.weave.response β pattern.taxonomy.updated babel.embedding.response
β β
memory.coherence.checked β memory.health.checked β memory.sync.completed
β β
orthodoxy.audit.completed βββββββββββββββββββββββββ vault.archive.completed
β
vault.snapshot.created
β
vault.integrity.validatedπ Migration & Compatibility
Event Naming Migration (v1 β v2)
- v1 Legacy:
intake.evidence.created - v2 Canonical:
oculus_prime.evidence.created - Migration Mode: Controlled by
OCULUS_PRIME_EVENT_MIGRATION_MODEenv varv1_onlyβ Emit only legacy channelv2_onlyβ Emit only canonical channeldual_writeβ Emit both (default for gradual rollout)
Consumer Configuration
Codex Hunters listeners support:
OCULUS_PRIME_EVENT_MIGRATION_MODEβ Which channel(s) Oculus Prime emitsCODEX_OCULUS_CONSUME_LEGACY_ALIASβ Whether to consume legacyintake.*channel
π References
- Oculus Prime Implementation:
/infrastructure/edge/oculus_prime/ - Event Emitter:
/infrastructure/edge/oculus_prime/core/event_emitter.py - Codex Hunters Consumer:
/services/api_codex_hunters/streams_listener.py - Pattern Weavers Consumer:
/services/api_pattern_weavers/adapters/bus_adapter.py - Memory Orders Coherence:
/services/api_memory_orders/adapters/bus_adapter.py - Vault Keepers Archival:
/services/api_vault_keepers/adapters/bus_adapter.py - StreamBus Transport:
/vitruvyan_core/core/synaptic_conclave/transport/streams.py