Skip to content

Instantly share code, notes, and snippets.

@dsfaccini
Created February 11, 2026 19:00
Show Gist options
  • Select an option

  • Save dsfaccini/e0db94c8362743fb842137c090ba6123 to your computer and use it in GitHub Desktop.

Select an option

Save dsfaccini/e0db94c8362743fb842137c090ba6123 to your computer and use it in GitHub Desktop.
Cloned mastra repo and tasked opus to study observational memory and explain how it works

Observational Memory — Full Code Flow

What it is

Observational Memory (OM) is a two-stage compression system that gives agents unlimited conversation length. As messages accumulate beyond a token threshold, an Observer agent extracts structured observations. When observations themselves grow too large, a Reflector agent compresses them further. The main agent (the "Actor") only ever sees: compressed observations + recent unobserved messages.

Three agents are involved:

  • Actor — the main agent the user talks to
  • Observer — extracts observations from raw messages (default: gemini-2.5-flash, temp 0.3)
  • Reflector — compresses observations when they get too large (default: gemini-2.5-flash, temp 0)

Architecture: Processor-based

OM is implemented as a Processor<'observational-memory'> — not part of the Agent class itself. It hooks into the agent's generate/stream loop via two methods:

  • processInputStep() — runs BEFORE each LLM call (every step of multi-step execution)
  • processOutputResult() — runs AFTER the full agent turn completes

Registration: when Memory is configured with observationalMemory: true, the Memory class adds ObservationalMemory as both an input and output processor.

Key files:

  • packages/memory/src/processors/observational-memory/observational-memory.ts — main processor (~4900 lines)
  • packages/memory/src/processors/observational-memory/observer-agent.ts — observer prompts + parsing
  • packages/memory/src/processors/observational-memory/reflector-agent.ts — reflector prompts + parsing
  • packages/memory/src/processors/observational-memory/types.ts — all OM types and streamed data parts
  • packages/memory/src/processors/observational-memory/token-counter.ts — token counting
  • packages/core/src/memory/types.ts — config types, isObservationalMemoryEnabled()
  • packages/core/src/storage/types.tsObservationalMemoryRecord, storage interface
  • packages/core/src/agent/workflows/prepare-stream/prepare-memory-step.ts — initializes memory context in RequestContext

Code flow: processInputStep() (line 3039)

Called before every LLM call. The numbered steps map directly to the code comments in the file.

Step 1: Load historical messages (step 0 only)

loadHistoricalMessagesIfNeeded() — fetches messages from storage using lastObservedAt as a cursor. Only loads messages created AFTER the last observation timestamp (cursor-based, not "load all"). Adds them to messageList with tag 'memory'.

Step 1b: Load other threads' context (resource scope only)

loadOtherThreadsContext() (line 3068) — when scope: 'resource', loads unobserved messages from OTHER threads belonging to the same resourceId. Formatted as <other-conversation> blocks. This is how OM shares memory across threads for the same user.

Step 1c: Activate buffered observations (step 0 only)

tryActivateBufferedObservations() (line 3077) — if async buffering produced pre-computed observation chunks, and the message token count has reached the threshold, moves buffered chunks into activeObservations. This is instant (no LLM call needed), which is the whole point of buffering.

Step 1d: Maybe reflect (step 0 only)

maybeReflect() — if observation tokens exceed the reflection threshold (default 40k), triggers the Reflector agent to compress observations. Creates a new ObservationalMemoryRecord with incremented generationCount.

Step 2: Check threshold + handle observation

When totalPendingTokens >= messageTokens threshold (default 30k):

  1. handleThresholdReached() (line ~2583) is called
  2. Loads unobserved messages via loadUnobservedMessages() (line 3491)
  3. Calls the Observer agent with buildObserverPrompt() passing those messages
  4. Observer returns { observations, suggestedContinuation }
  5. Saves to record.activeObservations via updateActiveObservations()
  6. Advances lastObservedAt cursor
  7. Inserts observation markers (data-om-observation-start/end) into message parts

Per-step save happens here too — handlePerStepSave() (line 2857) persists messages incrementally even when threshold isn't reached, preventing data loss on interruption.

Step 3: Inject observations into context

injectObservationsIntoContext() (line 2885):

  1. Reads activeObservations from the record
  2. Reads currentTask and suggestedResponse from thread metadata (set by previous Observer calls)
  3. Calls formatObservationsForContext() which wraps observations in <observations> XML + adds instructions about how to use them
  4. Adds temporal annotations via addRelativeTimeToObservations() — converts "Date: May 15, 2023" headers to "Date: May 15, 2023 (2 weeks ago)" and marks planned actions that are now past-due
  5. Clears any existing observation system message, adds fresh one: messageList.addSystem(observationSystemMessage, 'observational-memory') (line 2919)
  6. Adds a continuation reminder as a fake user message (id: om-continuation, line 2922) that tells the actor "the conversation got too long, your memories are above, continue naturally"

Step 4: Filter already-observed messages (step 0 only)

filterAlreadyObservedMessages() (line 2950):

Two strategies:

  1. Marker-based (primary): scans messages for data-om-observation-end markers, removes everything before the last completed marker. The marker message itself is split — observed parts removed, unobserved parts kept.
  2. Timestamp-based (fallback): uses lastObservedAt cursor + observedMessageIds set. Used when no markers exist (e.g. after buffered activation).

Step 5: Emit status

Streams a DataOmStatusPart with token counts, thresholds, buffer state — for UI feedback.

Code flow: processOutputResult() (line 3399)

Runs after the full agent turn. Simple job: save any messages that weren't saved during per-step saves (e.g. the final assistant response). Uses saveMessagesWithSealedIdTracking() (line 3453) which regenerates IDs for messages that already have observation markers (sealed) to prevent overwriting them in storage.

What the LLM actually sees

After processInputStep(), the messageList sent to the LLM contains:

[system message tagged 'observational-memory']
  → "## Observations from Previous Sessions\n<observations>...\n</observations>"
  → + <current-task> block (if set by previous Observer)
  → + <suggested-response> block (if set)
  → + other-threads context blocks (if resource scope)

[fake user message id='om-continuation']
  → "conversation grew too long, continue from your memories naturally"

[recent unobserved messages]
  → actual conversation the LLM hasn't seen summarized yet

Observer prompt structure

observer-agent.ts builds the prompt with:

  1. Extraction instructions — 3 variants toggled by env vars:

    • CURRENT_OBSERVER_EXTRACTION_INSTRUCTIONS (default, ~200 lines) — detailed rules for assertions vs questions, temporal anchoring, state changes, detail preservation
    • LEGACY_OBSERVER_EXTRACTION_INSTRUCTIONS — older version for A/B testing (OM_USE_LEGACY_PROMPT=1)
    • CONDENSED_OBSERVER_EXTRACTION_INSTRUCTIONS — principle-based, ~45 lines (OM_USE_CONDENSED_PROMPT=1)
  2. Output format — XML structure:

    <observations>
    Date: Dec 4, 2025
    * 🔴 (14:30) User stated prefers direct answers
    * 🟡 (14:31) User asked about feature X
    </observations>
    <current-task>What the user is working on</current-task>
    <suggested-response>Hint for the actor's next message</suggested-response>
    

    Emoji priorities: 🔴 = user assertions (authoritative), 🟡 = questions/requests, 🟢 = uncertain

  3. Messages to observe — formatted via formatMessagesForObserver(), passed as user message

Parsing: parseObserverOutput() extracts the three XML blocks. optimizeObservationsForContext() trims if needed.

Reflector prompt structure

reflector-agent.ts — receives the Observer's extraction instructions (so it understands the format) plus compression guidance:

  • Level 0: no compression hint (first attempt)
  • Level 1: gentle compression ("slightly more")
  • Level 2: aggressive compression (if level 1 didn't reduce enough)

validateCompression() checks the output is actually smaller. If not, retries with higher compression level.

Key instruction: reflections become the ENTIRE memory — any detail not preserved is permanently forgotten. Condense older observations more aggressively, keep recent ones detailed.

Async buffering system

Enabled by default (bufferTokens: 0.2 = every 20% of threshold).

Flow:

  1. At 20% of threshold (6k tokens by default), Observer runs async in background → result stored in bufferedObservationChunks
  2. At 40%, 60%, 80% — more chunks buffered
  3. When 100% threshold reached → tryActivateBufferedObservations() moves chunks to activeObservations instantly. No blocking LLM call needed.
  4. bufferActivation: 0.8 means only 80% of buffered content activates, keeping 20% in reserve for continuity.

Same pattern for reflection: reflection.bufferActivation: 0.5 triggers async reflection at 50% of reflection threshold.

Stream markers (DataOmBufferingStartPart, DataOmBufferingEndPart, DataOmActivationPart) provide real-time UI feedback.

Storage layer

ObservationalMemoryRecord persists:

  • activeObservations — current observation text
  • bufferedObservationChunks — pre-computed chunks pending activation
  • bufferedReflection — pre-computed reflection pending activation
  • lastObservedAt — cursor timestamp
  • observedMessageIds — set of sealed message IDs
  • pendingMessageTokens, observationTokenCount — token tracking
  • isObserving, isReflecting, isBufferingObservation, isBufferingReflection — state flags (with stale flag detection via process-level activeOps set)
  • generationCount — incremented on each reflection
  • scope'thread' or 'resource'

Storage methods: getObservationalMemory(), initializeObservationalMemory(), updateActiveObservations(), updateBufferedObservations(), createReflectionGeneration(), swapBufferedObservations(), setObservingFlag(), setReflectingFlag()

Scope modes

  • thread (default): observations are per-thread. Each thread has its own OM record.
  • resource: observations are per-user across all threads. One OM record per resourceId. Other threads' unobserved messages are loaded via loadOtherThreadsContext() and formatted as <thread id="..."> sections within observations.

Config

// Minimal
memory: { storage: pgStore, options: { observationalMemory: true } }

// Full
memory: {
  storage: pgStore,
  options: {
    observationalMemory: {
      scope: 'resource',
      model: 'google/gemini-2.5-flash',
      observation: {
        messageTokens: 20_000,      // trigger observation at 20k tokens
        bufferTokens: 0.25,         // async buffer every 25% of threshold
        bufferActivation: 0.75,     // activate 75% of buffer
        blockAfter: 1.5,            // force sync observation at 1.5x threshold
      },
      reflection: {
        observationTokens: 90_000,  // trigger reflection at 90k observation tokens
        bufferActivation: 0.4,      // async reflect at 40% of threshold
        blockAfter: 1.5,
      },
    },
  },
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment