mirror of
https://github.com/zed-industries/zed.git
synced 2026-05-30 20:24:08 +00:00
Fix agent threads missing from sidebar after upgrade (#54723)
Fixes #54618. ## Summary After upgrading from `v0.232.x` to `v0.233.5+`, many users reported that most of their agent threads "disappeared" from the sidebar. The threads are still on disk in the legacy `threads.db` — but they never make it into `sidebar_threads`, which is the only table the new sidebar UI reads from, so they're unreachable from the UI. The root cause is a race between `ThreadStore::reload` and `migrate_thread_metadata`: - `ThreadStore::new` constructs with an empty in-memory `Vec` and kicks off `reload()` as a fire-and-forget task. - `migrate_thread_metadata` runs during `agent_ui::init` and reads `ThreadStore::global(cx).read(cx).entries()` synchronously, without awaiting that reload. - On cold boot the migration observes an empty iterator, early-returns on `to_migrate.is_empty()`, and never populates `sidebar_threads`. The migration runs every launch, so it keeps losing the race forever. Empirically, the only rows that *did* end up in `sidebar_threads` for affected users came from `handle_conversation_event` writing rows when the user actively interacted with a thread. That's why users would typically see a handful of recently-touched threads rather than their full history. ## Fix 1. `ThreadStore` now tracks its current reload as a `Shared<Task<()>>` and exposes it via `reload_task()`. `migrate_thread_metadata` awaits this before reading `entries()`. 2. Move the top-5-per-project unarchive pass out from under the `is_first_migration` guard. Previously the rescue only ran when `sidebar_threads` was empty, which meant any user who had even one pre-existing row (e.g. from interacting with a thread on a prior launch) got every subsequent batch of migrated threads archived with no rescue. Running the rescue per-batch is stateless and idempotent — a user who bounces between older releases and newer ones still gets their top 5 per project surfaced each time a new batch is migrated. No new KVP flag or one-shot backfill is needed: because the migration already dedupes by session_id and runs on every launch, the next cold boot on a fixed build picks up any legacy threads that earlier launches missed. ## Structure The PR is split into two commits to make the diff easy to verify: 1. **Regression test** — fails on `main`, reproduces the cold-boot race by seeding the legacy DB, reinitializing `ThreadStore` to get a fresh empty cache, and running the migration without parking first. 2. **The fix** — gates entries reads on `reload_task().await`, and adjusts the per-batch rescue policy. Updates one existing test's assertions to match the new per-batch rescue policy. ## Validation - The regression test fails on `main`, passes after the fix. - All `agent_ui`, `agent::thread_store`, and `sidebar` lib tests pass. - `./script/clippy -p agent -p agent_ui` is clean. - End-to-end repro with a real on-disk data dir: launched `v0.230.2 → v0.231.2 → v0.232.3 → v0.233.8` as a non-staff user, creating 15 threads and interacting with one, reproduced the reported state (1 thread in sidebar, 14 missing). Launched a debug build of this branch against the same data dir: `Migrating 14 thread store entries` in the log, 15 rows in `sidebar_threads` with 6 unarchived (the interacted-with thread plus the 5 most recent), 9 archived and reachable from the archive view. Release Notes: - Fixed agent threads appearing to be missing after upgrading to the parallel-agents sidebar. The thread-metadata migration was racing against the on-disk thread store's async reload and skipping itself on every launch. Upgrading to this build runs the migration successfully; previously-missing threads are surfaced either in the sidebar (5 most recent per project) or in the archive view.
This commit is contained in:
parent
9fe32ac141
commit
9adb4ea63e
2 changed files with 132 additions and 36 deletions
|
|
@ -1,6 +1,7 @@
|
|||
use crate::{DbThread, DbThreadMetadata, ThreadsDatabase};
|
||||
use agent_client_protocol::schema as acp;
|
||||
use anyhow::{Result, anyhow};
|
||||
use futures::{FutureExt, future::Shared};
|
||||
use gpui::{App, Context, Entity, Global, Task, prelude::*};
|
||||
use util::path_list::PathList;
|
||||
|
||||
|
|
@ -10,6 +11,7 @@ impl Global for GlobalThreadStore {}
|
|||
|
||||
pub struct ThreadStore {
|
||||
threads: Vec<DbThreadMetadata>,
|
||||
reload_task: Shared<Task<()>>,
|
||||
}
|
||||
|
||||
impl ThreadStore {
|
||||
|
|
@ -27,11 +29,18 @@ impl ThreadStore {
|
|||
}
|
||||
|
||||
pub fn new(cx: &mut Context<Self>) -> Self {
|
||||
let this = Self {
|
||||
let reload_task = Self::spawn_reload(cx);
|
||||
Self {
|
||||
threads: Vec::new(),
|
||||
};
|
||||
this.reload(cx);
|
||||
this
|
||||
reload_task,
|
||||
}
|
||||
}
|
||||
|
||||
/// Resolves when the most recently initiated reload has completed.
|
||||
/// Callers that need to read `entries()` and can't tolerate the initial
|
||||
/// empty state must await this before reading.
|
||||
pub fn reload_task(&self) -> Shared<Task<()>> {
|
||||
self.reload_task.clone()
|
||||
}
|
||||
|
||||
pub fn thread_from_session_id(&self, session_id: &acp::SessionId) -> Option<&DbThreadMetadata> {
|
||||
|
|
@ -87,11 +96,19 @@ impl ThreadStore {
|
|||
})
|
||||
}
|
||||
|
||||
pub fn reload(&self, cx: &mut Context<Self>) {
|
||||
pub fn reload(&mut self, cx: &mut Context<Self>) {
|
||||
self.reload_task = Self::spawn_reload(cx);
|
||||
}
|
||||
|
||||
fn spawn_reload(cx: &mut Context<Self>) -> Shared<Task<()>> {
|
||||
let database_connection = ThreadsDatabase::connect(cx);
|
||||
cx.spawn(async move |this, cx| {
|
||||
let database = database_connection.await.map_err(|err| anyhow!(err))?;
|
||||
let all_threads = database.list_threads().await?;
|
||||
let Ok(database) = database_connection.await.map_err(|err| anyhow!(err)) else {
|
||||
return;
|
||||
};
|
||||
let Ok(all_threads) = database.list_threads().await else {
|
||||
return;
|
||||
};
|
||||
this.update(cx, |this, cx| {
|
||||
this.threads.clear();
|
||||
for thread in all_threads {
|
||||
|
|
@ -102,8 +119,9 @@ impl ThreadStore {
|
|||
}
|
||||
cx.notify();
|
||||
})
|
||||
.ok();
|
||||
})
|
||||
.detach_and_log_err(cx);
|
||||
.shared()
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
|
|
|
|||
|
|
@ -94,18 +94,25 @@ pub fn init(cx: &mut App) {
|
|||
fn migrate_thread_metadata(cx: &mut App) -> Task<anyhow::Result<()>> {
|
||||
let store = ThreadMetadataStore::global(cx);
|
||||
let db = store.read(cx).db.clone();
|
||||
let thread_store = ThreadStore::global(cx);
|
||||
let thread_store_ready = thread_store.read(cx).reload_task();
|
||||
|
||||
cx.spawn(async move |cx| {
|
||||
// Wait for `ThreadStore`'s initial reload to complete. Without this,
|
||||
// reading `entries()` races with the store's async population from
|
||||
// disk and usually observes an empty iterator, silently skipping the
|
||||
// migration on every launch. The regression test
|
||||
// `test_migration_awaits_thread_store_reload` pins this behavior.
|
||||
thread_store_ready.await;
|
||||
|
||||
let existing_list = db.list()?;
|
||||
let is_first_migration = existing_list.is_empty();
|
||||
let existing_session_ids: HashSet<Arc<str>> = existing_list
|
||||
.into_iter()
|
||||
.filter_map(|m| m.session_id.map(|s| s.0))
|
||||
.collect();
|
||||
|
||||
let mut to_migrate = store.read_with(cx, |_store, cx| {
|
||||
ThreadStore::global(cx)
|
||||
.read(cx)
|
||||
let mut to_migrate = thread_store.read_with(cx, |store, _cx| {
|
||||
store
|
||||
.entries()
|
||||
.filter_map(|entry| {
|
||||
if existing_session_ids.contains(&entry.id.0) {
|
||||
|
|
@ -138,24 +145,26 @@ fn migrate_thread_metadata(cx: &mut App) -> Task<anyhow::Result<()>> {
|
|||
return anyhow::Ok(());
|
||||
}
|
||||
|
||||
// On the first migration (no entries in DB yet), keep the 5 most
|
||||
// recent threads per project unarchived.
|
||||
if is_first_migration {
|
||||
let mut per_project: HashMap<PathList, Vec<&mut ThreadMetadata>> = HashMap::default();
|
||||
for entry in &mut to_migrate {
|
||||
if entry.worktree_paths.is_empty() {
|
||||
continue;
|
||||
}
|
||||
per_project
|
||||
.entry(entry.worktree_paths.folder_path_list().clone())
|
||||
.or_default()
|
||||
.push(entry);
|
||||
// For each batch of newly-migrated threads, keep the 5 most recent
|
||||
// per project unarchived. Previously this was gated on
|
||||
// `is_first_migration` (an empty `sidebar_threads`), which meant any
|
||||
// subsequent batch of newly-discovered legacy threads got migrated as
|
||||
// fully archived. Running the rescue per-batch keeps the behavior
|
||||
// idempotent across partial migrations and re-runs.
|
||||
let mut per_project: HashMap<PathList, Vec<&mut ThreadMetadata>> = HashMap::default();
|
||||
for entry in &mut to_migrate {
|
||||
if entry.worktree_paths.is_empty() {
|
||||
continue;
|
||||
}
|
||||
for entries in per_project.values_mut() {
|
||||
entries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
|
||||
for entry in entries.iter_mut().take(5) {
|
||||
entry.archived = false;
|
||||
}
|
||||
per_project
|
||||
.entry(entry.worktree_paths.folder_path_list().clone())
|
||||
.or_default()
|
||||
.push(entry);
|
||||
}
|
||||
for entries in per_project.values_mut() {
|
||||
entries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
|
||||
for entry in entries.iter_mut().take(5) {
|
||||
entry.archived = false;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -2096,16 +2105,20 @@ mod tests {
|
|||
assert!(migrated_session_ids.iter().any(|s| s == "b-session-0"));
|
||||
assert!(migrated_session_ids.iter().any(|s| s == "projectless"));
|
||||
|
||||
let migrated_entries: Vec<_> = list
|
||||
// The per-batch top-5 rescue applies: each migrated thread that has
|
||||
// a project becomes the most-recent-in-its-project within this batch
|
||||
// and is unarchived. Only the projectless thread stays archived,
|
||||
// because the rescue only applies to threads with a folder path.
|
||||
let migrated_by_session: HashMap<String, &ThreadMetadata> = list
|
||||
.iter()
|
||||
.filter(|metadata| {
|
||||
!metadata
|
||||
.session_id
|
||||
.as_ref()
|
||||
.is_some_and(|s| s.0.as_ref() == "a-session-0")
|
||||
.filter_map(|metadata| {
|
||||
let session_id = metadata.session_id.as_ref()?.0.to_string();
|
||||
(session_id != "a-session-0").then_some((session_id, metadata))
|
||||
})
|
||||
.collect();
|
||||
assert!(migrated_entries.iter().all(|metadata| metadata.archived));
|
||||
assert!(!migrated_by_session["a-session-1"].archived);
|
||||
assert!(!migrated_by_session["b-session-0"].archived);
|
||||
assert!(migrated_by_session["projectless"].archived);
|
||||
}
|
||||
|
||||
#[gpui::test]
|
||||
|
|
@ -2334,6 +2347,71 @@ mod tests {
|
|||
assert!(project_b_entries.iter().all(|m| !m.archived));
|
||||
}
|
||||
|
||||
// Regression test for the race between `ThreadStore::reload` and
|
||||
// `migrate_thread_metadata`. `ThreadStore::new` constructs with an empty
|
||||
// in-memory cache and kicks off `reload()` as a fire-and-forget task. If
|
||||
// `migrate_thread_metadata` reads `ThreadStore::entries()` before that
|
||||
// reload completes, it observes an empty iterator and no-ops, even though
|
||||
// the on-disk legacy DB has threads to migrate. In production this
|
||||
// manifests as "my old threads disappeared after upgrading": the threads
|
||||
// are still in the legacy `threads.db`, but never make it into
|
||||
// `sidebar_threads`, so the new sidebar UI can't see them.
|
||||
#[gpui::test]
|
||||
async fn test_migration_awaits_thread_store_reload(cx: &mut TestAppContext) {
|
||||
init_test(cx);
|
||||
|
||||
// Seed the legacy threads DB via the ThreadStore (the only public
|
||||
// save path in this crate), then park to make sure the rows are on
|
||||
// disk and `ThreadStore`'s in-memory cache is populated.
|
||||
let project_paths = PathList::new(&[Path::new("/project-a")]);
|
||||
let now = Utc::now();
|
||||
for i in 0..3 {
|
||||
let save_task = cx.update(|cx| {
|
||||
let thread_store = ThreadStore::global(cx);
|
||||
let session_id = format!("legacy-session-{i}");
|
||||
let title = format!("Legacy Thread {i}");
|
||||
let updated_at = now + chrono::Duration::seconds(i as i64);
|
||||
let paths = project_paths.clone();
|
||||
thread_store.update(cx, |store, cx| {
|
||||
store.save_thread(
|
||||
acp::SessionId::new(session_id),
|
||||
make_db_thread(&title, updated_at),
|
||||
paths,
|
||||
cx,
|
||||
)
|
||||
})
|
||||
});
|
||||
save_task.await.unwrap();
|
||||
cx.run_until_parked();
|
||||
}
|
||||
|
||||
// Re-initialize `ThreadStore` so its in-memory cache is freshly empty
|
||||
// and a new async `reload` task is kicked off. This reproduces the
|
||||
// cold-boot state where the migration runs before the store has
|
||||
// populated itself from disk. The on-disk legacy DB still has the
|
||||
// three threads we saved above.
|
||||
cx.update(|cx| ThreadStore::init_global(cx));
|
||||
|
||||
// Crucially: do NOT run_until_parked here. If we parked, the reload
|
||||
// would complete, ThreadStore::entries() would return the 3 rows, and
|
||||
// the race would be hidden. We want the migration to run with
|
||||
// `ThreadStore::entries()` still returning an empty iterator.
|
||||
run_store_migrations(cx);
|
||||
|
||||
let list = cx.update(|cx| {
|
||||
let store = ThreadMetadataStore::global(cx);
|
||||
store.read(cx).entries().cloned().collect::<Vec<_>>()
|
||||
});
|
||||
|
||||
assert_eq!(
|
||||
list.len(),
|
||||
3,
|
||||
"Expected migration to pick up all 3 legacy threads even when \
|
||||
ThreadStore::reload has not yet completed, but got {} entries",
|
||||
list.len()
|
||||
);
|
||||
}
|
||||
|
||||
#[gpui::test]
|
||||
async fn test_empty_thread_events_do_not_create_metadata(cx: &mut TestAppContext) {
|
||||
init_test(cx);
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue