feat(db): implement auth_credit_usage_chunk RPC

This commit is contained in:
Gergő Móricz 2024-09-25 19:25:18 +02:00
parent abdc08edea
commit 331e826bca
8 changed files with 139 additions and 394 deletions

View File

@ -17,6 +17,7 @@ import { getValue } from "../services/redis";
import { setValue } from "../services/redis";
import { validate } from "uuid";
import * as Sentry from "@sentry/node";
import { AuthCreditUsageChunk } from "./v1/types";
// const { data, error } = await supabase_service
// .from('api_keys')
// .select(`
@ -35,6 +36,58 @@ function normalizedApiIsUuid(potentialUuid: string): boolean {
// Check if the string is a valid UUID
return validate(potentialUuid);
}
async function setCachedACUC(api_key: string, acuc: AuthCreditUsageChunk) {
const cacheKeyACUC = `acuc_${api_key}`;
const redLockKey = `lock_${cacheKeyACUC}`;
const lockTTL = 10000; // 10 seconds
try {
const lock = await redlock.acquire([redLockKey], lockTTL);
try {
// Cache for 10 minutes. This means that changing subscription tier could have
// a maximum of 10 minutes of a delay. - mogery
await setValue(cacheKeyACUC, JSON.stringify(acuc), 600);
} finally {
await lock.release();
}
} catch (error) {
Logger.error(`Error updating cached ACUC: ${error}`);
Sentry.captureException(error);
}
}
async function getACUC(api_key: string): Promise<AuthCreditUsageChunk | null> {
const cacheKeyACUC = `acuc_${api_key}`;
const cachedACUC = await getValue(cacheKeyACUC);
if (cachedACUC !== null) {
return JSON.parse(cacheKeyACUC);
} else {
const { data, error } =
await supabase_service.rpc("auth_credit_usage_chunk", { input_key: api_key });
if (error) {
throw new Error("Failed to retrieve authentication and credit usage data: " + JSON.stringify(error));
}
const chunk: AuthCreditUsageChunk | null = data.length === 0
? null
: data[0].team_id === null
? null
: data[0];
// NOTE: Should we cache null chunks? - mogery
if (chunk !== null) {
setCachedACUC(api_key, chunk);
}
return chunk;
}
}
export async function authenticateUser(
req,
res,
@ -42,6 +95,7 @@ export async function authenticateUser(
): Promise<AuthResponse> {
return withAuth(supaAuthenticateUser)(req, res, mode);
}
function setTrace(team_id: string, api_key: string) {
try {
setTraceAttributes({
@ -54,45 +108,6 @@ function setTrace(team_id: string, api_key: string) {
}
}
async function getKeyAndPriceId(normalizedApi: string): Promise<{
success: boolean;
teamId?: string;
priceId?: string;
error?: string;
status?: number;
}> {
const { data, error } = await supabase_service.rpc("get_key_and_price_id_2", {
api_key: normalizedApi,
});
if (error) {
Sentry.captureException(error);
Logger.error(`RPC ERROR (get_key_and_price_id_2): ${error.message}`);
return {
success: false,
error:
"The server seems overloaded. Please contact hello@firecrawl.com if you aren't sending too many requests at once.",
status: 500,
};
}
if (!data || data.length === 0) {
if (error) {
Logger.warn(`Error fetching api key: ${error.message} or data is empty`);
Sentry.captureException(error);
}
// TODO: change this error code ?
return {
success: false,
error: "Unauthorized: Invalid token",
status: 401,
};
} else {
return {
success: true,
teamId: data[0].team_id,
priceId: data[0].price_id,
};
}
}
export async function supaAuthenticateUser(
req,
res,
@ -103,8 +118,8 @@ export async function supaAuthenticateUser(
error?: string;
status?: number;
plan?: PlanType;
chunk?: AuthCreditUsageChunk;
}> {
const authHeader = req.headers.authorization ?? (req.headers["sec-websocket-protocol"] ? `Bearer ${req.headers["sec-websocket-protocol"]}` : null);
if (!authHeader) {
return { success: false, error: "Unauthorized", status: 401 };
@ -126,11 +141,9 @@ export async function supaAuthenticateUser(
let subscriptionData: { team_id: string; plan: string } | null = null;
let normalizedApi: string;
let cacheKey = "";
let redLockKey = "";
const lockTTL = 15000; // 10 seconds
let teamId: string | null = null;
let priceId: string | null = null;
let chunk: AuthCreditUsageChunk;
if (token == "this_is_just_a_preview_token") {
if (mode == RateLimiterMode.CrawlStatus) {
@ -149,85 +162,25 @@ export async function supaAuthenticateUser(
};
}
cacheKey = `api_key:${normalizedApi}`;
chunk = await getACUC(normalizedApi);
try {
const teamIdPriceId = await getValue(cacheKey);
if (teamIdPriceId) {
const { team_id, price_id } = JSON.parse(teamIdPriceId);
teamId = team_id;
priceId = price_id;
} else {
const {
success,
teamId: tId,
priceId: pId,
error,
status,
} = await getKeyAndPriceId(normalizedApi);
if (!success) {
return { success, error, status };
}
teamId = tId;
priceId = pId;
await setValue(
cacheKey,
JSON.stringify({ team_id: teamId, price_id: priceId }),
60
);
}
} catch (error) {
Sentry.captureException(error);
Logger.error(`Error with auth function: ${error}`);
// const {
// success,
// teamId: tId,
// priceId: pId,
// error: e,
// status,
// } = await getKeyAndPriceId(normalizedApi);
// if (!success) {
// return { success, error: e, status };
// }
// teamId = tId;
// priceId = pId;
// const {
// success,
// teamId: tId,
// priceId: pId,
// error: e,
// status,
// } = await getKeyAndPriceId(normalizedApi);
// if (!success) {
// return { success, error: e, status };
// }
// teamId = tId;
// priceId = pId;
if (chunk === null) {
return {
success: false,
error: "Unauthorized: Invalid token",
status: 401,
};
}
// get_key_and_price_id_2 rpc definition:
// create or replace function get_key_and_price_id_2(api_key uuid)
// returns table(key uuid, team_id uuid, price_id text) as $$
// begin
// if api_key is null then
// return query
// select null::uuid as key, null::uuid as team_id, null::text as price_id;
// end if;
// return query
// select ak.key, ak.team_id, s.price_id
// from api_keys ak
// left join subscriptions s on ak.team_id = s.team_id
// where ak.key = api_key;
// end;
// $$ language plpgsql;
teamId = chunk.team_id;
priceId = chunk.price_id;
const plan = getPlanByPriceId(priceId);
// HyperDX Logging
setTrace(teamId, normalizedApi);
subscriptionData = {
team_id: teamId,
plan: plan,
plan,
};
switch (mode) {
case RateLimiterMode.Crawl:
@ -291,14 +244,6 @@ export async function supaAuthenticateUser(
endDate.setDate(endDate.getDate() + 7);
// await sendNotification(team_id, NotificationType.RATE_LIMIT_REACHED, startDate.toISOString(), endDate.toISOString());
// Cache longer for 429s
if (teamId && priceId && mode !== RateLimiterMode.Preview) {
await setValue(
cacheKey,
JSON.stringify({ team_id: teamId, price_id: priceId }),
60 // 10 seconds, cache for everything
);
}
return {
success: false,
@ -329,34 +274,11 @@ export async function supaAuthenticateUser(
// return { success: false, error: "Unauthorized: Invalid token", status: 401 };
}
// make sure api key is valid, based on the api_keys table in supabase
if (!subscriptionData) {
normalizedApi = parseApi(token);
const { data, error } = await supabase_service
.from("api_keys")
.select("*")
.eq("key", normalizedApi);
if (error || !data || data.length === 0) {
if (error) {
Sentry.captureException(error);
Logger.warn(`Error fetching api key: ${error.message} or data is empty`);
}
return {
success: false,
error: "Unauthorized: Invalid token",
status: 401,
};
}
subscriptionData = data[0];
}
return {
success: true,
team_id: subscriptionData.team_id,
plan: (subscriptionData.plan ?? "") as PlanType,
chunk,
};
}
function getPlanByPriceId(price_id: string): PlanType {

View File

@ -18,7 +18,7 @@ import { getJobPriority } from "../../lib/job-priority";
export async function crawlController(req: Request, res: Response) {
try {
const { success, team_id, error, status, plan } = await authenticateUser(
const { success, team_id, error, status, plan, chunk } = await authenticateUser(
req,
res,
RateLimiterMode.Crawl
@ -68,7 +68,7 @@ export async function crawlController(req: Request, res: Response) {
const limitCheck = req.body?.crawlerOptions?.limit ?? 1;
const { success: creditsCheckSuccess, message: creditsCheckMessage, remainingCredits } =
await checkTeamCredits(team_id, limitCheck);
await checkTeamCredits(chunk, team_id, limitCheck);
if (!creditsCheckSuccess) {
return res.status(402).json({ error: "Insufficient credits. You may be requesting with a higher limit than the amount of credits you have left. If not, upgrade your plan at https://firecrawl.dev/pricing or contact us at hello@firecrawl.com" });

View File

@ -157,7 +157,7 @@ export async function scrapeController(req: Request, res: Response) {
try {
let earlyReturn = false;
// make sure to authenticate user first, Bearer <token>
const { success, team_id, error, status, plan } = await authenticateUser(
const { success, team_id, error, status, plan, chunk } = await authenticateUser(
req,
res,
RateLimiterMode.Scrape
@ -193,7 +193,7 @@ export async function scrapeController(req: Request, res: Response) {
// checkCredits
try {
const { success: creditsCheckSuccess, message: creditsCheckMessage } =
await checkTeamCredits(team_id, 1);
await checkTeamCredits(chunk, team_id, 1);
if (!creditsCheckSuccess) {
earlyReturn = true;
return res.status(402).json({ error: "Insufficient credits" });

View File

@ -131,7 +131,7 @@ export async function searchHelper(
export async function searchController(req: Request, res: Response) {
try {
// make sure to authenticate user first, Bearer <token>
const { success, team_id, error, status, plan } = await authenticateUser(
const { success, team_id, error, status, plan, chunk } = await authenticateUser(
req,
res,
RateLimiterMode.Search
@ -155,7 +155,7 @@ export async function searchController(req: Request, res: Response) {
try {
const { success: creditsCheckSuccess, message: creditsCheckMessage } =
await checkTeamCredits(team_id, 1);
await checkTeamCredits(chunk, team_id, 1);
if (!creditsCheckSuccess) {
return res.status(402).json({ error: "Insufficient credits" });
}

View File

@ -315,11 +315,51 @@ type Account = {
remainingCredits: number;
};
export interface RequestWithMaybeAuth<
export type AuthCreditUsageChunk = {
api_key: string;
team_id: string;
sub_id: string;
sub_current_period_start: string;
sub_current_period_end: string;
price_id: string;
price_credits: number; // credit limit with assoicated price, or free_credits (500) if free plan
credits_used: number;
coupon_credits: number;
coupons: any[];
adjusted_credits_used: number; // credits this period minus coupons used
remaining_credits: number;
};
export interface RequestWithMaybeACUC<
ReqParams = {},
ReqBody = undefined,
ResBody = undefined
> extends Request<ReqParams, ReqBody, ResBody> {
acuc?: AuthCreditUsageChunk,
}
export interface RequestWithACUC<
ReqParams = {},
ReqBody = undefined,
ResBody = undefined
> extends Request<ReqParams, ReqBody, ResBody> {
acuc: AuthCreditUsageChunk,
}
export interface RequestWithAuth<
ReqParams = {},
ReqBody = undefined,
ResBody = undefined,
> extends Request<ReqParams, ReqBody, ResBody> {
auth: AuthObject;
account?: Account;
}
export interface RequestWithMaybeAuth<
ReqParams = {},
ReqBody = undefined,
ResBody = undefined
> extends RequestWithMaybeACUC<ReqParams, ReqBody, ResBody> {
auth?: AuthObject;
account?: Account;
}
@ -328,7 +368,7 @@ export interface RequestWithAuth<
ReqParams = {},
ReqBody = undefined,
ResBody = undefined,
> extends Request<ReqParams, ReqBody, ResBody> {
> extends RequestWithACUC<ReqParams, ReqBody, ResBody> {
auth: AuthObject;
account?: Account;
}

View File

@ -4,7 +4,7 @@ import { crawlController } from "../controllers/v1/crawl";
import { scrapeController } from "../../src/controllers/v1/scrape";
import { crawlStatusController } from "../controllers/v1/crawl-status";
import { mapController } from "../controllers/v1/map";
import { ErrorResponse, RequestWithAuth, RequestWithMaybeAuth } from "../controllers/v1/types";
import { ErrorResponse, RequestWithACUC, RequestWithAuth, RequestWithMaybeAuth } from "../controllers/v1/types";
import { RateLimiterMode } from "../types";
import { authenticateUser } from "../controllers/auth";
import { createIdempotencyKey } from "../services/idempotency/create";
@ -30,14 +30,15 @@ function checkCreditsMiddleware(minimum?: number): (req: RequestWithAuth, res: R
if (!minimum && req.body) {
minimum = (req.body as any)?.limit ?? 1;
}
const { success, message, remainingCredits } = await checkTeamCredits(req.auth.team_id, minimum);
const { success, remainingCredits, chunk } = await checkTeamCredits(req.acuc, req.auth.team_id, minimum);
req.acuc = chunk;
if (!success) {
Logger.error(`Insufficient credits: ${JSON.stringify({ team_id: req.auth.team_id, minimum, remainingCredits })}`);
if (!res.headersSent) {
return res.status(402).json({ success: false, error: "Insufficient credits" });
}
}
req.account = { remainingCredits }
req.account = { remainingCredits };
next();
})()
.catch(err => next(err));
@ -47,7 +48,7 @@ function checkCreditsMiddleware(minimum?: number): (req: RequestWithAuth, res: R
export function authMiddleware(rateLimiterMode: RateLimiterMode): (req: RequestWithMaybeAuth, res: Response, next: NextFunction) => void {
return (req, res, next) => {
(async () => {
const { success, team_id, error, status, plan } = await authenticateUser(
const { success, team_id, error, status, plan, chunk } = await authenticateUser(
req,
res,
rateLimiterMode,
@ -60,6 +61,7 @@ export function authMiddleware(rateLimiterMode: RateLimiterMode): (req: RequestW
}
req.auth = { team_id, plan };
req.acuc = chunk;
next();
})()
.catch(err => next(err));

View File

@ -6,6 +6,7 @@ import { Logger } from "../../lib/logger";
import { getValue, setValue } from "../redis";
import { redlock } from "../redlock";
import * as Sentry from "@sentry/node";
import { AuthCreditUsageChunk } from "../../controllers/v1/types";
const FREE_CREDITS = 500;
@ -166,264 +167,42 @@ export async function supaBillTeam(team_id: string, credits: number) {
});
}
export async function checkTeamCredits(team_id: string, credits: number) {
return withAuth(supaCheckTeamCredits)(team_id, credits);
export async function checkTeamCredits(chunk: AuthCreditUsageChunk, team_id: string, credits: number) {
return withAuth(supaCheckTeamCredits)(chunk, team_id, credits);
}
// if team has enough credits for the operation, return true, else return false
export async function supaCheckTeamCredits(team_id: string, credits: number) {
export async function supaCheckTeamCredits(chunk: AuthCreditUsageChunk, team_id: string, credits: number) {
// WARNING: chunk will be null if team_id is preview -- do not perform operations on it under ANY circumstances - mogery
if (team_id === "preview") {
return { success: true, message: "Preview team, no credits used", remainingCredits: Infinity };
}
let cacheKeySubscription = `subscription_${team_id}`;
let cacheKeyCoupons = `coupons_${team_id}`;
// Try to get data from cache first
const [cachedSubscription, cachedCoupons] = await Promise.all([
getValue(cacheKeySubscription),
getValue(cacheKeyCoupons)
]);
let subscription, subscriptionError;
let coupons : {credits: number}[];
if (cachedSubscription && cachedCoupons) {
subscription = JSON.parse(cachedSubscription);
coupons = JSON.parse(cachedCoupons);
} else {
// If not in cache, retrieve from database
const [subscriptionResult, couponsResult] = await Promise.all([
supabase_service
.from("subscriptions")
.select("id, price_id, current_period_start, current_period_end")
.eq("team_id", team_id)
.eq("status", "active")
.single(),
supabase_service
.from("coupons")
.select("credits")
.eq("team_id", team_id)
.eq("status", "active"),
]);
subscription = subscriptionResult.data;
subscriptionError = subscriptionResult.error;
coupons = couponsResult.data;
// Cache the results for a minute, sub can be null and that's fine
await setValue(cacheKeySubscription, JSON.stringify(subscription), 60); // Cache for 1 minute, even if null
await setValue(cacheKeyCoupons, JSON.stringify(coupons), 60); // Cache for 1 minute
}
let couponCredits = 0;
if (coupons && coupons.length > 0) {
couponCredits = coupons.reduce(
(total, coupon) => total + coupon.credits,
0
);
}
// If there are available coupons and they are enough for the operation
if (couponCredits >= credits) {
return { success: true, message: "Sufficient credits available", remainingCredits: couponCredits };
}
// Free credits, no coupons
if (!subscription || subscriptionError) {
let creditUsages;
let creditUsageError;
let totalCreditsUsed = 0;
const cacheKeyCreditUsage = `credit_usage_${team_id}`;
// Try to get credit usage from cache
const cachedCreditUsage = await getValue(cacheKeyCreditUsage);
if (cachedCreditUsage) {
totalCreditsUsed = parseInt(cachedCreditUsage);
} else {
let retries = 0;
const maxRetries = 3;
const retryInterval = 2000; // 2 seconds
while (retries < maxRetries) {
// Reminder, this has an 1000 limit.
const result = await supabase_service
.from("credit_usage")
.select("credits_used")
.is("subscription_id", null)
.eq("team_id", team_id);
creditUsages = result.data;
creditUsageError = result.error;
if (!creditUsageError) {
break;
}
retries++;
if (retries < maxRetries) {
await new Promise(resolve => setTimeout(resolve, retryInterval));
}
}
if (creditUsageError) {
Logger.error(`Credit usage error after ${maxRetries} attempts: ${creditUsageError}`);
throw new Error(
`Failed to retrieve credit usage for team_id: ${team_id}`
);
}
totalCreditsUsed = creditUsages.reduce(
(acc, usage) => acc + usage.credits_used,
0
);
// Cache the result for 30 seconds
await setValue(cacheKeyCreditUsage, totalCreditsUsed.toString(), 30);
}
Logger.info(`totalCreditsUsed: ${totalCreditsUsed}`);
const end = new Date();
end.setDate(end.getDate() + 30);
// check if usage is within 80% of the limit
const creditLimit = FREE_CREDITS;
const creditUsagePercentage = totalCreditsUsed / creditLimit;
// Add a check to ensure totalCreditsUsed is greater than 0
if (totalCreditsUsed > 0 && creditUsagePercentage >= 0.8 && creditUsagePercentage < 1) {
Logger.info(`Sending notification for team ${team_id}. Total credits used: ${totalCreditsUsed}, Credit usage percentage: ${creditUsagePercentage}`);
await sendNotification(
team_id,
NotificationType.APPROACHING_LIMIT,
new Date().toISOString(),
end.toISOString()
);
}
// 5. Compare the total credits used with the credits allowed by the plan.
if (totalCreditsUsed >= FREE_CREDITS) {
// Send email notification for insufficient credits
await sendNotification(
team_id,
NotificationType.LIMIT_REACHED,
new Date().toISOString(),
end.toISOString()
);
return {
success: false,
message: "Insufficient credits, please upgrade!",
remainingCredits: FREE_CREDITS - totalCreditsUsed
};
}
return { success: true, message: "Sufficient credits available", remainingCredits: FREE_CREDITS - totalCreditsUsed };
}
let totalCreditsUsed = 0;
const cacheKey = `credit_usage_${subscription.id}_${subscription.current_period_start}_${subscription.current_period_end}_lc`;
const redLockKey = `lock_${cacheKey}`;
const lockTTL = 10000; // 10 seconds
try {
const lock = await redlock.acquire([redLockKey], lockTTL);
try {
const cachedCreditUsage = await getValue(cacheKey);
if (cachedCreditUsage) {
totalCreditsUsed = parseInt(cachedCreditUsage);
} else {
const { data: creditUsages, error: creditUsageError } =
await supabase_service.rpc("get_credit_usage_2", {
sub_id: subscription.id,
start_time: subscription.current_period_start,
end_time: subscription.current_period_end,
});
if (creditUsageError) {
Logger.error(`Error calculating credit usage: ${creditUsageError}`);
}
if (creditUsages && creditUsages.length > 0) {
totalCreditsUsed = creditUsages[0].total_credits_used;
await setValue(cacheKey, totalCreditsUsed.toString(), 500); // Cache for 8 minutes
// Logger.info(`Cache set for credit usage: ${totalCreditsUsed}`);
}
}
} finally {
await lock.release();
}
} catch (error) {
Logger.error(`Error acquiring lock or calculating credit usage: ${error}`);
}
// Adjust total credits used by subtracting coupon value
const adjustedCreditsUsed = Math.max(0, totalCreditsUsed - couponCredits);
// Get the price details from cache or database
const priceCacheKey = `price_${subscription.price_id}`;
let price : {credits: number};
try {
const cachedPrice = await getValue(priceCacheKey);
if (cachedPrice) {
price = JSON.parse(cachedPrice);
} else {
const { data, error: priceError } = await supabase_service
.from("prices")
.select("credits")
.eq("id", subscription.price_id)
.single();
if (priceError) {
throw new Error(
`Failed to retrieve price for price_id: ${subscription.price_id}`
);
}
price = data;
// There are only 21 records, so this is super fine
// Cache the price for a long time (e.g., 1 day)
await setValue(priceCacheKey, JSON.stringify(price), 86400);
}
} catch (error) {
Logger.error(`Error retrieving or caching price: ${error}`);
Sentry.captureException(error);
// If errors, just assume it's a big number so user don't get an error
price = { credits: 10000000 };
}
const creditLimit = price.credits;
const creditsWillBeUsed = chunk.adjusted_credits_used + credits;
// Removal of + credits
const creditUsagePercentage = adjustedCreditsUsed / creditLimit;
const creditUsagePercentage = creditsWillBeUsed / chunk.price_credits;
// Compare the adjusted total credits used with the credits allowed by the plan
if (adjustedCreditsUsed >= price.credits) {
await sendNotification(
if (creditsWillBeUsed >= chunk.price_credits) {
sendNotification(
team_id,
NotificationType.LIMIT_REACHED,
subscription.current_period_start,
subscription.current_period_end
chunk.sub_current_period_start,
chunk.sub_current_period_end
);
return { success: false, message: "Insufficient credits, please upgrade!", remainingCredits: creditLimit - adjustedCreditsUsed };
return { success: false, message: "Insufficient credits, please upgrade!", remainingCredits: chunk.price_credits - chunk.adjusted_credits_used, chunk };
} else if (creditUsagePercentage >= 0.8 && creditUsagePercentage < 1) {
// Send email notification for approaching credit limit
await sendNotification(
sendNotification(
team_id,
NotificationType.APPROACHING_LIMIT,
subscription.current_period_start,
subscription.current_period_end
chunk.sub_current_period_start,
chunk.sub_current_period_end
);
}
return { success: true, message: "Sufficient credits available", remainingCredits: creditLimit - adjustedCreditsUsed };
return { success: true, message: "Sufficient credits available", remainingCredits: chunk.price_credits - chunk.adjusted_credits_used, chunk };
}
// Count the total credits used by a team within the current billing period and return the remaining credits.

View File

@ -1,3 +1,4 @@
import { AuthCreditUsageChunk } from "./controllers/v1/types";
import { ExtractorOptions, Document, DocumentUrl } from "./lib/entities";
type Mode = "crawl" | "single_urls" | "sitemap";
@ -120,6 +121,7 @@ export interface AuthResponse {
status?: number;
api_key?: string;
plan?: PlanType;
chunk?: AuthCreditUsageChunk;
}