diff --git a/apps/api/src/controllers/v1/crawl.ts b/apps/api/src/controllers/v1/crawl.ts index fd72c8cf..c2d5bdca 100644 --- a/apps/api/src/controllers/v1/crawl.ts +++ b/apps/api/src/controllers/v1/crawl.ts @@ -22,6 +22,7 @@ import { getScrapeQueue } from "../../services/queue-service"; import { addScrapeJob } from "../../services/queue-jobs"; import { Logger } from "../../lib/logger"; import { getJobPriority } from "../../lib/job-priority"; +import { callWebhook } from "../../services/webhook"; export async function crawlController( req: RequestWithAuth<{}, CrawlResponse, CrawlRequest>, @@ -150,6 +151,10 @@ export async function crawlController( await addCrawlJob(id, job.id); } + if(req.body.webhook) { + await callWebhook(req.auth.team_id, id, null, req.body.webhook, true, "crawl.started"); + } + return res.status(200).json({ success: true, id, diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index e0bb0df0..f8675ac3 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -217,10 +217,15 @@ async function processJob(job: Job, token: string) { docs, }; - if (job.data.mode === "crawl") { - await callWebhook(job.data.team_id, job.id as string, data, job.data.webhook, job.data.v1); + + + // No idea what this does and when it is called. + if (job.data.mode === "crawl" && !job.data.v1) { + callWebhook(job.data.team_id, job.id as string, data, job.data.webhook, job.data.v1).catch((error) => { + Logger.error(`Error calling webhook for job (1 - mode crawl - v0) ${job.id} - ${error}`); + }); } - if (job.data.webhook && job.data.mode !== "crawl") { + if (job.data.webhook && job.data.mode !== "crawl" && job.data.v1) { await callWebhook(job.data.team_id, job.data.crawl_id, data, job.data.webhook, job.data.v1); } @@ -344,13 +349,24 @@ async function processJob(job: Job, token: string) { error: message /* etc... */, docs: fullDocs, }; + // v0 web hooks, call when done with all the data + if (!job.data.v1) { + callWebhook(job.data.team_id, job.data.crawl_id, data, job.data.webhook, job.data.v1, "crawl.completed").catch((error) => { + Logger.error(`Error calling webhook for job ${job.id} - ${error}`); + }); + } + // v1 web hooks, call when done with no data, but with event completed + if (job.data.v1 && job.data.webhook) { + callWebhook(job.data.team_id, job.id as string, [], job.data.webhook, job.data.v1, "crawl.completed").catch((error) => { + Logger.error(`Error calling webhook for job ${job.id} - ${error}`); + }); + } } } - if (!job.data.v1) { - await callWebhook(job.data.team_id, job.data.crawl_id, data, job.data.webhook, job.data.v1); - } + + Logger.info(`🐂 Job done ${job.id}`); return data; } catch (error) { @@ -391,7 +407,14 @@ async function processJob(job: Job, token: string) { }; if (!job.data.v1 && (job.data.mode === "crawl" || job.data.crawl_id)) { - await callWebhook(job.data.team_id, job.data.crawl_id ?? job.id as string, data, job.data.webhook, job.data.v1); + callWebhook(job.data.team_id, job.data.crawl_id ?? job.id as string, data, job.data.webhook, job.data.v1).catch((error) => { + Logger.error(`Error calling webhook for job (catch - v0) ${job.id} - ${error}`); + }); + } + if(job.data.v1) { + callWebhook(job.data.team_id, job.id as string, [], job.data.webhook, job.data.v1, "crawl.failed").catch((error) => { + Logger.error(`Error calling webhook for job (catch - v1) ${job.id} - ${error}`); + }); } if (job.data.crawl_id) { diff --git a/apps/api/src/services/webhook.ts b/apps/api/src/services/webhook.ts index a3af9c34..1ca7dbad 100644 --- a/apps/api/src/services/webhook.ts +++ b/apps/api/src/services/webhook.ts @@ -1,11 +1,23 @@ +import axios from "axios"; import { legacyDocumentConverter } from "../../src/controllers/v1/types"; import { Logger } from "../../src/lib/logger"; import { supabase_service } from "./supabase"; +import { WebhookEventType } from "../types"; -export const callWebhook = async (teamId: string, id: string, data: any, specified?: string, v1 = false) => { +export const callWebhook = async ( + teamId: string, + id: string, + data: any | null, + specified?: string, + v1 = false, + eventType: WebhookEventType = "crawl.page" +) => { try { - const selfHostedUrl = process.env.SELF_HOSTED_WEBHOOK_URL?.replace("{{JOB_ID}}", id); - const useDbAuthentication = process.env.USE_DB_AUTHENTICATION === 'true'; + const selfHostedUrl = process.env.SELF_HOSTED_WEBHOOK_URL?.replace( + "{{JOB_ID}}", + id + ); + const useDbAuthentication = process.env.USE_DB_AUTHENTICATION === "true"; let webhookUrl = specified ?? selfHostedUrl; // Only fetch the webhook URL from the database if the self-hosted webhook URL and specified webhook are not set @@ -17,7 +29,9 @@ export const callWebhook = async (teamId: string, id: string, data: any, specifi .eq("team_id", teamId) .limit(1); if (error) { - Logger.error(`Error fetching webhook URL for team ID: ${teamId}, error: ${error.message}`); + Logger.error( + `Error fetching webhook URL for team ID: ${teamId}, error: ${error.message}` + ); return null; } @@ -29,10 +43,12 @@ export const callWebhook = async (teamId: string, id: string, data: any, specifi } let dataToSend = []; - if (data.result.links && data.result.links.length !== 0) { + if (data && data.result && data.result.links && data.result.links.length !== 0) { for (let i = 0; i < data.result.links.length; i++) { if (v1) { - dataToSend.push(legacyDocumentConverter(data.result.links[i].content)) + dataToSend.push( + legacyDocumentConverter(data.result.links[i].content) + ); } else { dataToSend.push({ content: data.result.links[i].content.content, @@ -43,19 +59,29 @@ export const callWebhook = async (teamId: string, id: string, data: any, specifi } } - await fetch(webhookUrl, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify({ - success: data.success, - id: id, + axios.post( + webhookUrl, + { + success: !v1 ? data.success : eventType === "crawl.page" ? data.success : true, + type: eventType, + [v1 ? 'id' : 'jobId']: id, data: dataToSend, - error: data.error || undefined, - }), + error: !v1 ? data?.error || undefined : eventType === "crawl.page" ? data?.error || undefined : undefined, + }, + { + headers: { + "Content-Type": "application/json", + }, + timeout: 10000, // 10 seconds timeout + } + ).catch((error) => { + Logger.error( + `Error sending webhook for team ID: ${teamId}, error: ${error.message}` + ); }); } catch (error) { - Logger.debug(`Error sending webhook for team ID: ${teamId}, error: ${error.message}`); + Logger.debug( + `Error sending webhook for team ID: ${teamId}, error: ${error.message}` + ); } }; diff --git a/apps/api/src/types.ts b/apps/api/src/types.ts index 431c0126..50fb6eef 100644 --- a/apps/api/src/types.ts +++ b/apps/api/src/types.ts @@ -153,4 +153,7 @@ export type PlanType = | "growth" | "growthdouble" | "free" - | ""; \ No newline at end of file + | ""; + + +export type WebhookEventType = "crawl.page" | "crawl.started" | "crawl.completed" | "crawl.failed"; \ No newline at end of file