Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion codex-rs/core/src/session/review.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use codex_auto_review::ReviewCoordination;
use codex_auto_review::ReviewLockGuard;
use codex_core_skills::HostLoadedSkills;
use codex_protocol::openai_models::ToolMode;
Expand All @@ -23,6 +24,11 @@ impl PreparedReviewThread {
self.task = ReviewTask::with_persistence(persistence);
self
}

fn without_persistence(mut self) -> Self {
self.task = self.task.without_persistence();
self
}
}

pub(super) enum ReviewPersistenceSpec {
Expand All @@ -39,7 +45,7 @@ pub(super) async fn spawn_review_thread(
resolved: crate::review_prompts::ResolvedReviewRequest,
persistence: Option<ReviewPersistence>,
) {
let prepared = prepare_review_thread(
let mut prepared = prepare_review_thread(
Arc::clone(&sess),
config,
parent_turn_context,
Expand All @@ -52,7 +58,17 @@ pub(super) async fn spawn_review_thread(
let manual_review_request = prepared.manual_review_request.clone();
if let Some(review_request) = manual_review_request {
sess.abort_all_tasks(TurnAbortReason::Replaced).await;
sess.cancel_background_auto_review_for_foreground_work()
.await;
sess.clear_connector_selection().await;
if let Some(persistence) = prepared.task.persistence_context()
&& persistence.is_manual()
{
prepared = match record_started_manual_auto_review(&sess, persistence).await {
Some(persistence) => prepared.with_persistence(persistence),
None => prepared.without_persistence(),
};
}
sess.send_event(
prepared.turn_context.as_ref(),
EventMsg::EnteredReviewMode(review_request),
Expand All @@ -70,6 +86,33 @@ pub(super) async fn spawn_review_thread(
}
}

async fn record_started_manual_auto_review(
sess: &Arc<Session>,
persistence: ReviewPersistenceContext,
) -> Option<ReviewPersistenceContext> {
let codex_home = sess.codex_home().await;
let coordination = ReviewCoordination::for_scope(&codex_home, persistence.store_scope());
let persistence = match coordination.bump_snapshot_epoch() {
Ok(snapshot_epoch) => persistence.with_snapshot_epoch(snapshot_epoch),
Err(err) => {
tracing::warn!(
run_id = %persistence.run_id(),
error = %err,
"failed to bump manual auto review snapshot epoch"
);
return None;
}
};
if !persistence.save_pending(&codex_home) {
tracing::warn!(
run_id = %persistence.run_id(),
"failed to persist pending manual auto review run"
);
return None;
}
Some(persistence)
}

pub(super) async fn prepare_review_thread(
sess: Arc<Session>,
config: Arc<Config>,
Expand Down
4 changes: 4 additions & 0 deletions codex-rs/core/src/tasks/review.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ impl ReviewTask {
}
}

pub(crate) fn without_persistence(self) -> Self {
Self { persistence: None }
}

pub(crate) fn persistence_context(&self) -> Option<ReviewPersistenceContext> {
self.persistence.clone()
}
Expand Down
156 changes: 156 additions & 0 deletions codex-rs/core/tests/suite/review.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use codex_auto_review::AutoReviewRunSource;
use codex_auto_review::AutoReviewRunStatus;
use codex_auto_review::AutoReviewStore;
use codex_auto_review::ReviewCoordination;
use codex_auto_review::SCHEMA_VERSION as AUTO_REVIEW_SCHEMA_VERSION;
use codex_core::CodexThread;
use codex_core::REVIEW_PROMPT;
Expand Down Expand Up @@ -348,6 +349,19 @@ async fn review_op_with_persistence_writes_auto_review_run() {
assert_eq!(run.status, AutoReviewRunStatus::Completed);
assert_eq!(run.source, AutoReviewRunSource::Manual);
assert_eq!(run.review_target, review_target);
assert_eq!(run.target.snapshot_epoch, Some(1));
assert_eq!(
ReviewCoordination::for_scope(
codex_home.path(),
run.target
.worktree_path
.as_deref()
.expect("manual review worktree path"),
)
.current_snapshot_epoch()
.expect("manual review snapshot epoch"),
1
);
assert_eq!(run.model.as_deref(), Some("gpt-5.4"));
assert!(run.completed_at_unix_secs.is_some());
assert_eq!(run.error_summary, None);
Expand Down Expand Up @@ -468,6 +482,7 @@ async fn review_op_with_persistence_writes_cancelled_run_when_interrupted() {
assert_eq!(run.status, AutoReviewRunStatus::Cancelled);
assert_eq!(run.source, AutoReviewRunSource::Manual);
assert_eq!(run.review_target, review_target);
assert_eq!(run.target.snapshot_epoch, Some(1));
assert!(run.completed_at_unix_secs.is_some());
assert_eq!(
run.error_summary.as_deref(),
Expand Down Expand Up @@ -619,6 +634,114 @@ async fn background_review_persistence_writes_cancelled_run_without_review_mode_
server.shutdown().await;
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn manual_review_with_persistence_supersedes_running_background_review() {
skip_if_no_network!();

let (gate_background_completed_tx, gate_background_completed_rx) = oneshot::channel();
let background_review = vec![
StreamingSseChunk {
gate: None,
body: streaming_sse_event(responses::ev_response_created("resp-1")),
},
StreamingSseChunk {
gate: Some(gate_background_completed_rx),
body: streaming_sse_event(responses::ev_completed("resp-1")),
},
];
let review_json = serde_json::json!({
"findings": [],
"overall_correctness": "patch is correct",
"overall_explanation": "Manual review completed.",
"overall_confidence_score": 0.8
})
.to_string();
let manual_review = vec![
StreamingSseChunk {
gate: None,
body: streaming_sse_event(responses::ev_response_created("resp-2")),
},
StreamingSseChunk {
gate: None,
body: streaming_sse_event(responses::ev_assistant_message("msg-2", &review_json)),
},
StreamingSseChunk {
gate: None,
body: streaming_sse_event(responses::ev_completed("resp-2")),
},
];
let (server, _completions) =
start_streaming_sse_server(vec![background_review, manual_review]).await;
let codex_home = Arc::new(TempDir::new().unwrap());
let codex = test_codex()
.with_home(codex_home.clone())
.build_with_streaming_server(&server)
.await
.unwrap()
.codex;

let background_target = ReviewTarget::Custom {
instructions: "background review".to_string(),
};
codex
.submit(Op::Review {
review_request: ReviewRequest {
target: background_target.clone(),
user_facing_hint: None,
},
persistence: Some(ReviewPersistence::BackgroundAutoReview),
})
.await
.unwrap();
let background_running_status =
wait_for_background_auto_review_status(&codex, BackgroundAutoReviewStatus::Running, None)
.await;
let running_background = load_single_auto_review_run(codex_home.path())
.expect("load running background auto review run");
assert_eq!(running_background.run_id, background_running_status.run_id);
assert_eq!(running_background.status, AutoReviewRunStatus::Running);
assert_eq!(running_background.source, AutoReviewRunSource::Background);
assert_eq!(running_background.target.snapshot_epoch, None);

let manual_target = ReviewTarget::Custom {
instructions: "manual review".to_string(),
};
codex
.submit(Op::Review {
review_request: ReviewRequest {
target: manual_target.clone(),
user_facing_hint: None,
},
persistence: Some(ReviewPersistence::ManualAutoReview),
})
.await
.unwrap();
let manual_running = wait_for_auto_review_run_status_by_source(
&codex,
codex_home.path(),
AutoReviewRunSource::Manual,
AutoReviewRunStatus::Running,
)
.await;
assert_eq!(manual_running.review_target, manual_target);
assert_eq!(manual_running.target.snapshot_epoch, Some(1));
let runs = load_auto_review_runs(codex_home.path()).expect("load auto review runs");
let background_run = runs
.iter()
.find(|run| run.run_id == background_running_status.run_id)
.expect("background run persisted");
assert!(
background_run.status.is_terminal(),
"background run should be terminal after manual review starts: {background_run:?}"
);
assert_eq!(background_run.source, AutoReviewRunSource::Background);
assert_eq!(background_run.review_target, background_target);
let _ = gate_background_completed_tx.send(());

let _codex_home_guard = codex_home;
server.shutdown().await;
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn automatic_background_review_runs_after_file_changing_turn() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
Expand Down Expand Up @@ -2549,6 +2672,39 @@ async fn wait_for_auto_review_run_status(
}
}

async fn wait_for_auto_review_run_status_by_source(
codex: &CodexThread,
codex_home: &std::path::Path,
expected_source: AutoReviewRunSource,
expected_status: AutoReviewRunStatus,
) -> codex_auto_review::AutoReviewRun {
let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
loop {
if let Ok(runs) = load_auto_review_runs(codex_home)
&& let Some(run) = runs
.into_iter()
.find(|run| run.source == expected_source && run.status == expected_status)
{
return run;
}
assert!(
tokio::time::Instant::now() < deadline,
"timeout waiting for {expected_source:?} auto-review run status {expected_status:?}; runs={:?}",
load_auto_review_runs(codex_home)
);
match tokio::time::timeout(Duration::from_millis(50), codex.next_event()).await {
Ok(Ok(event)) => match event.msg {
EventMsg::EnteredReviewMode(_) | EventMsg::ExitedReviewMode(_) => {}
_ => {}
},
Ok(Err(err)) => {
panic!("stream ended while waiting for auto-review run status: {err}")
}
Err(_) => {}
}
}
}

async fn wait_for_background_auto_review_status(
codex: &CodexThread,
expected_status: BackgroundAutoReviewStatus,
Expand Down
Loading