Merge pull request #721 from mendableai/feat/concurrency-limit

Concurrency limits
This commit is contained in:
Nicolas 2024-10-01 16:15:05 -03:00 committed by GitHub
commit d1b838322d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 181 additions and 43 deletions

View File

@ -54,9 +54,9 @@ export async function crawlStatusController(req: Request, res: Response) {
const jobs = (await getJobs(req.params.jobId, jobIDs)).sort((a, b) => a.timestamp - b.timestamp);
const jobStatuses = await Promise.all(jobs.map(x => x.getState()));
const jobStatus = sc.cancelled ? "failed" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "active";
const jobStatus = sc.cancelled ? "failed" : jobStatuses.every(x => x === "completed") ? "completed" : jobs.some((x, i) => jobStatuses[i] === "failed" && x.failedReason !== "Concurrency limit hit") ? "failed" : "active";
const data = jobs.map(x => Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue);
const data = jobs.filter(x => x.failedReason !== "Concurreny limit hit").map(x => Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue);
if (
jobs.length > 0 &&

View File

@ -171,7 +171,8 @@ export async function crawlController(req: Request, res: Response) {
url,
mode: "single_urls",
crawlerOptions: crawlerOptions,
team_id: team_id,
team_id,
plan,
pageOptions: pageOptions,
origin: req.body.origin ?? defaultOrigin,
crawl_id: id,
@ -211,7 +212,8 @@ export async function crawlController(req: Request, res: Response) {
url,
mode: "single_urls",
crawlerOptions: crawlerOptions,
team_id: team_id,
team_id,
plan,
pageOptions: pageOptions,
origin: req.body.origin ?? defaultOrigin,
crawl_id: id,

View File

@ -107,7 +107,8 @@ export async function crawlPreviewController(req: Request, res: Response) {
url,
mode: "single_urls",
crawlerOptions: crawlerOptions,
team_id: team_id,
team_id,
plan,
pageOptions: pageOptions,
origin: "website-preview",
crawl_id: id,
@ -121,7 +122,8 @@ export async function crawlPreviewController(req: Request, res: Response) {
url,
mode: "single_urls",
crawlerOptions: crawlerOptions,
team_id: team_id,
team_id,
plan,
pageOptions: pageOptions,
origin: "website-preview",
crawl_id: id,

View File

@ -61,6 +61,7 @@ export async function scrapeHelper(
crawlerOptions,
team_id,
pageOptions,
plan,
extractorOptions,
origin: req.body.origin ?? defaultOrigin,
is_scrape: true,
@ -196,7 +197,7 @@ export async function scrapeController(req: Request, res: Response) {
await checkTeamCredits(chunk, team_id, 1);
if (!creditsCheckSuccess) {
earlyReturn = true;
return res.status(402).json({ error: "Insufficient credits" });
return res.status(402).json({ error: "Insufficient credits. For more credits, you can upgrade your plan at https://firecrawl.dev/pricing" });
}
} catch (error) {
Logger.error(error);

View File

@ -0,0 +1,25 @@
import { authenticateUser } from "../auth";
import {
ConcurrencyCheckParams,
ConcurrencyCheckResponse,
RequestWithAuth,
} from "./types";
import { RateLimiterMode } from "../../types";
import { Response } from "express";
import { redisConnection } from "../../services/queue-service";
// Basically just middleware and error wrapping
export async function concurrencyCheckController(
req: RequestWithAuth<ConcurrencyCheckParams, undefined, undefined>,
res: Response<ConcurrencyCheckResponse>
) {
const concurrencyLimiterKey = "concurrency-limiter:" + req.auth.team_id;
const now = Date.now();
const activeJobsOfTeam = await redisConnection.zrangebyscore(
concurrencyLimiterKey,
now,
Infinity
);
return res
.status(200)
.json({ success: true, concurrency: activeJobsOfTeam.length });
}

View File

@ -5,7 +5,7 @@ import { CrawlStatusParams, CrawlStatusResponse, Document, ErrorResponse, legacy
import { WebSocket } from "ws";
import { v4 as uuidv4 } from "uuid";
import { Logger } from "../../lib/logger";
import { getCrawl, getCrawlExpiry, getCrawlJobs, getDoneJobsOrdered, getDoneJobsOrderedLength, isCrawlFinished, isCrawlFinishedLocked } from "../../lib/crawl-redis";
import { getCrawl, getCrawlExpiry, getCrawlJobs, getDoneJobsOrdered, getDoneJobsOrderedLength, getThrottledJobs, isCrawlFinished, isCrawlFinishedLocked } from "../../lib/crawl-redis";
import { getScrapeQueue } from "../../services/queue-service";
import { getJob, getJobs } from "./crawl-status";
import * as Sentry from "@sentry/node";
@ -95,8 +95,10 @@ async function crawlStatusWS(ws: WebSocket, req: RequestWithAuth<CrawlStatusPara
doneJobIDs = await getDoneJobsOrdered(req.params.jobId);
const jobIDs = await getCrawlJobs(req.params.jobId);
const jobStatuses = await Promise.all(jobIDs.map(x => getScrapeQueue().getJobState(x)));
const status: Exclude<CrawlStatusResponse, ErrorResponse>["status"] = sc.cancelled ? "cancelled" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "scraping";
let jobStatuses = await Promise.all(jobIDs.map(async x => [x, await getScrapeQueue().getJobState(x)] as const));
const throttledJobs = new Set(...await getThrottledJobs(req.auth.team_id));
jobStatuses = jobStatuses.filter(x => !throttledJobs.has(x[0])); // throttled jobs can have a failed status, but they are not actually failed
const status: Exclude<CrawlStatusResponse, ErrorResponse>["status"] = sc.cancelled ? "cancelled" : jobStatuses.every(x => x[1] === "completed") ? "completed" : jobStatuses.some(x => x[1] === "failed") ? "failed" : "scraping";
const doneJobs = await getJobs(doneJobIDs);
const data = doneJobs.map(x => x.returnvalue);

View File

@ -1,6 +1,6 @@
import { Response } from "express";
import { CrawlStatusParams, CrawlStatusResponse, ErrorResponse, legacyDocumentConverter, RequestWithAuth } from "./types";
import { getCrawl, getCrawlExpiry, getCrawlJobs, getDoneJobsOrdered, getDoneJobsOrderedLength } from "../../lib/crawl-redis";
import { getCrawl, getCrawlExpiry, getCrawlJobs, getDoneJobsOrdered, getDoneJobsOrderedLength, getThrottledJobs } from "../../lib/crawl-redis";
import { getScrapeQueue } from "../../services/queue-service";
import { supabaseGetJobById, supabaseGetJobsById } from "../../lib/supabase-jobs";
import { configDotenv } from "dotenv";
@ -58,8 +58,10 @@ export async function crawlStatusController(req: RequestWithAuth<CrawlStatusPara
const end = typeof req.query.limit === "string" ? (start + parseInt(req.query.limit, 10) - 1) : undefined;
const jobIDs = await getCrawlJobs(req.params.jobId);
const jobStatuses = await Promise.all(jobIDs.map(x => getScrapeQueue().getJobState(x)));
const status: Exclude<CrawlStatusResponse, ErrorResponse>["status"] = sc.cancelled ? "cancelled" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "scraping";
let jobStatuses = await Promise.all(jobIDs.map(async x => [x, await getScrapeQueue().getJobState(x)] as const));
const throttledJobs = new Set(...await getThrottledJobs(req.auth.team_id));
jobStatuses = jobStatuses.filter(x => !throttledJobs.has(x[0])); // throttled jobs can have a failed status, but they are not actually failed
const status: Exclude<CrawlStatusResponse, ErrorResponse>["status"] = sc.cancelled ? "cancelled" : jobStatuses.every(x => x[1] === "completed") ? "completed" : jobStatuses.some(x => x[1] === "failed") ? "failed" : "scraping";
const doneJobsLength = await getDoneJobsOrderedLength(req.params.jobId);
const doneJobsOrder = await getDoneJobsOrdered(req.params.jobId, start, end ?? -1);

View File

@ -106,6 +106,7 @@ export async function crawlController(
url,
mode: "single_urls",
team_id: req.auth.team_id,
plan: req.auth.plan,
crawlerOptions,
pageOptions,
origin: "api",
@ -138,6 +139,7 @@ export async function crawlController(
mode: "single_urls",
crawlerOptions: crawlerOptions,
team_id: req.auth.team_id,
plan: req.auth.plan,
pageOptions: pageOptions,
origin: "api",
crawl_id: id,

View File

@ -44,6 +44,7 @@ export async function scrapeController(
mode: "single_urls",
crawlerOptions: {},
team_id: req.auth.team_id,
plan: req.auth.plan,
pageOptions,
extractorOptions,
origin: req.body.origin,

View File

@ -216,6 +216,7 @@ export type Document = {
actions?: {
screenshots: string[];
};
warning?: string;
metadata: {
title?: string;
description?: string;
@ -293,6 +294,17 @@ export type CrawlStatusParams = {
jobId: string;
};
export type ConcurrencyCheckParams = {
teamId: string;
};
export type ConcurrencyCheckResponse =
| ErrorResponse
| {
success: true;
concurrency: number;
};
export type CrawlStatusResponse =
| ErrorResponse
| {
@ -444,6 +456,7 @@ export function legacyDocumentConverter(doc: any): Document {
extract: doc.llm_extraction,
screenshot: doc.screenshot ?? doc.fullPageScreenshot,
actions: doc.actions ?? undefined,
warning: doc.warning ?? undefined,
metadata: {
...doc.metadata,
pageError: undefined,

View File

@ -83,6 +83,10 @@ export async function getCrawlJobs(id: string): Promise<string[]> {
return await redisConnection.smembers("crawl:" + id + ":jobs");
}
export async function getThrottledJobs(teamId: string): Promise<string[]> {
return await redisConnection.zrangebyscore("concurrency-limiter:" + teamId + ":throttled", Date.now(), Infinity);
}
export async function lockURL(id: string, sc: StoredCrawl, url: string): Promise<boolean> {
if (typeof sc.crawlerOptions?.limit === "number") {
if (await redisConnection.scard("crawl:" + id + ":visited") >= sc.crawlerOptions.limit) {

View File

@ -16,6 +16,7 @@ import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist";
import { crawlCancelController } from "../controllers/v1/crawl-cancel";
import { Logger } from "../lib/logger";
import { scrapeStatusController } from "../controllers/v1/scrape-status";
import { concurrencyCheckController } from "../controllers/v1/concurrency-check";
// import { crawlPreviewController } from "../../src/controllers/v1/crawlPreview";
// import { crawlJobStatusPreviewController } from "../../src/controllers/v1/status";
// import { searchController } from "../../src/controllers/v1/search";
@ -140,11 +141,19 @@ v1Router.get(
wrap(scrapeStatusController)
);
v1Router.get(
"/concurrency-check",
authMiddleware(RateLimiterMode.CrawlStatus),
wrap(concurrencyCheckController)
);
v1Router.ws(
"/crawl/:jobId",
crawlStatusWSController
);
// v1Router.post("/crawlWebsitePreview", crawlPreviewController);

View File

@ -23,7 +23,7 @@ export async function getLinksFromSitemap(
const response = await axios.get(sitemapUrl, { timeout: axiosTimeout });
content = response.data;
} else if (mode === 'fire-engine') {
const response = await scrapWithFireEngine({ url: sitemapUrl, fireEngineOptions: { engine:"tlsclient", disableJsDom: true, mobileProxy: true } });
const response = await scrapWithFireEngine({ url: sitemapUrl, fireEngineOptions: { engine:"playwright" } });
content = response.html;
}
} catch (error) {

View File

@ -63,8 +63,11 @@ export function waitForJob(jobId: string, timeout: number) {
resolve((await getScrapeQueue().getJob(jobId)).returnvalue);
} else if (state === "failed") {
// console.log("failed", (await getScrapeQueue().getJob(jobId)).failedReason);
clearInterval(int);
reject((await getScrapeQueue().getJob(jobId)).failedReason);
const job = await getScrapeQueue().getJob(jobId);
if (job && job.failedReason !== "Concurrency limit hit") {
clearInterval(int);
reject(job.failedReason);
}
}
}
}, 500);

View File

@ -12,7 +12,7 @@ import { startWebScraperPipeline } from "../main/runWebScraper";
import { callWebhook } from "./webhook";
import { logJob } from "./logging/log_job";
import { initSDK } from "@hyperdx/node-opentelemetry";
import { Job } from "bullmq";
import { Job, Queue } from "bullmq";
import { Logger } from "../lib/logger";
import { Worker } from "bullmq";
import systemMonitor from "./system-monitor";
@ -34,9 +34,10 @@ import {
deleteJobPriority,
getJobPriority,
} from "../../src/lib/job-priority";
import { PlanType } from "../types";
import { PlanType, RateLimiterMode } from "../types";
import { getJobs } from "../../src/controllers/v1/crawl-status";
import { configDotenv } from "dotenv";
import { getRateLimiterPoints } from "./rate-limiter";
configDotenv();
if (process.env.ENV === "production") {
@ -99,10 +100,10 @@ process.on("SIGINT", () => {
});
const workerFun = async (
queueName: string,
queue: Queue,
processJobInternal: (token: string, job: Job) => Promise<any>
) => {
const worker = new Worker(queueName, null, {
const worker = new Worker(queue.name, null, {
connection: redisConnection,
lockDuration: 1 * 60 * 1000, // 1 minute
// lockRenewTime: 15 * 1000, // 15 seconds
@ -129,6 +130,49 @@ const workerFun = async (
const job = await worker.getNextJob(token);
if (job) {
const concurrencyLimiterKey = "concurrency-limiter:" + job.data?.team_id;
if (job.data && job.data.team_id && job.data.plan) {
const concurrencyLimiterThrottledKey = "concurrency-limiter:" + job.data.team_id + ":throttled";
const concurrencyLimit = getRateLimiterPoints(RateLimiterMode.Scrape, undefined, job.data.plan);
const now = Date.now();
const stalledJobTimeoutMs = 2 * 60 * 1000;
const throttledJobTimeoutMs = 10 * 60 * 1000;
redisConnection.zremrangebyscore(concurrencyLimiterThrottledKey, -Infinity, now);
redisConnection.zremrangebyscore(concurrencyLimiterKey, -Infinity, now);
const activeJobsOfTeam = await redisConnection.zrangebyscore(concurrencyLimiterKey, now, Infinity);
if (activeJobsOfTeam.length >= concurrencyLimit) {
// Nick: removed the log because it was too spammy, tested and confirmed that the job is added back to the queue
// Logger.info("Moving job " + job.id + " back the queue -- concurrency limit hit");
// Concurrency limit hit, throttles the job
await redisConnection.zadd(concurrencyLimiterThrottledKey, now + throttledJobTimeoutMs, job.id);
// We move to failed with a specific error
await job.moveToFailed(new Error("Concurrency limit hit"), token, false);
// Remove the job from the queue
await job.remove();
// Increment the priority of the job exponentially by 5%, Note: max bull priority is 2 million
const newJobPriority = Math.min(Math.round((job.opts.priority ?? 10) * 1.05), 20000);
// Add the job back to the queue with the new priority
await queue.add(job.name, {
...job.data,
concurrencyLimitHit: true,
}, {
...job.opts,
jobId: job.id,
priority: newJobPriority, // exponential backoff for stuck jobs
});
// await sleep(gotJobInterval);
continue;
} else {
// If we are not throttled, add the job back to the queue with the new priority
await redisConnection.zadd(concurrencyLimiterKey, now + stalledJobTimeoutMs, job.id);
// Remove the job from the throttled list
await redisConnection.zrem(concurrencyLimiterThrottledKey, job.id);
}
}
if (job.data && job.data.sentry && Sentry.isInitialized()) {
Sentry.continueTrace(
{
@ -159,7 +203,15 @@ const workerFun = async (
},
},
async () => {
const res = await processJobInternal(token, job);
let res;
try {
res = await processJobInternal(token, job);
} finally {
if (job.id && job.data && job.data.team_id) {
await redisConnection.zrem(concurrencyLimiterKey, job.id);
}
}
if (res !== null) {
span.setStatus({ code: 2 }); // ERROR
} else {
@ -181,7 +233,12 @@ const workerFun = async (
},
},
() => {
processJobInternal(token, job);
processJobInternal(token, job)
.finally(() => {
if (job.id && job.data && job.data.team_id) {
redisConnection.zrem(concurrencyLimiterKey, job.id);
}
});
}
);
}
@ -193,7 +250,7 @@ const workerFun = async (
}
};
workerFun(scrapeQueueName, processJobInternal);
workerFun(getScrapeQueue(), processJobInternal);
async function processJob(job: Job, token: string) {
Logger.info(`🐂 Worker taking job ${job.id}`);
@ -254,7 +311,10 @@ async function processJob(job: Job, token: string) {
},
project_id: job.data.project_id,
error: message /* etc... */,
docs,
docs: job.data.concurrencyLimitHit ? docs.map(x => ({
...x,
warning: "This scrape was throttled because you hit you concurrency limit." + (x.warning ? " " + x.warning : ""),
})) : docs,
};
// No idea what this does and when it is called.
@ -331,6 +391,7 @@ async function processJob(job: Job, token: string) {
mode: "single_urls",
crawlerOptions: sc.crawlerOptions,
team_id: sc.team_id,
plan: job.data.plan,
pageOptions: sc.pageOptions,
origin: job.data.origin,
crawl_id: job.data.crawl_id,

View File

@ -49,7 +49,7 @@ describe("Rate Limiter Service", () => {
"nonexistent" as RateLimiterMode,
"test-prefix:someToken"
);
expect(limiter).toBe(serverRateLimiter);
expect(limiter.points).toBe(serverRateLimiter.points);
});
it("should return the correct rate limiter based on mode and plan", () => {
@ -210,7 +210,7 @@ describe("Rate Limiter Service", () => {
"test-prefix:someToken",
"starter"
);
expect(limiter2.points).toBe(3);
expect(limiter2.points).toBe(10);
const limiter3 = getRateLimiter(
"crawl" as RateLimiterMode,
@ -233,7 +233,7 @@ describe("Rate Limiter Service", () => {
"test-prefix:someToken",
"starter"
);
expect(limiter2.points).toBe(20);
expect(limiter2.points).toBe(100);
const limiter3 = getRateLimiter(
"scrape" as RateLimiterMode,
@ -263,14 +263,14 @@ describe("Rate Limiter Service", () => {
"test-prefix:someToken",
"starter"
);
expect(limiter2.points).toBe(20);
expect(limiter2.points).toBe(50);
const limiter3 = getRateLimiter(
"search" as RateLimiterMode,
"test-prefix:someToken",
"standard"
);
expect(limiter3.points).toBe(40);
expect(limiter3.points).toBe(50);
});
it("should return the correct rate limiter for 'preview' mode", () => {

View File

@ -123,14 +123,32 @@ const testSuiteTokens = ["a01ccae", "6254cf9", "0f96e673", "23befa1b", "69141c4"
const manual = ["69be9e74-7624-4990-b20d-08e0acc70cf6"];
export function getRateLimiter(
function makePlanKey(plan?: string) {
return plan ? plan.replace("-", "") : "default"; // "default"
}
export function getRateLimiterPoints(
mode: RateLimiterMode,
token: string,
token?: string,
plan?: string,
teamId?: string
) {
) : number {
const rateLimitConfig = RATE_LIMITS[mode]; // {default : 5}
if (!rateLimitConfig) return RATE_LIMITS.account.default;
if (testSuiteTokens.some(testToken => token.includes(testToken))) {
const points : number =
rateLimitConfig[makePlanKey(plan)] || rateLimitConfig.default; // 5
return points;
}
export function getRateLimiter(
mode: RateLimiterMode,
token?: string,
plan?: string,
teamId?: string
) : RateLimiterRedis {
if (token && testSuiteTokens.some(testToken => token.includes(testToken))) {
return testSuiteRateLimiter;
}
@ -141,14 +159,6 @@ export function getRateLimiter(
if(teamId && manual.includes(teamId)) {
return manualRateLimiter;
}
const rateLimitConfig = RATE_LIMITS[mode]; // {default : 5}
if (!rateLimitConfig) return serverRateLimiter;
const planKey = plan ? plan.replace("-", "") : "default"; // "default"
const points =
rateLimitConfig[planKey] || rateLimitConfig.default || rateLimitConfig; // 5
return createRateLimiter(`${mode}-${planKey}`, points);
return createRateLimiter(`${mode}-${makePlanKey(plan)}`, getRateLimiterPoints(mode, token, plan, teamId));
}

View File

@ -28,6 +28,7 @@ export interface WebScraperOptions {
pageOptions: any;
extractorOptions?: any;
team_id: string;
plan: string;
origin?: string;
crawl_id?: string;
sitemapped?: boolean;