Merge branch 'main' into support-anthropic-for-extraction

This commit is contained in:
aar2dee2 2024-10-26 09:44:57 +05:30
commit d87467a158
9 changed files with 458 additions and 110 deletions

View File

@ -75,15 +75,19 @@ export async function setCachedACUC(
export async function getACUC(
api_key: string,
cacheOnly = false
cacheOnly = false,
useCache = true
): Promise<AuthCreditUsageChunk | null> {
const cacheKeyACUC = `acuc_${api_key}`;
const cachedACUC = await getValue(cacheKeyACUC);
if (useCache) {
const cachedACUC = await getValue(cacheKeyACUC);
if (cachedACUC !== null) {
return JSON.parse(cachedACUC);
}
}
if (cachedACUC !== null) {
return JSON.parse(cachedACUC);
} else if (!cacheOnly) {
if (!cacheOnly) {
let data;
let error;
let retries = 0;
@ -91,7 +95,7 @@ export async function getACUC(
while (retries < maxRetries) {
({ data, error } = await supabase_service.rpc(
"auth_credit_usage_chunk_test_3",
"auth_credit_usage_chunk_test_21_credit_pack",
{ input_key: api_key }
));
@ -118,9 +122,11 @@ export async function getACUC(
data.length === 0 ? null : data[0].team_id === null ? null : data[0];
// NOTE: Should we cache null chunks? - mogery
if (chunk !== null) {
if (chunk !== null && useCache) {
setCachedACUC(api_key, chunk);
}
// console.log(chunk);
return chunk;
} else {

View File

@ -379,6 +379,8 @@ export type AuthCreditUsageChunk = {
coupons: any[];
adjusted_credits_used: number; // credits this period minus coupons used
remaining_credits: number;
sub_user_id: string | null;
total_credits_sum: number;
};
export interface RequestWithMaybeACUC<

View File

@ -0,0 +1,167 @@
// Import necessary dependencies and types
import { AuthCreditUsageChunk } from "../../controllers/v1/types";
import { getACUC, setCachedACUC } from "../../controllers/auth";
import { redlock } from "../redlock";
import { supabase_service } from "../supabase";
import { createPaymentIntent } from "./stripe";
import { issueCredits } from "./issue_credits";
import { sendNotification } from "../notification/email_notification";
import { NotificationType } from "../../types";
import { deleteKey, getValue, setValue } from "../redis";
import { sendSlackWebhook } from "../alerts/slack";
import { Logger } from "../../lib/logger";
// Define the number of credits to be added during auto-recharge
const AUTO_RECHARGE_CREDITS = 1000;
const AUTO_RECHARGE_COOLDOWN = 300; // 5 minutes in seconds
/**
* Attempt to automatically charge a user's account when their credit balance falls below a threshold
* @param chunk The user's current usage data
* @param autoRechargeThreshold The credit threshold that triggers auto-recharge
*/
export async function autoCharge(
chunk: AuthCreditUsageChunk,
autoRechargeThreshold: number
): Promise<{ success: boolean; message: string; remainingCredits: number; chunk: AuthCreditUsageChunk }> {
const resource = `auto-recharge:${chunk.team_id}`;
const cooldownKey = `auto-recharge-cooldown:${chunk.team_id}`;
try {
// Check if the team is in the cooldown period
// Another check to prevent race conditions, double charging - cool down of 5 minutes
const cooldownValue = await getValue(cooldownKey);
if (cooldownValue) {
Logger.info(`Auto-recharge for team ${chunk.team_id} is in cooldown period`);
return {
success: false,
message: "Auto-recharge is in cooldown period",
remainingCredits: chunk.remaining_credits,
chunk,
};
}
// Use a distributed lock to prevent concurrent auto-charge attempts
return await redlock.using([resource], 5000, async (signal) : Promise<{ success: boolean; message: string; remainingCredits: number; chunk: AuthCreditUsageChunk }> => {
// Recheck the condition inside the lock to prevent race conditions
const updatedChunk = await getACUC(chunk.api_key, false, false);
if (
updatedChunk &&
updatedChunk.remaining_credits < autoRechargeThreshold
) {
if (chunk.sub_user_id) {
// Fetch the customer's Stripe information
const { data: customer, error: customersError } =
await supabase_service
.from("customers")
.select("id, stripe_customer_id")
.eq("id", chunk.sub_user_id)
.single();
if (customersError) {
Logger.error(`Error fetching customer data: ${customersError}`);
return {
success: false,
message: "Error fetching customer data",
remainingCredits: chunk.remaining_credits,
chunk,
};
}
if (customer && customer.stripe_customer_id) {
let issueCreditsSuccess = false;
// Attempt to create a payment intent
const paymentStatus = await createPaymentIntent(
chunk.team_id,
customer.stripe_customer_id
);
// If payment is successful or requires further action, issue credits
if (
paymentStatus.return_status === "succeeded" ||
paymentStatus.return_status === "requires_action"
) {
issueCreditsSuccess = await issueCredits(
chunk.team_id,
AUTO_RECHARGE_CREDITS
);
}
// Record the auto-recharge transaction
await supabase_service.from("auto_recharge_transactions").insert({
team_id: chunk.team_id,
initial_payment_status: paymentStatus.return_status,
credits_issued: issueCreditsSuccess ? AUTO_RECHARGE_CREDITS : 0,
stripe_charge_id: paymentStatus.charge_id,
});
// Send a notification if credits were successfully issued
if (issueCreditsSuccess) {
await sendNotification(
chunk.team_id,
NotificationType.AUTO_RECHARGE_SUCCESS,
chunk.sub_current_period_start,
chunk.sub_current_period_end,
chunk,
true
);
// Set cooldown period
await setValue(cooldownKey, 'true', AUTO_RECHARGE_COOLDOWN);
}
// Reset ACUC cache to reflect the new credit balance
const cacheKeyACUC = `acuc_${chunk.api_key}`;
await deleteKey(cacheKeyACUC);
if (process.env.SLACK_ADMIN_WEBHOOK_URL ) {
sendSlackWebhook(
`Auto-recharge successful: Team ${chunk.team_id}. ${AUTO_RECHARGE_CREDITS} credits added. Payment status: ${paymentStatus.return_status}. User was notified via email.`,
false,
process.env.SLACK_ADMIN_WEBHOOK_URL
).catch((error) => {
Logger.debug(`Error sending slack notification: ${error}`);
});
}
return {
success: true,
message: "Auto-recharge successful",
remainingCredits: chunk.remaining_credits + AUTO_RECHARGE_CREDITS,
chunk: {...chunk, remaining_credits: chunk.remaining_credits + AUTO_RECHARGE_CREDITS},
};
} else {
Logger.error("No Stripe customer ID found for user");
return {
success: false,
message: "No Stripe customer ID found for user",
remainingCredits: chunk.remaining_credits,
chunk,
};
}
} else {
Logger.error("No sub_user_id found in chunk");
return {
success: false,
message: "No sub_user_id found in chunk",
remainingCredits: chunk.remaining_credits,
chunk,
};
}
}
return {
success: false,
message: "No need to auto-recharge",
remainingCredits: chunk.remaining_credits,
chunk,
};
});
} catch (error) {
Logger.error(`Failed to acquire lock for auto-recharge: ${error}`);
return {
success: false,
message: "Failed to acquire lock for auto-recharge",
remainingCredits: chunk.remaining_credits,
chunk,
};
}
}

View File

@ -6,24 +6,40 @@ import { Logger } from "../../lib/logger";
import * as Sentry from "@sentry/node";
import { AuthCreditUsageChunk } from "../../controllers/v1/types";
import { getACUC, setCachedACUC } from "../../controllers/auth";
import { issueCredits } from "./issue_credits";
import { redlock } from "../redlock";
import { autoCharge } from "./auto_charge";
import { getValue, setValue } from "../redis";
const FREE_CREDITS = 500;
/**
* If you do not know the subscription_id in the current context, pass subscription_id as undefined.
*/
export async function billTeam(team_id: string, subscription_id: string | null | undefined, credits: number) {
export async function billTeam(
team_id: string,
subscription_id: string | null | undefined,
credits: number
) {
return withAuth(supaBillTeam)(team_id, subscription_id, credits);
}
export async function supaBillTeam(team_id: string, subscription_id: string, credits: number) {
export async function supaBillTeam(
team_id: string,
subscription_id: string,
credits: number
) {
if (team_id === "preview") {
return { success: true, message: "Preview team, no credits used" };
}
Logger.info(`Billing team ${team_id} for ${credits} credits`);
const { data, error } =
await supabase_service.rpc("bill_team", { _team_id: team_id, sub_id: subscription_id ?? null, fetch_subscription: subscription_id === undefined, credits });
const { data, error } = await supabase_service.rpc("bill_team", {
_team_id: team_id,
sub_id: subscription_id ?? null,
fetch_subscription: subscription_id === undefined,
credits,
});
if (error) {
Sentry.captureException(error);
Logger.error("Failed to bill team: " + JSON.stringify(error));
@ -31,48 +47,109 @@ export async function supaBillTeam(team_id: string, subscription_id: string, cre
}
(async () => {
for (const apiKey of (data ?? []).map(x => x.api_key)) {
await setCachedACUC(apiKey, acuc => (acuc ? {
...acuc,
credits_used: acuc.credits_used + credits,
adjusted_credits_used: acuc.adjusted_credits_used + credits,
remaining_credits: acuc.remaining_credits - credits,
} : null));
for (const apiKey of (data ?? []).map((x) => x.api_key)) {
await setCachedACUC(apiKey, (acuc) =>
acuc
? {
...acuc,
credits_used: acuc.credits_used + credits,
adjusted_credits_used: acuc.adjusted_credits_used + credits,
remaining_credits: acuc.remaining_credits - credits,
}
: null
);
}
})();
}
export async function checkTeamCredits(chunk: AuthCreditUsageChunk, team_id: string, credits: number) {
return withAuth(supaCheckTeamCredits)(chunk, team_id, credits);
export async function checkTeamCredits(
chunk: AuthCreditUsageChunk,
team_id: string,
credits: number
): Promise<{ success: boolean; message: string; remainingCredits: number; chunk: AuthCreditUsageChunk }> {
const result = await withAuth(supaCheckTeamCredits)(chunk, team_id, credits);
return {
success: result.success,
message: result.message,
remainingCredits: result.remainingCredits,
chunk: chunk // Ensure chunk is always returned
};
}
// if team has enough credits for the operation, return true, else return false
export async function supaCheckTeamCredits(chunk: AuthCreditUsageChunk, team_id: string, credits: number) {
export async function supaCheckTeamCredits(
chunk: AuthCreditUsageChunk,
team_id: string,
credits: number
) {
// WARNING: chunk will be null if team_id is preview -- do not perform operations on it under ANY circumstances - mogery
if (team_id === "preview") {
return { success: true, message: "Preview team, no credits used", remainingCredits: Infinity };
return {
success: true,
message: "Preview team, no credits used",
remainingCredits: Infinity,
};
}
const creditsWillBeUsed = chunk.adjusted_credits_used + credits;
// In case chunk.price_credits is undefined, set it to a large number to avoid mistakes
const totalPriceCredits = chunk.price_credits ?? 100000000;
const totalPriceCredits = chunk.total_credits_sum ?? 100000000;
// Removal of + credits
const creditUsagePercentage = chunk.adjusted_credits_used / totalPriceCredits;
let isAutoRechargeEnabled = false, autoRechargeThreshold = 1000;
const cacheKey = `team_auto_recharge_${team_id}`;
let cachedData = await getValue(cacheKey);
if (cachedData) {
const parsedData = JSON.parse(cachedData);
isAutoRechargeEnabled = parsedData.auto_recharge;
autoRechargeThreshold = parsedData.auto_recharge_threshold;
} else {
const { data, error } = await supabase_service
.from("teams")
.select("auto_recharge, auto_recharge_threshold")
.eq("id", team_id)
.single();
if (data) {
isAutoRechargeEnabled = data.auto_recharge;
autoRechargeThreshold = data.auto_recharge_threshold;
await setValue(cacheKey, JSON.stringify(data), 300); // Cache for 5 minutes (300 seconds)
}
}
if (isAutoRechargeEnabled && chunk.remaining_credits < autoRechargeThreshold) {
const autoChargeResult = await autoCharge(chunk, autoRechargeThreshold);
if (autoChargeResult.success) {
return {
success: true,
message: autoChargeResult.message,
remainingCredits: autoChargeResult.remainingCredits,
chunk: autoChargeResult.chunk,
};
}
}
// Compare the adjusted total credits used with the credits allowed by the plan
if (creditsWillBeUsed > totalPriceCredits) {
// Only notify if their actual credits (not what they will use) used is greater than the total price credits
if(chunk.adjusted_credits_used > totalPriceCredits) {
if (chunk.adjusted_credits_used > totalPriceCredits) {
sendNotification(
team_id,
NotificationType.LIMIT_REACHED,
chunk.sub_current_period_start,
chunk.sub_current_period_end,
chunk
);
}
return { success: false, message: "Insufficient credits to perform this request. For more credits, you can upgrade your plan at https://firecrawl.dev/pricing.", remainingCredits: chunk.remaining_credits, chunk };
NotificationType.LIMIT_REACHED,
chunk.sub_current_period_start,
chunk.sub_current_period_end,
chunk
);
}
return {
success: false,
message:
"Insufficient credits to perform this request. For more credits, you can upgrade your plan at https://firecrawl.dev/pricing.",
remainingCredits: chunk.remaining_credits,
chunk,
};
} else if (creditUsagePercentage >= 0.8 && creditUsagePercentage < 1) {
// Send email notification for approaching credit limit
sendNotification(
@ -84,7 +161,12 @@ export async function supaCheckTeamCredits(chunk: AuthCreditUsageChunk, team_id:
);
}
return { success: true, message: "Sufficient credits available", remainingCredits: chunk.remaining_credits, chunk };
return {
success: true,
message: "Sufficient credits available",
remainingCredits: chunk.remaining_credits,
chunk,
};
}
// Count the total credits used by a team within the current billing period and return the remaining credits.

View File

@ -0,0 +1,20 @@
import { Logger } from "../../lib/logger";
import { supabase_service } from "../supabase";
export async function issueCredits(team_id: string, credits: number) {
// Add an entry to supabase coupons
const { data, error } = await supabase_service.from("coupons").insert({
team_id: team_id,
credits: credits,
status: "active",
// indicates that this coupon was issued from auto recharge
from_auto_recharge: true,
});
if (error) {
Logger.error(`Error adding coupon: ${error}`);
return false;
}
return true;
}

View File

@ -0,0 +1,51 @@
import { Logger } from "../../lib/logger";
import Stripe from "stripe";
const stripe = new Stripe(process.env.STRIPE_SECRET_KEY ?? "");
async function getCustomerDefaultPaymentMethod(customerId: string) {
const paymentMethods = await stripe.customers.listPaymentMethods(customerId, {
limit: 3,
});
return paymentMethods.data[0]?.id;
}
type ReturnStatus = "succeeded" | "requires_action" | "failed";
export async function createPaymentIntent(
team_id: string,
customer_id: string
): Promise<{ return_status: ReturnStatus; charge_id: string }> {
try {
const paymentIntent = await stripe.paymentIntents.create({
amount: 1100,
currency: "usd",
customer: customer_id,
description: "Firecrawl: Auto re-charge of 1000 credits",
payment_method_types: ["card"],
payment_method: await getCustomerDefaultPaymentMethod(customer_id),
off_session: true,
confirm: true,
});
if (paymentIntent.status === "succeeded") {
Logger.info(`Payment succeeded for team: ${team_id}`);
return { return_status: "succeeded", charge_id: paymentIntent.id };
} else if (
paymentIntent.status === "requires_action" ||
paymentIntent.status === "processing" ||
paymentIntent.status === "requires_capture"
) {
Logger.warn(`Payment requires further action for team: ${team_id}`);
return { return_status: "requires_action", charge_id: paymentIntent.id };
} else {
Logger.error(`Payment failed for team: ${team_id}`);
return { return_status: "failed", charge_id: paymentIntent.id };
}
} catch (error) {
Logger.error(
`Failed to create or confirm PaymentIntent for team: ${team_id}`
);
console.error(error);
return { return_status: "failed", charge_id: "" };
}
}

View File

@ -24,6 +24,14 @@ const emailTemplates: Record<
subject: "Rate Limit Reached - Firecrawl",
html: "Hey there,<br/><p>You've hit one of the Firecrawl endpoint's rate limit! Take a breather and try again in a few moments. If you need higher rate limits, consider upgrading your plan. Check out our <a href='https://firecrawl.dev/pricing'>pricing page</a> for more info.</p><p>If you have any questions, feel free to reach out to us at <a href='mailto:hello@firecrawl.com'>hello@firecrawl.com</a></p><br/>Thanks,<br/>Firecrawl Team<br/><br/>Ps. this email is only sent once every 7 days if you reach a rate limit.",
},
[NotificationType.AUTO_RECHARGE_SUCCESS]: {
subject: "Auto recharge successful - Firecrawl",
html: "Hey there,<br/><p>Your account was successfully recharged with 1000 credits because your remaining credits were below the threshold. Consider upgrading your plan at <a href='https://firecrawl.dev/pricing'>firecrawl.dev/pricing</a> to avoid hitting the limit.</p><br/>Thanks,<br/>Firecrawl Team<br/>",
},
[NotificationType.AUTO_RECHARGE_FAILED]: {
subject: "Auto recharge failed - Firecrawl",
html: "Hey there,<br/><p>Your auto recharge failed. Please try again manually. If the issue persists, please reach out to us at <a href='mailto:hello@firecrawl.com'>hello@firecrawl.com</a></p><br/>Thanks,<br/>Firecrawl Team<br/>",
},
};
export async function sendNotification(
@ -31,18 +39,20 @@ export async function sendNotification(
notificationType: NotificationType,
startDateString: string,
endDateString: string,
chunk: AuthCreditUsageChunk
chunk: AuthCreditUsageChunk,
bypassRecentChecks: boolean = false
) {
return withAuth(sendNotificationInternal)(
team_id,
notificationType,
startDateString,
endDateString,
chunk
chunk,
bypassRecentChecks
);
}
async function sendEmailNotification(
export async function sendEmailNotification(
email: string,
notificationType: NotificationType,
) {
@ -72,90 +82,94 @@ export async function sendNotificationInternal(
notificationType: NotificationType,
startDateString: string,
endDateString: string,
chunk: AuthCreditUsageChunk
chunk: AuthCreditUsageChunk,
bypassRecentChecks: boolean = false
): Promise<{ success: boolean }> {
if (team_id === "preview") {
return { success: true };
}
const fifteenDaysAgo = new Date();
fifteenDaysAgo.setDate(fifteenDaysAgo.getDate() - 15);
if (!bypassRecentChecks) {
const fifteenDaysAgo = new Date();
fifteenDaysAgo.setDate(fifteenDaysAgo.getDate() - 15);
const { data, error } = await supabase_service
.from("user_notifications")
.select("*")
.eq("team_id", team_id)
.eq("notification_type", notificationType)
.gte("sent_date", fifteenDaysAgo.toISOString());
if (error) {
Logger.debug(`Error fetching notifications: ${error}`);
return { success: false };
}
if (data.length !== 0) {
// Logger.debug(`Notification already sent for team_id: ${team_id} and notificationType: ${notificationType} in the last 15 days`);
return { success: false };
}
const { data: recentData, error: recentError } = await supabase_service
.from("user_notifications")
.select("*")
.eq("team_id", team_id)
.eq("notification_type", notificationType)
.gte("sent_date", startDateString)
.lte("sent_date", endDateString);
if (recentError) {
Logger.debug(`Error fetching recent notifications: ${recentError}`);
return { success: false };
}
if (recentData.length !== 0) {
// Logger.debug(`Notification already sent for team_id: ${team_id} and notificationType: ${notificationType} within the specified date range`);
return { success: false };
} else {
console.log(`Sending notification for team_id: ${team_id} and notificationType: ${notificationType}`);
// get the emails from the user with the team_id
const { data: emails, error: emailsError } = await supabase_service
.from("users")
.select("email")
.eq("team_id", team_id);
if (emailsError) {
Logger.debug(`Error fetching emails: ${emailsError}`);
return { success: false };
}
for (const email of emails) {
await sendEmailNotification(email.email, notificationType);
}
const { error: insertError } = await supabase_service
const { data, error } = await supabase_service
.from("user_notifications")
.insert([
{
team_id: team_id,
notification_type: notificationType,
sent_date: new Date().toISOString(),
},
]);
.select("*")
.eq("team_id", team_id)
.eq("notification_type", notificationType)
.gte("sent_date", fifteenDaysAgo.toISOString());
if (process.env.SLACK_ADMIN_WEBHOOK_URL && emails.length > 0) {
sendSlackWebhook(
`${getNotificationString(notificationType)}: Team ${team_id}, with email ${emails[0].email}. Number of credits used: ${chunk.adjusted_credits_used} | Number of credits in the plan: ${chunk.price_credits}`,
false,
process.env.SLACK_ADMIN_WEBHOOK_URL
).catch((error) => {
Logger.debug(`Error sending slack notification: ${error}`);
});
}
if (insertError) {
Logger.debug(`Error inserting notification record: ${insertError}`);
if (error) {
Logger.debug(`Error fetching notifications: ${error}`);
return { success: false };
}
return { success: true };
if (data.length !== 0) {
return { success: false };
}
// TODO: observation: Free credits people are not receiving notifications
const { data: recentData, error: recentError } = await supabase_service
.from("user_notifications")
.select("*")
.eq("team_id", team_id)
.eq("notification_type", notificationType)
.gte("sent_date", startDateString)
.lte("sent_date", endDateString);
if (recentError) {
Logger.debug(`Error fetching recent notifications: ${recentError.message}`);
return { success: false };
}
if (recentData.length !== 0) {
return { success: false };
}
}
console.log(`Sending notification for team_id: ${team_id} and notificationType: ${notificationType}`);
// get the emails from the user with the team_id
const { data: emails, error: emailsError } = await supabase_service
.from("users")
.select("email")
.eq("team_id", team_id);
if (emailsError) {
Logger.debug(`Error fetching emails: ${emailsError}`);
return { success: false };
}
for (const email of emails) {
await sendEmailNotification(email.email, notificationType);
}
const { error: insertError } = await supabase_service
.from("user_notifications")
.insert([
{
team_id: team_id,
notification_type: notificationType,
sent_date: new Date().toISOString(),
},
]);
if (process.env.SLACK_ADMIN_WEBHOOK_URL && emails.length > 0) {
sendSlackWebhook(
`${getNotificationString(notificationType)}: Team ${team_id}, with email ${emails[0].email}. Number of credits used: ${chunk.adjusted_credits_used} | Number of credits in the plan: ${chunk.price_credits}`,
false,
process.env.SLACK_ADMIN_WEBHOOK_URL
).catch((error) => {
Logger.debug(`Error sending slack notification: ${error}`);
});
}
if (insertError) {
Logger.debug(`Error inserting notification record: ${insertError}`);
return { success: false };
}
return { success: true };
}

View File

@ -11,6 +11,10 @@ export function getNotificationString(
return "Limit reached (100%)";
case NotificationType.RATE_LIMIT_REACHED:
return "Rate limit reached";
case NotificationType.AUTO_RECHARGE_SUCCESS:
return "Auto-recharge successful";
case NotificationType.AUTO_RECHARGE_FAILED:
return "Auto-recharge failed";
default:
return "Unknown notification type";
}

View File

@ -130,6 +130,8 @@ export enum NotificationType {
APPROACHING_LIMIT = "approachingLimit",
LIMIT_REACHED = "limitReached",
RATE_LIMIT_REACHED = "rateLimitReached",
AUTO_RECHARGE_SUCCESS = "autoRechargeSuccess",
AUTO_RECHARGE_FAILED = "autoRechargeFailed",
}
export type ScrapeLog = {