diff --git a/crates/agent/src/thread_store.rs b/crates/agent/src/thread_store.rs index f1367457d32..e3c8b186454 100644 --- a/crates/agent/src/thread_store.rs +++ b/crates/agent/src/thread_store.rs @@ -1,6 +1,7 @@ use crate::{DbThread, DbThreadMetadata, ThreadsDatabase}; use agent_client_protocol::schema as acp; use anyhow::{Result, anyhow}; +use futures::{FutureExt, future::Shared}; use gpui::{App, Context, Entity, Global, Task, prelude::*}; use util::path_list::PathList; @@ -10,6 +11,7 @@ impl Global for GlobalThreadStore {} pub struct ThreadStore { threads: Vec, + reload_task: Shared>, } impl ThreadStore { @@ -27,11 +29,18 @@ impl ThreadStore { } pub fn new(cx: &mut Context) -> Self { - let this = Self { + let reload_task = Self::spawn_reload(cx); + Self { threads: Vec::new(), - }; - this.reload(cx); - this + reload_task, + } + } + + /// Resolves when the most recently initiated reload has completed. + /// Callers that need to read `entries()` and can't tolerate the initial + /// empty state must await this before reading. + pub fn reload_task(&self) -> Shared> { + self.reload_task.clone() } pub fn thread_from_session_id(&self, session_id: &acp::SessionId) -> Option<&DbThreadMetadata> { @@ -87,11 +96,19 @@ impl ThreadStore { }) } - pub fn reload(&self, cx: &mut Context) { + pub fn reload(&mut self, cx: &mut Context) { + self.reload_task = Self::spawn_reload(cx); + } + + fn spawn_reload(cx: &mut Context) -> Shared> { let database_connection = ThreadsDatabase::connect(cx); cx.spawn(async move |this, cx| { - let database = database_connection.await.map_err(|err| anyhow!(err))?; - let all_threads = database.list_threads().await?; + let Ok(database) = database_connection.await.map_err(|err| anyhow!(err)) else { + return; + }; + let Ok(all_threads) = database.list_threads().await else { + return; + }; this.update(cx, |this, cx| { this.threads.clear(); for thread in all_threads { @@ -102,8 +119,9 @@ impl ThreadStore { } cx.notify(); }) + .ok(); }) - .detach_and_log_err(cx); + .shared() } pub fn is_empty(&self) -> bool { diff --git a/crates/agent_ui/src/thread_metadata_store.rs b/crates/agent_ui/src/thread_metadata_store.rs index 0b7e023e2c0..f7ba017c164 100644 --- a/crates/agent_ui/src/thread_metadata_store.rs +++ b/crates/agent_ui/src/thread_metadata_store.rs @@ -94,18 +94,25 @@ pub fn init(cx: &mut App) { fn migrate_thread_metadata(cx: &mut App) -> Task> { let store = ThreadMetadataStore::global(cx); let db = store.read(cx).db.clone(); + let thread_store = ThreadStore::global(cx); + let thread_store_ready = thread_store.read(cx).reload_task(); cx.spawn(async move |cx| { + // Wait for `ThreadStore`'s initial reload to complete. Without this, + // reading `entries()` races with the store's async population from + // disk and usually observes an empty iterator, silently skipping the + // migration on every launch. The regression test + // `test_migration_awaits_thread_store_reload` pins this behavior. + thread_store_ready.await; + let existing_list = db.list()?; - let is_first_migration = existing_list.is_empty(); let existing_session_ids: HashSet> = existing_list .into_iter() .filter_map(|m| m.session_id.map(|s| s.0)) .collect(); - let mut to_migrate = store.read_with(cx, |_store, cx| { - ThreadStore::global(cx) - .read(cx) + let mut to_migrate = thread_store.read_with(cx, |store, _cx| { + store .entries() .filter_map(|entry| { if existing_session_ids.contains(&entry.id.0) { @@ -138,24 +145,26 @@ fn migrate_thread_metadata(cx: &mut App) -> Task> { return anyhow::Ok(()); } - // On the first migration (no entries in DB yet), keep the 5 most - // recent threads per project unarchived. - if is_first_migration { - let mut per_project: HashMap> = HashMap::default(); - for entry in &mut to_migrate { - if entry.worktree_paths.is_empty() { - continue; - } - per_project - .entry(entry.worktree_paths.folder_path_list().clone()) - .or_default() - .push(entry); + // For each batch of newly-migrated threads, keep the 5 most recent + // per project unarchived. Previously this was gated on + // `is_first_migration` (an empty `sidebar_threads`), which meant any + // subsequent batch of newly-discovered legacy threads got migrated as + // fully archived. Running the rescue per-batch keeps the behavior + // idempotent across partial migrations and re-runs. + let mut per_project: HashMap> = HashMap::default(); + for entry in &mut to_migrate { + if entry.worktree_paths.is_empty() { + continue; } - for entries in per_project.values_mut() { - entries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at)); - for entry in entries.iter_mut().take(5) { - entry.archived = false; - } + per_project + .entry(entry.worktree_paths.folder_path_list().clone()) + .or_default() + .push(entry); + } + for entries in per_project.values_mut() { + entries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at)); + for entry in entries.iter_mut().take(5) { + entry.archived = false; } } @@ -2096,16 +2105,20 @@ mod tests { assert!(migrated_session_ids.iter().any(|s| s == "b-session-0")); assert!(migrated_session_ids.iter().any(|s| s == "projectless")); - let migrated_entries: Vec<_> = list + // The per-batch top-5 rescue applies: each migrated thread that has + // a project becomes the most-recent-in-its-project within this batch + // and is unarchived. Only the projectless thread stays archived, + // because the rescue only applies to threads with a folder path. + let migrated_by_session: HashMap = list .iter() - .filter(|metadata| { - !metadata - .session_id - .as_ref() - .is_some_and(|s| s.0.as_ref() == "a-session-0") + .filter_map(|metadata| { + let session_id = metadata.session_id.as_ref()?.0.to_string(); + (session_id != "a-session-0").then_some((session_id, metadata)) }) .collect(); - assert!(migrated_entries.iter().all(|metadata| metadata.archived)); + assert!(!migrated_by_session["a-session-1"].archived); + assert!(!migrated_by_session["b-session-0"].archived); + assert!(migrated_by_session["projectless"].archived); } #[gpui::test] @@ -2334,6 +2347,71 @@ mod tests { assert!(project_b_entries.iter().all(|m| !m.archived)); } + // Regression test for the race between `ThreadStore::reload` and + // `migrate_thread_metadata`. `ThreadStore::new` constructs with an empty + // in-memory cache and kicks off `reload()` as a fire-and-forget task. If + // `migrate_thread_metadata` reads `ThreadStore::entries()` before that + // reload completes, it observes an empty iterator and no-ops, even though + // the on-disk legacy DB has threads to migrate. In production this + // manifests as "my old threads disappeared after upgrading": the threads + // are still in the legacy `threads.db`, but never make it into + // `sidebar_threads`, so the new sidebar UI can't see them. + #[gpui::test] + async fn test_migration_awaits_thread_store_reload(cx: &mut TestAppContext) { + init_test(cx); + + // Seed the legacy threads DB via the ThreadStore (the only public + // save path in this crate), then park to make sure the rows are on + // disk and `ThreadStore`'s in-memory cache is populated. + let project_paths = PathList::new(&[Path::new("/project-a")]); + let now = Utc::now(); + for i in 0..3 { + let save_task = cx.update(|cx| { + let thread_store = ThreadStore::global(cx); + let session_id = format!("legacy-session-{i}"); + let title = format!("Legacy Thread {i}"); + let updated_at = now + chrono::Duration::seconds(i as i64); + let paths = project_paths.clone(); + thread_store.update(cx, |store, cx| { + store.save_thread( + acp::SessionId::new(session_id), + make_db_thread(&title, updated_at), + paths, + cx, + ) + }) + }); + save_task.await.unwrap(); + cx.run_until_parked(); + } + + // Re-initialize `ThreadStore` so its in-memory cache is freshly empty + // and a new async `reload` task is kicked off. This reproduces the + // cold-boot state where the migration runs before the store has + // populated itself from disk. The on-disk legacy DB still has the + // three threads we saved above. + cx.update(|cx| ThreadStore::init_global(cx)); + + // Crucially: do NOT run_until_parked here. If we parked, the reload + // would complete, ThreadStore::entries() would return the 3 rows, and + // the race would be hidden. We want the migration to run with + // `ThreadStore::entries()` still returning an empty iterator. + run_store_migrations(cx); + + let list = cx.update(|cx| { + let store = ThreadMetadataStore::global(cx); + store.read(cx).entries().cloned().collect::>() + }); + + assert_eq!( + list.len(), + 3, + "Expected migration to pick up all 3 legacy threads even when \ + ThreadStore::reload has not yet completed, but got {} entries", + list.len() + ); + } + #[gpui::test] async fn test_empty_thread_events_do_not_create_metadata(cx: &mut TestAppContext) { init_test(cx);