clean up crawl-status, fix db ddos

This commit is contained in:
Gergő Móricz 2024-08-16 23:29:30 +02:00
parent e5b807ccc4
commit aabfaf0ac5
5 changed files with 82 additions and 33 deletions

View File

@ -2,23 +2,27 @@ import { Response } from "express";
import { CrawlStatusParams, CrawlStatusResponse, ErrorResponse, legacyDocumentConverter, RequestWithAuth } from "./types";
import { getCrawl, getCrawlExpiry, getCrawlJobs, getDoneJobsOrdered, getDoneJobsOrderedLength } from "../../lib/crawl-redis";
import { getScrapeQueue } from "../../services/queue-service";
import { supabaseGetJobById } from "../../lib/supabase-jobs";
import { supabaseGetJobById, supabaseGetJobsById } from "../../lib/supabase-jobs";
async function getJob(id: string) {
const job = await getScrapeQueue().getJob(id);
if (!job) return job;
async function getJobs(ids: string[]) {
const jobs = (await Promise.all(ids.map(x => getScrapeQueue().getJob(x)))).filter(x => x);
if (process.env.USE_DB_AUTHENTICATION === "true") {
const supabaseData = await supabaseGetJobById(id);
const supabaseData = await supabaseGetJobsById(ids);
if (supabaseData) {
job.returnvalue = supabaseData.docs;
}
supabaseData.forEach(x => {
const job = jobs.find(y => y.id === x.job_id);
if (job) {
job.returnvalue = x.docs;
}
})
}
job.returnvalue = Array.isArray(job.returnvalue) ? job.returnvalue[0] : job.returnvalue;
jobs.forEach(job => {
job.returnvalue = Array.isArray(job.returnvalue) ? job.returnvalue[0] : job.returnvalue;
});
return job;
return jobs;
}
export async function crawlStatusController(req: RequestWithAuth<CrawlStatusParams, undefined, CrawlStatusResponse>, res: Response<CrawlStatusResponse>) {
@ -43,22 +47,30 @@ export async function crawlStatusController(req: RequestWithAuth<CrawlStatusPara
let doneJobs = [];
if (end === undefined) { // determine 10 megabyte limit
let bytes = 0, used = 0;
while (bytes < 10485760 && used < doneJobsOrder.length) {
const job = await getJob(doneJobsOrder[used]);
let bytes = 0;
const bytesLimit = 10485760; // 10 MiB in bytes
const factor = 100; // chunking for faster retrieval
doneJobs.push(job);
bytes += JSON.stringify(legacyDocumentConverter(job.returnvalue)).length;
used++;
for (let i = 0; i < doneJobsOrder.length && bytes < bytesLimit; i += factor) {
// get current chunk and retrieve jobs
const currentIDs = doneJobsOrder.slice(i, i+factor);
const jobs = await getJobs(currentIDs);
// iterate through jobs and add them one them one to the byte counter
// both loops will break once we cross the byte counter
for (let ii = 0; ii < jobs.length && bytes < bytesLimit; ii++) {
const job = jobs[ii];
doneJobs.push(job);
bytes += JSON.stringify(legacyDocumentConverter(job.returnvalue)).length;
}
}
if (used < doneJobsOrder.length) {
// if we ran over the bytes limit, remove the last document
if (bytes > bytesLimit) {
doneJobs.splice(doneJobs.length - 1, 1);
used--;
}
} else {
doneJobs = (await Promise.all(doneJobsOrder.map(async x => await getJob(x))));
doneJobs = await getJobs(doneJobsOrder);
}
const data = doneJobs.map(x => x.returnvalue);

View File

@ -17,3 +17,21 @@ export const supabaseGetJobById = async (jobId: string) => {
return data;
}
export const supabaseGetJobsById = async (jobIds: string[]) => {
const { data, error } = await supabase_service
.from('firecrawl_jobs')
.select('*')
.in('job_id', jobIds);
if (error) {
return [];
}
if (!data) {
return [];
}
return data;
}

View File

@ -27,7 +27,12 @@ export async function startWebScraperPipeline({
mode: job.data.mode,
crawlerOptions: job.data.crawlerOptions,
extractorOptions: job.data.extractorOptions,
pageOptions: job.data.pageOptions,
pageOptions: {
...job.data.pageOptions,
...(job.data.crawl_id ? ({
includeRawHtml: true,
}): {}),
},
inProgress: (progress) => {
Logger.debug(`🐂 Job in progress ${job.id}`);
if (progress.currentDocument) {

View File

@ -289,6 +289,23 @@ export class WebCrawler {
return null;
}
public extractLinksFromHTML(html: string, url: string) {
let links: string[] = [];
const $ = load(html);
$("a").each((_, element) => {
const href = $(element).attr("href");
if (href) {
const u = this.filterURL(href, url);
if (u !== null) {
links.push(u);
}
}
});
return links;
}
async crawl(url: string, pageOptions: PageOptions): Promise<{url: string, html: string, pageStatusCode?: number, pageError?: string}[]> {
if (this.visited.has(url) || !this.robots.isAllowed(url, "FireCrawlAgent")) {
return [];
@ -332,15 +349,7 @@ export class WebCrawler {
links.push({ url, html: content, pageStatusCode, pageError });
}
$("a").each((_, element) => {
const href = $(element).attr("href");
if (href) {
const u = this.filterURL(href, url);
if (u !== null) {
links.push({ url: u, html: content, pageStatusCode, pageError });
}
}
});
links.push(...this.extractLinksFromHTML(content, url).map(url => ({ url, html: content, pageStatusCode, pageError })));
if (this.visited.size === 1) {
return links;

View File

@ -130,6 +130,12 @@ async function processJob(job: Job, token: string) {
const end = Date.now();
const timeTakenInSeconds = (end - start) / 1000;
const rawHtml = docs[0].rawHtml;
if (job.data.crawl_id && (!job.data.pageOptions || !job.data.pageOptions.includeRawHtml)) {
delete docs[0].rawHtml;
}
const data = {
success,
result: {
@ -174,9 +180,8 @@ async function processJob(job: Job, token: string) {
if (!sc.cancelled) {
const crawler = crawlToCrawler(job.data.crawl_id, sc);
const links = crawler.filterLinks((data.docs[0].linksOnPage ?? [])
.map(href => crawler.filterURL(href.trim(), sc.originUrl))
.filter(x => x !== null),
const links = crawler.filterLinks(
crawler.extractLinksFromHTML(rawHtml ?? "", sc.originUrl),
Infinity,
sc.crawlerOptions?.maxDepth ?? 10
)