From 49e3e64787ab0080724b3cff82a715a630955451 Mon Sep 17 00:00:00 2001 From: rafaelsideguide <150964962+rafaelsideguide@users.noreply.github.com> Date: Mon, 29 Jul 2024 14:13:46 -0300 Subject: [PATCH] bugfix for pdfs and logging pdf events, also added trycatchs for docx --- apps/api/src/scraper/WebScraper/index.ts | 47 +++++++++++- .../scraper/WebScraper/utils/docxProcessor.ts | 76 ++++++++++++++----- .../scraper/WebScraper/utils/pdfProcessor.ts | 36 +++++++-- 3 files changed, 129 insertions(+), 30 deletions(-) diff --git a/apps/api/src/scraper/WebScraper/index.ts b/apps/api/src/scraper/WebScraper/index.ts index eff709fa..9171b805 100644 --- a/apps/api/src/scraper/WebScraper/index.ts +++ b/apps/api/src/scraper/WebScraper/index.ts @@ -20,6 +20,7 @@ import { getWebScraperQueue } from "../../../src/services/queue-service"; import { fetchAndProcessDocx } from "./utils/docxProcessor"; import { getAdjustedMaxDepth, getURLDepth } from "./utils/maxDepthUtils"; import { Logger } from "../../lib/logger"; +import { ScrapeEvents } from "../../lib/scrape-events"; export class WebScraperDataProvider { private jobId: string; @@ -316,10 +317,28 @@ export class WebScraperDataProvider { private async fetchPdfDocuments(pdfLinks: string[]): Promise { return Promise.all( pdfLinks.map(async (pdfLink) => { + const timer = Date.now(); + const logInsertPromise = ScrapeEvents.insert(this.jobId, { + type: "scrape", + url: pdfLink, + worker: process.env.FLY_MACHINE_ID, + method: "pdf-scrape", + result: null, + }); + const { content, pageStatusCode, pageError } = await fetchAndProcessPdf( pdfLink, this.pageOptions.parsePDF ); + + const insertedLogId = await logInsertPromise; + ScrapeEvents.updateScrapeResult(insertedLogId, { + response_size: content.length, + success: !(pageStatusCode && pageStatusCode >= 400) && !!content && (content.trim().length >= 100), + error: pageError, + response_code: pageStatusCode, + time_taken: Date.now() - timer, + }); return { content: content, metadata: { sourceURL: pdfLink, pageStatusCode, pageError }, @@ -330,12 +349,32 @@ export class WebScraperDataProvider { } private async fetchDocxDocuments(docxLinks: string[]): Promise { return Promise.all( - docxLinks.map(async (p) => { - const { content, pageStatusCode, pageError } = - await fetchAndProcessDocx(p); + docxLinks.map(async (docxLink) => { + const timer = Date.now(); + const logInsertPromise = ScrapeEvents.insert(this.jobId, { + type: "scrape", + url: docxLink, + worker: process.env.FLY_MACHINE_ID, + method: "docx-scrape", + result: null, + }); + + const { content, pageStatusCode, pageError } = await fetchAndProcessDocx( + docxLink + ); + + const insertedLogId = await logInsertPromise; + ScrapeEvents.updateScrapeResult(insertedLogId, { + response_size: content.length, + success: !(pageStatusCode && pageStatusCode >= 400) && !!content && (content.trim().length >= 100), + error: pageError, + response_code: pageStatusCode, + time_taken: Date.now() - timer, + }); + return { content, - metadata: { sourceURL: p, pageStatusCode, pageError }, + metadata: { sourceURL: docxLink, pageStatusCode, pageError }, provider: "web-scraper", }; }) diff --git a/apps/api/src/scraper/WebScraper/utils/docxProcessor.ts b/apps/api/src/scraper/WebScraper/utils/docxProcessor.ts index a01b8a28..8f6dc97c 100644 --- a/apps/api/src/scraper/WebScraper/utils/docxProcessor.ts +++ b/apps/api/src/scraper/WebScraper/utils/docxProcessor.ts @@ -4,38 +4,76 @@ import { createWriteStream } from "node:fs"; import path from "path"; import os from "os"; import mammoth from "mammoth"; +import { Logger } from "../../../lib/logger"; export async function fetchAndProcessDocx(url: string): Promise<{ content: string; pageStatusCode: number; pageError: string }> { - const { tempFilePath, pageStatusCode, pageError } = await downloadDocx(url); - const content = await processDocxToText(tempFilePath); - fs.unlinkSync(tempFilePath); // Clean up the temporary file + let tempFilePath = ''; + let pageStatusCode = 200; + let pageError = ''; + let content = ''; + + try { + const downloadResult = await downloadDocx(url); + tempFilePath = downloadResult.tempFilePath; + pageStatusCode = downloadResult.pageStatusCode; + pageError = downloadResult.pageError; + content = await processDocxToText(tempFilePath); + } catch (error) { + Logger.error(`Failed to fetch and process DOCX: ${error.message}`); + pageStatusCode = 500; + pageError = error.message; + content = ''; + } finally { + if (tempFilePath) { + fs.unlinkSync(tempFilePath); // Clean up the temporary file + } + } + return { content, pageStatusCode, pageError }; } async function downloadDocx(url: string): Promise<{ tempFilePath: string; pageStatusCode: number; pageError: string }> { - const response = await axios({ - url, - method: "GET", - responseType: "stream", - }); + try { + const response = await axios({ + url, + method: "GET", + responseType: "stream", + }); - const tempFilePath = path.join(os.tmpdir(), `tempDocx-${Date.now()}.docx`); - const writer = createWriteStream(tempFilePath); + const tempFilePath = path.join(os.tmpdir(), `tempDocx-${Date.now()}.docx`); + const writer = createWriteStream(tempFilePath); - response.data.pipe(writer); + response.data.pipe(writer); - return new Promise((resolve, reject) => { - writer.on("finish", () => resolve({ tempFilePath, pageStatusCode: response.status, pageError: response.statusText != "OK" ? response.statusText : undefined })); - writer.on("error", reject); - }); + return new Promise((resolve, reject) => { + writer.on("finish", () => resolve({ tempFilePath, pageStatusCode: response.status, pageError: response.statusText != "OK" ? response.statusText : undefined })); + writer.on("error", () => { + Logger.error('Failed to write DOCX file to disk'); + reject(new Error('Failed to write DOCX file to disk')); + }); + }); + } catch (error) { + Logger.error(`Failed to download DOCX: ${error.message}`); + return { tempFilePath: "", pageStatusCode: 500, pageError: error.message }; + } } export async function processDocxToText(filePath: string): Promise { - const content = await extractTextFromDocx(filePath); - return content; + try { + const content = await extractTextFromDocx(filePath); + return content; + } catch (error) { + Logger.error(`Failed to process DOCX to text: ${error.message}`); + return ""; + } } async function extractTextFromDocx(filePath: string): Promise { - const result = await mammoth.extractRawText({ path: filePath }); - return result.value; + try { + const result = await mammoth.extractRawText({ path: filePath }); + return result.value; + } catch (error) { + Logger.error(`Failed to extract text from DOCX: ${error.message}`); + return ""; + } } diff --git a/apps/api/src/scraper/WebScraper/utils/pdfProcessor.ts b/apps/api/src/scraper/WebScraper/utils/pdfProcessor.ts index 660d27eb..b27db99a 100644 --- a/apps/api/src/scraper/WebScraper/utils/pdfProcessor.ts +++ b/apps/api/src/scraper/WebScraper/utils/pdfProcessor.ts @@ -76,7 +76,6 @@ export async function processPdfToText(filePath: string, parsePDF: boolean): Pro let attempt = 0; const maxAttempts = 10; // Maximum number of attempts let resultAvailable = false; - while (attempt < maxAttempts && !resultAvailable) { try { resultResponse = await axios.get(resultUrl, { headers, timeout: (axiosTimeout * 2) }); @@ -90,13 +89,22 @@ export async function processPdfToText(filePath: string, parsePDF: boolean): Pro } catch (error) { Logger.debug("Error fetching result w/ LlamaIndex"); attempt++; + if (attempt >= maxAttempts) { + Logger.error("Max attempts reached, unable to fetch result."); + break; // Exit the loop if max attempts are reached + } await new Promise((resolve) => setTimeout(resolve, 500)); // Wait for 0.5 seconds before retrying // You may want to handle specific errors differently } } if (!resultAvailable) { - content = await processPdf(filePath); + try { + content = await processPdf(filePath); + } catch (error) { + Logger.error(`Failed to process PDF: ${error}`); + content = ""; + } } content = resultResponse.data[resultType]; } catch (error) { @@ -104,15 +112,29 @@ export async function processPdfToText(filePath: string, parsePDF: boolean): Pro content = await processPdf(filePath); } } else if (parsePDF) { - content = await processPdf(filePath); + try { + content = await processPdf(filePath); + } catch (error) { + Logger.error(`Failed to process PDF: ${error}`); + content = ""; + } } else { - content = fs.readFileSync(filePath, "utf-8"); + try { + content = fs.readFileSync(filePath, "utf-8"); + } catch (error) { + Logger.error(`Failed to read PDF file: ${error}`); + content = ""; + } } return content; } async function processPdf(file: string) { - const fileContent = fs.readFileSync(file); - const data = await pdf(fileContent); - return data.text; + try { + const fileContent = fs.readFileSync(file); + const data = await pdf(fileContent); + return data.text; + } catch (error) { + throw error; + } } \ No newline at end of file