feat: pick up and commit interrupted jobs from/to DB

This commit is contained in:
Gergo Moricz 2024-07-09 14:56:47 +02:00
parent 77aa46588f
commit 1a07e9d23b
3 changed files with 77 additions and 1 deletions

View File

@ -9,6 +9,8 @@ import { initSDK } from "@hyperdx/node-opentelemetry";
import cluster from "cluster"; import cluster from "cluster";
import os from "os"; import os from "os";
import { Job } from "bull"; import { Job } from "bull";
import { supabase_service } from "./services/supabase";
import { logJob } from "./services/logging/log_job";
const { createBullBoard } = require("@bull-board/api"); const { createBullBoard } = require("@bull-board/api");
const { BullAdapter } = require("@bull-board/api/bullAdapter"); const { BullAdapter } = require("@bull-board/api/bullAdapter");
@ -20,6 +22,39 @@ console.log(`Number of CPUs: ${numCPUs} available`);
if (cluster.isMaster) { if (cluster.isMaster) {
console.log(`Master ${process.pid} is running`); console.log(`Master ${process.pid} is running`);
(async () => {
if (process.env.USE_DB_AUTHENTICATION) {
const wsq = getWebScraperQueue();
const { error, data } = await supabase_service
.from("firecrawl_jobs")
.select()
.eq("retry", true);
if (error) throw new Error(error.message);
await wsq.addBulk(data.map(x => ({
data: {
url: x.url,
mode: x.mode,
crawlerOptions: x.crawler_options,
team_id: x.team_id,
pageOptions: x.page_options,
origin: x.origin,
},
opts: {
jobId: x.job_id,
}
})))
if (data.length > 0) {
await supabase_service
.from("firecrawl_jobs")
.delete()
.in("id", data.map(x => x.id));
}
}
})();
// Fork workers. // Fork workers.
for (let i = 0; i < numCPUs; i++) { for (let i = 0; i < numCPUs; i++) {
cluster.fork(); cluster.fork();
@ -33,7 +68,7 @@ if (cluster.isMaster) {
} }
}); });
const onExit = () => { const onExit = async () => {
console.log("Shutting down gracefully..."); console.log("Shutting down gracefully...");
if (cluster.workers) { if (cluster.workers) {
@ -42,6 +77,44 @@ if (cluster.isMaster) {
} }
} }
if (process.env.USE_DB_AUTHENTICATION) {
const wsq = getWebScraperQueue();
const activeJobCount = await wsq.getActiveCount();
console.log("Updating", activeJobCount, "in-progress jobs");
const activeJobs = (await Promise.all(new Array(Math.ceil(activeJobCount / 10)).fill(0).map((_, i) => {
return wsq.getActive(i, i+10)
}))).flat(1);
for (const job of activeJobs) {
console.log(job.id);
try {
await logJob({
job_id: job.id as string,
success: false,
message: "Interrupted, retrying",
num_docs: 0,
docs: [],
time_taken: 0,
team_id: job.data.team_id,
mode: "crawl",
url: job.data.url,
crawlerOptions: job.data.crawlerOptions,
pageOptions: job.data.pageOptions,
origin: job.data.origin,
retry: true,
});
await wsq.client.del(await job.lockKey());
await job.takeLock();
await job.moveToFailed({ message: "interrupted" });
await job.remove();
} catch (error) {
console.error("Failed to update job status:", error);
}
}
}
process.exit(); process.exit();
}; };

View File

@ -38,6 +38,7 @@ export async function logJob(job: FirecrawlJob) {
origin: job.origin, origin: job.origin,
extractor_options: job.extractor_options, extractor_options: job.extractor_options,
num_tokens: job.num_tokens, num_tokens: job.num_tokens,
retry: !!job.retry,
}, },
]); ]);
@ -61,6 +62,7 @@ export async function logJob(job: FirecrawlJob) {
origin: job.origin, origin: job.origin,
extractor_options: job.extractor_options, extractor_options: job.extractor_options,
num_tokens: job.num_tokens, num_tokens: job.num_tokens,
retry: job.retry,
}, },
}; };
posthog.capture(phLog); posthog.capture(phLog);

View File

@ -62,6 +62,7 @@ export interface FirecrawlJob {
origin: string; origin: string;
extractor_options?: ExtractorOptions, extractor_options?: ExtractorOptions,
num_tokens?: number, num_tokens?: number,
retry?: boolean,
} }
export interface FirecrawlScrapeResponse { export interface FirecrawlScrapeResponse {