Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save NotWadeGrimridge/98963c6003c4d41009cb3441977753ef to your computer and use it in GitHub Desktop.

Select an option

Save NotWadeGrimridge/98963c6003c4d41009cb3441977753ef to your computer and use it in GitHub Desktop.
toggle sse patch for codex
From d293b67993b1d020667e3a1d665409440f393f43 Mon Sep 17 00:00:00 2001
From: NotWadeGrimridge <wade@6942020.xyz>
Date: Sat, 23 Aug 2025 18:17:51 +0530
Subject: [PATCH] feat: allow toggling sse for responses
Signed-off-by: NotWadeGrimridge <wade@6942020.xyz>
---
codex-rs/core/src/client.rs | 153 ++++++++++++++++--
codex-rs/core/src/config.rs | 12 ++
codex-rs/exec/src/lib.rs | 1 +
.../mcp-server/src/codex_message_processor.rs | 1 +
codex-rs/mcp-server/src/codex_tool_config.rs | 1 +
codex-rs/tui/src/lib.rs | 1 +
6 files changed, 159 insertions(+), 10 deletions(-)
diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs
index b7bfeaa7..9c493022 100644
--- a/codex-rs/core/src/client.rs
+++ b/codex-rs/core/src/client.rs
@@ -199,7 +199,7 @@ impl ModelClient {
parallel_tool_calls: false,
reasoning,
store,
- stream: true,
+ stream: !self.config.disable_streaming,
include,
prompt_cache_key: Some(self.session_id.to_string()),
text,
@@ -224,11 +224,15 @@ impl ModelClient {
.provider
.create_request_builder(&self.client, &auth)
.await?;
-
+ let accept = if self.config.disable_streaming {
+ "application/json"
+ } else {
+ "text/event-stream"
+ };
req_builder = req_builder
.header("OpenAI-Beta", "responses=experimental")
.header("session_id", self.session_id.to_string())
- .header(reqwest::header::ACCEPT, "text/event-stream")
+ .header(reqwest::header::ACCEPT, accept)
.json(&payload);
if let Some(auth) = auth.as_ref()
@@ -258,13 +262,17 @@ impl ModelClient {
Ok(resp) if resp.status().is_success() => {
let (tx_event, rx_event) = mpsc::channel::<Result<ResponseEvent>>(1600);
- // spawn task to process SSE
- let stream = resp.bytes_stream().map_err(CodexErr::Reqwest);
- tokio::spawn(process_sse(
- stream,
- tx_event,
- self.provider.stream_idle_timeout(),
- ));
+ // spawn task to process response
+ if self.config.disable_streaming {
+ tokio::spawn(handle_non_streamed_response(resp, tx_event));
+ } else {
+ let stream = resp.bytes_stream().map_err(CodexErr::Reqwest);
+ tokio::spawn(process_sse(
+ stream,
+ tx_event,
+ self.provider.stream_idle_timeout(),
+ ));
+ }
return Ok(ResponseStream { rx_event });
}
@@ -414,6 +422,13 @@ impl From<ResponseCompletedUsage> for TokenUsage {
}
}
+#[derive(Debug, Deserialize)]
+struct NonStreamedResponse {
+ id: String,
+ usage: Option<ResponseCompletedUsage>,
+ output: Vec<ResponseItem>,
+}
+
#[derive(Debug, Deserialize)]
struct ResponseCompletedInputTokensDetails {
cached_tokens: u64,
@@ -424,6 +439,50 @@ struct ResponseCompletedOutputTokensDetails {
reasoning_tokens: u64,
}
+async fn send_non_streamed_events(
+ response: NonStreamedResponse,
+ tx_event: mpsc::Sender<Result<ResponseEvent>>,
+) {
+ // Send Created
+ if tx_event.send(Ok(ResponseEvent::Created {})).await.is_err() {
+ return;
+ }
+ // Forward each output item
+ for item in response.output {
+ if tx_event
+ .send(Ok(ResponseEvent::OutputItemDone(item)))
+ .await
+ .is_err()
+ {
+ return;
+ }
+ }
+ // Send Completed
+ let _ = tx_event
+ .send(Ok(ResponseEvent::Completed {
+ response_id: response.id,
+ token_usage: response.usage.map(Into::into),
+ }))
+ .await;
+}
+
+async fn handle_non_streamed_response(
+ resp: reqwest::Response,
+ tx_event: mpsc::Sender<Result<ResponseEvent>>,
+) {
+ match resp.json::<NonStreamedResponse>().await {
+ Ok(response) => send_non_streamed_events(response, tx_event).await,
+ Err(e) => {
+ let _ = tx_event
+ .send(Err(CodexErr::Stream(
+ format!("failed to parse non-streamed response: {e}"),
+ None,
+ )))
+ .await;
+ }
+ };
+}
+
async fn process_sse<S>(
stream: S,
tx_event: mpsc::Sender<Result<ResponseEvent>>,
@@ -975,4 +1034,78 @@ mod tests {
);
}
}
+
+ #[tokio::test]
+ async fn non_streamed_response_sends_events_and_usage() {
+ use codex_protocol::models::ContentItem;
+ // Build a minimal non-streamed response with two assistant messages and usage.
+ let response = NonStreamedResponse {
+ id: "resp_non_stream".to_string(),
+ usage: Some(ResponseCompletedUsage {
+ input_tokens: 10,
+ input_tokens_details: Some(ResponseCompletedInputTokensDetails {
+ cached_tokens: 4,
+ }),
+ output_tokens: 6,
+ output_tokens_details: Some(ResponseCompletedOutputTokensDetails {
+ reasoning_tokens: 2,
+ }),
+ total_tokens: 16,
+ }),
+ output: vec![
+ ResponseItem::Message {
+ id: None,
+ role: "assistant".to_string(),
+ content: vec![ContentItem::OutputText {
+ text: "Hello".to_string(),
+ }],
+ },
+ ResponseItem::Message {
+ id: None,
+ role: "assistant".to_string(),
+ content: vec![ContentItem::OutputText {
+ text: "World".to_string(),
+ }],
+ },
+ ],
+ };
+
+ let (tx, mut rx) = mpsc::channel::<Result<ResponseEvent>>(8);
+ send_non_streamed_events(response, tx).await;
+
+ let mut events = Vec::new();
+ while let Some(ev) = rx.recv().await {
+ events.push(ev);
+ }
+
+ assert_eq!(
+ events.len(),
+ 4,
+ "expected Created, two OutputItemDone, Completed"
+ );
+ assert!(matches!(events[0], Ok(ResponseEvent::Created)));
+ assert!(matches!(
+ &events[1],
+ Ok(ResponseEvent::OutputItemDone(ResponseItem::Message { role, .. })) if role == "assistant"
+ ));
+ assert!(matches!(
+ &events[2],
+ Ok(ResponseEvent::OutputItemDone(ResponseItem::Message { role, .. })) if role == "assistant"
+ ));
+ match &events[3] {
+ Ok(ResponseEvent::Completed {
+ response_id,
+ token_usage,
+ }) => {
+ assert_eq!(response_id, "resp_non_stream");
+ let usage = token_usage.as_ref().expect("usage should be present");
+ assert_eq!(usage.input_tokens, 10);
+ assert_eq!(usage.cached_input_tokens, Some(4));
+ assert_eq!(usage.output_tokens, 6);
+ assert_eq!(usage.reasoning_output_tokens, Some(2));
+ assert_eq!(usage.total_tokens, 16);
+ }
+ other => panic!("unexpected final event: {other:?}"),
+ }
+ }
}
diff --git a/codex-rs/core/src/config.rs b/codex-rs/core/src/config.rs
index 7845f5d4..890ed1c2 100644
--- a/codex-rs/core/src/config.rs
+++ b/codex-rs/core/src/config.rs
@@ -75,6 +75,9 @@ pub struct Config {
/// Defaults to `false`.
pub show_raw_agent_reasoning: bool,
+ /// When `true`, streaming responses are disabled.
+ pub disable_streaming: bool,
+
/// Disable server-side response storage (sends the full conversation
/// context with every request). Currently necessary for OpenAI customers
/// who have opted into Zero Data Retention (ZDR).
@@ -414,6 +417,9 @@ pub struct ConfigToml {
/// Sandbox configuration to apply if `sandbox` is `WorkspaceWrite`.
pub sandbox_workspace_write: Option<SandboxWorkspaceWrite>,
+ /// When `true`, streaming responses are disabled.
+ pub disable_streaming: Option<bool>,
+
/// Disable server-side response storage (sends the full conversation
/// context with every request). Currently necessary for OpenAI customers
/// who have opted into Zero Data Retention (ZDR).
@@ -602,6 +608,7 @@ pub struct ConfigOverrides {
pub include_plan_tool: Option<bool>,
pub include_apply_patch_tool: Option<bool>,
pub include_view_image_tool: Option<bool>,
+ pub disable_streaming: Option<bool>,
pub disable_response_storage: Option<bool>,
pub show_raw_agent_reasoning: Option<bool>,
pub tools_web_search_request: Option<bool>,
@@ -630,6 +637,7 @@ impl Config {
include_plan_tool,
include_apply_patch_tool,
include_view_image_tool,
+ disable_streaming,
disable_response_storage,
show_raw_agent_reasoning,
tools_web_search_request: override_tools_web_search_request,
@@ -760,6 +768,7 @@ impl Config {
.unwrap_or_else(AskForApproval::default),
sandbox_policy,
shell_environment_policy,
+ disable_streaming: disable_streaming.or(cfg.disable_streaming).unwrap_or(false),
disable_response_storage: config_profile
.disable_response_storage
.or(cfg.disable_response_storage)
@@ -1149,6 +1158,7 @@ disable_response_storage = true
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::new_read_only_policy(),
shell_environment_policy: ShellEnvironmentPolicy::default(),
+ disable_streaming: false,
disable_response_storage: false,
user_instructions: None,
notify: None,
@@ -1207,6 +1217,7 @@ disable_response_storage = true
approval_policy: AskForApproval::UnlessTrusted,
sandbox_policy: SandboxPolicy::new_read_only_policy(),
shell_environment_policy: ShellEnvironmentPolicy::default(),
+ disable_streaming: false,
disable_response_storage: false,
user_instructions: None,
notify: None,
@@ -1280,6 +1291,7 @@ disable_response_storage = true
approval_policy: AskForApproval::OnFailure,
sandbox_policy: SandboxPolicy::new_read_only_policy(),
shell_environment_policy: ShellEnvironmentPolicy::default(),
+ disable_streaming: false,
disable_response_storage: true,
user_instructions: None,
notify: None,
diff --git a/codex-rs/exec/src/lib.rs b/codex-rs/exec/src/lib.rs
index 785272a6..90d6d5ea 100644
--- a/codex-rs/exec/src/lib.rs
+++ b/codex-rs/exec/src/lib.rs
@@ -149,6 +149,7 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
include_plan_tool: None,
include_apply_patch_tool: None,
include_view_image_tool: None,
+ disable_streaming: None,
disable_response_storage: oss.then_some(true),
show_raw_agent_reasoning: oss.then_some(true),
tools_web_search_request: None,
diff --git a/codex-rs/mcp-server/src/codex_message_processor.rs b/codex-rs/mcp-server/src/codex_message_processor.rs
index aae463ad..c539acbc 100644
--- a/codex-rs/mcp-server/src/codex_message_processor.rs
+++ b/codex-rs/mcp-server/src/codex_message_processor.rs
@@ -799,6 +799,7 @@ fn derive_config_from_params(
include_plan_tool,
include_apply_patch_tool,
include_view_image_tool: None,
+ disable_streaming: None,
disable_response_storage: None,
show_raw_agent_reasoning: None,
tools_web_search_request: None,
diff --git a/codex-rs/mcp-server/src/codex_tool_config.rs b/codex-rs/mcp-server/src/codex_tool_config.rs
index c29cb52c..5001cf88 100644
--- a/codex-rs/mcp-server/src/codex_tool_config.rs
+++ b/codex-rs/mcp-server/src/codex_tool_config.rs
@@ -162,6 +162,7 @@ impl CodexToolCallParam {
include_plan_tool,
include_apply_patch_tool: None,
include_view_image_tool: None,
+ disable_streaming: None,
disable_response_storage: None,
show_raw_agent_reasoning: None,
tools_web_search_request: None,
diff --git a/codex-rs/tui/src/lib.rs b/codex-rs/tui/src/lib.rs
index 544aec27..da5777d8 100644
--- a/codex-rs/tui/src/lib.rs
+++ b/codex-rs/tui/src/lib.rs
@@ -127,6 +127,7 @@ pub async fn run_main(
include_plan_tool: Some(true),
include_apply_patch_tool: None,
include_view_image_tool: None,
+ disable_streaming: None,
disable_response_storage: cli.oss.then_some(true),
show_raw_agent_reasoning: cli.oss.then_some(true),
tools_web_search_request: cli.web_search.then_some(true),
--
2.51.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment