diff --git a/.github/workflows/clean-before-24h-complete-jobs.yml b/.github/workflows/clean-before-24h-complete-jobs.yml index 36c3c718..d897a19a 100644 --- a/.github/workflows/clean-before-24h-complete-jobs.yml +++ b/.github/workflows/clean-before-24h-complete-jobs.yml @@ -12,7 +12,7 @@ jobs: steps: - name: Send GET request to clean jobs run: | - response=$(curl --write-out '%{http_code}' --silent --output /dev/null https://api.firecrawl.dev/admin/${{ secrets.BULL_AUTH_KEY }}/clean-before-24h-complete-jobs) + response=$(curl --write-out '%{http_code}' --silent --output /dev/null --max-time 180 https://api.firecrawl.dev/admin/${{ secrets.BULL_AUTH_KEY }}/clean-before-24h-complete-jobs) if [ "$response" -ne 200 ]; then echo "Failed to clean jobs. Response: $response" exit 1 diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 494b4d5e..ff1ee6d3 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -8,6 +8,7 @@ import { v0Router } from "./routes/v0"; import { initSDK } from "@hyperdx/node-opentelemetry"; import cluster from "cluster"; import os from "os"; +import { Job } from "bull"; const { createBullBoard } = require("@bull-board/api"); const { BullAdapter } = require("@bull-board/api/bullAdapter"); @@ -189,24 +190,40 @@ if (cluster.isMaster) { async (req, res) => { try { const webScraperQueue = getWebScraperQueue(); - const completedJobs = await webScraperQueue.getJobs(["completed"]); + const batchSize = 10; + const numberOfBatches = 9; // Adjust based on your needs + const completedJobsPromises: Promise[] = []; + for (let i = 0; i < numberOfBatches; i++) { + completedJobsPromises.push(webScraperQueue.getJobs( + ["completed"], + i * batchSize, + i * batchSize + batchSize, + true + )); + } + const completedJobs: Job[] = (await Promise.all(completedJobsPromises)).flat(); const before24hJobs = completedJobs.filter( (job) => job.finishedOn < Date.now() - 24 * 60 * 60 * 1000 - ); - const jobIds = before24hJobs.map((job) => job.id) as string[]; + ) || []; + let count = 0; - for (const jobId of jobIds) { + + if (!before24hJobs) { + return res.status(200).send(`No jobs to remove.`); + } + + for (const job of before24hJobs) { try { - await webScraperQueue.removeJobs(jobId); + await job.remove() count++; } catch (jobError) { - console.error(`Failed to remove job with ID ${jobId}:`, jobError); + console.error(`Failed to remove job with ID ${job.id}:`, jobError); } } - res.status(200).send(`Removed ${count} completed jobs.`); + return res.status(200).send(`Removed ${count} completed jobs.`); } catch (error) { console.error("Failed to clean last 24h complete jobs:", error); - res.status(500).send("Failed to clean jobs"); + return res.status(500).send("Failed to clean jobs"); } } );