Merge pull request #326 from mendableai/feat/save-docs-on-supabase

[Feat] Added implementation for saving docs on supabase
This commit is contained in:
Nicolas 2024-07-11 20:27:41 -04:00 committed by GitHub
commit 30c1118713
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 188 additions and 40 deletions

View File

@ -29,9 +29,9 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v3
- name: Change directory
run: cd apps/api
- uses: superfly/flyctl-actions/setup-flyctl@master - uses: superfly/flyctl-actions/setup-flyctl@master
- run: flyctl deploy ./apps/api --remote-only -a firecrawl-scraper-js - run: flyctl deploy --remote-only -a firecrawl-scraper-js && curl -X POST https://api.firecrawl.dev/admin/$BULL_AUTH_KEY/unpause
working-directory: ./apps/api
env: env:
FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }} FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }}
BULL_AUTH_KEY: ${{ secrets.BULL_AUTH_KEY }}

View File

@ -175,12 +175,12 @@ jobs:
needs: [pre-deploy-test-suite, python-sdk-tests, js-sdk-tests] needs: [pre-deploy-test-suite, python-sdk-tests, js-sdk-tests]
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v3
- name: Change directory
run: cd apps/api
- uses: superfly/flyctl-actions/setup-flyctl@master - uses: superfly/flyctl-actions/setup-flyctl@master
- run: flyctl deploy ./apps/api --remote-only -a firecrawl-scraper-js - run: flyctl deploy --remote-only -a firecrawl-scraper-js && curl -X POST https://api.firecrawl.dev/admin/$BULL_AUTH_KEY/unpause
working-directory: ./apps/api
env: env:
FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }} FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }}
BULL_AUTH_KEY: ${{ secrets.BULL_AUTH_KEY }}
build-and-publish-python-sdk: build-and-publish-python-sdk:
name: Build and publish Python SDK name: Build and publish Python SDK

View File

@ -31,6 +31,3 @@ COPY --from=build /app /app
# Start the server by default, this can be overwritten at runtime # Start the server by default, this can be overwritten at runtime
EXPOSE 8080 EXPOSE 8080
ENV PUPPETEER_EXECUTABLE_PATH="/usr/bin/chromium" ENV PUPPETEER_EXECUTABLE_PATH="/usr/bin/chromium"
CMD [ "pnpm", "run", "start:production" ]
CMD [ "pnpm", "run", "worker:production" ]

View File

@ -6,13 +6,16 @@
app = 'staging-firecrawl-scraper-js' app = 'staging-firecrawl-scraper-js'
primary_region = 'mia' primary_region = 'mia'
kill_signal = 'SIGINT' kill_signal = 'SIGINT'
kill_timeout = '5s' kill_timeout = '30s'
[deploy]
release_command = 'node dist/src/trigger-shutdown.js https://staging-firecrawl-scraper-js.fly.dev'
[build] [build]
[processes] [processes]
app = 'npm run start:production' app = 'node dist/src/index.js'
worker = 'npm run worker:production' worker = 'node dist/src/services/queue-worker.js'
[http_service] [http_service]
internal_port = 8080 internal_port = 8080

View File

@ -6,13 +6,16 @@
app = 'firecrawl-scraper-js' app = 'firecrawl-scraper-js'
primary_region = 'mia' primary_region = 'mia'
kill_signal = 'SIGINT' kill_signal = 'SIGINT'
kill_timeout = '5s' kill_timeout = '30s'
[deploy]
release_command = 'node dist/src/trigger-shutdown.js https://api.firecrawl.dev'
[build] [build]
[processes] [processes]
app = 'npm run start:production' app = 'node dist/src/index.js'
worker = 'npm run worker:production' worker = 'node dist/src/services/queue-worker.js'
[http_service] [http_service]
internal_port = 8080 internal_port = 8080

View File

@ -19,7 +19,8 @@
"mongo-docker": "docker run -d -p 2717:27017 -v ./mongo-data:/data/db --name mongodb mongo:latest", "mongo-docker": "docker run -d -p 2717:27017 -v ./mongo-data:/data/db --name mongodb mongo:latest",
"mongo-docker-console": "docker exec -it mongodb mongosh", "mongo-docker-console": "docker exec -it mongodb mongosh",
"run-example": "npx ts-node src/example.ts", "run-example": "npx ts-node src/example.ts",
"deploy:fly:staging": "fly deploy -c fly.staging.toml" "deploy:fly": "flyctl deploy && node postdeploy.js https://api.firecrawl.dev",
"deploy:fly:staging": "fly deploy -c fly.staging.toml && node postdeploy.js https://staging-firecrawl-scraper-js.fly.dev"
}, },
"author": "", "author": "",
"license": "ISC", "license": "ISC",

11
apps/api/postdeploy.js Normal file
View File

@ -0,0 +1,11 @@
require("dotenv").config();
fetch(process.argv[2] + "/admin/" + process.env.BULL_AUTH_KEY + "/unpause", {
method: "POST"
}).then(async x => {
console.log(await x.text());
process.exit(0);
}).catch(e => {
console.error(e);
process.exit(1);
});

View File

@ -331,7 +331,7 @@ describe("E2E Tests for API Routes", () => {
expect(completedResponse.body.data[0].content).toContain("Mendable"); expect(completedResponse.body.data[0].content).toContain("Mendable");
expect(completedResponse.body.data[0].metadata.pageStatusCode).toBe(200); expect(completedResponse.body.data[0].metadata.pageStatusCode).toBe(200);
expect(completedResponse.body.data[0].metadata.pageError).toBeUndefined(); expect(completedResponse.body.data[0].metadata.pageError).toBeUndefined();
}, 60000); // 60 seconds }, 180000); // 180 seconds
it.concurrent("should return a successful response with a valid API key and valid excludes option", async () => { it.concurrent("should return a successful response with a valid API key and valid excludes option", async () => {
const crawlResponse: FirecrawlCrawlResponse = await request(TEST_URL) const crawlResponse: FirecrawlCrawlResponse = await request(TEST_URL)

View File

@ -3,6 +3,7 @@ import { authenticateUser } from "./auth";
import { RateLimiterMode } from "../../src/types"; import { RateLimiterMode } from "../../src/types";
import { addWebScraperJob } from "../../src/services/queue-jobs"; import { addWebScraperJob } from "../../src/services/queue-jobs";
import { getWebScraperQueue } from "../../src/services/queue-service"; import { getWebScraperQueue } from "../../src/services/queue-service";
import { supabaseGetJobById } from "../../src/lib/supabase-jobs";
export async function crawlStatusController(req: Request, res: Response) { export async function crawlStatusController(req: Request, res: Response) {
try { try {
@ -20,15 +21,27 @@ export async function crawlStatusController(req: Request, res: Response) {
} }
const { current, current_url, total, current_step, partialDocs } = await job.progress(); const { current, current_url, total, current_step, partialDocs } = await job.progress();
let data = job.returnvalue;
if (process.env.USE_DB_AUTHENTICATION) {
const supabaseData = await supabaseGetJobById(req.params.jobId);
if (supabaseData) {
data = supabaseData.docs;
}
}
const jobStatus = await job.getState();
res.json({ res.json({
status: await job.getState(), status: jobStatus,
// progress: job.progress(), // progress: job.progress(),
current: current, current,
current_url: current_url, current_url,
current_step: current_step, current_step,
total: total, total,
data: job.returnvalue, data: data ? data : null,
partial_data: partialDocs ?? [], partial_data: jobStatus == 'completed' ? [] : partialDocs,
}); });
} catch (error) { } catch (error) {
console.error(error); console.error(error);

View File

@ -1,5 +1,6 @@
import { Request, Response } from "express"; import { Request, Response } from "express";
import { getWebScraperQueue } from "../../src/services/queue-service"; import { getWebScraperQueue } from "../../src/services/queue-service";
import { supabaseGetJobById } from "../../src/lib/supabase-jobs";
export async function crawlJobStatusPreviewController(req: Request, res: Response) { export async function crawlJobStatusPreviewController(req: Request, res: Response) {
try { try {
@ -9,15 +10,26 @@ export async function crawlJobStatusPreviewController(req: Request, res: Respons
} }
const { current, current_url, total, current_step, partialDocs } = await job.progress(); const { current, current_url, total, current_step, partialDocs } = await job.progress();
let data = job.returnvalue;
if (process.env.USE_DB_AUTHENTICATION) {
const supabaseData = await supabaseGetJobById(req.params.jobId);
if (supabaseData) {
data = supabaseData.docs;
}
}
const jobStatus = await job.getState();
res.json({ res.json({
status: await job.getState(), status: jobStatus,
// progress: job.progress(), // progress: job.progress(),
current: current, current,
current_url: current_url, current_url,
current_step: current_step, current_step,
total: total, total,
data: job.returnvalue, data: data ? data : null,
partial_data: partialDocs ?? [], partial_data: jobStatus == 'completed' ? [] : partialDocs,
}); });
} catch (error) { } catch (error) {
console.error(error); console.error(error);

View File

@ -26,9 +26,11 @@ if (cluster.isMaster) {
} }
cluster.on("exit", (worker, code, signal) => { cluster.on("exit", (worker, code, signal) => {
if (code !== null) {
console.log(`Worker ${worker.process.pid} exited`); console.log(`Worker ${worker.process.pid} exited`);
console.log("Starting a new worker"); console.log("Starting a new worker");
cluster.fork(); cluster.fork();
}
}); });
} else { } else {
const app = express(); const app = express();
@ -97,6 +99,7 @@ if (cluster.isMaster) {
app.get(`/admin/${process.env.BULL_AUTH_KEY}/queues`, async (req, res) => { app.get(`/admin/${process.env.BULL_AUTH_KEY}/queues`, async (req, res) => {
try { try {
const webScraperQueue = getWebScraperQueue(); const webScraperQueue = getWebScraperQueue();
const [webScraperActive] = await Promise.all([ const [webScraperActive] = await Promise.all([
webScraperQueue.getActiveCount(), webScraperQueue.getActiveCount(),
]); ]);
@ -113,6 +116,49 @@ if (cluster.isMaster) {
} }
}); });
app.post(`/admin/${process.env.BULL_AUTH_KEY}/shutdown`, async (req, res) => {
try {
const wsq = getWebScraperQueue();
console.log("Gracefully shutting down...");
await wsq.pause(false, true);
const jobs = await wsq.getActive();
if (jobs.length > 0) {
console.log("Removing", jobs.length, "jobs...");
await Promise.all(jobs.map(async x => {
await wsq.client.del(await x.lockKey());
await x.takeLock();
await x.moveToFailed({ message: "interrupted" });
await x.remove();
}));
console.log("Re-adding", jobs.length, "jobs...");
await wsq.addBulk(jobs.map(x => ({
data: x.data,
opts: {
jobId: x.id,
},
})));
console.log("Done!");
res.json({ ok: true });
}
} catch (error) {
console.error(error);
return res.status(500).json({ error: error.message });
}
});
app.post(`/admin/${process.env.BULL_AUTH_KEY}/unpause`, async (req, res) => {
await getWebScraperQueue().resume(false);
res.json({ ok: true });
});
app.get(`/serverHealthCheck`, async (req, res) => { app.get(`/serverHealthCheck`, async (req, res) => {
try { try {
const webScraperQueue = getWebScraperQueue(); const webScraperQueue = getWebScraperQueue();

View File

@ -0,0 +1,19 @@
import { supabase_service } from "../services/supabase";
export const supabaseGetJobById = async (jobId: string) => {
const { data, error } = await supabase_service
.from('firecrawl_jobs')
.select('*')
.eq('job_id', jobId)
.single();
if (error) {
return null;
}
if (!data) {
return null;
}
return data;
}

View File

@ -4,6 +4,7 @@ import { WebScraperDataProvider } from "../scraper/WebScraper";
import { DocumentUrl, Progress } from "../lib/entities"; import { DocumentUrl, Progress } from "../lib/entities";
import { billTeam } from "../services/billing/credit_billing"; import { billTeam } from "../services/billing/credit_billing";
import { Document } from "../lib/entities"; import { Document } from "../lib/entities";
import { supabase_service } from "../services/supabase";
export async function startWebScraperPipeline({ export async function startWebScraperPipeline({
job, job,
@ -26,7 +27,7 @@ export async function startWebScraperPipeline({
} }
}, },
onSuccess: (result) => { onSuccess: (result) => {
job.moveToCompleted(result); saveJob(job, result);
}, },
onError: (error) => { onError: (error) => {
job.moveToFailed(error); job.moveToFailed(error);
@ -107,3 +108,22 @@ export async function runWebScraper({
return { success: false, message: error.message, docs: [] }; return { success: false, message: error.message, docs: [] };
} }
} }
const saveJob = async (job: Job, result: any) => {
try {
if (process.env.USE_DB_AUTHENTICATION) {
const { data, error } = await supabase_service
.from("firecrawl_jobs")
.update({ docs: result })
.eq("job_id", job.id);
if (error) throw new Error(error.message);
await job.moveToCompleted(null);
} else {
await job.moveToCompleted(result);
}
} catch (error) {
console.error("Failed to update job status:", error);
}
}

View File

@ -6,8 +6,7 @@ import "dotenv/config";
export async function logJob(job: FirecrawlJob) { export async function logJob(job: FirecrawlJob) {
try { try {
// Only log jobs in production if (!process.env.USE_DB_AUTHENTICATION) {
if (process.env.ENV !== "production") {
return; return;
} }
@ -25,6 +24,7 @@ export async function logJob(job: FirecrawlJob) {
.from("firecrawl_jobs") .from("firecrawl_jobs")
.insert([ .insert([
{ {
job_id: job.job_id ? job.job_id : null,
success: job.success, success: job.success,
message: job.message, message: job.message,
num_docs: job.num_docs, num_docs: job.num_docs,
@ -38,6 +38,7 @@ export async function logJob(job: FirecrawlJob) {
origin: job.origin, origin: job.origin,
extractor_options: job.extractor_options, extractor_options: job.extractor_options,
num_tokens: job.num_tokens, num_tokens: job.num_tokens,
retry: !!job.retry,
}, },
]); ]);
@ -61,6 +62,7 @@ export async function logJob(job: FirecrawlJob) {
origin: job.origin, origin: job.origin,
extractor_options: job.extractor_options, extractor_options: job.extractor_options,
num_tokens: job.num_tokens, num_tokens: job.num_tokens,
retry: job.retry,
}, },
}; };
posthog.capture(phLog); posthog.capture(phLog);

View File

@ -8,13 +8,17 @@ import { logJob } from "./logging/log_job";
import { initSDK } from '@hyperdx/node-opentelemetry'; import { initSDK } from '@hyperdx/node-opentelemetry';
if(process.env.ENV === 'production') { if(process.env.ENV === 'production') {
initSDK({ consoleCapture: true, additionalInstrumentations: []}); initSDK({
consoleCapture: true,
additionalInstrumentations: [],
});
} }
getWebScraperQueue().process( const wsq = getWebScraperQueue();
wsq.process(
Math.floor(Number(process.env.NUM_WORKERS_PER_QUEUE ?? 8)), Math.floor(Number(process.env.NUM_WORKERS_PER_QUEUE ?? 8)),
async function (job, done) { async function (job, done) {
try { try {
job.progress({ job.progress({
current: 1, current: 1,
@ -41,6 +45,7 @@ getWebScraperQueue().process(
await callWebhook(job.data.team_id, job.id as string, data); await callWebhook(job.data.team_id, job.id as string, data);
await logJob({ await logJob({
job_id: job.id as string,
success: success, success: success,
message: message, message: message,
num_docs: docs.length, num_docs: docs.length,
@ -55,6 +60,10 @@ getWebScraperQueue().process(
}); });
done(null, data); done(null, data);
} catch (error) { } catch (error) {
if (await getWebScraperQueue().isPaused(false)) {
return;
}
if (error instanceof CustomError) { if (error instanceof CustomError) {
// Here we handle the error, then save the failed job // Here we handle the error, then save the failed job
console.error(error.message); // or any other error handling console.error(error.message); // or any other error handling
@ -80,6 +89,7 @@ getWebScraperQueue().process(
}; };
await callWebhook(job.data.team_id, job.id as string, data); await callWebhook(job.data.team_id, job.id as string, data);
await logJob({ await logJob({
job_id: job.id as string,
success: false, success: false,
message: typeof error === 'string' ? error : (error.message ?? "Something went wrong... Contact help@mendable.ai"), message: typeof error === 'string' ? error : (error.message ?? "Something went wrong... Contact help@mendable.ai"),
num_docs: 0, num_docs: 0,

View File

@ -0,0 +1,9 @@
fetch(process.argv[2] + "/admin/" + process.env.BULL_AUTH_KEY + "/shutdown", {
method: "POST"
}).then(async x => {
console.log(await x.text());
process.exit(0);
}).catch(e => {
console.error(e);
process.exit(1);
});

View File

@ -48,6 +48,7 @@ export interface RunWebScraperResult {
} }
export interface FirecrawlJob { export interface FirecrawlJob {
job_id?: string;
success: boolean; success: boolean;
message: string; message: string;
num_docs: number; num_docs: number;
@ -61,6 +62,7 @@ export interface FirecrawlJob {
origin: string; origin: string;
extractor_options?: ExtractorOptions, extractor_options?: ExtractorOptions,
num_tokens?: number, num_tokens?: number,
retry?: boolean,
} }
export interface FirecrawlScrapeResponse { export interface FirecrawlScrapeResponse {