diff --git a/backend/functions/src/cloud-functions/crawler.ts b/backend/functions/src/cloud-functions/crawler.ts index e121ec4..9890a95 100644 --- a/backend/functions/src/cloud-functions/crawler.ts +++ b/backend/functions/src/cloud-functions/crawler.ts @@ -19,7 +19,7 @@ import { randomUUID } from 'crypto'; import { JinaEmbeddingsAuthDTO } from '../shared/dto/jina-embeddings-auth'; import { countGPTToken as estimateToken } from '../shared/utils/openai'; -import { CrawlerOptions } from '../dto/scrapping-options'; +import { CrawlerOptions, CrawlerOptionsHeaderOnly } from '../dto/scrapping-options'; import { JinaEmbeddingsTokenAccount } from '../shared/db/jina-embeddings-token-account'; import { PDFExtractor } from '../services/pdf-extract'; @@ -230,7 +230,9 @@ export class CrawlerHost extends RPCHost { let pdfMode = false; if (snapshot.pdfs?.length && !snapshot.title) { - const pdf = await this.pdfExtractor.cachedExtract(snapshot.pdfs[0]); + const pdf = await this.pdfExtractor.cachedExtract(snapshot.pdfs[0], + this.threadLocal.get('cacheTolerance') + ); if (pdf) { pdfMode = true; snapshot.title = pdf.meta?.Title; @@ -432,7 +434,7 @@ ${suffixMixins.length ? `\n${suffixMixins.join('\n\n')}\n` : ''}`; runtime: { memory: '4GiB', timeoutSeconds: 300, - concurrency: 4, + concurrency: 22, }, tags: ['Crawler'], httpMethod: ['get', 'post'], @@ -442,9 +444,9 @@ ${suffixMixins.length ? `\n${suffixMixins.join('\n\n')}\n` : ''}`; @CloudHTTPv2({ runtime: { memory: '4GiB', - cpu: 2, + cpu: 4, timeoutSeconds: 300, - concurrency: 11, + concurrency: 22, maxInstances: 455, }, openapi: { @@ -543,11 +545,11 @@ ${suffixMixins.length ? `\n${suffixMixins.join('\n\n')}\n` : ''}`; res: Response, }, auth: JinaEmbeddingsAuthDTO, - crawlerOptions: CrawlerOptions, + crawlerOptions: CrawlerOptionsHeaderOnly, ) { const uid = await auth.solveUID(); let chargeAmount = 0; - const noSlashURL = ctx.req.url.slice(1).trimStart(); + const noSlashURL = ctx.req.url.slice(1); if (!noSlashURL) { const latestUser = uid ? await auth.assertUser() : undefined; if (!ctx.req.accepts('text/plain') && (ctx.req.accepts('text/json') || ctx.req.accepts('application/json'))) { @@ -911,6 +913,7 @@ ${suffixMixins.length ? `\n${suffixMixins.join('\n\n')}\n` : ''}`; this.threadLocal.set('withGeneratedAlt', opts.withGeneratedAlt); this.threadLocal.set('withLinksSummary', opts.withLinksSummary); this.threadLocal.set('withImagesSummary', opts.withImagesSummary); + this.threadLocal.set('cacheTolerance', opts.cacheTolerance); const crawlOpts: ExtraScrappingOptions = { proxyUrl: opts.proxyUrl, diff --git a/backend/functions/src/cloud-functions/data-crunching.ts b/backend/functions/src/cloud-functions/data-crunching.ts index 202f719..a040032 100644 --- a/backend/functions/src/cloud-functions/data-crunching.ts +++ b/backend/functions/src/cloud-functions/data-crunching.ts @@ -118,6 +118,20 @@ export class DataCrunchingHost extends RPCHost { }, tags: ['DataCrunching'], }) + async dispatchPageCacheCrunching() { + for await (const { fileName, date, offset } of this.iterPageCacheChunks()) { + this.logger.info(`Dispatching ${fileName}...`); + // sse.write({ data: `Dispatching ${fileName}...` }); + + await getFunctions().taskQueue('crunchPageCacheWorker').enqueue({ date, offset }, { + dispatchDeadlineSeconds: 1800, + uri: await getFunctionUrl('crunchPageCacheWorker'), + }); + } + + return true; + } + // @CloudHTTPv2({ // runtime: { // cpu: 2, @@ -128,29 +142,28 @@ export class DataCrunchingHost extends RPCHost { // }, // tags: ['DataCrunching'], // }) - async dispatchPageCacheCrunching( - @RPCReflect() rpcReflect: RPCReflection, - ) { - const sse = new OutputServerEventStream({ highWaterMark: 4096 }); - rpcReflect.return(sse); - rpcReflect.catch((err) => { - sse.end({ data: `Error: ${err.message}` }); - }); - for await (const { fileName, date, offset } of this.iterPageCacheChunks()) { - this.logger.info(`Dispatching ${fileName}...`); - sse.write({ data: `Dispatching ${fileName}...` }); + // async dispatchPageCacheCrunching( + // @RPCReflect() rpcReflect: RPCReflection + // ) { + // const sse = new OutputServerEventStream({ highWaterMark: 4096 }); + // rpcReflect.return(sse); + // rpcReflect.catch((err) => { + // sse.end({ data: `Error: ${err.message}` }); + // }); + // for await (const { fileName, date, offset } of this.iterPageCacheChunks()) { + // this.logger.info(`Dispatching ${fileName}...`); + // sse.write({ data: `Dispatching ${fileName}...` }); - await getFunctions().taskQueue('crunchPageCacheWorker').enqueue({ date, offset }, { - dispatchDeadlineSeconds: 1800, - uri: await getFunctionUrl('crunchPageCacheWorker'), - }); - } + // await getFunctions().taskQueue('crunchPageCacheWorker').enqueue({ date, offset }, { + // dispatchDeadlineSeconds: 1800, + // uri: await getFunctionUrl('crunchPageCacheWorker'), + // }); + // } - sse.end({ data: 'done' }); - sse.resume(); + // sse.end({ data: 'done' }); - return true; - } + // return true; + // } async* iterPageCacheRecords(date?: string, inputOffset?: number | string) { const startOfToday = dayjs().utc().startOf('day'); @@ -234,8 +247,6 @@ export class DataCrunchingHost extends RPCHost { if (nRecords) { yield { fileName, date: theDay.toISOString(), offset }; } - - continue; } } diff --git a/backend/functions/src/cloud-functions/searcher.ts b/backend/functions/src/cloud-functions/searcher.ts index 7888b62..a5d394a 100644 --- a/backend/functions/src/cloud-functions/searcher.ts +++ b/backend/functions/src/cloud-functions/searcher.ts @@ -53,6 +53,7 @@ export class SearcherHost extends RPCHost { @CloudHTTPv2({ name: 'search2', runtime: { + cpu: 4, memory: '4GiB', timeoutSeconds: 300, concurrency: 4, @@ -64,10 +65,10 @@ export class SearcherHost extends RPCHost { }) @CloudHTTPv2({ runtime: { - cpu: 4, + cpu: 8, memory: '8GiB', timeoutSeconds: 300, - concurrency: 4, + concurrency: 6, maxInstances: 200, }, openapi: { @@ -265,28 +266,40 @@ export class SearcherHost extends RPCHost { let lastScrapped: any[] | undefined; let earlyReturn = false; if (!ctx.req.accepts('text/plain') && (ctx.req.accepts('text/json') || ctx.req.accepts('application/json'))) { - const earlyReturnTimer = setTimeout(() => { - if (!lastScrapped) { + let earlyReturnTimer: ReturnType | undefined; + const setEarlyReturnTimer = () => { + if (earlyReturnTimer) { return; } - chargeAmount = this.getChargeAmount(lastScrapped); - rpcReflect.return(lastScrapped); - earlyReturn = true; - }, this.reasonableDelayMs); + earlyReturnTimer = setTimeout(() => { + if (!lastScrapped) { + return; + } + chargeAmount = this.getChargeAmount(lastScrapped); + rpcReflect.return(lastScrapped); + earlyReturn = true; + }, this.reasonableDelayMs); + }; for await (const scrapped of it) { lastScrapped = scrapped; - + if (_.some(scrapped, (x) => this.pageQualified(x))) { + setEarlyReturnTimer(); + } if (!this.searchResultsQualified(scrapped)) { continue; } - clearTimeout(earlyReturnTimer); + if (earlyReturnTimer) { + clearTimeout(earlyReturnTimer); + } chargeAmount = this.getChargeAmount(scrapped); return scrapped; } - clearTimeout(earlyReturnTimer); + if (earlyReturnTimer) { + clearTimeout(earlyReturnTimer); + } if (!lastScrapped) { throw new AssertionFailureError(`No content available for query ${searchQuery}`); @@ -299,29 +312,44 @@ export class SearcherHost extends RPCHost { return lastScrapped; } - const earlyReturnTimer = setTimeout(() => { - if (!lastScrapped) { + let earlyReturnTimer: ReturnType | undefined; + const setEarlyReturnTimer = () => { + if (earlyReturnTimer) { return; } - chargeAmount = this.getChargeAmount(lastScrapped); - rpcReflect.return(assignTransferProtocolMeta(`${lastScrapped}`, { contentType: 'text/plain', envelope: null })); - earlyReturn = true; - }, this.reasonableDelayMs); + earlyReturnTimer = setTimeout(() => { + if (!lastScrapped) { + return; + } + chargeAmount = this.getChargeAmount(lastScrapped); + rpcReflect.return(assignTransferProtocolMeta(`${lastScrapped}`, { contentType: 'text/plain', envelope: null })); + earlyReturn = true; + }, this.reasonableDelayMs); + }; for await (const scrapped of it) { lastScrapped = scrapped; + if (_.some(scrapped, (x) => this.pageQualified(x))) { + setEarlyReturnTimer(); + } + if (!this.searchResultsQualified(scrapped)) { continue; } - clearTimeout(earlyReturnTimer); + if (earlyReturnTimer) { + clearTimeout(earlyReturnTimer); + } + chargeAmount = this.getChargeAmount(scrapped); return assignTransferProtocolMeta(`${scrapped}`, { contentType: 'text/plain', envelope: null }); } - clearTimeout(earlyReturnTimer); + if (earlyReturnTimer) { + clearTimeout(earlyReturnTimer); + } if (!lastScrapped) { throw new AssertionFailureError(`No content available for query ${searchQuery}`); @@ -331,7 +359,6 @@ export class SearcherHost extends RPCHost { chargeAmount = this.getChargeAmount(lastScrapped); } - return assignTransferProtocolMeta(`${lastScrapped}`, { contentType: 'text/plain', envelope: null }); } diff --git a/backend/functions/src/dto/scrapping-options.ts b/backend/functions/src/dto/scrapping-options.ts index 96d010c..2433f9f 100644 --- a/backend/functions/src/dto/scrapping-options.ts +++ b/backend/functions/src/dto/scrapping-options.ts @@ -52,44 +52,44 @@ export class CrawlerOptions extends AutoCastable { const ctx = Reflect.get(input, RPC_CALL_ENVIRONMENT) as { req: Request, res: Response, - }; + } | undefined; - const customMode = ctx.req.get('x-respond-with') || ctx.req.get('x-return-format'); + const customMode = ctx?.req.get('x-respond-with') || ctx?.req.get('x-return-format'); if (customMode !== undefined) { instance.respondWith = customMode; } - const withGeneratedAlt = ctx.req.get('x-with-generated-alt'); + const withGeneratedAlt = ctx?.req.get('x-with-generated-alt'); if (withGeneratedAlt !== undefined) { instance.withGeneratedAlt = Boolean(withGeneratedAlt); } - const withLinksSummary = ctx.req.get('x-with-links-summary'); + const withLinksSummary = ctx?.req.get('x-with-links-summary'); if (withLinksSummary !== undefined) { instance.withLinksSummary = Boolean(withLinksSummary); } - const withImagesSummary = ctx.req.get('x-with-images-summary'); + const withImagesSummary = ctx?.req.get('x-with-images-summary'); if (withImagesSummary !== undefined) { instance.withImagesSummary = Boolean(withImagesSummary); } - const noCache = ctx.req.get('x-no-cache'); + const noCache = ctx?.req.get('x-no-cache'); if (noCache !== undefined) { instance.noCache = Boolean(noCache); - if (instance.noCache && instance.cacheTolerance === undefined) { - instance.cacheTolerance = 0; - } } - let cacheTolerance = parseInt(ctx.req.get('x-cache-tolerance') || ''); + if (instance.noCache && instance.cacheTolerance === undefined) { + instance.cacheTolerance = 0; + } + let cacheTolerance = parseInt(ctx?.req.get('x-cache-tolerance') || ''); if (!isNaN(cacheTolerance)) { instance.cacheTolerance = cacheTolerance; } - const targetSelector = ctx.req.get('x-target-selector'); + const targetSelector = ctx?.req.get('x-target-selector'); instance.targetSelector ??= targetSelector; - const waitForSelector = ctx.req.get('x-wait-for-selector'); + const waitForSelector = ctx?.req.get('x-wait-for-selector'); instance.waitForSelector ??= waitForSelector || instance.targetSelector; const cookies: CookieParam[] = []; - const setCookieHeaders = ctx.req.headers['x-set-cookie'] || (instance.setCookies as any as string[]); + const setCookieHeaders = ctx?.req.headers['x-set-cookie'] || (instance.setCookies as any as string[]); if (Array.isArray(setCookieHeaders)) { for (const setCookie of setCookieHeaders) { cookies.push({ @@ -102,9 +102,23 @@ export class CrawlerOptions extends AutoCastable { }); } - const proxyUrl = ctx.req.get('x-proxy-url'); + const proxyUrl = ctx?.req.get('x-proxy-url'); instance.proxyUrl ??= proxyUrl; + if (instance.cacheTolerance) { + instance.cacheTolerance = instance.cacheTolerance * 1000; + } + + return instance; + } +} + +export class CrawlerOptionsHeaderOnly extends CrawlerOptions { + static override from(input: any) { + const instance = super.from({ + [RPC_CALL_ENVIRONMENT]: Reflect.get(input, RPC_CALL_ENVIRONMENT), + }) as CrawlerOptionsHeaderOnly; + return instance; } } diff --git a/backend/functions/src/index.ts b/backend/functions/src/index.ts index a6016bb..45215ca 100644 --- a/backend/functions/src/index.ts +++ b/backend/functions/src/index.ts @@ -13,6 +13,7 @@ Object.assign(exports, registry.exportGrouped({ memory: '4GiB', timeoutSeconds: 540, })); +registry.allHandsOnDeck().catch(() => void 0); registry.title = 'reader'; registry.version = '0.1.0'; diff --git a/backend/functions/src/services/pdf-extract.ts b/backend/functions/src/services/pdf-extract.ts index 447099b..8624171 100644 --- a/backend/functions/src/services/pdf-extract.ts +++ b/backend/functions/src/services/pdf-extract.ts @@ -6,6 +6,8 @@ import { AsyncService, HashManager } from 'civkit'; import { Logger } from '../shared/services/logger'; import { PDFContent } from '../db/pdf'; import dayjs from 'dayjs'; +import { FirebaseStorageBucketControl } from '../shared'; +import { randomUUID } from 'crypto'; const utc = require('dayjs/plugin/utc'); // Import the UTC plugin dayjs.extend(utc); // Extend dayjs with the UTC plugin const timezone = require('dayjs/plugin/timezone'); @@ -46,6 +48,7 @@ export class PDFExtractor extends AsyncService { constructor( protected globalLogger: Logger, + protected firebaseObjectStorage: FirebaseStorageBucketControl, ) { super(...arguments); } @@ -225,22 +228,46 @@ export class PDFExtractor extends AsyncService { return { meta: meta.info as Record, content: mdChunks.join(''), text: rawChunks.join('') }; } - async cachedExtract(url: string | URL) { + async cachedExtract(url: string | URL, cacheTolerance: number = 1000 * 3600 * 24) { if (!url) { return undefined; } const digest = md5Hasher.hash(url.toString()); - const shortDigest = Buffer.from(digest, 'hex').toString('base64url'); - const existing = await PDFContent.fromFirestore(shortDigest); + const cache: PDFContent | undefined = (await PDFContent.fromFirestoreQuery(PDFContent.COLLECTION.where('urlDigest', '==', digest).orderBy('createdAt', 'desc').limit(1)))?.[0]; - if (existing) { - return { - meta: existing.meta, - content: existing.content, - text: existing.text - }; + if (cache) { + const age = Date.now() - cache?.createdAt.valueOf(); + const stale = cache.createdAt.valueOf() < (Date.now() - cacheTolerance); + this.logger.info(`${stale ? 'Stale cache exists' : 'Cache hit'} for PDF ${url}, normalized digest: ${digest}, ${age}ms old, tolerance ${cacheTolerance}ms`, { + url, digest, age, stale, cacheTolerance + }); + + if (!stale) { + if (cache.content && cache.text) { + return { + meta: cache.meta, + content: cache.content, + text: cache.text + }; + } + + try { + const r = await this.firebaseObjectStorage.downloadFile(`pdfs/${cache._id}`); + let cached = JSON.parse(r.toString('utf-8')); + + return { + meta: cached.meta, + content: cached.content, + text: cached.text + }; + } catch (err) { + this.logger.warn(`Unable to load cached content for ${url}`, { err }); + + return undefined; + } + } } let extracted; @@ -253,14 +280,16 @@ export class PDFExtractor extends AsyncService { // Don't try again until the next day const expireMixin = extracted ? {} : { expireAt: new Date(Date.now() + 1000 * 3600 * 24) }; + const theID = randomUUID(); + await this.firebaseObjectStorage.saveFile(`pdfs/${theID}`, + Buffer.from(JSON.stringify(extracted), 'utf-8'), { contentType: 'application/json' }); - await PDFContent.COLLECTION.doc(shortDigest).set( + await PDFContent.COLLECTION.doc(theID).set( { - _id: shortDigest, src: url.toString(), meta: extracted?.meta || {}, - content: extracted?.content || '', text: extracted?.text || '', + content: extracted?.content || '', urlDigest: digest, createdAt: new Date(), ...expireMixin diff --git a/backend/functions/src/services/puppeteer.ts b/backend/functions/src/services/puppeteer.ts index 9f273cf..5afdbe0 100644 --- a/backend/functions/src/services/puppeteer.ts +++ b/backend/functions/src/services/puppeteer.ts @@ -380,7 +380,7 @@ document.addEventListener('load', handlePageLoad); let screenshot: Buffer | undefined; const page = await this.getNextPage(); const sn = this.snMap.get(page); - this.logger.info(`Page ${sn}: Scraping ${url}`, { url }); + this.logger.info(`Page ${sn}: Scraping ${url}`, { url }); if (options?.proxyUrl) { await page.useProxy(options.proxyUrl); } diff --git a/thinapps-shared b/thinapps-shared index b0b5978..a3a13b1 160000 --- a/thinapps-shared +++ b/thinapps-shared @@ -1 +1 @@ -Subproject commit b0b597800a36e2aa8ee3d52715aa7c998b388f47 +Subproject commit a3a13b13fbef8e9f5d388bde6fca6b459e6f92a6