feat: move scraper to queue

This commit is contained in:
Gergő Móricz 2024-07-25 00:14:25 +02:00
parent 15890772be
commit 6798695ee4
4 changed files with 60 additions and 27 deletions

View File

@ -9,6 +9,8 @@ import { Document } from "../lib/entities";
import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist"; // Import the isUrlBlocked function
import { numTokensFromString } from '../lib/LLM-extraction/helpers';
import { defaultPageOptions, defaultExtractorOptions, defaultTimeout, defaultOrigin } from '../lib/default-values';
import { addWebScraperJob } from '../services/queue-jobs';
import { getWebScraperQueue } from '../services/queue-service';
export async function scrapeHelper(
req: Request,
@ -33,49 +35,74 @@ export async function scrapeHelper(
return { success: false, error: "Firecrawl currently does not support social media scraping due to policy restrictions. We're actively working on building support for it.", returnCode: 403 };
}
const a = new WebScraperDataProvider();
await a.setOptions({
// const a = new WebScraperDataProvider();
// await a.setOptions({
// mode: "single_urls",
// urls: [url],
// crawlerOptions: {
// ...crawlerOptions,
// },
// pageOptions: pageOptions,
// extractorOptions: extractorOptions,
// });
const job = await addWebScraperJob({
url,
mode: "single_urls",
urls: [url],
crawlerOptions: {
...crawlerOptions,
},
pageOptions: pageOptions,
extractorOptions: extractorOptions,
crawlerOptions,
team_id,
pageOptions,
extractorOptions,
origin: req.body.origin ?? defaultOrigin,
});
const wsq = getWebScraperQueue();
let promiseResolve;
const docsPromise = new Promise((resolve) => {
promiseResolve = resolve;
});
const listener = (j: string) => {
console.log("JOB COMPLETED", j, "vs", job.id);
if (j === job.id) {
promiseResolve(j);
wsq.removeListener("global:completed", listener);
}
}
wsq.on("global:completed", listener);
const timeoutPromise = new Promise<{ success: boolean; error?: string; returnCode: number }>((_, reject) =>
setTimeout(() => reject({ success: false, error: "Request timed out. Increase the timeout by passing `timeout` param to the request.", returnCode: 408 }), timeout)
);
const docsPromise = a.getDocuments(false);
let docs;
let j;
try {
docs = await Promise.race([docsPromise, timeoutPromise]);
j = await Promise.race([docsPromise, timeoutPromise]);
} catch (error) {
wsq.removeListener("global:completed", listener);
return error;
}
const jobNew = (await wsq.getJob(j));
const doc = jobNew.progress().currentDocument;
delete doc.index;
// make sure doc.content is not empty
let filteredDocs = docs.filter(
(doc: { content?: string }) => doc.content && doc.content.trim().length > 0
);
if (filteredDocs.length === 0) {
return { success: true, error: "No page found", returnCode: 200, data: docs[0] };
if (!doc) {
return { success: true, error: "No page found", returnCode: 200, data: doc };
}
// Remove rawHtml if pageOptions.rawHtml is false and extractorOptions.mode is llm-extraction-from-raw-html
if (!pageOptions.includeRawHtml && extractorOptions.mode == "llm-extraction-from-raw-html") {
filteredDocs.forEach(doc => {
delete doc.rawHtml;
});
delete doc.rawHtml;
}
return {
success: true,
data: filteredDocs[0],
data: doc,
returnCode: 200,
};
}

View File

@ -7,11 +7,12 @@ import { WebScraperOptions } from "../types";
export async function addWebScraperJob(
webScraperOptions: WebScraperOptions,
options: any = {}
options: any = {},
jobId: string = uuidv4(),
): Promise<Job> {
return await getWebScraperQueue().add(webScraperOptions, {
...options,
jobId: uuidv4(),
jobId,
});
}

View File

@ -42,7 +42,9 @@ async function processJob(job: Job, done) {
error: message /* etc... */,
};
await callWebhook(job.data.team_id, job.id as string, data);
if (job.data.mode === "crawl") {
await callWebhook(job.data.team_id, job.id as string, data);
}
await logJob({
job_id: job.id as string,
@ -52,7 +54,7 @@ async function processJob(job: Job, done) {
docs: docs,
time_taken: timeTakenInSeconds,
team_id: job.data.team_id,
mode: "crawl",
mode: job.data.mode,
url: job.data.url,
crawlerOptions: job.data.crawlerOptions,
pageOptions: job.data.pageOptions,
@ -90,7 +92,9 @@ async function processJob(job: Job, done) {
error:
"Something went wrong... Contact help@mendable.ai or try again." /* etc... */,
};
await callWebhook(job.data.team_id, job.id as string, data);
if (job.data.mode === "crawl") {
await callWebhook(job.data.team_id, job.id as string, data);
}
await logJob({
job_id: job.id as string,
success: false,

View File

@ -25,6 +25,7 @@ export interface WebScraperOptions {
mode: Mode;
crawlerOptions: any;
pageOptions: any;
extractorOptions?: any;
team_id: string;
origin?: string;
}