diff --git a/apps/api/src/controllers/scrape.ts b/apps/api/src/controllers/scrape.ts index fb57e41d..a52cb307 100644 --- a/apps/api/src/controllers/scrape.ts +++ b/apps/api/src/controllers/scrape.ts @@ -9,6 +9,8 @@ import { Document } from "../lib/entities"; import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist"; // Import the isUrlBlocked function import { numTokensFromString } from '../lib/LLM-extraction/helpers'; import { defaultPageOptions, defaultExtractorOptions, defaultTimeout, defaultOrigin } from '../lib/default-values'; +import { addWebScraperJob } from '../services/queue-jobs'; +import { getWebScraperQueue } from '../services/queue-service'; export async function scrapeHelper( req: Request, @@ -33,49 +35,74 @@ export async function scrapeHelper( return { success: false, error: "Firecrawl currently does not support social media scraping due to policy restrictions. We're actively working on building support for it.", returnCode: 403 }; } - const a = new WebScraperDataProvider(); - await a.setOptions({ + // const a = new WebScraperDataProvider(); + // await a.setOptions({ + // mode: "single_urls", + // urls: [url], + // crawlerOptions: { + // ...crawlerOptions, + // }, + // pageOptions: pageOptions, + // extractorOptions: extractorOptions, + // }); + + const job = await addWebScraperJob({ + url, mode: "single_urls", - urls: [url], - crawlerOptions: { - ...crawlerOptions, - }, - pageOptions: pageOptions, - extractorOptions: extractorOptions, + crawlerOptions, + team_id, + pageOptions, + extractorOptions, + origin: req.body.origin ?? defaultOrigin, }); + const wsq = getWebScraperQueue(); + + let promiseResolve; + + const docsPromise = new Promise((resolve) => { + promiseResolve = resolve; + }); + + const listener = (j: string) => { + console.log("JOB COMPLETED", j, "vs", job.id); + if (j === job.id) { + promiseResolve(j); + wsq.removeListener("global:completed", listener); + } + } + + wsq.on("global:completed", listener); + const timeoutPromise = new Promise<{ success: boolean; error?: string; returnCode: number }>((_, reject) => setTimeout(() => reject({ success: false, error: "Request timed out. Increase the timeout by passing `timeout` param to the request.", returnCode: 408 }), timeout) ); - const docsPromise = a.getDocuments(false); - - let docs; + let j; try { - docs = await Promise.race([docsPromise, timeoutPromise]); + j = await Promise.race([docsPromise, timeoutPromise]); } catch (error) { + wsq.removeListener("global:completed", listener); return error; } + const jobNew = (await wsq.getJob(j)); + const doc = jobNew.progress().currentDocument; + delete doc.index; + // make sure doc.content is not empty - let filteredDocs = docs.filter( - (doc: { content?: string }) => doc.content && doc.content.trim().length > 0 - ); - if (filteredDocs.length === 0) { - return { success: true, error: "No page found", returnCode: 200, data: docs[0] }; + if (!doc) { + return { success: true, error: "No page found", returnCode: 200, data: doc }; } - // Remove rawHtml if pageOptions.rawHtml is false and extractorOptions.mode is llm-extraction-from-raw-html if (!pageOptions.includeRawHtml && extractorOptions.mode == "llm-extraction-from-raw-html") { - filteredDocs.forEach(doc => { - delete doc.rawHtml; - }); + delete doc.rawHtml; } return { success: true, - data: filteredDocs[0], + data: doc, returnCode: 200, }; } diff --git a/apps/api/src/services/queue-jobs.ts b/apps/api/src/services/queue-jobs.ts index 9403fc1f..d982f32f 100644 --- a/apps/api/src/services/queue-jobs.ts +++ b/apps/api/src/services/queue-jobs.ts @@ -7,11 +7,12 @@ import { WebScraperOptions } from "../types"; export async function addWebScraperJob( webScraperOptions: WebScraperOptions, - options: any = {} + options: any = {}, + jobId: string = uuidv4(), ): Promise { return await getWebScraperQueue().add(webScraperOptions, { ...options, - jobId: uuidv4(), + jobId, }); } diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index be2a4c70..2105863a 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -42,7 +42,9 @@ async function processJob(job: Job, done) { error: message /* etc... */, }; - await callWebhook(job.data.team_id, job.id as string, data); + if (job.data.mode === "crawl") { + await callWebhook(job.data.team_id, job.id as string, data); + } await logJob({ job_id: job.id as string, @@ -52,7 +54,7 @@ async function processJob(job: Job, done) { docs: docs, time_taken: timeTakenInSeconds, team_id: job.data.team_id, - mode: "crawl", + mode: job.data.mode, url: job.data.url, crawlerOptions: job.data.crawlerOptions, pageOptions: job.data.pageOptions, @@ -90,7 +92,9 @@ async function processJob(job: Job, done) { error: "Something went wrong... Contact help@mendable.ai or try again." /* etc... */, }; - await callWebhook(job.data.team_id, job.id as string, data); + if (job.data.mode === "crawl") { + await callWebhook(job.data.team_id, job.id as string, data); + } await logJob({ job_id: job.id as string, success: false, diff --git a/apps/api/src/types.ts b/apps/api/src/types.ts index cef49f2f..03296062 100644 --- a/apps/api/src/types.ts +++ b/apps/api/src/types.ts @@ -25,6 +25,7 @@ export interface WebScraperOptions { mode: Mode; crawlerOptions: any; pageOptions: any; + extractorOptions?: any; team_id: string; origin?: string; }