mirror of
https://github.com/mendableai/firecrawl.git
synced 2024-11-16 11:42:24 +08:00
feat: pick up and commit interrupted jobs from/to DB
This commit is contained in:
parent
77aa46588f
commit
6a524e1bae
|
@ -9,6 +9,7 @@ import { initSDK } from "@hyperdx/node-opentelemetry";
|
|||
import cluster from "cluster";
|
||||
import os from "os";
|
||||
import { Job } from "bull";
|
||||
import { supabase_service } from "./services/supabase";
|
||||
|
||||
const { createBullBoard } = require("@bull-board/api");
|
||||
const { BullAdapter } = require("@bull-board/api/bullAdapter");
|
||||
|
@ -20,6 +21,32 @@ console.log(`Number of CPUs: ${numCPUs} available`);
|
|||
if (cluster.isMaster) {
|
||||
console.log(`Master ${process.pid} is running`);
|
||||
|
||||
(async () => {
|
||||
if (process.env.USE_DB_AUTHENTICATION) {
|
||||
const wsq = getWebScraperQueue();
|
||||
const { error, data } = await supabase_service
|
||||
.from("firecrawl_jobs")
|
||||
.select()
|
||||
.eq("retry", true);
|
||||
|
||||
if (error) throw new Error(error.message);
|
||||
|
||||
await wsq.addBulk(data.map(x => ({
|
||||
data: {
|
||||
url: x.url,
|
||||
mode: x.mode,
|
||||
crawlerOptions: x.crawler_options,
|
||||
team_id: x.team_id,
|
||||
pageOptions: x.page_options,
|
||||
origin: x.origin,
|
||||
},
|
||||
opts: {
|
||||
jobId: x.job_id,
|
||||
}
|
||||
})))
|
||||
}
|
||||
})();
|
||||
|
||||
// Fork workers.
|
||||
for (let i = 0; i < numCPUs; i++) {
|
||||
cluster.fork();
|
||||
|
@ -33,7 +60,7 @@ if (cluster.isMaster) {
|
|||
}
|
||||
});
|
||||
|
||||
const onExit = () => {
|
||||
const onExit = async () => {
|
||||
console.log("Shutting down gracefully...");
|
||||
|
||||
if (cluster.workers) {
|
||||
|
@ -42,6 +69,30 @@ if (cluster.isMaster) {
|
|||
}
|
||||
}
|
||||
|
||||
if (process.env.USE_DB_AUTHENTICATION) {
|
||||
const wsq = getWebScraperQueue();
|
||||
const activeJobCount = await wsq.getActiveCount();
|
||||
console.log("Updating", activeJobCount, "in-progress jobs");
|
||||
|
||||
const activeJobs = (await Promise.all(new Array(Math.ceil(activeJobCount / 10)).fill(0).map((_, i) => {
|
||||
return wsq.getActive(i, i+10)
|
||||
}))).flat(1);
|
||||
|
||||
for (const job of activeJobs) {
|
||||
try {
|
||||
const { error } = await supabase_service
|
||||
.from("firecrawl_jobs")
|
||||
.update({ docs: job.data.docs, partial_docs: job.data.partialDocs, retry: true })
|
||||
.eq("job_id", job.id);
|
||||
|
||||
if (error) throw new Error(error.message);
|
||||
} catch (error) {
|
||||
console.error("Failed to update job status:", error);
|
||||
}
|
||||
await wsq.removeJobs(job.id.toString());
|
||||
}
|
||||
}
|
||||
|
||||
process.exit();
|
||||
};
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user