fix(queue-worker): crawl finishing race condition

This commit is contained in:
Gergő Móricz 2024-08-16 18:48:52 +02:00
parent 6bd52e63bf
commit d0a8382a5b
2 changed files with 12 additions and 2 deletions

View File

@ -45,6 +45,16 @@ export async function isCrawlFinished(id: string) {
return (await redisConnection.scard("crawl:" + id + ":jobs_done")) === (await redisConnection.scard("crawl:" + id + ":jobs"));
}
export async function finishCrawl(id: string) {
if (await isCrawlFinished(id)) {
const set = await redisConnection.setnx("crawl:" + id + ":finish", "yes");
if (set === 1) {
await redisConnection.expire("crawl:" + id + ":finish", 24 * 60 * 60);
}
return set === 1
}
}
export async function getCrawlJobs(id: string): Promise<string[]> {
return await redisConnection.smembers("crawl:" + id + ":jobs");
}

View File

@ -15,7 +15,7 @@ import { Logger } from "../lib/logger";
import { Worker } from "bullmq";
import systemMonitor from "./system-monitor";
import { v4 as uuidv4 } from "uuid";
import { addCrawlJob, addCrawlJobDone, crawlToCrawler, getCrawl, getCrawlJobs, isCrawlFinished, lockURL } from "../lib/crawl-redis";
import { addCrawlJob, addCrawlJobDone, crawlToCrawler, finishCrawl, getCrawl, getCrawlJobs, isCrawlFinished, lockURL } from "../lib/crawl-redis";
import { StoredCrawl } from "../lib/crawl-redis";
import { addScrapeJob } from "./queue-jobs";
import { supabaseGetJobById } from "../../src/lib/supabase-jobs";
@ -199,7 +199,7 @@ async function processJob(job: Job, token: string) {
}
}
if (await isCrawlFinished(job.data.crawl_id)) {
if (await finishCrawl(job.data.crawl_id)) {
const jobIDs = await getCrawlJobs(job.data.crawl_id);
const jobs = (await Promise.all(jobIDs.map(async x => {