Merge pull request #455 from mendableai/feat/scrape-monitoring

Add scrape monitoring
This commit is contained in:
Nicolas 2024-07-25 16:27:07 -04:00 committed by GitHub
commit 11e6b2680e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 174 additions and 7 deletions

View File

@ -10,6 +10,7 @@ import { logCrawl } from "../../src/services/logging/crawl_log";
import { validateIdempotencyKey } from "../../src/services/idempotency/validate";
import { createIdempotencyKey } from "../../src/services/idempotency/create";
import { defaultCrawlPageOptions, defaultCrawlerOptions, defaultOrigin } from "../../src/lib/default-values";
import { v4 as uuidv4 } from "uuid";
import { Logger } from "../../src/lib/logger";
export async function crawlController(req: Request, res: Response) {
@ -61,10 +62,11 @@ export async function crawlController(req: Request, res: Response) {
const crawlerOptions = { ...defaultCrawlerOptions, ...req.body.crawlerOptions };
const pageOptions = { ...defaultCrawlPageOptions, ...req.body.pageOptions };
if (mode === "single_urls" && !url.includes(",")) {
if (mode === "single_urls" && !url.includes(",")) { // NOTE: do we need this?
try {
const a = new WebScraperDataProvider();
await a.setOptions({
jobId: uuidv4(),
mode: "single_urls",
urls: [url],
crawlerOptions: { ...crawlerOptions, returnOnlyUrls: true },

View File

@ -9,9 +9,11 @@ import { Document } from "../lib/entities";
import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist"; // Import the isUrlBlocked function
import { numTokensFromString } from '../lib/LLM-extraction/helpers';
import { defaultPageOptions, defaultExtractorOptions, defaultTimeout, defaultOrigin } from '../lib/default-values';
import { v4 as uuidv4 } from "uuid";
import { Logger } from '../lib/logger';
export async function scrapeHelper(
jobId: string,
req: Request,
team_id: string,
crawlerOptions: any,
@ -36,6 +38,7 @@ export async function scrapeHelper(
const a = new WebScraperDataProvider();
await a.setOptions({
jobId,
mode: "single_urls",
urls: [url],
crawlerOptions: {
@ -128,8 +131,11 @@ export async function scrapeController(req: Request, res: Response) {
checkCredits();
}
const jobId = uuidv4();
const startTime = new Date().getTime();
const result = await scrapeHelper(
jobId,
req,
team_id,
crawlerOptions,
@ -170,6 +176,7 @@ export async function scrapeController(req: Request, res: Response) {
}
logJob({
job_id: jobId,
success: result.success,
message: result.error,
num_docs: 1,

View File

@ -7,9 +7,11 @@ import { logJob } from "../services/logging/log_job";
import { PageOptions, SearchOptions } from "../lib/entities";
import { search } from "../search";
import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist";
import { v4 as uuidv4 } from "uuid";
import { Logger } from "../lib/logger";
export async function searchHelper(
jobId: string,
req: Request,
team_id: string,
crawlerOptions: any,
@ -76,6 +78,7 @@ export async function searchHelper(
const a = new WebScraperDataProvider();
await a.setOptions({
jobId,
mode: "single_urls",
urls: res.map((r) => r.url).slice(0, searchOptions.limit ?? 7),
crawlerOptions: {
@ -149,6 +152,8 @@ export async function searchController(req: Request, res: Response) {
const searchOptions = req.body.searchOptions ?? { limit: 7 };
const jobId = uuidv4();
try {
const { success: creditsCheckSuccess, message: creditsCheckMessage } =
await checkTeamCredits(team_id, 1);
@ -161,6 +166,7 @@ export async function searchController(req: Request, res: Response) {
}
const startTime = new Date().getTime();
const result = await searchHelper(
jobId,
req,
team_id,
crawlerOptions,
@ -170,6 +176,7 @@ export async function searchController(req: Request, res: Response) {
const endTime = new Date().getTime();
const timeTakenInSeconds = (endTime - startTime) / 1000;
logJob({
job_id: jobId,
success: result.success,
message: result.error,
num_docs: result.data ? result.data.length : 0,

View File

@ -4,6 +4,7 @@ async function example() {
const example = new WebScraperDataProvider();
await example.setOptions({
jobId: "TEST",
mode: "crawl",
urls: ["https://mendable.ai"],
crawlerOptions: {},

View File

@ -13,6 +13,7 @@ import { checkAlerts } from "./services/alerts";
import Redis from "ioredis";
import { redisRateLimitClient } from "./services/rate-limiter";
import { Logger } from "./lib/logger";
import { ScrapeEvents } from "./lib/scrape-events";
const { createBullBoard } = require("@bull-board/api");
const { BullAdapter } = require("@bull-board/api/bullAdapter");
@ -325,3 +326,12 @@ if (cluster.isMaster) {
Logger.info(`Worker ${process.pid} started`);
}
const wsq = getWebScraperQueue();
wsq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting"));
wsq.on("active", j => ScrapeEvents.logJobEvent(j, "active"));
wsq.on("completed", j => ScrapeEvents.logJobEvent(j, "completed"));
wsq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused"));
wsq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed"));
wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed"));

View File

@ -56,6 +56,7 @@ export type CrawlerOptions = {
}
export type WebScraperOptions = {
jobId: string;
urls: string[];
mode: "single_urls" | "sitemap" | "crawl";
crawlerOptions?: CrawlerOptions;

View File

@ -0,0 +1,84 @@
import { Job, JobId } from "bull";
import type { baseScrapers } from "../scraper/WebScraper/single_url";
import { supabase_service as supabase } from "../services/supabase";
import { Logger } from "./logger";
export type ScrapeErrorEvent = {
type: "error",
message: string,
stack?: string,
}
export type ScrapeScrapeEvent = {
type: "scrape",
url: string,
worker?: string,
method: (typeof baseScrapers)[number],
result: null | {
success: boolean,
response_code?: number,
response_size?: number,
error?: string | object,
// proxy?: string,
time_taken: number,
},
}
export type ScrapeQueueEvent = {
type: "queue",
event: "waiting" | "active" | "completed" | "paused" | "resumed" | "removed" | "failed",
worker?: string,
}
export type ScrapeEvent = ScrapeErrorEvent | ScrapeScrapeEvent | ScrapeQueueEvent;
export class ScrapeEvents {
static async insert(jobId: string, content: ScrapeEvent) {
if (jobId === "TEST") return null;
if (process.env.USE_DB_AUTHENTICATION) {
try {
const result = await supabase.from("scrape_events").insert({
job_id: jobId,
type: content.type,
content: content,
// created_at
}).select().single();
return (result.data as any).id;
} catch (error) {
Logger.error(`Error inserting scrape event: ${error}`);
return null;
}
}
return null;
}
static async updateScrapeResult(logId: number | null, result: ScrapeScrapeEvent["result"]) {
if (logId === null) return;
try {
const previousLog = (await supabase.from("scrape_events").select().eq("id", logId).single()).data as any;
await supabase.from("scrape_events").update({
content: {
...previousLog.content,
result,
}
}).eq("id", logId);
} catch (error) {
Logger.error(`Error updating scrape result: ${error}`);
}
}
static async logJobEvent(job: Job | JobId, event: ScrapeQueueEvent["event"]) {
try {
await this.insert(((job as any).id ? (job as any).id : job) as string, {
type: "queue",
event,
worker: process.env.FLY_MACHINE_ID,
});
} catch (error) {
Logger.error(`Error logging job event: ${error}`);
}
}
}

View File

@ -11,6 +11,7 @@ import { billTeam } from "../services/billing/credit_billing";
import { Document } from "../lib/entities";
import { supabase_service } from "../services/supabase";
import { Logger } from "../lib/logger";
import { ScrapeEvents } from "../lib/scrape-events";
export async function startWebScraperPipeline({
job,
@ -39,6 +40,7 @@ export async function startWebScraperPipeline({
},
onError: (error) => {
Logger.error(`🐂 Job failed ${job.id}`);
ScrapeEvents.logJobEvent(job, "failed");
job.moveToFailed(error);
},
team_id: job.data.team_id,
@ -60,6 +62,7 @@ export async function runWebScraper({
const provider = new WebScraperDataProvider();
if (mode === "crawl") {
await provider.setOptions({
jobId: bull_job_id,
mode: mode,
urls: [url],
crawlerOptions: crawlerOptions,
@ -68,6 +71,7 @@ export async function runWebScraper({
});
} else {
await provider.setOptions({
jobId: bull_job_id,
mode: mode,
urls: url.split(","),
crawlerOptions: crawlerOptions,
@ -138,6 +142,7 @@ const saveJob = async (job: Job, result: any) => {
// I think the job won't exist here anymore
}
}
ScrapeEvents.logJobEvent(job, "completed");
} catch (error) {
Logger.error(`🐂 Failed to update job status: ${error}`);
}

View File

@ -42,6 +42,7 @@ describe('WebCrawler', () => {
crawler = new WebCrawler({
jobId: "TEST",
initialUrl: initialUrl,
includes: [],
excludes: [],
@ -76,6 +77,7 @@ describe('WebCrawler', () => {
crawler = new WebCrawler({
jobId: "TEST",
initialUrl: initialUrl,
includes: [],
excludes: [],
@ -104,6 +106,7 @@ describe('WebCrawler', () => {
crawler = new WebCrawler({
jobId: "TEST",
initialUrl: initialUrl,
includes: [],
excludes: [],
@ -133,6 +136,7 @@ describe('WebCrawler', () => {
crawler = new WebCrawler({
jobId: "TEST",
initialUrl: initialUrl,
includes: [],
excludes: [],
@ -161,6 +165,7 @@ describe('WebCrawler', () => {
// Setup the crawler with the specific test case options
const crawler = new WebCrawler({
jobId: "TEST",
initialUrl: initialUrl,
includes: [],
excludes: [],
@ -194,6 +199,7 @@ describe('WebCrawler', () => {
const limit = 2; // Set a limit for the number of links
crawler = new WebCrawler({
jobId: "TEST",
initialUrl: initialUrl,
includes: [],
excludes: [],

View File

@ -15,8 +15,8 @@ describe('scrapSingleUrl', () => {
const pageOptionsWithHtml: PageOptions = { includeHtml: true };
const pageOptionsWithoutHtml: PageOptions = { includeHtml: false };
const resultWithHtml = await scrapSingleUrl(url, pageOptionsWithHtml);
const resultWithoutHtml = await scrapSingleUrl(url, pageOptionsWithoutHtml);
const resultWithHtml = await scrapSingleUrl("TEST", url, pageOptionsWithHtml);
const resultWithoutHtml = await scrapSingleUrl("TEST", url, pageOptionsWithoutHtml);
expect(resultWithHtml.html).toBeDefined();
expect(resultWithoutHtml.html).toBeUndefined();
@ -27,7 +27,7 @@ it('should return a list of links on the mendable.ai page', async () => {
const url = 'https://mendable.ai';
const pageOptions: PageOptions = { includeHtml: true };
const result = await scrapSingleUrl(url, pageOptions);
const result = await scrapSingleUrl("TEST", url, pageOptions);
// Check if the result contains a list of links
expect(result.linksOnPage).toBeDefined();

View File

@ -11,6 +11,7 @@ import { axiosTimeout } from "../../../src/lib/timeout";
import { Logger } from "../../../src/lib/logger";
export class WebCrawler {
private jobId: string;
private initialUrl: string;
private baseUrl: string;
private includes: string[];
@ -27,6 +28,7 @@ export class WebCrawler {
private allowExternalContentLinks: boolean;
constructor({
jobId,
initialUrl,
includes,
excludes,
@ -37,6 +39,7 @@ export class WebCrawler {
allowBackwardCrawling = false,
allowExternalContentLinks = false
}: {
jobId: string;
initialUrl: string;
includes?: string[];
excludes?: string[];
@ -47,6 +50,7 @@ export class WebCrawler {
allowBackwardCrawling?: boolean;
allowExternalContentLinks?: boolean;
}) {
this.jobId = jobId;
this.initialUrl = initialUrl;
this.baseUrl = new URL(initialUrl).origin;
this.includes = includes ?? [];
@ -261,7 +265,7 @@ export class WebCrawler {
// If it is the first link, fetch with single url
if (this.visited.size === 1) {
const page = await scrapSingleUrl(url, { ...pageOptions, includeHtml: true });
const page = await scrapSingleUrl(this.jobId, url, { ...pageOptions, includeHtml: true });
content = page.html ?? "";
pageStatusCode = page.metadata?.pageStatusCode;
pageError = page.metadata?.pageError || undefined;

View File

@ -22,6 +22,7 @@ import { getAdjustedMaxDepth, getURLDepth } from "./utils/maxDepthUtils";
import { Logger } from "../../lib/logger";
export class WebScraperDataProvider {
private jobId: string;
private bullJobId: string;
private urls: string[] = [""];
private mode: "single_urls" | "sitemap" | "crawl" = "single_urls";
@ -66,6 +67,7 @@ export class WebScraperDataProvider {
batchUrls.map(async (url, index) => {
const existingHTML = allHtmls ? allHtmls[i + index] : "";
const result = await scrapSingleUrl(
this.jobId,
url,
this.pageOptions,
this.extractorOptions,
@ -166,6 +168,7 @@ export class WebScraperDataProvider {
inProgress?: (progress: Progress) => void
): Promise<Document[]> {
const crawler = new WebCrawler({
jobId: this.jobId,
initialUrl: this.urls[0],
includes: this.includes,
excludes: this.excludes,
@ -500,6 +503,7 @@ export class WebScraperDataProvider {
throw new Error("Urls are required");
}
this.jobId = options.jobId;
this.bullJobId = options.bullJobId;
this.urls = options.urls;
this.mode = options.mode;

View File

@ -18,10 +18,11 @@ import { scrapWithPlaywright } from "./scrapers/playwright";
import { scrapWithScrapingBee } from "./scrapers/scrapingBee";
import { extractLinks } from "./utils/utils";
import { Logger } from "../../lib/logger";
import { ScrapeEvents } from "../../lib/scrape-events";
dotenv.config();
const baseScrapers = [
export const baseScrapers = [
"fire-engine",
"fire-engine;chrome-cdp",
"scrapingBee",
@ -118,6 +119,7 @@ function getScrapingFallbackOrder(
export async function scrapSingleUrl(
jobId: string,
urlToScrap: string,
pageOptions: PageOptions = {
onlyMainContent: true,
@ -145,6 +147,15 @@ export async function scrapSingleUrl(
} = { text: "", screenshot: "", metadata: {} };
let screenshot = "";
const timer = Date.now();
const logInsertPromise = ScrapeEvents.insert(jobId, {
type: "scrape",
url,
worker: process.env.FLY_MACHINE_ID,
method,
result: null,
});
switch (method) {
case "fire-engine":
case "fire-engine;chrome-cdp":
@ -254,8 +265,19 @@ export async function scrapSingleUrl(
}
//* TODO: add an optional to return markdown or structured/extracted content
let cleanedHtml = removeUnwantedElements(scraperResponse.text, pageOptions);
const text = await parseMarkdown(cleanedHtml);
const insertedLogId = await logInsertPromise;
ScrapeEvents.updateScrapeResult(insertedLogId, {
response_size: scraperResponse.text.length,
success: !scraperResponse.metadata.pageError && !!text,
error: scraperResponse.metadata.pageError,
response_code: scraperResponse.metadata.pageStatusCode,
time_taken: Date.now() - timer,
});
return {
text: await parseMarkdown(cleanedHtml),
text,
html: cleanedHtml,
rawHtml: scraperResponse.text,
screenshot: scraperResponse.screenshot,
@ -379,6 +401,11 @@ export async function scrapSingleUrl(
return document;
} catch (error) {
Logger.debug(`⛏️ Error: ${error.message} - Failed to fetch URL: ${urlToScrap}`);
ScrapeEvents.insert(jobId, {
type: "error",
message: typeof error === "string" ? error : typeof error.message === "string" ? error.message : JSON.stringify(error),
stack: error.stack,
});
return {
content: "",
markdown: "",

View File

@ -8,6 +8,7 @@ import { logJob } from "./logging/log_job";
import { initSDK } from '@hyperdx/node-opentelemetry';
import { Job } from "bull";
import { Logger } from "../lib/logger";
import { ScrapeEvents } from "../lib/scrape-events";
if (process.env.ENV === 'production') {
initSDK({
@ -20,6 +21,7 @@ const wsq = getWebScraperQueue();
async function processJob(job: Job, done) {
Logger.debug(`🐂 Worker taking job ${job.id}`);
try {
job.progress({
current: 1,
@ -114,3 +116,10 @@ wsq.process(
Math.floor(Number(process.env.NUM_WORKERS_PER_QUEUE ?? 8)),
processJob
);
wsq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting"));
wsq.on("active", j => ScrapeEvents.logJobEvent(j, "active"));
wsq.on("completed", j => ScrapeEvents.logJobEvent(j, "completed"));
wsq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused"));
wsq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed"));
wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed"));