Created
September 1, 2025 12:12
-
-
Save NotWadeGrimridge/98963c6003c4d41009cb3441977753ef to your computer and use it in GitHub Desktop.
toggle sse patch for codex
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| From d293b67993b1d020667e3a1d665409440f393f43 Mon Sep 17 00:00:00 2001 | |
| From: NotWadeGrimridge <wade@6942020.xyz> | |
| Date: Sat, 23 Aug 2025 18:17:51 +0530 | |
| Subject: [PATCH] feat: allow toggling sse for responses | |
| Signed-off-by: NotWadeGrimridge <wade@6942020.xyz> | |
| --- | |
| codex-rs/core/src/client.rs | 153 ++++++++++++++++-- | |
| codex-rs/core/src/config.rs | 12 ++ | |
| codex-rs/exec/src/lib.rs | 1 + | |
| .../mcp-server/src/codex_message_processor.rs | 1 + | |
| codex-rs/mcp-server/src/codex_tool_config.rs | 1 + | |
| codex-rs/tui/src/lib.rs | 1 + | |
| 6 files changed, 159 insertions(+), 10 deletions(-) | |
| diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs | |
| index b7bfeaa7..9c493022 100644 | |
| --- a/codex-rs/core/src/client.rs | |
| +++ b/codex-rs/core/src/client.rs | |
| @@ -199,7 +199,7 @@ impl ModelClient { | |
| parallel_tool_calls: false, | |
| reasoning, | |
| store, | |
| - stream: true, | |
| + stream: !self.config.disable_streaming, | |
| include, | |
| prompt_cache_key: Some(self.session_id.to_string()), | |
| text, | |
| @@ -224,11 +224,15 @@ impl ModelClient { | |
| .provider | |
| .create_request_builder(&self.client, &auth) | |
| .await?; | |
| - | |
| + let accept = if self.config.disable_streaming { | |
| + "application/json" | |
| + } else { | |
| + "text/event-stream" | |
| + }; | |
| req_builder = req_builder | |
| .header("OpenAI-Beta", "responses=experimental") | |
| .header("session_id", self.session_id.to_string()) | |
| - .header(reqwest::header::ACCEPT, "text/event-stream") | |
| + .header(reqwest::header::ACCEPT, accept) | |
| .json(&payload); | |
| if let Some(auth) = auth.as_ref() | |
| @@ -258,13 +262,17 @@ impl ModelClient { | |
| Ok(resp) if resp.status().is_success() => { | |
| let (tx_event, rx_event) = mpsc::channel::<Result<ResponseEvent>>(1600); | |
| - // spawn task to process SSE | |
| - let stream = resp.bytes_stream().map_err(CodexErr::Reqwest); | |
| - tokio::spawn(process_sse( | |
| - stream, | |
| - tx_event, | |
| - self.provider.stream_idle_timeout(), | |
| - )); | |
| + // spawn task to process response | |
| + if self.config.disable_streaming { | |
| + tokio::spawn(handle_non_streamed_response(resp, tx_event)); | |
| + } else { | |
| + let stream = resp.bytes_stream().map_err(CodexErr::Reqwest); | |
| + tokio::spawn(process_sse( | |
| + stream, | |
| + tx_event, | |
| + self.provider.stream_idle_timeout(), | |
| + )); | |
| + } | |
| return Ok(ResponseStream { rx_event }); | |
| } | |
| @@ -414,6 +422,13 @@ impl From<ResponseCompletedUsage> for TokenUsage { | |
| } | |
| } | |
| +#[derive(Debug, Deserialize)] | |
| +struct NonStreamedResponse { | |
| + id: String, | |
| + usage: Option<ResponseCompletedUsage>, | |
| + output: Vec<ResponseItem>, | |
| +} | |
| + | |
| #[derive(Debug, Deserialize)] | |
| struct ResponseCompletedInputTokensDetails { | |
| cached_tokens: u64, | |
| @@ -424,6 +439,50 @@ struct ResponseCompletedOutputTokensDetails { | |
| reasoning_tokens: u64, | |
| } | |
| +async fn send_non_streamed_events( | |
| + response: NonStreamedResponse, | |
| + tx_event: mpsc::Sender<Result<ResponseEvent>>, | |
| +) { | |
| + // Send Created | |
| + if tx_event.send(Ok(ResponseEvent::Created {})).await.is_err() { | |
| + return; | |
| + } | |
| + // Forward each output item | |
| + for item in response.output { | |
| + if tx_event | |
| + .send(Ok(ResponseEvent::OutputItemDone(item))) | |
| + .await | |
| + .is_err() | |
| + { | |
| + return; | |
| + } | |
| + } | |
| + // Send Completed | |
| + let _ = tx_event | |
| + .send(Ok(ResponseEvent::Completed { | |
| + response_id: response.id, | |
| + token_usage: response.usage.map(Into::into), | |
| + })) | |
| + .await; | |
| +} | |
| + | |
| +async fn handle_non_streamed_response( | |
| + resp: reqwest::Response, | |
| + tx_event: mpsc::Sender<Result<ResponseEvent>>, | |
| +) { | |
| + match resp.json::<NonStreamedResponse>().await { | |
| + Ok(response) => send_non_streamed_events(response, tx_event).await, | |
| + Err(e) => { | |
| + let _ = tx_event | |
| + .send(Err(CodexErr::Stream( | |
| + format!("failed to parse non-streamed response: {e}"), | |
| + None, | |
| + ))) | |
| + .await; | |
| + } | |
| + }; | |
| +} | |
| + | |
| async fn process_sse<S>( | |
| stream: S, | |
| tx_event: mpsc::Sender<Result<ResponseEvent>>, | |
| @@ -975,4 +1034,78 @@ mod tests { | |
| ); | |
| } | |
| } | |
| + | |
| + #[tokio::test] | |
| + async fn non_streamed_response_sends_events_and_usage() { | |
| + use codex_protocol::models::ContentItem; | |
| + // Build a minimal non-streamed response with two assistant messages and usage. | |
| + let response = NonStreamedResponse { | |
| + id: "resp_non_stream".to_string(), | |
| + usage: Some(ResponseCompletedUsage { | |
| + input_tokens: 10, | |
| + input_tokens_details: Some(ResponseCompletedInputTokensDetails { | |
| + cached_tokens: 4, | |
| + }), | |
| + output_tokens: 6, | |
| + output_tokens_details: Some(ResponseCompletedOutputTokensDetails { | |
| + reasoning_tokens: 2, | |
| + }), | |
| + total_tokens: 16, | |
| + }), | |
| + output: vec![ | |
| + ResponseItem::Message { | |
| + id: None, | |
| + role: "assistant".to_string(), | |
| + content: vec![ContentItem::OutputText { | |
| + text: "Hello".to_string(), | |
| + }], | |
| + }, | |
| + ResponseItem::Message { | |
| + id: None, | |
| + role: "assistant".to_string(), | |
| + content: vec![ContentItem::OutputText { | |
| + text: "World".to_string(), | |
| + }], | |
| + }, | |
| + ], | |
| + }; | |
| + | |
| + let (tx, mut rx) = mpsc::channel::<Result<ResponseEvent>>(8); | |
| + send_non_streamed_events(response, tx).await; | |
| + | |
| + let mut events = Vec::new(); | |
| + while let Some(ev) = rx.recv().await { | |
| + events.push(ev); | |
| + } | |
| + | |
| + assert_eq!( | |
| + events.len(), | |
| + 4, | |
| + "expected Created, two OutputItemDone, Completed" | |
| + ); | |
| + assert!(matches!(events[0], Ok(ResponseEvent::Created))); | |
| + assert!(matches!( | |
| + &events[1], | |
| + Ok(ResponseEvent::OutputItemDone(ResponseItem::Message { role, .. })) if role == "assistant" | |
| + )); | |
| + assert!(matches!( | |
| + &events[2], | |
| + Ok(ResponseEvent::OutputItemDone(ResponseItem::Message { role, .. })) if role == "assistant" | |
| + )); | |
| + match &events[3] { | |
| + Ok(ResponseEvent::Completed { | |
| + response_id, | |
| + token_usage, | |
| + }) => { | |
| + assert_eq!(response_id, "resp_non_stream"); | |
| + let usage = token_usage.as_ref().expect("usage should be present"); | |
| + assert_eq!(usage.input_tokens, 10); | |
| + assert_eq!(usage.cached_input_tokens, Some(4)); | |
| + assert_eq!(usage.output_tokens, 6); | |
| + assert_eq!(usage.reasoning_output_tokens, Some(2)); | |
| + assert_eq!(usage.total_tokens, 16); | |
| + } | |
| + other => panic!("unexpected final event: {other:?}"), | |
| + } | |
| + } | |
| } | |
| diff --git a/codex-rs/core/src/config.rs b/codex-rs/core/src/config.rs | |
| index 7845f5d4..890ed1c2 100644 | |
| --- a/codex-rs/core/src/config.rs | |
| +++ b/codex-rs/core/src/config.rs | |
| @@ -75,6 +75,9 @@ pub struct Config { | |
| /// Defaults to `false`. | |
| pub show_raw_agent_reasoning: bool, | |
| + /// When `true`, streaming responses are disabled. | |
| + pub disable_streaming: bool, | |
| + | |
| /// Disable server-side response storage (sends the full conversation | |
| /// context with every request). Currently necessary for OpenAI customers | |
| /// who have opted into Zero Data Retention (ZDR). | |
| @@ -414,6 +417,9 @@ pub struct ConfigToml { | |
| /// Sandbox configuration to apply if `sandbox` is `WorkspaceWrite`. | |
| pub sandbox_workspace_write: Option<SandboxWorkspaceWrite>, | |
| + /// When `true`, streaming responses are disabled. | |
| + pub disable_streaming: Option<bool>, | |
| + | |
| /// Disable server-side response storage (sends the full conversation | |
| /// context with every request). Currently necessary for OpenAI customers | |
| /// who have opted into Zero Data Retention (ZDR). | |
| @@ -602,6 +608,7 @@ pub struct ConfigOverrides { | |
| pub include_plan_tool: Option<bool>, | |
| pub include_apply_patch_tool: Option<bool>, | |
| pub include_view_image_tool: Option<bool>, | |
| + pub disable_streaming: Option<bool>, | |
| pub disable_response_storage: Option<bool>, | |
| pub show_raw_agent_reasoning: Option<bool>, | |
| pub tools_web_search_request: Option<bool>, | |
| @@ -630,6 +637,7 @@ impl Config { | |
| include_plan_tool, | |
| include_apply_patch_tool, | |
| include_view_image_tool, | |
| + disable_streaming, | |
| disable_response_storage, | |
| show_raw_agent_reasoning, | |
| tools_web_search_request: override_tools_web_search_request, | |
| @@ -760,6 +768,7 @@ impl Config { | |
| .unwrap_or_else(AskForApproval::default), | |
| sandbox_policy, | |
| shell_environment_policy, | |
| + disable_streaming: disable_streaming.or(cfg.disable_streaming).unwrap_or(false), | |
| disable_response_storage: config_profile | |
| .disable_response_storage | |
| .or(cfg.disable_response_storage) | |
| @@ -1149,6 +1158,7 @@ disable_response_storage = true | |
| approval_policy: AskForApproval::Never, | |
| sandbox_policy: SandboxPolicy::new_read_only_policy(), | |
| shell_environment_policy: ShellEnvironmentPolicy::default(), | |
| + disable_streaming: false, | |
| disable_response_storage: false, | |
| user_instructions: None, | |
| notify: None, | |
| @@ -1207,6 +1217,7 @@ disable_response_storage = true | |
| approval_policy: AskForApproval::UnlessTrusted, | |
| sandbox_policy: SandboxPolicy::new_read_only_policy(), | |
| shell_environment_policy: ShellEnvironmentPolicy::default(), | |
| + disable_streaming: false, | |
| disable_response_storage: false, | |
| user_instructions: None, | |
| notify: None, | |
| @@ -1280,6 +1291,7 @@ disable_response_storage = true | |
| approval_policy: AskForApproval::OnFailure, | |
| sandbox_policy: SandboxPolicy::new_read_only_policy(), | |
| shell_environment_policy: ShellEnvironmentPolicy::default(), | |
| + disable_streaming: false, | |
| disable_response_storage: true, | |
| user_instructions: None, | |
| notify: None, | |
| diff --git a/codex-rs/exec/src/lib.rs b/codex-rs/exec/src/lib.rs | |
| index 785272a6..90d6d5ea 100644 | |
| --- a/codex-rs/exec/src/lib.rs | |
| +++ b/codex-rs/exec/src/lib.rs | |
| @@ -149,6 +149,7 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any | |
| include_plan_tool: None, | |
| include_apply_patch_tool: None, | |
| include_view_image_tool: None, | |
| + disable_streaming: None, | |
| disable_response_storage: oss.then_some(true), | |
| show_raw_agent_reasoning: oss.then_some(true), | |
| tools_web_search_request: None, | |
| diff --git a/codex-rs/mcp-server/src/codex_message_processor.rs b/codex-rs/mcp-server/src/codex_message_processor.rs | |
| index aae463ad..c539acbc 100644 | |
| --- a/codex-rs/mcp-server/src/codex_message_processor.rs | |
| +++ b/codex-rs/mcp-server/src/codex_message_processor.rs | |
| @@ -799,6 +799,7 @@ fn derive_config_from_params( | |
| include_plan_tool, | |
| include_apply_patch_tool, | |
| include_view_image_tool: None, | |
| + disable_streaming: None, | |
| disable_response_storage: None, | |
| show_raw_agent_reasoning: None, | |
| tools_web_search_request: None, | |
| diff --git a/codex-rs/mcp-server/src/codex_tool_config.rs b/codex-rs/mcp-server/src/codex_tool_config.rs | |
| index c29cb52c..5001cf88 100644 | |
| --- a/codex-rs/mcp-server/src/codex_tool_config.rs | |
| +++ b/codex-rs/mcp-server/src/codex_tool_config.rs | |
| @@ -162,6 +162,7 @@ impl CodexToolCallParam { | |
| include_plan_tool, | |
| include_apply_patch_tool: None, | |
| include_view_image_tool: None, | |
| + disable_streaming: None, | |
| disable_response_storage: None, | |
| show_raw_agent_reasoning: None, | |
| tools_web_search_request: None, | |
| diff --git a/codex-rs/tui/src/lib.rs b/codex-rs/tui/src/lib.rs | |
| index 544aec27..da5777d8 100644 | |
| --- a/codex-rs/tui/src/lib.rs | |
| +++ b/codex-rs/tui/src/lib.rs | |
| @@ -127,6 +127,7 @@ pub async fn run_main( | |
| include_plan_tool: Some(true), | |
| include_apply_patch_tool: None, | |
| include_view_image_tool: None, | |
| + disable_streaming: None, | |
| disable_response_storage: cli.oss.then_some(true), | |
| show_raw_agent_reasoning: cli.oss.then_some(true), | |
| tools_web_search_request: cli.web_search.then_some(true), | |
| -- | |
| 2.51.0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment