Fix job errors not reflecting in D1; add delays on document insert in vectorize to help with open ai rate limits

This commit is contained in:
Kush Thaker 2024-08-06 20:48:13 +05:30
parent 28e4c3b6d1
commit 1336da8aae
5 changed files with 54 additions and 43 deletions

View file

@ -140,6 +140,7 @@ export async function batchCreateChunksAndEmbeddings({
//! If a user saves it through the extension, we don't want other users to be able to see it.
// Requests from the extension should ALWAYS have a unique ID with the USERiD in it.
// I cannot stress this enough, important for security.
const ourID = `${body.url}#supermemory-web`;
const random = seededRandom(ourID);
const uuid =
@ -262,21 +263,30 @@ export async function batchCreateChunksAndEmbeddings({
}, {});
const ids = [];
const preparedDocuments = chunks.chunks.map((chunk, i) => {
console.log("Page hit moving on to the for loop");
for (let i = 0; i < chunks.chunks.length; i++) {
const chunk = chunks.chunks[i];
const id = `${uuid}-${i}`;
ids.push(id);
return {
const document = {
pageContent: chunk,
metadata: {
content: chunk,
...commonMetaData,
...spaceMetadata,
},
};
});
const docs = await store.addDocuments([document], { ids: [id] });
console.log("Docs added:", docs);
// Wait for a second after every 20 documents for open ai rate limit
console.log(
"This is the 20th thing in the list?",
(i + 1) % 20 === 0,
);
if ((i + 1) % 20 === 0) {
await new Promise((resolve) => setTimeout(resolve, 1000));
}
}
const docs = await store.addDocuments(preparedDocuments, { ids: ids });
console.log("Docs added:", docs);
const { CF_KV_AUTH_TOKEN, CF_ACCOUNT_ID, KV_NAMESPACE_ID } = env;
await bulkInsertKv(
{ CF_KV_AUTH_TOKEN, CF_ACCOUNT_ID, KV_NAMESPACE_ID },
@ -299,21 +309,29 @@ export async function batchCreateChunksAndEmbeddings({
}, {});
const ids = [];
const preparedDocuments = chunks.chunks.map((chunk, i) => {
for (let i = 0; i < chunks.chunks.length; i++) {
const chunk = chunks.chunks[i];
const id = `${uuid}-${i}`;
ids.push(id);
return {
const document = {
pageContent: chunk,
metadata: {
content: chunk,
...commonMetaData,
...spaceMetadata,
},
};
});
const docs = await store.addDocuments([document], { ids: [id] });
console.log("Docs added:", docs);
// Wait for a second after every 20 documents for open ai rate limit
console.log(
"This is the 20th thing in the list?",
(i + 1) % 20 === 0,
);
if ((i + 1) % 20 === 0) {
await new Promise((resolve) => setTimeout(resolve, 1000));
}
}
const docs = await store.addDocuments(preparedDocuments, { ids: ids });
console.log("Docs added:", docs);
const { CF_KV_AUTH_TOKEN, CF_ACCOUNT_ID, KV_NAMESPACE_ID } = env;
await bulkInsertKv(
{ CF_KV_AUTH_TOKEN, CF_ACCOUNT_ID, KV_NAMESPACE_ID },
@ -335,20 +353,27 @@ export async function batchCreateChunksAndEmbeddings({
}, {});
const ids = [];
const preparedDocuments = chunks.chunks.map((chunk, i) => {
for (let i = 0; i < chunks.chunks.length; i++) {
const chunk = chunks.chunks[i];
const id = `${uuid}-${i}`;
ids.push(id);
return {
const document = {
pageContent: chunk,
metadata: {
...commonMetaData,
...spaceMetadata,
},
};
});
const docs = await store.addDocuments([document], { ids: [id] });
console.log("Docs added:", docs);
// Wait for a second after every 20 documents for open ai rate limit
console.log("This is the 20th thing in the list?", (i + 1) % 20 === 0);
if ((i + 1) % 20 === 0) {
console.log("-----------waiting atm");
await new Promise((resolve) => setTimeout(resolve, 1000));
}
}
const docs = await store.addDocuments(preparedDocuments, { ids: ids });
console.log("Docs added:", docs);
const { CF_KV_AUTH_TOKEN, CF_ACCOUNT_ID, KV_NAMESPACE_ID } = env;
await bulkInsertKv(
{ CF_KV_AUTH_TOKEN, CF_ACCOUNT_ID, KV_NAMESPACE_ID },
@ -356,5 +381,6 @@ export async function batchCreateChunksAndEmbeddings({
);
}
}
return;
}

View file

@ -29,12 +29,12 @@ export async function processPage(input: {
),
);
}
console.log("[This is the page content]", pageContent);
const metadataResult = await getMetaData(input.url);
if (isErr(metadataResult)) {
throw metadataResult.error;
}
const metadata = metadataResult.value;
console.log("[this is the metadata]", metadata);
return Ok({ pageContent, metadata });
} catch (e) {
console.error("[Page Processing Error]", e);

View file

@ -45,7 +45,7 @@ const calculateExponentialBackoff = (
return baseDelaySeconds ** attempts;
};
const BASE_DELAY_SECONDS = 1.5;
const BASE_DELAY_SECONDS = 5;
export async function queue(
batch: MessageBatch<{
content: string;
@ -102,6 +102,7 @@ export async function queue(
.set({
attempts: existingJob.value[0].attempts + 1,
updatedAt: new Date(),
status: "Processing",
})
.where(eq(jobs.id, jobId)),
d1ErrorFactory,
@ -248,7 +249,7 @@ export async function queue(
if (isErr(vectorResult)) {
await db
.update(jobs)
.set({ error: vectorResult.error })
.set({ error: vectorResult.error.message, status: "error" })
.where(eq(jobs.id, jobId));
message.retry({
delaySeconds: calculateExponentialBackoff(
@ -288,7 +289,7 @@ export async function queue(
if (isErr(insertResponse)) {
await db
.update(jobs)
.set({ error: insertResponse.error })
.set({ error: insertResponse.error.message, status: "error" })
.where(eq(jobs.id, jobId));
message.retry({
delaySeconds: calculateExponentialBackoff(
@ -340,7 +341,7 @@ export async function queue(
}
} catch (e) {
console.error("Error in simulated transaction", e.message);
console.log("Rooling back changes");
message.retry({
delaySeconds: calculateExponentialBackoff(
message.attempts,
@ -366,23 +367,6 @@ export async function queue(
/*
To do:
1. Abstract and shitft the entrie creatememory function to the queue consumer --> Hopefully done
2. Make the front end use that instead of whatever khichidi is going on right now
3. remove getMetada form the lib file as it's not being used anywhere else
4. Figure out the limit stuff ( server action for that seems fine because no use in limiting after they already in the queue rigth? )
5. Figure out the initQuery stuff ( ;( ) --> This is a bad way of doing stuff :0
6. How do I hande the content already exists wala use case? --> Also how do I figure out limits?
8. Wrap the d1 thing in a transaction and then write to vectorize if d1 is sucessful if it's not then just error out ( if d1 fails dlq, recoverable failure --> retry )
Firt write to d1 in a transaction ( sotredContent + sapces ) --> write to vectorize --> vectorize failes --> reset d1 alternatively first we can also do the vectorise stuff if that suceeds then do the d1 stuff in a batch right?
DEBUG:
What's hapenning:
1. The stuff in the d1 is updating but nothing is hapenning in the vectorize for some reason
Figure out rate limits!!
*/

View file

@ -44,7 +44,7 @@ mode = "smart"
[[queues.consumers]]
queue = "embedchunks-queue"
max_batch_size = 100
max_retries = 10
max_retries = 3
dead_letter_queue = "embedchunks-dlq"

View file

@ -26,8 +26,9 @@ bucket_name = "dev-r2-anycontext"
[[d1_databases]]
binding = "DATABASE"
database_name = "dev-d1-anycontext"
database_id = "fc562605-157a-4f60-b439-2a24ffed5b4c"
database_name = "supermemlocal"
database_id = "0f93c990-72fb-489c-8563-57a7bb18dc43"
[[env.production.d1_databases]]
binding = "DATABASE"