Nick: chrome cdp main | simple autoscaler

This commit is contained in:
Nicolas 2024-08-23 20:09:59 -03:00
parent 732e6af8b9
commit 173f4ee1bf
4 changed files with 135 additions and 26 deletions

36
.github/workflows/autoscale.yml vendored Normal file
View File

@ -0,0 +1,36 @@
name: Simple Autoscaler
on:
schedule:
- cron: '*/0.5 * * * *'
env:
BULL_AUTH_KEY: ${{ secrets.BULL_AUTH_KEY }}
FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }}
jobs:
scale:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: superfly/flyctl-actions/setup-flyctl@master
- name: Send GET request to check queues
run: |
response=$(curl --silent --max-time 180 https://api.firecrawl.dev/admin/${{ secrets.BULL_AUTH_KEY }}/autoscaler)
http_code=$(echo "$response" | jq -r '.status_code')
mode=$(echo "$response" | jq -r '.mode')
count=$(echo "$response" | jq -r '.count')
echo "Mode: $mode"
echo "Count: $count"
if [ "$mode" = "scale-descale" ]; then
flyctl scale count $count -c fly.staging.toml --process-group=worker --yes
echo "Scaled to $count machines."
else
echo "No scaling needed. Mode: $mode"
fi
env:
FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }}
BULL_AUTH_KEY: ${{ secrets.BULL_AUTH_KEY }}
working-directory: apps/api

View File

@ -4,6 +4,7 @@ import { Job } from "bullmq";
import { Logger } from "../../lib/logger"; import { Logger } from "../../lib/logger";
import { getScrapeQueue } from "../../services/queue-service"; import { getScrapeQueue } from "../../services/queue-service";
import { checkAlerts } from "../../services/alerts"; import { checkAlerts } from "../../services/alerts";
import { exec } from "node:child_process";
export async function cleanBefore24hCompleteJobsController( export async function cleanBefore24hCompleteJobsController(
req: Request, req: Request,
@ -54,34 +55,100 @@ export async function cleanBefore24hCompleteJobsController(
} }
} }
export async function checkQueuesController(req: Request, res: Response) { export async function checkQueuesController(req: Request, res: Response) {
try { try {
await checkAlerts(); await checkAlerts();
return res.status(200).send("Alerts initialized"); return res.status(200).send("Alerts initialized");
} catch (error) { } catch (error) {
Logger.debug(`Failed to initialize alerts: ${error}`); Logger.debug(`Failed to initialize alerts: ${error}`);
return res.status(500).send("Failed to initialize alerts"); return res.status(500).send("Failed to initialize alerts");
}
} }
}
// Use this as a "health check" that way we dont destroy the server // Use this as a "health check" that way we dont destroy the server
export async function queuesController(req: Request, res: Response) { export async function queuesController(req: Request, res: Response) {
try { try {
const scrapeQueue = getScrapeQueue(); const scrapeQueue = getScrapeQueue();
const [webScraperActive] = await Promise.all([ const [webScraperActive] = await Promise.all([
scrapeQueue.getActiveCount(), scrapeQueue.getActiveCount(),
]); ]);
const noActiveJobs = webScraperActive === 0; const noActiveJobs = webScraperActive === 0;
// 200 if no active jobs, 503 if there are active jobs // 200 if no active jobs, 503 if there are active jobs
return res.status(noActiveJobs ? 200 : 500).json({ return res.status(noActiveJobs ? 200 : 500).json({
webScraperActive, webScraperActive,
noActiveJobs, noActiveJobs,
}); });
} catch (error) { } catch (error) {
Logger.error(error); Logger.error(error);
return res.status(500).json({ error: error.message }); return res.status(500).json({ error: error.message });
}
}
export async function autoscalerController(req: Request, res: Response) {
try {
const maxNumberOfMachines = 100;
const minNumberOfMachines = 20;
const scrapeQueue = getScrapeQueue();
const [webScraperActive, webScraperWaiting, webScraperPriority] = await Promise.all([
scrapeQueue.getActiveCount(),
scrapeQueue.getWaitingCount(),
scrapeQueue.getPrioritizedCount(),
]);
let waitingAndPriorityCount = webScraperWaiting + webScraperPriority;
// get number of machines active
const request = await fetch('https://api.machines.dev/v1/apps/firecrawl-scraper-js/machines',
{
headers: {
'Authorization': `Bearer ${process.env.FLY_API_TOKEN}`
}
}
)
const machines = await request.json();
const activeMachines = machines.filter(machine => machine.state === 'started' || machine.state === "starting").length;
let targetMachineCount = activeMachines;
const baseScaleUp = 10;
const baseScaleDown = 5;
// Scale up logic
if (webScraperActive > 9000 || waitingAndPriorityCount > 2000) {
targetMachineCount = Math.min(maxNumberOfMachines, activeMachines + (baseScaleUp * 3));
} else if (webScraperActive > 5000 || waitingAndPriorityCount > 1000) {
targetMachineCount = Math.min(maxNumberOfMachines, activeMachines + (baseScaleUp * 2));
} else if (webScraperActive > 1000 || waitingAndPriorityCount > 500) {
targetMachineCount = Math.min(maxNumberOfMachines, activeMachines + baseScaleUp);
} }
}
// Scale down logic
if (webScraperActive < 100 && waitingAndPriorityCount < 50) {
targetMachineCount = Math.max(minNumberOfMachines, activeMachines - (baseScaleDown * 3));
} else if (webScraperActive < 500 && waitingAndPriorityCount < 200) {
targetMachineCount = Math.max(minNumberOfMachines, activeMachines - (baseScaleDown * 2));
} else if (webScraperActive < 1000 && waitingAndPriorityCount < 500) {
targetMachineCount = Math.max(minNumberOfMachines, activeMachines - baseScaleDown);
}
if (targetMachineCount !== activeMachines) {
Logger.info(`🐂 Scaling from ${activeMachines} to ${targetMachineCount} - ${webScraperActive} active, ${webScraperWaiting} waiting`);
return res.status(200).json({
mode: "scale-descale",
count: targetMachineCount,
});
}
return res.status(200).json({
mode: "normal",
count: activeMachines,
});
} catch (error) {
Logger.error(error);
return res.status(500).send("Failed to initialize autoscaler");
}
}

View File

@ -1,6 +1,7 @@
import express from "express"; import express from "express";
import { redisHealthController } from "../controllers/admin/redis-health"; import { redisHealthController } from "../controllers/admin/redis-health";
import { import {
autoscalerController,
checkQueuesController, checkQueuesController,
cleanBefore24hCompleteJobsController, cleanBefore24hCompleteJobsController,
queuesController, queuesController,
@ -27,3 +28,8 @@ adminRouter.get(
`/admin/${process.env.BULL_AUTH_KEY}/queues`, `/admin/${process.env.BULL_AUTH_KEY}/queues`,
queuesController queuesController
); );
adminRouter.get(
`/admin/${process.env.BULL_AUTH_KEY}/autoscaler`,
autoscalerController
);

View File

@ -24,8 +24,8 @@ import { clientSideError } from "../../strings";
dotenv.config(); dotenv.config();
export const baseScrapers = [ export const baseScrapers = [
"fire-engine",
"fire-engine;chrome-cdp", "fire-engine;chrome-cdp",
"fire-engine",
"scrapingBee", "scrapingBee",
process.env.USE_DB_AUTHENTICATION ? undefined : "playwright", process.env.USE_DB_AUTHENTICATION ? undefined : "playwright",
"scrapingBeeLoad", "scrapingBeeLoad",
@ -85,8 +85,8 @@ function getScrapingFallbackOrder(
}); });
let defaultOrder = [ let defaultOrder = [
!process.env.USE_DB_AUTHENTICATION ? undefined : "fire-engine",
!process.env.USE_DB_AUTHENTICATION ? undefined : "fire-engine;chrome-cdp", !process.env.USE_DB_AUTHENTICATION ? undefined : "fire-engine;chrome-cdp",
!process.env.USE_DB_AUTHENTICATION ? undefined : "fire-engine",
"scrapingBee", "scrapingBee",
process.env.USE_DB_AUTHENTICATION ? undefined : "playwright", process.env.USE_DB_AUTHENTICATION ? undefined : "playwright",
"scrapingBeeLoad", "scrapingBeeLoad",