From 6a524e1bae1e0934cf89400a62c09a555b6fdb6b Mon Sep 17 00:00:00 2001 From: Gergo Moricz Date: Tue, 9 Jul 2024 14:56:47 +0200 Subject: [PATCH] feat: pick up and commit interrupted jobs from/to DB --- apps/api/src/index.ts | 53 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 52 insertions(+), 1 deletion(-) diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 747f8a7b..f18975eb 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -9,6 +9,7 @@ import { initSDK } from "@hyperdx/node-opentelemetry"; import cluster from "cluster"; import os from "os"; import { Job } from "bull"; +import { supabase_service } from "./services/supabase"; const { createBullBoard } = require("@bull-board/api"); const { BullAdapter } = require("@bull-board/api/bullAdapter"); @@ -20,6 +21,32 @@ console.log(`Number of CPUs: ${numCPUs} available`); if (cluster.isMaster) { console.log(`Master ${process.pid} is running`); + (async () => { + if (process.env.USE_DB_AUTHENTICATION) { + const wsq = getWebScraperQueue(); + const { error, data } = await supabase_service + .from("firecrawl_jobs") + .select() + .eq("retry", true); + + if (error) throw new Error(error.message); + + await wsq.addBulk(data.map(x => ({ + data: { + url: x.url, + mode: x.mode, + crawlerOptions: x.crawler_options, + team_id: x.team_id, + pageOptions: x.page_options, + origin: x.origin, + }, + opts: { + jobId: x.job_id, + } + }))) + } + })(); + // Fork workers. for (let i = 0; i < numCPUs; i++) { cluster.fork(); @@ -33,7 +60,7 @@ if (cluster.isMaster) { } }); - const onExit = () => { + const onExit = async () => { console.log("Shutting down gracefully..."); if (cluster.workers) { @@ -42,6 +69,30 @@ if (cluster.isMaster) { } } + if (process.env.USE_DB_AUTHENTICATION) { + const wsq = getWebScraperQueue(); + const activeJobCount = await wsq.getActiveCount(); + console.log("Updating", activeJobCount, "in-progress jobs"); + + const activeJobs = (await Promise.all(new Array(Math.ceil(activeJobCount / 10)).fill(0).map((_, i) => { + return wsq.getActive(i, i+10) + }))).flat(1); + + for (const job of activeJobs) { + try { + const { error } = await supabase_service + .from("firecrawl_jobs") + .update({ docs: job.data.docs, partial_docs: job.data.partialDocs, retry: true }) + .eq("job_id", job.id); + + if (error) throw new Error(error.message); + } catch (error) { + console.error("Failed to update job status:", error); + } + await wsq.removeJobs(job.id.toString()); + } + } + process.exit(); };