fix websocket

This commit is contained in:
Gergő Móricz 2024-08-23 19:55:41 +02:00
parent 05c250d3b8
commit 064ebfc54d

View File

@ -8,6 +8,7 @@ import { Logger } from "../../lib/logger";
import { getCrawl, getCrawlExpiry, getCrawlJobs, getDoneJobsOrdered, getDoneJobsOrderedLength, isCrawlFinished, isCrawlFinishedLocked } from "../../lib/crawl-redis"; import { getCrawl, getCrawlExpiry, getCrawlJobs, getDoneJobsOrdered, getDoneJobsOrderedLength, isCrawlFinished, isCrawlFinishedLocked } from "../../lib/crawl-redis";
import { getScrapeQueue } from "../../services/queue-service"; import { getScrapeQueue } from "../../services/queue-service";
import { getJob, getJobs } from "./crawl-status"; import { getJob, getJobs } from "./crawl-status";
import * as Sentry from "@sentry/node";
type ErrorMessage = { type ErrorMessage = {
type: "error", type: "error",
@ -56,31 +57,38 @@ async function crawlStatusWS(ws: WebSocket, req: RequestWithAuth<CrawlStatusPara
} }
let doneJobIDs = []; let doneJobIDs = [];
let finished = false;
const completedListener = async e => { const loop = async () => {
const job = await getScrapeQueue().getJob(e.jobId) if (finished) return;
if (job.data.crawl_id === req.params.jobId) {
if (doneJobIDs.includes(job.id)) return; const jobIDs = await getCrawlJobs(req.params.jobId);
const j = await getJob(job.id);
if (j.returnvalue) { if (jobIDs.length === doneJobIDs.length) {
return close(ws, 1000, { type: "done" });
}
const notDoneJobIDs = jobIDs.filter(x => !doneJobIDs.includes(x));
const jobStatuses = await Promise.all(notDoneJobIDs.map(async x => [x, await getScrapeQueue().getJobState(x)]));
const newlyDoneJobIDs = jobStatuses.filter(x => x[1] === "completed" || x[1] === "failed").map(x => x[0]);
for (const jobID of newlyDoneJobIDs) {
const job = await getJob(jobID);
if (job.returnvalue) {
send(ws, { send(ws, {
type: "document", type: "document",
data: legacyDocumentConverter(j.returnvalue), data: legacyDocumentConverter(job.returnvalue),
}); })
if (await isCrawlFinishedLocked(req.params.jobId)) {
await new Promise((resolve) => setTimeout(() => resolve(true), 5000)) // wait for last events to pour in
scrapeQueueEvents.removeListener("completed", completedListener);
close(ws, 1000, { type: "done" })
}
} else { } else {
// FAILED return close(ws, 3000, { type: "error", error: job.failedReason });
} }
} }
setTimeout(loop, 1000);
}; };
// TODO: handle failed jobs setTimeout(loop, 1000);
scrapeQueueEvents.addListener("completed", completedListener);
doneJobIDs = await getDoneJobsOrdered(req.params.jobId); doneJobIDs = await getDoneJobsOrdered(req.params.jobId);
@ -102,7 +110,7 @@ async function crawlStatusWS(ws: WebSocket, req: RequestWithAuth<CrawlStatusPara
}); });
if (status !== "scraping") { if (status !== "scraping") {
scrapeQueueEvents.removeListener("completed", completedListener); finished = true;
return close(ws, 1000, { type: "done" }); return close(ws, 1000, { type: "done" });
} }
} }
@ -127,6 +135,8 @@ export async function crawlStatusWSController(ws: WebSocket, req: RequestWithAut
await crawlStatusWS(ws, req); await crawlStatusWS(ws, req);
} catch (err) { } catch (err) {
Sentry.captureException(err);
const id = uuidv4(); const id = uuidv4();
let verbose = JSON.stringify(err); let verbose = JSON.stringify(err);
if (verbose === "{}") { if (verbose === "{}") {