Observational Memory (OM) is a two-stage compression system that gives agents unlimited conversation length. As messages accumulate beyond a token threshold, an Observer agent extracts structured observations. When observations themselves grow too large, a Reflector agent compresses them further. The main agent (the "Actor") only ever sees: compressed observations + recent unobserved messages.
Three agents are involved:
- Actor — the main agent the user talks to
- Observer — extracts observations from raw messages (default: gemini-2.5-flash, temp 0.3)
- Reflector — compresses observations when they get too large (default: gemini-2.5-flash, temp 0)
OM is implemented as a Processor<'observational-memory'> — not part of the Agent class itself. It hooks into the agent's generate/stream loop via two methods:
processInputStep()— runs BEFORE each LLM call (every step of multi-step execution)processOutputResult()— runs AFTER the full agent turn completes
Registration: when Memory is configured with observationalMemory: true, the Memory class adds ObservationalMemory as both an input and output processor.
Key files:
packages/memory/src/processors/observational-memory/observational-memory.ts— main processor (~4900 lines)packages/memory/src/processors/observational-memory/observer-agent.ts— observer prompts + parsingpackages/memory/src/processors/observational-memory/reflector-agent.ts— reflector prompts + parsingpackages/memory/src/processors/observational-memory/types.ts— all OM types and streamed data partspackages/memory/src/processors/observational-memory/token-counter.ts— token countingpackages/core/src/memory/types.ts— config types,isObservationalMemoryEnabled()packages/core/src/storage/types.ts—ObservationalMemoryRecord, storage interfacepackages/core/src/agent/workflows/prepare-stream/prepare-memory-step.ts— initializes memory context in RequestContext
Called before every LLM call. The numbered steps map directly to the code comments in the file.
loadHistoricalMessagesIfNeeded() — fetches messages from storage using lastObservedAt as a cursor. Only loads messages created AFTER the last observation timestamp (cursor-based, not "load all"). Adds them to messageList with tag 'memory'.
loadOtherThreadsContext() (line 3068) — when scope: 'resource', loads unobserved messages from OTHER threads belonging to the same resourceId. Formatted as <other-conversation> blocks. This is how OM shares memory across threads for the same user.
tryActivateBufferedObservations() (line 3077) — if async buffering produced pre-computed observation chunks, and the message token count has reached the threshold, moves buffered chunks into activeObservations. This is instant (no LLM call needed), which is the whole point of buffering.
maybeReflect() — if observation tokens exceed the reflection threshold (default 40k), triggers the Reflector agent to compress observations. Creates a new ObservationalMemoryRecord with incremented generationCount.
When totalPendingTokens >= messageTokens threshold (default 30k):
handleThresholdReached()(line ~2583) is called- Loads unobserved messages via
loadUnobservedMessages()(line 3491) - Calls the Observer agent with
buildObserverPrompt()passing those messages - Observer returns
{ observations, suggestedContinuation } - Saves to
record.activeObservationsviaupdateActiveObservations() - Advances
lastObservedAtcursor - Inserts observation markers (
data-om-observation-start/end) into message parts
Per-step save happens here too — handlePerStepSave() (line 2857) persists messages incrementally even when threshold isn't reached, preventing data loss on interruption.
injectObservationsIntoContext() (line 2885):
- Reads
activeObservationsfrom the record - Reads
currentTaskandsuggestedResponsefrom thread metadata (set by previous Observer calls) - Calls
formatObservationsForContext()which wraps observations in<observations>XML + adds instructions about how to use them - Adds temporal annotations via
addRelativeTimeToObservations()— converts "Date: May 15, 2023" headers to "Date: May 15, 2023 (2 weeks ago)" and marks planned actions that are now past-due - Clears any existing observation system message, adds fresh one:
messageList.addSystem(observationSystemMessage, 'observational-memory')(line 2919) - Adds a continuation reminder as a fake user message (id:
om-continuation, line 2922) that tells the actor "the conversation got too long, your memories are above, continue naturally"
filterAlreadyObservedMessages() (line 2950):
Two strategies:
- Marker-based (primary): scans messages for
data-om-observation-endmarkers, removes everything before the last completed marker. The marker message itself is split — observed parts removed, unobserved parts kept. - Timestamp-based (fallback): uses
lastObservedAtcursor +observedMessageIdsset. Used when no markers exist (e.g. after buffered activation).
Streams a DataOmStatusPart with token counts, thresholds, buffer state — for UI feedback.
Runs after the full agent turn. Simple job: save any messages that weren't saved during per-step saves (e.g. the final assistant response). Uses saveMessagesWithSealedIdTracking() (line 3453) which regenerates IDs for messages that already have observation markers (sealed) to prevent overwriting them in storage.
After processInputStep(), the messageList sent to the LLM contains:
[system message tagged 'observational-memory']
→ "## Observations from Previous Sessions\n<observations>...\n</observations>"
→ + <current-task> block (if set by previous Observer)
→ + <suggested-response> block (if set)
→ + other-threads context blocks (if resource scope)
[fake user message id='om-continuation']
→ "conversation grew too long, continue from your memories naturally"
[recent unobserved messages]
→ actual conversation the LLM hasn't seen summarized yet
observer-agent.ts builds the prompt with:
-
Extraction instructions — 3 variants toggled by env vars:
CURRENT_OBSERVER_EXTRACTION_INSTRUCTIONS(default, ~200 lines) — detailed rules for assertions vs questions, temporal anchoring, state changes, detail preservationLEGACY_OBSERVER_EXTRACTION_INSTRUCTIONS— older version for A/B testing (OM_USE_LEGACY_PROMPT=1)CONDENSED_OBSERVER_EXTRACTION_INSTRUCTIONS— principle-based, ~45 lines (OM_USE_CONDENSED_PROMPT=1)
-
Output format — XML structure:
<observations> Date: Dec 4, 2025 * 🔴 (14:30) User stated prefers direct answers * 🟡 (14:31) User asked about feature X </observations> <current-task>What the user is working on</current-task> <suggested-response>Hint for the actor's next message</suggested-response>Emoji priorities: 🔴 = user assertions (authoritative), 🟡 = questions/requests, 🟢 = uncertain
-
Messages to observe — formatted via
formatMessagesForObserver(), passed as user message
Parsing: parseObserverOutput() extracts the three XML blocks. optimizeObservationsForContext() trims if needed.
reflector-agent.ts — receives the Observer's extraction instructions (so it understands the format) plus compression guidance:
- Level 0: no compression hint (first attempt)
- Level 1: gentle compression ("slightly more")
- Level 2: aggressive compression (if level 1 didn't reduce enough)
validateCompression() checks the output is actually smaller. If not, retries with higher compression level.
Key instruction: reflections become the ENTIRE memory — any detail not preserved is permanently forgotten. Condense older observations more aggressively, keep recent ones detailed.
Enabled by default (bufferTokens: 0.2 = every 20% of threshold).
Flow:
- At 20% of threshold (6k tokens by default), Observer runs async in background → result stored in
bufferedObservationChunks - At 40%, 60%, 80% — more chunks buffered
- When 100% threshold reached →
tryActivateBufferedObservations()moves chunks toactiveObservationsinstantly. No blocking LLM call needed. bufferActivation: 0.8means only 80% of buffered content activates, keeping 20% in reserve for continuity.
Same pattern for reflection: reflection.bufferActivation: 0.5 triggers async reflection at 50% of reflection threshold.
Stream markers (DataOmBufferingStartPart, DataOmBufferingEndPart, DataOmActivationPart) provide real-time UI feedback.
ObservationalMemoryRecord persists:
activeObservations— current observation textbufferedObservationChunks— pre-computed chunks pending activationbufferedReflection— pre-computed reflection pending activationlastObservedAt— cursor timestampobservedMessageIds— set of sealed message IDspendingMessageTokens,observationTokenCount— token trackingisObserving,isReflecting,isBufferingObservation,isBufferingReflection— state flags (with stale flag detection via process-levelactiveOpsset)generationCount— incremented on each reflectionscope—'thread'or'resource'
Storage methods: getObservationalMemory(), initializeObservationalMemory(), updateActiveObservations(), updateBufferedObservations(), createReflectionGeneration(), swapBufferedObservations(), setObservingFlag(), setReflectingFlag()
thread(default): observations are per-thread. Each thread has its own OM record.resource: observations are per-user across all threads. One OM record per resourceId. Other threads' unobserved messages are loaded vialoadOtherThreadsContext()and formatted as<thread id="...">sections within observations.
// Minimal
memory: { storage: pgStore, options: { observationalMemory: true } }
// Full
memory: {
storage: pgStore,
options: {
observationalMemory: {
scope: 'resource',
model: 'google/gemini-2.5-flash',
observation: {
messageTokens: 20_000, // trigger observation at 20k tokens
bufferTokens: 0.25, // async buffer every 25% of threshold
bufferActivation: 0.75, // activate 75% of buffer
blockAfter: 1.5, // force sync observation at 1.5x threshold
},
reflection: {
observationTokens: 90_000, // trigger reflection at 90k observation tokens
bufferActivation: 0.4, // async reflect at 40% of threshold
blockAfter: 1.5,
},
},
},
}