Nick: scrape queue

This commit is contained in:
Nicolas 2024-07-30 14:44:13 -04:00
parent 7e002a8b06
commit f43d5e7895
7 changed files with 124 additions and 71 deletions

View File

@ -9,8 +9,8 @@ import { Document } from "../lib/entities";
import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist"; // Import the isUrlBlocked function
import { numTokensFromString } from '../lib/LLM-extraction/helpers';
import { defaultPageOptions, defaultExtractorOptions, defaultTimeout, defaultOrigin } from '../lib/default-values';
import { addWebScraperJob } from '../services/queue-jobs';
import { getWebScraperQueue } from '../services/queue-service';
import { addScrapeJob, addWebScraperJob } from '../services/queue-jobs';
import { getScrapeQueue, getWebScraperQueue, scrapeQueueEvents } from '../services/queue-service';
import { supabase_service } from '../services/supabase';
import { v4 as uuidv4 } from "uuid";
import { Logger } from '../lib/logger';
@ -50,7 +50,7 @@ export async function scrapeHelper(
// extractorOptions: extractorOptions,
// });
const job = await addWebScraperJob({
const job = await addScrapeJob({
url,
mode: "single_urls",
crawlerOptions,
@ -60,39 +60,41 @@ export async function scrapeHelper(
origin: req.body.origin ?? defaultOrigin,
});
const wsq = getWebScraperQueue();
let promiseResolve;
// const docsPromise = new Promise((resolve) => {
// promiseResolve = resolve;
// });
const docsPromise = new Promise((resolve) => {
promiseResolve = resolve;
});
// const listener = (j: string, res: any) => {
// console.log("JOB COMPLETED", j, "vs", job.id, res);
// if (j === job.id) {
// promiseResolve([j, res]);
// sq.removeListener("global:completed", listener);
// }
// }
const jobResult = await job.waitUntilFinished(scrapeQueueEvents, 60 * 1000);//60 seconds timeout
const listener = (j: string, res: any) => {
console.log("JOB COMPLETED", j, "vs", job.id, res);
if (j === job.id) {
promiseResolve([j, res]);
wsq.removeListener("global:completed", listener);
}
}
// wsq.on("global:completed", listener);
const timeoutPromise = new Promise<{ success: boolean; error?: string; returnCode: number }>((_, reject) =>
setTimeout(() => reject({ success: false, error: "Request timed out. Increase the timeout by passing `timeout` param to the request.", returnCode: 408 }), timeout)
);
// const timeoutPromise = new Promise<{ success: boolean; error?: string; returnCode: number }>((_, reject) =>
// setTimeout(() => reject({ success: false, error: "Request timed out. Increase the timeout by passing `timeout` param to the request.", returnCode: 408 }), timeout)
// );
let j;
try {
j = await Promise.race([docsPromise, timeoutPromise]);
} catch (error) {
wsq.removeListener("global:completed", listener);
return error;
}
// let j;
// try {
// j = await Promise.race([jobResult, timeoutPromise]);
// } catch (error) {
// // sq.removeListener("global:completed", listener);
// return error;
// }
// console.log("JOB RESULT", j[1]);
let j1 = typeof j[1] === "string" ? JSON.parse(j[1]) : j[1];
// let j1 = typeof j[1] === "string" ? JSON.parse(j[1]) : j[1];
const doc = j1 !== null ? j1.result.links[0].content : (await supabase_service
console.log("JOB RESULT", jobResult);
const doc = jobResult !== null ? jobResult[0] : (await supabase_service
.from("firecrawl_jobs")
.select("docs")
.eq("job_id", job.id as string)).data[0]?.docs[0];

View File

@ -2,7 +2,7 @@ import express from "express";
import bodyParser from "body-parser";
import cors from "cors";
import "dotenv/config";
import { getWebScraperQueue } from "./services/queue-service";
import { getScrapeQueue, getWebScraperQueue } from "./services/queue-service";
import { v0Router } from "./routes/v0";
import { initSDK } from "@hyperdx/node-opentelemetry";
import cluster from "cluster";
@ -58,7 +58,7 @@ if (cluster.isMaster) {
serverAdapter.setBasePath(`/admin/${process.env.BULL_AUTH_KEY}/queues`);
const { addQueue, removeQueue, setQueues, replaceQueues } = createBullBoard({
queues: [new BullAdapter(getWebScraperQueue())],
queues: [new BullAdapter(getWebScraperQueue()), new BullAdapter(getScrapeQueue())],
serverAdapter: serverAdapter,
});

View File

@ -36,9 +36,9 @@ export async function startWebScraperPipeline({
job.updateProgress({ ...progress, partialDocs: partialDocs });
}
},
onSuccess: (result) => {
onSuccess: (result, mode) => {
Logger.debug(`🐂 Job completed ${job.id}`);
saveJob(job, result, token);
saveJob(job, result, token, mode);
},
onError: (error) => {
Logger.error(`🐂 Job failed ${job.id}`);
@ -113,7 +113,7 @@ export async function runWebScraper({
}
// This is where the returnvalue from the job is set
onSuccess(filteredDocs);
onSuccess(filteredDocs, mode);
// this return doesn't matter too much for the job completion result
return { success: true, message: "", docs: filteredDocs };
@ -123,7 +123,7 @@ export async function runWebScraper({
}
}
const saveJob = async (job: Job, result: any, token: string) => {
const saveJob = async (job: Job, result: any, token: string, mode: string) => {
try {
if (process.env.USE_DB_AUTHENTICATION === "true") {
const { data, error } = await supabase_service
@ -133,7 +133,11 @@ const saveJob = async (job: Job, result: any, token: string) => {
if (error) throw new Error(error.message);
try {
await job.moveToCompleted(null, token, false);
if (mode === "crawl") {
await job.moveToCompleted(null, token, false);
} else {
await job.moveToCompleted(result, token, false);
}
} catch (error) {
// I think the job won't exist here anymore
}

View File

@ -1,5 +1,6 @@
import { Job, Queue } from "bullmq";
import {
getScrapeQueue,
getWebScraperQueue,
} from "./queue-service";
import { v4 as uuidv4 } from "uuid";
@ -16,3 +17,14 @@ export async function addWebScraperJob(
});
}
export async function addScrapeJob(
webScraperOptions: WebScraperOptions,
options: any = {},
jobId: string = uuidv4(),
): Promise<Job> {
return await getScrapeQueue().add(jobId, webScraperOptions, {
...options,
jobId,
});
}

View File

@ -1,19 +1,16 @@
import { Queue } from "bullmq";
import { Logger } from "../lib/logger";
import IORedis from "ioredis";
import { Worker } from "bullmq";
import systemMonitor from "./system-monitor";
import { v4 as uuidv4 } from "uuid";
let webScraperQueue: Queue;
let scrapeQueue: Queue;
export const redisConnection = new IORedis(process.env.REDIS_URL, {
maxRetriesPerRequest: null,
});
export const webScraperQueueName = "{webscraperQueue}";
export const webScraperQueueName = "{crawlQueue}";
export const scrapeQueueName = "{scrapeQueue}";
export function getWebScraperQueue() {
if (!webScraperQueue) {
webScraperQueue = new Queue(
@ -38,4 +35,32 @@ export function getWebScraperQueue() {
return webScraperQueue;
}
export function getScrapeQueue() {
if (!scrapeQueue) {
scrapeQueue = new Queue(
scrapeQueueName,
{
connection: redisConnection,
}
// {
// settings: {
// lockDuration: 1 * 60 * 1000, // 1 minute in milliseconds,
// lockRenewTime: 15 * 1000, // 15 seconds in milliseconds
// stalledInterval: 30 * 1000,
// maxStalledCount: 10,
// },
// defaultJobOptions:{
// attempts: 5
// }
// }
);
Logger.info("Web scraper queue created");
}
return scrapeQueue;
}
import { QueueEvents } from 'bullmq';
export const scrapeQueueEvents = new QueueEvents(scrapeQueueName, { connection: redisConnection });
export const webScraperQueueEvents = new QueueEvents(webScraperQueueName, { connection: redisConnection });

View File

@ -1,19 +1,25 @@
import { CustomError } from "../lib/custom-error";
import { getWebScraperQueue, redisConnection, webScraperQueueName } from "./queue-service";
import {
getWebScraperQueue,
getScrapeQueue,
redisConnection,
webScraperQueueName,
scrapeQueueName,
} from "./queue-service";
import "dotenv/config";
import { logtail } from "./logtail";
import { startWebScraperPipeline } from "../main/runWebScraper";
import { callWebhook } from "./webhook";
import { logJob } from "./logging/log_job";
import { initSDK } from '@hyperdx/node-opentelemetry';
import { Job } from "bullmq";
import { initSDK } from "@hyperdx/node-opentelemetry";
import { Job, tryCatch } from "bullmq";
import { Logger } from "../lib/logger";
import { ScrapeEvents } from "../lib/scrape-events";
import { Worker } from "bullmq";
import systemMonitor from "./system-monitor";
import { v4 as uuidv4 } from "uuid";
if (process.env.ENV === 'production') {
if (process.env.ENV === "production") {
initSDK({
consoleCapture: true,
additionalInstrumentations: [],
@ -35,21 +41,23 @@ const connectionMonitorInterval =
Number(process.env.CONNECTION_MONITOR_INTERVAL) || 10;
const gotJobInterval = Number(process.env.CONNECTION_MONITOR_INTERVAL) || 20;
const wsq = getWebScraperQueue();
const sq = getScrapeQueue();
const processJobInternal = async (token: string, job: Job) => {
const extendLockInterval = setInterval(async () => {
await job.extendLock(token, jobLockExtensionTime);
}, jobLockExtendInterval);
try {
const result = await processJob(job, token);
// await resultQueue.add('resultJob', result,{jobId: job.id});
console.log("🐂 Job completed", result);
console.log({token})
console.log(await job.getState())
// await job.moveToCompleted(result, token, false); //3rd arg fetchNext
const jobState = await job.getState();
if(jobState !== "completed" && jobState !== "failed"){
try{
await job.moveToCompleted(result.docs, token, false); //3rd arg fetchNext
}catch(e){
// console.log("Job already completed, error:", e);
}
}
} catch (error) {
console.log("Job failed, error:", error);
@ -66,12 +74,8 @@ process.on("SIGINT", () => {
isShuttingDown = true;
});
const workerFun = async () => {
// const bullQueueName = queueNames[engine];
// const resultQueue = messageQueues[engine];
const worker = new Worker(webScraperQueueName, null, {
const workerFun = async (queueName: string, processJobInternal: (token: string, job: Job) => Promise<void>) => {
const worker = new Worker(queueName, null, {
connection: redisConnection,
lockDuration: 1 * 60 * 1000, // 1 minute
// lockRenewTime: 15 * 1000, // 15 seconds
@ -81,8 +85,6 @@ const workerFun = async () => {
worker.startStalledCheckTimer();
let contextManager;
const monitor = await systemMonitor;
while (true) {
@ -91,10 +93,7 @@ const workerFun = async () => {
break;
}
const token = uuidv4();
// console.time("acceptConnectionDelay");
const canAcceptConnection = await monitor.acceptConnection();
// console.timeEnd("acceptConnectionDelay");
// console.log("canAcceptConnection", canAcceptConnection);
if (!canAcceptConnection) {
console.log("Cant accept connection");
await sleep(cantAcceptConnectionInterval); // more sleep
@ -102,9 +101,8 @@ const workerFun = async () => {
}
const job = await worker.getNextJob(token);
// console.log("job", job);
if (job) {
processJobInternal(token, job);
await processJobInternal(token, job);
await sleep(gotJobInterval);
} else {
await sleep(connectionMonitorInterval);
@ -112,14 +110,15 @@ const workerFun = async () => {
}
};
workerFun();
workerFun(webScraperQueueName, processJobInternal);
workerFun(scrapeQueueName, processJobInternal);
async function processJob(job: Job, token: string) {
Logger.debug(`🐂 Worker taking job ${job.id}`);
try {
console.log("🐂 Updating progress");
console.log({job})
console.log({ job });
job.updateProgress({
current: 1,
total: 100,
@ -127,7 +126,10 @@ async function processJob(job: Job, token: string) {
current_url: "",
});
const start = Date.now();
const { success, message, docs } = await startWebScraperPipeline({ job, token });
const { success, message, docs } = await startWebScraperPipeline({
job,
token,
});
const end = Date.now();
const timeTakenInSeconds = (end - start) / 1000;
@ -135,11 +137,15 @@ async function processJob(job: Job, token: string) {
success: success,
result: {
links: docs.map((doc) => {
return { content: doc, source: doc?.metadata?.sourceURL ?? doc?.url ?? "" };
return {
content: doc,
source: doc?.metadata?.sourceURL ?? doc?.url ?? "",
};
}),
},
project_id: job.data.project_id,
error: message /* etc... */,
docs: docs,
};
if (job.data.mode === "crawl") {
@ -189,6 +195,7 @@ async function processJob(job: Job, token: string) {
const data = {
success: false,
docs: [],
project_id: job.data.project_id,
error:
"Something went wrong... Contact help@mendable.ai or try again." /* etc... */,
@ -199,7 +206,10 @@ async function processJob(job: Job, token: string) {
await logJob({
job_id: job.id as string,
success: false,
message: typeof error === 'string' ? error : (error.message ?? "Something went wrong... Contact help@mendable.ai"),
message:
typeof error === "string"
? error
: error.message ?? "Something went wrong... Contact help@mendable.ai",
num_docs: 0,
docs: [],
time_taken: 0,

View File

@ -36,7 +36,7 @@ export interface RunWebScraperParams {
crawlerOptions: any;
pageOptions?: any;
inProgress: (progress: any) => void;
onSuccess: (result: any) => void;
onSuccess: (result: any, mode: string) => void;
onError: (error: Error) => void;
team_id: string;
bull_job_id: string;