diff --git a/apps/api/fly.toml b/apps/api/fly.toml index 1272f4b9..ca619d16 100644 --- a/apps/api/fly.toml +++ b/apps/api/fly.toml @@ -22,6 +22,11 @@ kill_timeout = '5s' min_machines_running = 2 processes = ['app'] +[http_service.concurrency] + type = "requests" + hard_limit = 200 + soft_limit = 100 + [[services]] protocol = 'tcp' internal_port = 8080 @@ -38,10 +43,14 @@ kill_timeout = '5s' [services.concurrency] type = 'connections' - hard_limit = 45 - soft_limit = 20 + hard_limit = 75 + soft_limit = 30 [[vm]] - size = 'performance-1x' + size = 'performance-4x' + processes = ['app'] + + + diff --git a/apps/api/src/__tests__/e2e_withAuth/index.test.ts b/apps/api/src/__tests__/e2e_withAuth/index.test.ts index cd2d17a9..5e3777b3 100644 --- a/apps/api/src/__tests__/e2e_withAuth/index.test.ts +++ b/apps/api/src/__tests__/e2e_withAuth/index.test.ts @@ -353,6 +353,45 @@ describe("E2E Tests for API Routes", () => { }, 60000); }); // 60 seconds + it("If someone cancels a crawl job, it should turn into failed status", async () => { + const crawlResponse = await request(TEST_URL) + .post("/v0/crawl") + .set("Authorization", `Bearer ${process.env.TEST_API_KEY}`) + .set("Content-Type", "application/json") + .send({ url: "https://jestjs.io" }); + expect(crawlResponse.statusCode).toBe(200); + + + + // wait for 30 seconds + await new Promise((r) => setTimeout(r, 10000)); + + const response = await request(TEST_URL) + .delete(`/v0/crawl/cancel/${crawlResponse.body.jobId}`) + .set("Authorization", `Bearer ${process.env.TEST_API_KEY}`); + expect(response.statusCode).toBe(200); + expect(response.body).toHaveProperty("status"); + expect(response.body.status).toBe("cancelled"); + + await new Promise((r) => setTimeout(r, 20000)); + + const completedResponse = await request(TEST_URL) + .get(`/v0/crawl/status/${crawlResponse.body.jobId}`) + .set("Authorization", `Bearer ${process.env.TEST_API_KEY}`); + expect(completedResponse.statusCode).toBe(200); + expect(completedResponse.body).toHaveProperty("status"); + expect(completedResponse.body.status).toBe("failed"); + expect(completedResponse.body).toHaveProperty("data"); + expect(completedResponse.body.data).toEqual(null); + expect(completedResponse.body).toHaveProperty("partial_data"); + expect(completedResponse.body.partial_data[0]).toHaveProperty("content"); + expect(completedResponse.body.partial_data[0]).toHaveProperty("markdown"); + expect(completedResponse.body.partial_data[0]).toHaveProperty("metadata"); + + }, 60000); // 60 seconds + + + describe("POST /v0/scrape with LLM Extraction", () => { it("should extract data using LLM extraction mode", async () => { const response = await request(TEST_URL) diff --git a/apps/api/src/controllers/crawl-cancel.ts b/apps/api/src/controllers/crawl-cancel.ts new file mode 100644 index 00000000..8e8ba313 --- /dev/null +++ b/apps/api/src/controllers/crawl-cancel.ts @@ -0,0 +1,62 @@ +import { Request, Response } from "express"; +import { authenticateUser } from "./auth"; +import { RateLimiterMode } from "../../src/types"; +import { addWebScraperJob } from "../../src/services/queue-jobs"; +import { getWebScraperQueue } from "../../src/services/queue-service"; +import { supabase_service } from "../../src/services/supabase"; +import { billTeam } from "../../src/services/billing/credit_billing"; + +export async function crawlCancelController(req: Request, res: Response) { + try { + const { success, team_id, error, status } = await authenticateUser( + req, + res, + RateLimiterMode.CrawlStatus + ); + if (!success) { + return res.status(status).json({ error }); + } + const job = await getWebScraperQueue().getJob(req.params.jobId); + if (!job) { + return res.status(404).json({ error: "Job not found" }); + } + + // check if the job belongs to the team + const { data, error: supaError } = await supabase_service + .from("bulljobs_teams") + .select("*") + .eq("job_id", req.params.jobId) + .eq("team_id", team_id); + if (supaError) { + return res.status(500).json({ error: supaError.message }); + } + + if (data.length === 0) { + return res.status(403).json({ error: "Unauthorized" }); + } + const jobState = await job.getState(); + const { partialDocs } = await job.progress(); + + if (partialDocs && partialDocs.length > 0 && jobState === "active") { + console.log("Billing team for partial docs..."); + // Note: the credits that we will bill them here might be lower than the actual + // due to promises that are not yet resolved + await billTeam(team_id, partialDocs.length); + } + + try { + await job.moveToFailed(Error("Job cancelled by user"), true); + } catch (error) { + console.error(error); + } + + const newJobState = await job.getState(); + + res.json({ + status: newJobState === "failed" ? "cancelled" : "Cancelling...", + }); + } catch (error) { + console.error(error); + return res.status(500).json({ error: error.message }); + } +} diff --git a/apps/api/src/controllers/crawl.ts b/apps/api/src/controllers/crawl.ts index 3ba92139..e53faeda 100644 --- a/apps/api/src/controllers/crawl.ts +++ b/apps/api/src/controllers/crawl.ts @@ -6,6 +6,7 @@ import { authenticateUser } from "./auth"; import { RateLimiterMode } from "../../src/types"; import { addWebScraperJob } from "../../src/services/queue-jobs"; import { isUrlBlocked } from "../../src/scraper/WebScraper/utils/blocklist"; +import { logCrawl } from "../../src/services/logging/crawl_log"; export async function crawlController(req: Request, res: Response) { try { @@ -30,9 +31,14 @@ export async function crawlController(req: Request, res: Response) { } if (isUrlBlocked(url)) { - return res.status(403).json({ error: "Firecrawl currently does not support social media scraping due to policy restrictions. We're actively working on building support for it." }); + return res + .status(403) + .json({ + error: + "Firecrawl currently does not support social media scraping due to policy restrictions. We're actively working on building support for it.", + }); } - + const mode = req.body.mode ?? "crawl"; const crawlerOptions = req.body.crawlerOptions ?? {}; const pageOptions = req.body.pageOptions ?? { onlyMainContent: false, includeHtml: false }; @@ -66,6 +72,7 @@ export async function crawlController(req: Request, res: Response) { return res.status(500).json({ error: error.message }); } } + const job = await addWebScraperJob({ url: url, mode: mode ?? "crawl", // fix for single urls not working @@ -75,6 +82,8 @@ export async function crawlController(req: Request, res: Response) { origin: req.body.origin ?? "api", }); + await logCrawl(job.id.toString(), team_id); + res.json({ jobId: job.id }); } catch (error) { console.error(error); diff --git a/apps/api/src/lib/entities.ts b/apps/api/src/lib/entities.ts index 30bdfae9..a387b549 100644 --- a/apps/api/src/lib/entities.ts +++ b/apps/api/src/lib/entities.ts @@ -48,6 +48,7 @@ export type WebScraperOptions = { pageOptions?: PageOptions; extractorOptions?: ExtractorOptions; concurrentRequests?: number; + bullJobId?: string; }; export interface DocumentUrl { diff --git a/apps/api/src/main/runWebScraper.ts b/apps/api/src/main/runWebScraper.ts index 189d5005..3c9ea88e 100644 --- a/apps/api/src/main/runWebScraper.ts +++ b/apps/api/src/main/runWebScraper.ts @@ -26,7 +26,8 @@ export async function startWebScraperPipeline({ onError: (error) => { job.moveToFailed(error); }, - team_id: job.data.team_id + team_id: job.data.team_id, + bull_job_id: job.id.toString() })) as { success: boolean; message: string; docs: Document[] }; } export async function runWebScraper({ @@ -38,6 +39,7 @@ export async function runWebScraper({ onSuccess, onError, team_id, + bull_job_id, }: { url: string; mode: "crawl" | "single_urls" | "sitemap"; @@ -47,6 +49,7 @@ export async function runWebScraper({ onSuccess: (result: any) => void; onError: (error: any) => void; team_id: string; + bull_job_id: string; }): Promise<{ success: boolean; message: string; @@ -59,7 +62,8 @@ export async function runWebScraper({ mode: mode, urls: [url], crawlerOptions: crawlerOptions, - pageOptions: pageOptions + pageOptions: pageOptions, + bullJobId: bull_job_id }); } else { await provider.setOptions({ diff --git a/apps/api/src/routes/v0.ts b/apps/api/src/routes/v0.ts index f84b974b..42b8814d 100644 --- a/apps/api/src/routes/v0.ts +++ b/apps/api/src/routes/v0.ts @@ -5,6 +5,7 @@ import { scrapeController } from "../../src/controllers/scrape"; import { crawlPreviewController } from "../../src/controllers/crawlPreview"; import { crawlJobStatusPreviewController } from "../../src/controllers/status"; import { searchController } from "../../src/controllers/search"; +import { crawlCancelController } from "../../src/controllers/crawl-cancel"; export const v0Router = express.Router(); @@ -12,6 +13,7 @@ v0Router.post("/v0/scrape", scrapeController); v0Router.post("/v0/crawl", crawlController); v0Router.post("/v0/crawlWebsitePreview", crawlPreviewController); v0Router.get("/v0/crawl/status/:jobId", crawlStatusController); +v0Router.delete("/v0/crawl/cancel/:jobId", crawlCancelController); v0Router.get("/v0/checkJobStatus/:jobId", crawlJobStatusPreviewController); // Search routes diff --git a/apps/api/src/scraper/WebScraper/index.ts b/apps/api/src/scraper/WebScraper/index.ts index d42d27b2..e3256dbd 100644 --- a/apps/api/src/scraper/WebScraper/index.ts +++ b/apps/api/src/scraper/WebScraper/index.ts @@ -1,4 +1,9 @@ -import { Document, ExtractorOptions, PageOptions, WebScraperOptions } from "../../lib/entities"; +import { + Document, + ExtractorOptions, + PageOptions, + WebScraperOptions, +} from "../../lib/entities"; import { Progress } from "../../lib/entities"; import { scrapSingleUrl } from "./single_url"; import { SitemapEntry, fetchSitemapData, getLinksFromSitemap } from "./sitemap"; @@ -6,11 +11,15 @@ import { WebCrawler } from "./crawler"; import { getValue, setValue } from "../../services/redis"; import { getImageDescription } from "./utils/imageDescription"; import { fetchAndProcessPdf } from "./utils/pdfProcessor"; -import { replaceImgPathsWithAbsolutePaths, replacePathsWithAbsolutePaths } from "./utils/replacePaths"; +import { + replaceImgPathsWithAbsolutePaths, + replacePathsWithAbsolutePaths, +} from "./utils/replacePaths"; import { generateCompletions } from "../../lib/LLM-extraction"; - +import { getWebScraperQueue } from "../../../src/services/queue-service"; export class WebScraperDataProvider { + private bullJobId: string; private urls: string[] = [""]; private mode: "single_urls" | "sitemap" | "crawl" = "single_urls"; private includes: string[]; @@ -24,7 +33,8 @@ export class WebScraperDataProvider { private pageOptions?: PageOptions; private extractorOptions?: ExtractorOptions; private replaceAllPathsWithAbsolutePaths?: boolean = false; - private generateImgAltTextModel: "gpt-4-turbo" | "claude-3-opus" = "gpt-4-turbo"; + private generateImgAltTextModel: "gpt-4-turbo" | "claude-3-opus" = + "gpt-4-turbo"; authorize(): void { throw new Error("Method not implemented."); @@ -40,7 +50,7 @@ export class WebScraperDataProvider { ): Promise { const totalUrls = urls.length; let processedUrls = 0; - + const results: (Document | null)[] = new Array(urls.length).fill(null); for (let i = 0; i < urls.length; i += this.concurrentRequests) { const batchUrls = urls.slice(i, i + this.concurrentRequests); @@ -54,12 +64,26 @@ export class WebScraperDataProvider { total: totalUrls, status: "SCRAPING", currentDocumentUrl: url, - currentDocument: result + currentDocument: result, }); } + results[i + index] = result; }) ); + try { + if (this.mode === "crawl" && this.bullJobId) { + const job = await getWebScraperQueue().getJob(this.bullJobId); + const jobStatus = await job.getState(); + if (jobStatus === "failed") { + throw new Error( + "Job has failed or has been cancelled by the user. Stopping the job..." + ); + } + } + } catch (error) { + console.error(error); + } } return results.filter((result) => result !== null) as Document[]; } @@ -88,7 +112,9 @@ export class WebScraperDataProvider { * @param inProgress inProgress * @returns documents */ - private async processDocumentsWithoutCache(inProgress?: (progress: Progress) => void): Promise { + private async processDocumentsWithoutCache( + inProgress?: (progress: Progress) => void + ): Promise { switch (this.mode) { case "crawl": return this.handleCrawlMode(inProgress); @@ -101,7 +127,9 @@ export class WebScraperDataProvider { } } - private async handleCrawlMode(inProgress?: (progress: Progress) => void): Promise { + private async handleCrawlMode( + inProgress?: (progress: Progress) => void + ): Promise { const crawler = new WebCrawler({ initialUrl: this.urls[0], includes: this.includes, @@ -120,12 +148,16 @@ export class WebScraperDataProvider { return this.cacheAndFinalizeDocuments(documents, links); } - private async handleSingleUrlsMode(inProgress?: (progress: Progress) => void): Promise { + private async handleSingleUrlsMode( + inProgress?: (progress: Progress) => void + ): Promise { let documents = await this.processLinks(this.urls, inProgress); return documents; } - private async handleSitemapMode(inProgress?: (progress: Progress) => void): Promise { + private async handleSitemapMode( + inProgress?: (progress: Progress) => void + ): Promise { let links = await getLinksFromSitemap(this.urls[0]); if (this.returnOnlyUrls) { return this.returnOnlyUrlsResponse(links, inProgress); @@ -135,14 +167,17 @@ export class WebScraperDataProvider { return this.cacheAndFinalizeDocuments(documents, links); } - private async returnOnlyUrlsResponse(links: string[], inProgress?: (progress: Progress) => void): Promise { + private async returnOnlyUrlsResponse( + links: string[], + inProgress?: (progress: Progress) => void + ): Promise { inProgress?.({ current: links.length, total: links.length, status: "COMPLETED", currentDocumentUrl: this.urls[0], }); - return links.map(url => ({ + return links.map((url) => ({ content: "", html: this.pageOptions?.includeHtml ? "" : undefined, markdown: "", @@ -150,54 +185,73 @@ export class WebScraperDataProvider { })); } - private async processLinks(links: string[], inProgress?: (progress: Progress) => void): Promise { - let pdfLinks = links.filter(link => link.endsWith(".pdf")); + private async processLinks( + links: string[], + inProgress?: (progress: Progress) => void + ): Promise { + let pdfLinks = links.filter((link) => link.endsWith(".pdf")); let pdfDocuments = await this.fetchPdfDocuments(pdfLinks); - links = links.filter(link => !link.endsWith(".pdf")); + links = links.filter((link) => !link.endsWith(".pdf")); let documents = await this.convertUrlsToDocuments(links, inProgress); documents = await this.getSitemapData(this.urls[0], documents); documents = this.applyPathReplacements(documents); documents = await this.applyImgAltText(documents); - - if(this.extractorOptions.mode === "llm-extraction" && this.mode === "single_urls") { - documents = await generateCompletions( - documents, - this.extractorOptions - ) + + if ( + this.extractorOptions.mode === "llm-extraction" && + this.mode === "single_urls" + ) { + documents = await generateCompletions(documents, this.extractorOptions); } return documents.concat(pdfDocuments); } private async fetchPdfDocuments(pdfLinks: string[]): Promise { - return Promise.all(pdfLinks.map(async pdfLink => { - const pdfContent = await fetchAndProcessPdf(pdfLink); - return { - content: pdfContent, - metadata: { sourceURL: pdfLink }, - provider: "web-scraper" - }; - })); + return Promise.all( + pdfLinks.map(async (pdfLink) => { + const pdfContent = await fetchAndProcessPdf(pdfLink); + return { + content: pdfContent, + metadata: { sourceURL: pdfLink }, + provider: "web-scraper", + }; + }) + ); } private applyPathReplacements(documents: Document[]): Document[] { - return this.replaceAllPathsWithAbsolutePaths ? replacePathsWithAbsolutePaths(documents) : replaceImgPathsWithAbsolutePaths(documents); + return this.replaceAllPathsWithAbsolutePaths + ? replacePathsWithAbsolutePaths(documents) + : replaceImgPathsWithAbsolutePaths(documents); } private async applyImgAltText(documents: Document[]): Promise { - return this.generateImgAltText ? this.generatesImgAltText(documents) : documents; + return this.generateImgAltText + ? this.generatesImgAltText(documents) + : documents; } - private async cacheAndFinalizeDocuments(documents: Document[], links: string[]): Promise { + private async cacheAndFinalizeDocuments( + documents: Document[], + links: string[] + ): Promise { await this.setCachedDocuments(documents, links); documents = this.removeChildLinks(documents); return documents.splice(0, this.limit); } - private async processDocumentsWithCache(inProgress?: (progress: Progress) => void): Promise { - let documents = await this.getCachedDocuments(this.urls.slice(0, this.limit)); + private async processDocumentsWithCache( + inProgress?: (progress: Progress) => void + ): Promise { + let documents = await this.getCachedDocuments( + this.urls.slice(0, this.limit) + ); if (documents.length < this.limit) { - const newDocuments: Document[] = await this.getDocuments(false, inProgress); + const newDocuments: Document[] = await this.getDocuments( + false, + inProgress + ); documents = this.mergeNewDocuments(documents, newDocuments); } documents = this.filterDocsExcludeInclude(documents); @@ -206,9 +260,18 @@ export class WebScraperDataProvider { return documents.splice(0, this.limit); } - private mergeNewDocuments(existingDocuments: Document[], newDocuments: Document[]): Document[] { - newDocuments.forEach(doc => { - if (!existingDocuments.some(d => this.normalizeUrl(d.metadata.sourceURL) === this.normalizeUrl(doc.metadata?.sourceURL))) { + private mergeNewDocuments( + existingDocuments: Document[], + newDocuments: Document[] + ): Document[] { + newDocuments.forEach((doc) => { + if ( + !existingDocuments.some( + (d) => + this.normalizeUrl(d.metadata.sourceURL) === + this.normalizeUrl(doc.metadata?.sourceURL) + ) + ) { existingDocuments.push(doc); } }); @@ -289,7 +352,7 @@ export class WebScraperDataProvider { documents.push(cachedDocument); // get children documents - for (const childUrl of (cachedDocument.childrenLinks || [])) { + for (const childUrl of cachedDocument.childrenLinks || []) { const normalizedChildUrl = this.normalizeUrl(childUrl); const childCachedDocumentString = await getValue( "web-scraper-cache:" + normalizedChildUrl @@ -317,6 +380,7 @@ export class WebScraperDataProvider { throw new Error("Urls are required"); } + this.bullJobId = options.bullJobId; this.urls = options.urls; this.mode = options.mode; this.concurrentRequests = options.concurrentRequests ?? 20; @@ -400,8 +464,9 @@ export class WebScraperDataProvider { altText = await getImageDescription( imageUrl, backText, - frontText - , this.generateImgAltTextModel); + frontText, + this.generateImgAltTextModel + ); } document.content = document.content.replace( diff --git a/apps/api/src/services/logging/crawl_log.ts b/apps/api/src/services/logging/crawl_log.ts new file mode 100644 index 00000000..76a06072 --- /dev/null +++ b/apps/api/src/services/logging/crawl_log.ts @@ -0,0 +1,17 @@ +import { supabase_service } from "../supabase"; +import "dotenv/config"; + +export async function logCrawl(job_id: string, team_id: string) { + try { + const { data, error } = await supabase_service + .from("bulljobs_teams") + .insert([ + { + job_id: job_id, + team_id: team_id, + }, + ]); + } catch (error) { + console.error("Error logging crawl job:\n", error); + } +}