feat: extract text from pdf (#70)

* feat: pdf

* fix

* fix
This commit is contained in:
Yanlong Wang 2024-05-30 20:21:33 +08:00 committed by GitHub
parent 7c5712363c
commit 33e14e5404
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 1020 additions and 194 deletions

View File

@ -33,9 +33,6 @@
"functions": {
"port": 5001
},
"auth": {
"port": 9099
},
"firestore": {
"port": 9098
},

View File

@ -15,6 +15,7 @@
"axios": "^1.3.3",
"bcrypt": "^5.1.0",
"civkit": "^0.6.5-047c0d8",
"core-js": "^3.37.1",
"cors": "^2.8.5",
"dayjs": "^1.11.9",
"express": "^4.19.2",
@ -27,6 +28,7 @@
"maxmind": "^4.3.18",
"minio": "^7.1.3",
"openai": "^4.20.0",
"pdfjs-dist": "^4.2.67",
"puppeteer": "^22.7.1",
"puppeteer-extra": "^3.3.6",
"puppeteer-extra-plugin-block-resources": "^2.4.3",
@ -3923,6 +3925,16 @@
"integrity": "sha512-3DdaFaU/Zf1AnpLiFDeNCD4TOWe3Zl2RZaTzUvWiIk5ERzcCodOE20Vqq4fzCbNoHURFHT4/us/Lfq+S2zyY4w==",
"optional": true
},
"node_modules/core-js": {
"version": "3.37.1",
"resolved": "https://registry.npmjs.org/core-js/-/core-js-3.37.1.tgz",
"integrity": "sha512-Xn6qmxrQZyB0FFY8E3bgRXei3lWDJHhvI+u0q9TKIYM49G8pAr0FgnnrFRAmsbptZL1yxRADVXn+x5AGsbBfyw==",
"hasInstallScript": true,
"funding": {
"type": "opencollective",
"url": "https://opencollective.com/core-js"
}
},
"node_modules/core-util-is": {
"version": "1.0.3",
"resolved": "https://registry.npmjs.org/core-util-is/-/core-util-is-1.0.3.tgz",
@ -9208,6 +9220,18 @@
"node": ">=8"
}
},
"node_modules/pdfjs-dist": {
"version": "4.2.67",
"resolved": "https://registry.npmjs.org/pdfjs-dist/-/pdfjs-dist-4.2.67.tgz",
"integrity": "sha512-rJmuBDFpD7cqC8WIkQUEClyB4UAH05K4AsyewToMTp2gSy3Rrx8c1ydAVqlJlGv3yZSOrhEERQU/4ScQQFlLHA==",
"engines": {
"node": ">=18"
},
"optionalDependencies": {
"canvas": "^2.11.2",
"path2d": "^0.2.0"
}
},
"node_modules/pend": {
"version": "1.2.0",
"resolved": "https://registry.npmjs.org/pend/-/pend-1.2.0.tgz",

View File

@ -35,6 +35,7 @@
"axios": "^1.3.3",
"bcrypt": "^5.1.0",
"civkit": "^0.6.5-047c0d8",
"core-js": "^3.37.1",
"cors": "^2.8.5",
"dayjs": "^1.11.9",
"express": "^4.19.2",
@ -47,6 +48,7 @@
"maxmind": "^4.3.18",
"minio": "^7.1.3",
"openai": "^4.20.0",
"pdfjs-dist": "^4.2.67",
"puppeteer": "^22.7.1",
"puppeteer-extra": "^3.3.6",
"puppeteer-extra-plugin-block-resources": "^2.4.3",

View File

@ -10,18 +10,18 @@ import { RateLimitControl, RateLimitDesc } from '../shared/services/rate-limit';
import _ from 'lodash';
import { PageSnapshot, PuppeteerControl, ScrappingOptions } from '../services/puppeteer';
import { Request, Response } from 'express';
import normalizeUrl from "@esm2cjs/normalize-url";
const pNormalizeUrl = import("@esm2cjs/normalize-url");
import { AltTextService } from '../services/alt-text';
import TurndownService from 'turndown';
import { parseString as parseSetCookieString } from 'set-cookie-parser';
import type { CookieParam } from 'puppeteer';
import { Crawled } from '../db/crawled';
import { cleanAttribute } from '../utils/misc';
import { randomUUID } from 'crypto';
import { JinaEmbeddingsAuthDTO } from '../shared/dto/jina-embeddings-auth';
import { countGPTToken as estimateToken } from '../shared/utils/openai';
import { CrawlerOptions } from '../dto/scrapping-options';
import { JinaEmbeddingsTokenAccount } from '../shared/db/jina-embeddings-token-account';
import { PDFExtractor } from '../services/pdf-extract';
const md5Hasher = new HashManager('md5', 'hex');
@ -69,6 +69,7 @@ export class CrawlerHost extends RPCHost {
protected globalLogger: Logger,
protected puppeteerControl: PuppeteerControl,
protected altTextService: AltTextService,
protected pdfExtractor: PDFExtractor,
protected firebaseObjectStorage: FirebaseStorageBucketControl,
protected rateLimitControl: RateLimitControl,
protected threadLocal: AsyncContext,
@ -76,7 +77,7 @@ export class CrawlerHost extends RPCHost {
super(...arguments);
puppeteerControl.on('crawled', async (snapshot: PageSnapshot, options: ScrappingOptions & { url: URL; }) => {
if (!snapshot.title?.trim()) {
if (!snapshot.title?.trim() && !snapshot.pdfs?.length) {
return;
}
if (options.cookies?.length) {
@ -138,14 +139,14 @@ export class CrawlerHost extends RPCHost {
});
turnDownService.addRule('improved-inline-link', {
filter: function (node, options) {
return (
return Boolean(
options.linkStyle === 'inlined' &&
node.nodeName === 'A' &&
node.getAttribute('href')
);
},
replacement: function (content, node) {
replacement: function (content, node: any) {
let href = node.getAttribute('href');
if (href) href = href.replace(/([()])/g, '\\$1');
let title = cleanAttribute(node.getAttribute('title'));
@ -226,6 +227,26 @@ export class CrawlerHost extends RPCHost {
}
} as FormattedPage;
}
let pdfMode = false;
if (snapshot.pdfs?.length && !snapshot.title) {
const pdf = await this.pdfExtractor.cachedExtract(snapshot.pdfs[0]);
if (pdf) {
pdfMode = true;
snapshot.title = pdf.meta?.Title;
snapshot.text = pdf.text || snapshot.text;
snapshot.parsed = {
content: pdf.content,
textContent: pdf.content,
length: pdf.content?.length,
byline: pdf.meta?.Author,
lang: pdf.meta?.Language || undefined,
title: pdf.meta?.Title,
publishedTime: this.pdfExtractor.parsePdfDate(pdf.meta?.ModDate || pdf.meta?.CreationDate)?.toISOString(),
};
}
}
if (mode === 'text') {
return {
...this.getGeneralSnapshotMixins(snapshot),
@ -236,6 +257,15 @@ export class CrawlerHost extends RPCHost {
} as FormattedPage;
}
let contentText = '';
const imageSummary = {} as { [k: string]: string; };
const imageIdxTrack = new Map<string, number[]>();
do {
if (pdfMode) {
contentText = snapshot.parsed?.content || snapshot.text;
break;
}
const toBeTurnedToMd = mode === 'markdown' ? snapshot.html : snapshot.parsed?.content;
let turnDownService = mode === 'markdown' ? this.getTurndown() : this.getTurndown('without any rule');
for (const plugin of this.turnDownPlugins) {
@ -256,11 +286,9 @@ export class CrawlerHost extends RPCHost {
await Promise.all(tasks);
}
let imgIdx = 0;
const imageSummary = {} as { [k: string]: string; };
const imageIdxTrack = new Map<string, number[]>();
turnDownService.addRule('img-generated-alt', {
filter: 'img',
replacement: (_content, node) => {
replacement: (_content, node: any) => {
let linkPreferredSrc = (node.getAttribute('src') || '').trim();
if (!linkPreferredSrc || linkPreferredSrc.startsWith('data:')) {
const dataSrc = (node.getAttribute('data-src') || '').trim();
@ -297,7 +325,6 @@ export class CrawlerHost extends RPCHost {
}
});
let contentText = '';
if (toBeTurnedToMd) {
try {
contentText = turnDownService.turndown(toBeTurnedToMd).trim();
@ -331,6 +358,7 @@ export class CrawlerHost extends RPCHost {
if (!contentText || (contentText.startsWith('<') || contentText.endsWith('>'))) {
contentText = snapshot.text;
}
} while (false);
const cleanText = (contentText || '').trim();
@ -514,7 +542,8 @@ ${suffixMixins.length ? `\n${suffixMixins.join('\n\n')}\n` : ''}`;
req: Request,
res: Response,
},
auth: JinaEmbeddingsAuthDTO
auth: JinaEmbeddingsAuthDTO,
crawlerOptions: CrawlerOptions,
) {
const uid = await auth.solveUID();
let chargeAmount = 0;
@ -571,6 +600,7 @@ ${suffixMixins.length ? `\n${suffixMixins.join('\n\n')}\n` : ''}`;
}
let urlToCrawl;
const normalizeUrl = (await pNormalizeUrl).default;
try {
urlToCrawl = new URL(normalizeUrl(noSlashURL.trim(), { stripWWW: false, removeTrailingSlash: false, removeSingleSlash: false }));
} catch (err) {
@ -586,58 +616,19 @@ ${suffixMixins.length ? `\n${suffixMixins.join('\n\n')}\n` : ''}`;
});
}
const customMode = ctx.req.get('x-respond-with') || ctx.req.get('x-return-format') || 'default';
const withGeneratedAlt = Boolean(ctx.req.get('x-with-generated-alt'));
const withLinksSummary = Boolean(ctx.req.get('x-with-links-summary'));
const withImagesSummary = Boolean(ctx.req.get('x-with-images-summary'));
const noCache = Boolean(ctx.req.get('x-no-cache'));
let cacheTolerance = parseInt(ctx.req.get('x-cache-tolerance') || '') * 1000;
if (isNaN(cacheTolerance)) {
cacheTolerance = this.cacheValidMs;
if (noCache) {
cacheTolerance = 0;
}
}
const targetSelector = ctx.req.get('x-target-selector') || undefined;
const waitForSelector = ctx.req.get('x-wait-for-selector') || targetSelector;
const cookies: CookieParam[] = [];
const setCookieHeaders = ctx.req.headers['x-set-cookie'];
if (Array.isArray(setCookieHeaders)) {
for (const setCookie of setCookieHeaders) {
cookies.push({
...parseSetCookieString(setCookie, { decodeValues: false }) as CookieParam,
domain: urlToCrawl.hostname,
});
}
} else if (setCookieHeaders) {
cookies.push({
...parseSetCookieString(setCookieHeaders, { decodeValues: false }) as CookieParam,
domain: urlToCrawl.hostname,
});
}
this.threadLocal.set('withGeneratedAlt', withGeneratedAlt);
this.threadLocal.set('withLinksSummary', withLinksSummary);
this.threadLocal.set('withImagesSummary', withImagesSummary);
const crawlOpts: ExtraScrappingOptions = {
proxyUrl: ctx.req.get('x-proxy-url'),
cookies,
favorScreenshot: customMode === 'screenshot',
waitForSelector,
targetSelector,
};
const crawlOpts = this.configure(crawlerOptions);
if (!ctx.req.accepts('text/plain') && ctx.req.accepts('text/event-stream')) {
const sseStream = new OutputServerEventStream();
rpcReflect.return(sseStream);
try {
for await (const scrapped of this.cachedScrap(urlToCrawl, crawlOpts, cacheTolerance)) {
for await (const scrapped of this.cachedScrap(urlToCrawl, crawlOpts, crawlerOptions.cacheTolerance)) {
if (!scrapped) {
continue;
}
const formatted = await this.formatSnapshot(customMode, scrapped, urlToCrawl);
const formatted = await this.formatSnapshot(crawlerOptions.respondWith, scrapped, urlToCrawl);
chargeAmount = this.getChargeAmount(formatted);
sseStream.write({
event: 'data',
@ -659,13 +650,13 @@ ${suffixMixins.length ? `\n${suffixMixins.join('\n\n')}\n` : ''}`;
let lastScrapped;
if (!ctx.req.accepts('text/plain') && (ctx.req.accepts('text/json') || ctx.req.accepts('application/json'))) {
for await (const scrapped of this.cachedScrap(urlToCrawl, crawlOpts, cacheTolerance)) {
for await (const scrapped of this.cachedScrap(urlToCrawl, crawlOpts, crawlerOptions.cacheTolerance)) {
lastScrapped = scrapped;
if (waitForSelector || !scrapped?.parsed?.content || !(scrapped.title?.trim())) {
if (crawlerOptions.waitForSelector || ((!scrapped?.parsed?.content || !scrapped.title?.trim()) && !scrapped?.pdfs?.length)) {
continue;
}
const formatted = await this.formatSnapshot(customMode, scrapped, urlToCrawl);
const formatted = await this.formatSnapshot(crawlerOptions.respondWith, scrapped, urlToCrawl);
chargeAmount = this.getChargeAmount(formatted);
return formatted;
@ -675,21 +666,21 @@ ${suffixMixins.length ? `\n${suffixMixins.join('\n\n')}\n` : ''}`;
throw new AssertionFailureError(`No content available for URL ${urlToCrawl}`);
}
const formatted = await this.formatSnapshot(customMode, lastScrapped, urlToCrawl);
const formatted = await this.formatSnapshot(crawlerOptions.respondWith, lastScrapped, urlToCrawl);
chargeAmount = this.getChargeAmount(formatted);
return formatted;
}
for await (const scrapped of this.cachedScrap(urlToCrawl, crawlOpts, cacheTolerance)) {
for await (const scrapped of this.cachedScrap(urlToCrawl, crawlOpts, crawlerOptions.cacheTolerance)) {
lastScrapped = scrapped;
if (waitForSelector || !scrapped?.parsed?.content || !(scrapped.title?.trim())) {
if (crawlerOptions.waitForSelector || ((!scrapped?.parsed?.content || !scrapped.title?.trim()) && !scrapped?.pdfs?.length)) {
continue;
}
const formatted = await this.formatSnapshot(customMode, scrapped, urlToCrawl);
const formatted = await this.formatSnapshot(crawlerOptions.respondWith, scrapped, urlToCrawl);
chargeAmount = this.getChargeAmount(formatted);
if (customMode === 'screenshot' && Reflect.get(formatted, 'screenshotUrl')) {
if (crawlerOptions.respondWith === 'screenshot' && Reflect.get(formatted, 'screenshotUrl')) {
return assignTransferProtocolMeta(`${formatted}`,
{ code: 302, envelope: null, headers: { Location: Reflect.get(formatted, 'screenshotUrl') } }
@ -703,9 +694,9 @@ ${suffixMixins.length ? `\n${suffixMixins.join('\n\n')}\n` : ''}`;
throw new AssertionFailureError(`No content available for URL ${urlToCrawl}`);
}
const formatted = await this.formatSnapshot(customMode, lastScrapped, urlToCrawl);
const formatted = await this.formatSnapshot(crawlerOptions.respondWith, lastScrapped, urlToCrawl);
chargeAmount = this.getChargeAmount(formatted);
if (customMode === 'screenshot' && Reflect.get(formatted, 'screenshotUrl')) {
if (crawlerOptions.respondWith === 'screenshot' && Reflect.get(formatted, 'screenshotUrl')) {
return assignTransferProtocolMeta(`${formatted}`,
{ code: 302, envelope: null, headers: { Location: Reflect.get(formatted, 'screenshotUrl') } }
@ -915,4 +906,55 @@ ${suffixMixins.length ? `\n${suffixMixins.join('\n\n')}\n` : ''}`;
}
}
configure(opts: CrawlerOptions) {
this.threadLocal.set('withGeneratedAlt', opts.withGeneratedAlt);
this.threadLocal.set('withLinksSummary', opts.withLinksSummary);
this.threadLocal.set('withImagesSummary', opts.withImagesSummary);
const crawlOpts: ExtraScrappingOptions = {
proxyUrl: opts.proxyUrl,
cookies: opts.setCookies,
favorScreenshot: opts.respondWith === 'screenshot',
waitForSelector: opts.waitForSelector,
targetSelector: opts.targetSelector,
};
return crawlOpts;
}
async simpleCrawl(mode: string, url: URL, opts?: ExtraScrappingOptions) {
const it = this.cachedScrap(url, { ...opts, minIntervalMs: 500 });
let lastSnapshot;
let goodEnough = false;
try {
for await (const x of it) {
lastSnapshot = x;
if (goodEnough) {
break;
}
if (lastSnapshot?.parsed?.content) {
// After it's good enough, wait for next snapshot;
goodEnough = true;
}
}
} catch (err) {
if (lastSnapshot) {
return this.formatSnapshot(mode, lastSnapshot, url);
}
throw err;
}
if (!lastSnapshot) {
throw new AssertionFailureError(`No content available`);
}
return this.formatSnapshot(mode, lastSnapshot, url);
}
}

View File

@ -0,0 +1,289 @@
import {
Defer,
PromiseThrottle,
RPCHost,
RPCReflection,
} from 'civkit';
import { singleton } from 'tsyringe';
import { CloudScheduleV2, CloudTaskV2, FirebaseStorageBucketControl, Logger, OutputServerEventStream, Param, RPCReflect, TempFileManager } from '../shared';
import _ from 'lodash';
import { CrawlerHost } from './crawler';
import { Crawled } from '../db/crawled';
import dayjs from 'dayjs';
import { createReadStream } from 'fs';
import { appendFile } from 'fs/promises';
import { createGzip } from 'zlib';
import { getFunctions } from 'firebase-admin/functions';
import { GoogleAuth } from 'google-auth-library';
dayjs.extend(require('dayjs/plugin/utc'));
/**
* Get the URL of a given v2 cloud function.
*
* @param {string} name the function's name
* @param {string} location the function's location
* @return {Promise<string>} The URL of the function
*/
async function getFunctionUrl(name: string, location = "us-central1") {
const projectId = `reader-6b7dc`;
const url = "https://cloudfunctions.googleapis.com/v2beta/" +
`projects/${projectId}/locations/${location}/functions/${name}`;
const auth = new GoogleAuth({
scopes: 'https://www.googleapis.com/auth/cloud-platform',
});
const client = await auth.getClient();
const res = await client.request<any>({ url });
const uri = res.data?.serviceConfig?.uri;
if (!uri) {
throw new Error(`Unable to retreive uri for function at ${url}`);
}
return uri;
}
@singleton()
export class DataCrunchingHost extends RPCHost {
logger = this.globalLogger.child({ service: this.constructor.name });
pageCacheCrunchingPrefix = 'crunched-pages';
pageCacheCrunchingBatchSize = 5000;
pageCacheCrunchingTMinus = 6 * 24 * 60 * 60 * 1000;
rev = 7;
constructor(
protected globalLogger: Logger,
protected crawler: CrawlerHost,
protected tempFileManager: TempFileManager,
protected firebaseObjectStorage: FirebaseStorageBucketControl,
) {
super(..._.without(arguments, crawler));
}
override async init() {
await this.dependencyReady();
this.emit('ready');
}
@CloudTaskV2({
runtime: {
cpu: 2,
memory: '4GiB',
timeoutSeconds: 3600,
concurrency: 2,
maxInstances: 200,
retryConfig: {
maxAttempts: 3,
minBackoffSeconds: 60,
},
rateLimits: {
maxConcurrentDispatches: 150,
maxDispatchesPerSecond: 2,
},
},
tags: ['DataCrunching'],
})
async crunchPageCacheWorker(
@Param('date') date: string,
@Param('offset', { default: 0 }) offset: number
) {
this.logger.info(`Crunching page cache @${date}+${offset}...`);
for await (const { fileName, records } of this.iterPageCacheRecords(date, offset)) {
this.logger.info(`Crunching ${fileName}...`);
const fileOnDrive = await this.crunchCacheRecords(records);
const fstream = createReadStream(fileOnDrive.path);
const gzipStream = createGzip();
fstream.pipe(gzipStream, { end: true });
await this.firebaseObjectStorage.bucket.file(fileName).save(gzipStream, {
contentType: 'application/jsonl+gzip',
});
}
this.logger.info(`Crunching page cache @${date}+${offset} done.`);
return true;
}
@CloudScheduleV2('2 0 * * *', {
name: 'crunchPageCacheEveryday',
runtime: {
cpu: 2,
memory: '4GiB',
timeoutSeconds: 1800,
timeZone: 'UTC',
retryCount: 3,
minBackoffSeconds: 60,
},
tags: ['DataCrunching'],
})
// @CloudHTTPv2({
// runtime: {
// cpu: 2,
// memory: '4GiB',
// timeoutSeconds: 3600,
// concurrency: 2,
// maxInstances: 200,
// },
// tags: ['DataCrunching'],
// })
async dispatchPageCacheCrunching(
@RPCReflect() rpcReflect: RPCReflection,
) {
const sse = new OutputServerEventStream({ highWaterMark: 4096 });
rpcReflect.return(sse);
rpcReflect.catch((err) => {
sse.end({ data: `Error: ${err.message}` });
});
for await (const { fileName, date, offset } of this.iterPageCacheChunks()) {
this.logger.info(`Dispatching ${fileName}...`);
sse.write({ data: `Dispatching ${fileName}...` });
await getFunctions().taskQueue('crunchPageCacheWorker').enqueue({ date, offset }, {
dispatchDeadlineSeconds: 1800,
uri: await getFunctionUrl('crunchPageCacheWorker'),
});
}
sse.end({ data: 'done' });
sse.resume();
return true;
}
async* iterPageCacheRecords(date?: string, inputOffset?: number | string) {
const startOfToday = dayjs().utc().startOf('day');
const startingPoint = dayjs().utc().subtract(this.pageCacheCrunchingTMinus, 'ms').startOf('day');
let theDay = startingPoint;
if (date) {
theDay = dayjs(date).utc().startOf('day');
}
let counter = 0;
if (inputOffset) {
counter = parseInt(inputOffset as string, 10);
}
while (theDay.isBefore(startOfToday)) {
const fileName = `${this.pageCacheCrunchingPrefix}/r${this.rev}/${theDay.format('YYYY-MM-DD')}/${counter}.jsonl.gz`;
const offset = counter;
counter += this.pageCacheCrunchingBatchSize;
const fileExists = (await this.firebaseObjectStorage.bucket.file(fileName).exists())[0];
if (fileExists) {
continue;
}
const records = await Crawled.fromFirestoreQuery(Crawled.COLLECTION
.where('createdAt', '>=', theDay.toDate())
.where('createdAt', '<', theDay.add(1, 'day').toDate())
.orderBy('createdAt', 'asc')
.offset(offset)
.limit(this.pageCacheCrunchingBatchSize)
);
this.logger.info(`Found ${records.length} records for ${theDay.format('YYYY-MM-DD')} at offset ${offset}`, { fileName, counter });
if (!records.length) {
if (date) {
break;
}
theDay = theDay.add(1, 'day');
counter = 0;
continue;
}
yield { fileName, records };
if (offset) {
break;
}
}
}
async* iterPageCacheChunks() {
const startOfToday = dayjs().utc().startOf('day');
const startingPoint = dayjs().utc().subtract(this.pageCacheCrunchingTMinus, 'ms').startOf('day');
let theDay = startingPoint;
let counter = 0;
while (theDay.isBefore(startOfToday)) {
const fileName = `${this.pageCacheCrunchingPrefix}/r${this.rev}/${theDay.format('YYYY-MM-DD')}/${counter}.jsonl.gz`;
const offset = counter;
counter += this.pageCacheCrunchingBatchSize;
const fileExists = (await this.firebaseObjectStorage.bucket.file(fileName).exists())[0];
if (fileExists) {
continue;
}
const nRecords = (await Crawled.COLLECTION
.where('createdAt', '>=', theDay.toDate())
.where('createdAt', '<', theDay.add(1, 'day').toDate())
.orderBy('createdAt', 'asc')
.offset(offset)
.limit(this.pageCacheCrunchingBatchSize)
.count().get()).data().count;
this.logger.info(`Found ${nRecords} records for ${theDay.format('YYYY-MM-DD')} at offset ${offset}`, { fileName, counter });
if (nRecords < this.pageCacheCrunchingBatchSize) {
theDay = theDay.add(1, 'day');
counter = 0;
}
if (nRecords) {
yield { fileName, date: theDay.toISOString(), offset };
}
continue;
}
}
async crunchCacheRecords(records: Crawled[]) {
const throttle = new PromiseThrottle(30);
const localFilePath = this.tempFileManager.alloc();
let nextDrainDeferred = Defer();
nextDrainDeferred.resolve();
for (const record of records) {
await throttle.acquire();
this.firebaseObjectStorage.downloadFile(`snapshots/${record._id}`)
.then(async (snapshotTxt) => {
try {
const snapshot = JSON.parse(snapshotTxt.toString('utf-8'));
let formatted = await this.crawler.formatSnapshot('default', snapshot);
if (!formatted.content) {
formatted = await this.crawler.formatSnapshot('markdown', snapshot);
}
await nextDrainDeferred.promise;
await appendFile(localFilePath, JSON.stringify({
url: snapshot.href,
title: snapshot.title || '',
html: snapshot.html || '',
text: snapshot.text || '',
content: formatted.content || '',
}) + '\n', { encoding: 'utf-8' });
} catch (err) {
this.logger.warn(`Failed to parse snapshot for ${record._id}`, { err });
}
})
.finally(() => {
throttle.release();
});
}
await throttle.nextDrain();
const ro = {
path: localFilePath
};
this.tempFileManager.bindPathTo(ro, localFilePath);
return ro;
}
}

View File

@ -19,6 +19,7 @@ import { parseString as parseSetCookieString } from 'set-cookie-parser';
import { WebSearchQueryParams } from '../shared/3rd-party/brave-search';
import { SearchResult } from '../db/searched';
import { WebSearchApiResponse, SearchResult as WebSearchResult } from '../shared/3rd-party/brave-types';
import { CrawlerOptions } from '../dto/scrapping-options';
@singleton()
@ -145,7 +146,8 @@ export class SearcherHost extends RPCHost {
req: Request,
res: Response,
},
auth: JinaEmbeddingsAuthDTO
auth: JinaEmbeddingsAuthDTO,
crawlerOptions: CrawlerOptions,
) {
const uid = await auth.solveUID();
let chargeAmount = 0;
@ -201,18 +203,7 @@ export class SearcherHost extends RPCHost {
});
}
const customMode = ctx.req.get('x-respond-with') || ctx.req.get('x-return-format') || 'default';
const withGeneratedAlt = Boolean(ctx.req.get('x-with-generated-alt'));
const withLinksSummary = Boolean(ctx.req.get('x-with-links-summary'));
const withImagesSummary = Boolean(ctx.req.get('x-with-images-summary'));
const noCache = Boolean(ctx.req.get('x-no-cache'));
let pageCacheTolerance = parseInt(ctx.req.get('x-cache-tolerance') || '') * 1000;
if (isNaN(pageCacheTolerance)) {
pageCacheTolerance = this.pageCacheToleranceMs;
if (noCache) {
pageCacheTolerance = 0;
}
}
const crawlOpts = this.crawler.configure(crawlerOptions);
const cookies: CookieParam[] = [];
const setCookieHeaders = ctx.req.headers['x-set-cookie'];
if (Array.isArray(setCookieHeaders)) {
@ -226,27 +217,19 @@ export class SearcherHost extends RPCHost {
...parseSetCookieString(setCookieHeaders, { decodeValues: false }) as CookieParam,
});
}
this.threadLocal.set('withGeneratedAlt', withGeneratedAlt);
this.threadLocal.set('withLinksSummary', withLinksSummary);
this.threadLocal.set('withImagesSummary', withImagesSummary);
const crawlOpts: ScrappingOptions = {
proxyUrl: ctx.req.get('x-proxy-url'),
cookies,
favorScreenshot: customMode === 'screenshot'
};
const searchQuery = noSlashPath;
const r = await this.cachedWebSearch({
q: searchQuery,
count: 10
}, noCache);
}, crawlerOptions.noCache);
if (!r.web?.results.length) {
throw new AssertionFailureError(`No search results available for query ${searchQuery}`);
}
const it = this.fetchSearchResults(customMode, r.web?.results, crawlOpts, pageCacheTolerance);
const it = this.fetchSearchResults(crawlerOptions.respondWith, r.web?.results, crawlOpts,
crawlerOptions.cacheTolerance || this.pageCacheToleranceMs
);
if (!ctx.req.accepts('text/plain') && ctx.req.accepts('text/event-stream')) {
const sseStream = new OutputServerEventStream();

View File

@ -0,0 +1,65 @@
import { Also, Prop, parseJSONText } from 'civkit';
import { FirestoreRecord } from '../shared/lib/firestore';
import _ from 'lodash';
@Also({
dictOf: Object
})
export class PDFContent extends FirestoreRecord {
static override collectionName = 'pdfs';
override _id!: string;
@Prop({
required: true
})
src!: string;
@Prop({
required: true
})
urlDigest!: string;
@Prop()
meta?: { [k: string]: any; };
@Prop()
text?: string;
@Prop()
content?: string;
@Prop()
createdAt!: Date;
@Prop()
expireAt?: Date;
static patchedFields = [
'meta'
];
static override from(input: any) {
for (const field of this.patchedFields) {
if (typeof input[field] === 'string') {
input[field] = parseJSONText(input[field]);
}
}
return super.from(input) as PDFContent;
}
override degradeForFireStore() {
const copy: any = { ...this };
for (const field of (this.constructor as typeof PDFContent).patchedFields) {
if (typeof copy[field] === 'object') {
copy[field] = JSON.stringify(copy[field]) as any;
}
}
return copy;
}
[k: string]: any;
}

View File

@ -0,0 +1,110 @@
import { AutoCastable, Prop, RPC_CALL_ENVIRONMENT } from 'civkit'; // Adjust the import based on where your decorators are defined
import type { Request, Response } from 'express';
import type { CookieParam } from 'puppeteer';
import { parseString as parseSetCookieString } from 'set-cookie-parser';
export class CrawlerOptions extends AutoCastable {
@Prop({
default: 'default',
})
respondWith!: string;
@Prop({
default: false,
})
withGeneratedAlt!: boolean;
@Prop({
default: false,
})
withLinksSummary!: boolean;
@Prop({
default: false,
})
withImagesSummary!: boolean;
@Prop({
default: false,
})
noCache!: boolean;
@Prop()
cacheTolerance?: number;
@Prop()
targetSelector?: string;
@Prop()
waitForSelector?: string;
@Prop({
arrayOf: String,
})
setCookies?: CookieParam[];
@Prop()
proxyUrl?: string;
static override from(input: any) {
const instance = super.from(input) as CrawlerOptions;
const ctx = Reflect.get(input, RPC_CALL_ENVIRONMENT) as {
req: Request,
res: Response,
};
const customMode = ctx.req.get('x-respond-with') || ctx.req.get('x-return-format');
if (customMode !== undefined) {
instance.respondWith = customMode;
}
const withGeneratedAlt = ctx.req.get('x-with-generated-alt');
if (withGeneratedAlt !== undefined) {
instance.withGeneratedAlt = Boolean(withGeneratedAlt);
}
const withLinksSummary = ctx.req.get('x-with-links-summary');
if (withLinksSummary !== undefined) {
instance.withLinksSummary = Boolean(withLinksSummary);
}
const withImagesSummary = ctx.req.get('x-with-images-summary');
if (withImagesSummary !== undefined) {
instance.withImagesSummary = Boolean(withImagesSummary);
}
const noCache = ctx.req.get('x-no-cache');
if (noCache !== undefined) {
instance.noCache = Boolean(noCache);
if (instance.noCache && instance.cacheTolerance === undefined) {
instance.cacheTolerance = 0;
}
}
let cacheTolerance = parseInt(ctx.req.get('x-cache-tolerance') || '');
if (!isNaN(cacheTolerance)) {
instance.cacheTolerance = cacheTolerance;
}
const targetSelector = ctx.req.get('x-target-selector');
instance.targetSelector ??= targetSelector;
const waitForSelector = ctx.req.get('x-wait-for-selector');
instance.waitForSelector ??= waitForSelector || instance.targetSelector;
const cookies: CookieParam[] = [];
const setCookieHeaders = ctx.req.headers['x-set-cookie'] || (instance.setCookies as any as string[]);
if (Array.isArray(setCookieHeaders)) {
for (const setCookie of setCookieHeaders) {
cookies.push({
...parseSetCookieString(setCookie, { decodeValues: false }) as CookieParam,
});
}
} else if (setCookieHeaders && typeof setCookieHeaders === 'string') {
cookies.push({
...parseSetCookieString(setCookieHeaders, { decodeValues: false }) as CookieParam,
});
}
const proxyUrl = ctx.req.get('x-proxy-url');
instance.proxyUrl ??= proxyUrl;
return instance;
}
}

View File

@ -6,7 +6,6 @@ import { ImageInterrogationManager } from '../shared/services/common-iminterroga
import { ImgBrief } from './puppeteer';
import { ImgAlt } from '../db/img-alt';
const md5Hasher = new HashManager('md5', 'hex');
@singleton()

View File

@ -0,0 +1,298 @@
import 'core-js/actual/promise/with-resolvers';
import { singleton } from 'tsyringe';
import _ from 'lodash';
import { TextItem } from 'pdfjs-dist/types/src/display/api';
import { AsyncService, HashManager } from 'civkit';
import { Logger } from '../shared/services/logger';
import { PDFContent } from '../db/pdf';
import dayjs from 'dayjs';
const utc = require('dayjs/plugin/utc'); // Import the UTC plugin
dayjs.extend(utc); // Extend dayjs with the UTC plugin
const timezone = require('dayjs/plugin/timezone');
dayjs.extend(timezone);
const pPdfjs = import('pdfjs-dist');
const md5Hasher = new HashManager('md5', 'hex');
function stdDev(numbers: number[]) {
const mean = _.mean(numbers);
const squareDiffs = numbers.map((num) => Math.pow(num - mean, 2));
const avgSquareDiff = _.mean(squareDiffs);
return Math.sqrt(avgSquareDiff);
}
function isRotatedByAtLeast35Degrees(transform: [number, number, number, number, number, number]): boolean {
const [a, b, c, d, _e, _f] = transform;
// Calculate the rotation angles using arctan(b/a) and arctan(-c/d)
const angle1 = Math.atan2(b, a) * (180 / Math.PI); // from a, b
const angle2 = Math.atan2(-c, d) * (180 / Math.PI); // from c, d
// Either angle1 or angle2 can be used to determine the rotation, they should be equivalent
const rotationAngle1 = Math.abs(angle1);
const rotationAngle2 = Math.abs(angle2);
// Check if the absolute rotation angle is greater than or equal to 35 degrees
return rotationAngle1 >= 35 || rotationAngle2 >= 35;
}
@singleton()
export class PDFExtractor extends AsyncService {
logger = this.globalLogger.child({ service: this.constructor.name });
pdfjs!: Awaited<typeof pPdfjs>;
constructor(
protected globalLogger: Logger,
) {
super(...arguments);
}
override async init() {
await this.dependencyReady();
this.pdfjs = await pPdfjs;
this.emit('ready');
}
async extract(url: string | URL) {
const loadingTask = this.pdfjs.getDocument({
url,
disableFontFace: true,
verbosity: 0
});
const doc = await loadingTask.promise;
const meta = await doc.getMetadata();
const textItems: TextItem[][] = [];
for (const pg of _.range(0, doc.numPages)) {
const page = await doc.getPage(pg + 1);
const textContent = await page.getTextContent();
textItems.push((textContent.items as TextItem[]));
}
const articleCharHeights: number[] = [];
for (const textItem of textItems.flat()) {
if (textItem.height) {
articleCharHeights.push(...Array(textItem.str.length).fill(textItem.height));
}
}
const articleAvgHeight = _.mean(articleCharHeights);
const articleStdDevHeight = stdDev(articleCharHeights);
// const articleMedianHeight = articleCharHeights.sort()[Math.floor(articleCharHeights.length / 2)];
const mdOps: Array<{
text: string;
op?: 'new' | 'append';
mode: 'h1' | 'h2' | 'p' | 'appendix' | 'space';
}> = [];
const rawChunks: string[] = [];
let op: 'append' | 'new' = 'new';
let mode: 'h1' | 'h2' | 'p' | 'space' | 'appendix' = 'p';
for (const pageTextItems of textItems) {
const charHeights = [];
for (const textItem of pageTextItems as TextItem[]) {
if (textItem.height) {
charHeights.push(...Array(textItem.str.length).fill(textItem.height));
}
rawChunks.push(`${textItem.str}${textItem.hasEOL ? '\n' : ''}`);
}
const avgHeight = _.mean(charHeights);
const stdDevHeight = stdDev(charHeights);
// const medianHeight = charHeights.sort()[Math.floor(charHeights.length / 2)];
for (const textItem of pageTextItems) {
if (textItem.height > articleAvgHeight + 3 * articleStdDevHeight) {
mode = 'h1';
} else if (textItem.height > articleAvgHeight + 2 * articleStdDevHeight) {
mode = 'h2';
} else if (textItem.height && textItem.height < avgHeight - stdDevHeight) {
mode = 'appendix';
} else if (textItem.height) {
mode = 'p';
} else {
mode = 'space';
}
if (isRotatedByAtLeast35Degrees(textItem.transform as any)) {
mode = 'appendix';
}
mdOps.push({
op,
mode,
text: textItem.str
});
if (textItem.hasEOL && !textItem.str) {
op = 'new';
} else {
op = 'append';
}
}
}
const mdChunks = [];
const appendixChunks = [];
mode = 'space';
for (const x of mdOps) {
const previousMode: string = mode;
const changeToMdChunks = [];
const isNewStart = x.mode !== 'space' && (x.op === 'new' || (previousMode === 'appendix' && x.mode !== previousMode));
if (isNewStart) {
switch (x.mode) {
case 'h1': {
changeToMdChunks.push(`\n\n# `);
mode = x.mode;
break;
}
case 'h2': {
changeToMdChunks.push(`\n\n## `);
mode = x.mode;
break;
}
case 'p': {
changeToMdChunks.push(`\n\n`);
mode = x.mode;
break;
}
case 'appendix': {
mode = x.mode;
appendixChunks.push(`\n\n`);
break;
}
default: {
break;
}
}
} else {
if (x.mode === 'appendix' && appendixChunks.length) {
const lastChunk = appendixChunks[appendixChunks.length - 1];
if (!lastChunk.match(/(\s+|-)$/) && lastChunk.length !== 1) {
appendixChunks.push(' ');
}
} else if (mdChunks.length) {
const lastChunk = mdChunks[mdChunks.length - 1];
if (!lastChunk.match(/(\s+|-)$/) && lastChunk.length !== 1) {
changeToMdChunks.push(' ');
}
}
}
if (x.text) {
if (x.mode == 'appendix') {
if (appendixChunks.length || isNewStart) {
appendixChunks.push(x.text);
} else {
changeToMdChunks.push(x.text);
}
} else {
changeToMdChunks.push(x.text);
}
}
if (isNewStart && x.mode !== 'appendix' && appendixChunks.length) {
const appendix = appendixChunks.join('').split(/\r?\n/).map((x) => x.trim()).filter(Boolean).map((x) => `> ${x}`).join('\n');
changeToMdChunks.unshift(appendix);
changeToMdChunks.unshift(`\n\n`);
appendixChunks.length = 0;
}
if (x.mode === 'space' && changeToMdChunks.length) {
changeToMdChunks.length = 1;
}
if (changeToMdChunks.length) {
mdChunks.push(...changeToMdChunks);
}
}
if (mdChunks.length) {
mdChunks[0] = mdChunks[0].trimStart();
}
return { meta: meta.info as Record<string, any>, content: mdChunks.join(''), text: rawChunks.join('') };
}
async cachedExtract(url: string | URL) {
if (!url) {
return undefined;
}
const digest = md5Hasher.hash(url.toString());
const shortDigest = Buffer.from(digest, 'hex').toString('base64url');
const existing = await PDFContent.fromFirestore(shortDigest);
if (existing) {
return {
meta: existing.meta,
content: existing.content,
text: existing.text
};
}
let extracted;
try {
extracted = await this.extract(url);
} catch (err) {
this.logger.warn(`Unable to extract from pdf ${url}`, { err });
}
// Don't try again until the next day
const expireMixin = extracted ? {} : { expireAt: new Date(Date.now() + 1000 * 3600 * 24) };
await PDFContent.COLLECTION.doc(shortDigest).set(
{
_id: shortDigest,
src: url.toString(),
meta: extracted?.meta || {},
content: extracted?.content || '',
text: extracted?.text || '',
urlDigest: digest,
createdAt: new Date(),
...expireMixin
}, { merge: true }
);
return extracted;
}
parsePdfDate(pdfDate: string | undefined) {
if (!pdfDate) {
return undefined;
}
// Remove the 'D:' prefix
const cleanedDate = pdfDate.slice(2);
// Define the format without the timezone part first
const dateTimePart = cleanedDate.slice(0, 14);
const timezonePart = cleanedDate.slice(14);
// Construct the full date string in a standard format
const formattedDate = `${dateTimePart}${timezonePart.replace("'", "").replace("'", "")}`;
// Parse the date with timezone
const parsedDate = dayjs(formattedDate, "YYYYMMDDHHmmssZ");
const date = parsedDate.toDate();
if (!date.valueOf()) {
return undefined;
}
return date;
}
}

View File

@ -50,6 +50,7 @@ export interface PageSnapshot {
parsed?: Partial<ReadabilityParsed> | null;
screenshot?: Buffer;
imgs?: ImgBrief[];
pdfs?: string[];
}
export interface ExtendedSnapshot extends PageSnapshot {
@ -62,6 +63,7 @@ export interface ScrappingOptions {
cookies?: CookieParam[];
favorScreenshot?: boolean;
waitForSelector?: string;
minIntervalMs?: number;
}
@ -97,7 +99,9 @@ export class PuppeteerControl extends AsyncService {
livePages = new Set<Page>();
lastPageCratedAt: number = 0;
constructor(protected globalLogger: Logger) {
constructor(
protected globalLogger: Logger,
) {
super(...arguments);
this.setMaxListeners(2 * Math.floor(os.totalmem() / (256 * 1024 * 1024)) + 1); 148 - 95;
@ -219,7 +223,17 @@ function briefImgs(elem) {
};
});
}
function giveSnapshot() {
function briefPDFs() {
const pdfTags = Array.from(document.querySelectorAll('embed[type="application/pdf"]'));
return pdfTags.map((x)=> {
return x.src === 'about:blank' ? document.location.href : x.src;
});
}
function giveSnapshot(stopActiveSnapshot) {
if (stopActiveSnapshot) {
window.haltSnapshot = true;
}
let parsed;
try {
parsed = new Readability(document.cloneNode(true)).parse();
@ -234,6 +248,7 @@ function giveSnapshot() {
text: document.body?.innerText,
parsed: parsed,
imgs: [],
pdfs: briefPDFs(),
};
if (parsed && parsed.content) {
const elem = document.createElement('div');
@ -277,7 +292,7 @@ function giveSnapshot() {
});
await page.evaluateOnNewDocument(`
let aftershot = undefined;
let lastTextLength = 0;
const handlePageLoad = () => {
if (window.haltSnapshot) {
return;
@ -285,26 +300,23 @@ const handlePageLoad = () => {
if (document.readyState !== 'complete') {
return;
}
const parsed = giveSnapshot();
window.reportSnapshot(parsed);
if (!parsed.text) {
if (aftershot) {
clearTimeout(aftershot);
const thisTextLength = (document.body.innerText || '').length;
const deltaLength = Math.abs(thisTextLength - lastTextLength);
if (10 * deltaLength < lastTextLength) {
// Change is not significant
return;
}
aftershot = setTimeout(() => {
const r = giveSnapshot();
if (r && r.text) {
window.reportSnapshot(r);
}
}, 500);
}
lastTextLength = thisTextLength;
};
setInterval(handlePageLoad, 500);
document.addEventListener('readystatechange', handlePageLoad);
document.addEventListener('load', handlePageLoad);
`);
this.snMap.set(page, sn);
this.logger.warn(`Page ${sn} created.`);
this.logger.info(`Page ${sn} created.`);
this.lastPageCratedAt = Date.now();
this.livePages.add(page);
@ -409,12 +421,12 @@ document.addEventListener('load', handlePageLoad);
finalized = true;
return;
}
snapshot = await page.evaluate('giveSnapshot()') as PageSnapshot;
snapshot = await page.evaluate('giveSnapshot(true)') as PageSnapshot;
screenshot = await page.screenshot();
if (!snapshot.title || !snapshot.parsed?.content) {
if ((!snapshot.title || !snapshot.parsed?.content) && !(snapshot.pdfs?.length)) {
const salvaged = await this.salvage(url, page);
if (salvaged) {
snapshot = await page.evaluate('giveSnapshot()') as PageSnapshot;
snapshot = await page.evaluate('giveSnapshot(true)') as PageSnapshot;
screenshot = await page.screenshot();
}
}
@ -429,7 +441,7 @@ document.addEventListener('load', handlePageLoad);
if (options?.waitForSelector) {
page.waitForSelector(options.waitForSelector)
.then(async () => {
snapshot = await page.evaluate('giveSnapshot()') as PageSnapshot;
snapshot = await page.evaluate('giveSnapshot(true)') as PageSnapshot;
screenshot = await page.screenshot();
finalized = true;
nextSnapshotDeferred.resolve(snapshot);
@ -442,7 +454,11 @@ document.addEventListener('load', handlePageLoad);
try {
let lastHTML = snapshot?.html;
while (true) {
await Promise.race([nextSnapshotDeferred.promise, gotoPromise]);
const ckpt = [nextSnapshotDeferred.promise, gotoPromise];
if (options?.minIntervalMs) {
ckpt.push(delay(options.minIntervalMs));
}
await Promise.race(ckpt);
if (finalized) {
yield { ...snapshot, screenshot } as PageSnapshot;
break;

View File

@ -1,6 +1,7 @@
{
"compilerOptions": {
"module": "commonjs",
"module": "node16",
"noImplicitReturns": true,
"noUnusedLocals": true,
"outDir": "build",

@ -1 +1 @@
Subproject commit 17ee85dd08becd8a86acf993ae7952d8f911b05e
Subproject commit b0b597800a36e2aa8ee3d52715aa7c998b388f47