diff --git a/codex-rs/core/src/session/review.rs b/codex-rs/core/src/session/review.rs index a974e92541bf..ec27399b1364 100644 --- a/codex-rs/core/src/session/review.rs +++ b/codex-rs/core/src/session/review.rs @@ -1,3 +1,4 @@ +use codex_auto_review::ReviewCoordination; use codex_auto_review::ReviewLockGuard; use codex_core_skills::HostLoadedSkills; use codex_protocol::openai_models::ToolMode; @@ -23,6 +24,11 @@ impl PreparedReviewThread { self.task = ReviewTask::with_persistence(persistence); self } + + fn without_persistence(mut self) -> Self { + self.task = self.task.without_persistence(); + self + } } pub(super) enum ReviewPersistenceSpec { @@ -39,7 +45,7 @@ pub(super) async fn spawn_review_thread( resolved: crate::review_prompts::ResolvedReviewRequest, persistence: Option, ) { - let prepared = prepare_review_thread( + let mut prepared = prepare_review_thread( Arc::clone(&sess), config, parent_turn_context, @@ -52,7 +58,17 @@ pub(super) async fn spawn_review_thread( let manual_review_request = prepared.manual_review_request.clone(); if let Some(review_request) = manual_review_request { sess.abort_all_tasks(TurnAbortReason::Replaced).await; + sess.cancel_background_auto_review_for_foreground_work() + .await; sess.clear_connector_selection().await; + if let Some(persistence) = prepared.task.persistence_context() + && persistence.is_manual() + { + prepared = match record_started_manual_auto_review(&sess, persistence).await { + Some(persistence) => prepared.with_persistence(persistence), + None => prepared.without_persistence(), + }; + } sess.send_event( prepared.turn_context.as_ref(), EventMsg::EnteredReviewMode(review_request), @@ -70,6 +86,33 @@ pub(super) async fn spawn_review_thread( } } +async fn record_started_manual_auto_review( + sess: &Arc, + persistence: ReviewPersistenceContext, +) -> Option { + let codex_home = sess.codex_home().await; + let coordination = ReviewCoordination::for_scope(&codex_home, persistence.store_scope()); + let persistence = match coordination.bump_snapshot_epoch() { + Ok(snapshot_epoch) => persistence.with_snapshot_epoch(snapshot_epoch), + Err(err) => { + tracing::warn!( + run_id = %persistence.run_id(), + error = %err, + "failed to bump manual auto review snapshot epoch" + ); + return None; + } + }; + if !persistence.save_pending(&codex_home) { + tracing::warn!( + run_id = %persistence.run_id(), + "failed to persist pending manual auto review run" + ); + return None; + } + Some(persistence) +} + pub(super) async fn prepare_review_thread( sess: Arc, config: Arc, diff --git a/codex-rs/core/src/tasks/review.rs b/codex-rs/core/src/tasks/review.rs index b5d0e166893b..7411d7fea66c 100644 --- a/codex-rs/core/src/tasks/review.rs +++ b/codex-rs/core/src/tasks/review.rs @@ -50,6 +50,10 @@ impl ReviewTask { } } + pub(crate) fn without_persistence(self) -> Self { + Self { persistence: None } + } + pub(crate) fn persistence_context(&self) -> Option { self.persistence.clone() } diff --git a/codex-rs/core/tests/suite/review.rs b/codex-rs/core/tests/suite/review.rs index b04a53111c3e..aae55357a861 100644 --- a/codex-rs/core/tests/suite/review.rs +++ b/codex-rs/core/tests/suite/review.rs @@ -1,6 +1,7 @@ use codex_auto_review::AutoReviewRunSource; use codex_auto_review::AutoReviewRunStatus; use codex_auto_review::AutoReviewStore; +use codex_auto_review::ReviewCoordination; use codex_auto_review::SCHEMA_VERSION as AUTO_REVIEW_SCHEMA_VERSION; use codex_core::CodexThread; use codex_core::REVIEW_PROMPT; @@ -348,6 +349,19 @@ async fn review_op_with_persistence_writes_auto_review_run() { assert_eq!(run.status, AutoReviewRunStatus::Completed); assert_eq!(run.source, AutoReviewRunSource::Manual); assert_eq!(run.review_target, review_target); + assert_eq!(run.target.snapshot_epoch, Some(1)); + assert_eq!( + ReviewCoordination::for_scope( + codex_home.path(), + run.target + .worktree_path + .as_deref() + .expect("manual review worktree path"), + ) + .current_snapshot_epoch() + .expect("manual review snapshot epoch"), + 1 + ); assert_eq!(run.model.as_deref(), Some("gpt-5.4")); assert!(run.completed_at_unix_secs.is_some()); assert_eq!(run.error_summary, None); @@ -468,6 +482,7 @@ async fn review_op_with_persistence_writes_cancelled_run_when_interrupted() { assert_eq!(run.status, AutoReviewRunStatus::Cancelled); assert_eq!(run.source, AutoReviewRunSource::Manual); assert_eq!(run.review_target, review_target); + assert_eq!(run.target.snapshot_epoch, Some(1)); assert!(run.completed_at_unix_secs.is_some()); assert_eq!( run.error_summary.as_deref(), @@ -619,6 +634,114 @@ async fn background_review_persistence_writes_cancelled_run_without_review_mode_ server.shutdown().await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn manual_review_with_persistence_supersedes_running_background_review() { + skip_if_no_network!(); + + let (gate_background_completed_tx, gate_background_completed_rx) = oneshot::channel(); + let background_review = vec![ + StreamingSseChunk { + gate: None, + body: streaming_sse_event(responses::ev_response_created("resp-1")), + }, + StreamingSseChunk { + gate: Some(gate_background_completed_rx), + body: streaming_sse_event(responses::ev_completed("resp-1")), + }, + ]; + let review_json = serde_json::json!({ + "findings": [], + "overall_correctness": "patch is correct", + "overall_explanation": "Manual review completed.", + "overall_confidence_score": 0.8 + }) + .to_string(); + let manual_review = vec![ + StreamingSseChunk { + gate: None, + body: streaming_sse_event(responses::ev_response_created("resp-2")), + }, + StreamingSseChunk { + gate: None, + body: streaming_sse_event(responses::ev_assistant_message("msg-2", &review_json)), + }, + StreamingSseChunk { + gate: None, + body: streaming_sse_event(responses::ev_completed("resp-2")), + }, + ]; + let (server, _completions) = + start_streaming_sse_server(vec![background_review, manual_review]).await; + let codex_home = Arc::new(TempDir::new().unwrap()); + let codex = test_codex() + .with_home(codex_home.clone()) + .build_with_streaming_server(&server) + .await + .unwrap() + .codex; + + let background_target = ReviewTarget::Custom { + instructions: "background review".to_string(), + }; + codex + .submit(Op::Review { + review_request: ReviewRequest { + target: background_target.clone(), + user_facing_hint: None, + }, + persistence: Some(ReviewPersistence::BackgroundAutoReview), + }) + .await + .unwrap(); + let background_running_status = + wait_for_background_auto_review_status(&codex, BackgroundAutoReviewStatus::Running, None) + .await; + let running_background = load_single_auto_review_run(codex_home.path()) + .expect("load running background auto review run"); + assert_eq!(running_background.run_id, background_running_status.run_id); + assert_eq!(running_background.status, AutoReviewRunStatus::Running); + assert_eq!(running_background.source, AutoReviewRunSource::Background); + assert_eq!(running_background.target.snapshot_epoch, None); + + let manual_target = ReviewTarget::Custom { + instructions: "manual review".to_string(), + }; + codex + .submit(Op::Review { + review_request: ReviewRequest { + target: manual_target.clone(), + user_facing_hint: None, + }, + persistence: Some(ReviewPersistence::ManualAutoReview), + }) + .await + .unwrap(); + let manual_running = wait_for_auto_review_run_status_by_source( + &codex, + codex_home.path(), + AutoReviewRunSource::Manual, + AutoReviewRunStatus::Running, + ) + .await; + assert_eq!(manual_running.review_target, manual_target); + assert_eq!(manual_running.target.snapshot_epoch, Some(1)); + let runs = load_auto_review_runs(codex_home.path()).expect("load auto review runs"); + let background_run = runs + .iter() + .find(|run| run.run_id == background_running_status.run_id) + .expect("background run persisted"); + assert!( + background_run.status.is_terminal(), + "background run should be terminal after manual review starts: {background_run:?}" + ); + assert_eq!(background_run.source, AutoReviewRunSource::Background); + assert_eq!(background_run.review_target, background_target); + let _ = gate_background_completed_tx.send(()); + + let _codex_home_guard = codex_home; + server.shutdown().await; +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn automatic_background_review_runs_after_file_changing_turn() -> anyhow::Result<()> { skip_if_no_network!(Ok(())); @@ -2549,6 +2672,39 @@ async fn wait_for_auto_review_run_status( } } +async fn wait_for_auto_review_run_status_by_source( + codex: &CodexThread, + codex_home: &std::path::Path, + expected_source: AutoReviewRunSource, + expected_status: AutoReviewRunStatus, +) -> codex_auto_review::AutoReviewRun { + let deadline = tokio::time::Instant::now() + Duration::from_secs(10); + loop { + if let Ok(runs) = load_auto_review_runs(codex_home) + && let Some(run) = runs + .into_iter() + .find(|run| run.source == expected_source && run.status == expected_status) + { + return run; + } + assert!( + tokio::time::Instant::now() < deadline, + "timeout waiting for {expected_source:?} auto-review run status {expected_status:?}; runs={:?}", + load_auto_review_runs(codex_home) + ); + match tokio::time::timeout(Duration::from_millis(50), codex.next_event()).await { + Ok(Ok(event)) => match event.msg { + EventMsg::EnteredReviewMode(_) | EventMsg::ExitedReviewMode(_) => {} + _ => {} + }, + Ok(Err(err)) => { + panic!("stream ended while waiting for auto-review run status: {err}") + } + Err(_) => {} + } + } +} + async fn wait_for_background_auto_review_status( codex: &CodexThread, expected_status: BackgroundAutoReviewStatus,