From 3ca621b10e86d4c4bf038d4affbb484e467012f1 Mon Sep 17 00:00:00 2001 From: James Grugett Date: Mon, 11 May 2026 23:29:07 -0700 Subject: [PATCH] Improve freebuff request throughput --- packages/billing/src/balance-calculator.ts | 171 ++++++++++---- .../completions/__tests__/completions.test.ts | 45 ++++ web/src/app/api/v1/chat/completions/_post.ts | 210 +++++++++++------- web/src/llm-api/helpers.ts | 34 ++- 4 files changed, 329 insertions(+), 131 deletions(-) diff --git a/packages/billing/src/balance-calculator.ts b/packages/billing/src/balance-calculator.ts index 6c4f7d6820..784d2ed196 100644 --- a/packages/billing/src/balance-calculator.ts +++ b/packages/billing/src/balance-calculator.ts @@ -38,6 +38,30 @@ export interface CreditConsumptionResult { fromPurchased: number } +export type MessageRecordParams = { + messageId: string + userId: string + agentId: string + clientId: string | null + clientRequestId: string | null + startTime: Date + model: string + reasoningText: string + response: string + cost: number + credits: number + byok: boolean + inputTokens: number + cacheCreationInputTokens: number | null + cacheReadInputTokens: number + reasoningTokens: number | null + outputTokens: number + ttftMs: number | null + logger: Logger + finishedAt?: Date + latencyMs?: number +} + // Add a minimal structural type that both `db` and `tx` satisfy type DbConn = Pick< typeof db, @@ -148,7 +172,14 @@ export async function updateGrantBalance(params: { tx: DbConn logger: Logger }) { - const { userId: _userId, grant, consumed: _consumed, newBalance, tx, logger: _logger } = params + const { + userId: _userId, + grant, + consumed: _consumed, + newBalance, + tx, + logger: _logger, + } = params await tx .update(schema.creditLedger) .set({ balance: newBalance }) @@ -282,8 +313,14 @@ export async function calculateUsageAndBalance( includeSubscriptionCredits: false, ...params, } - const { userId, quotaResetDate, now, isPersonalContext, includeSubscriptionCredits, logger } = - withDefaults + const { + userId, + quotaResetDate, + now, + isPersonalContext, + includeSubscriptionCredits, + logger, + } = withDefaults // Get all relevant grants in one query, using the provided connection const grants = await getOrderedActiveGrants(withDefaults) @@ -328,7 +365,11 @@ export async function calculateUsageAndBalance( // Skip subscription credits for personal context unless explicitly included // (subscription credits are shown separately in the CLI with progress bars, // but need to be included for credit gating after ensureSubscriberBlockGrant) - if (isPersonalContext && grantType === 'subscription' && !includeSubscriptionCredits) { + if ( + isPersonalContext && + grantType === 'subscription' && + !includeSubscriptionCredits + ) { continue } @@ -506,6 +547,78 @@ function extractPostgresErrorDetails(error: unknown): Record { return details } +export async function recordMessageWithoutBilling( + params: MessageRecordParams, +): Promise { + const { + messageId, + userId, + agentId, + clientId, + clientRequestId, + startTime, + model, + reasoningText, + response, + cost, + credits, + byok, + inputTokens, + cacheCreationInputTokens, + cacheReadInputTokens, + reasoningTokens, + outputTokens, + ttftMs, + logger, + } = params + + if (userId === TEST_USER_ID) { + return + } + + const finishedAt = params.finishedAt ?? new Date() + const latencyMs = + params.latencyMs ?? finishedAt.getTime() - startTime.getTime() + + try { + await db + .insert(schema.message) + .values({ + id: messageId, + agent_id: agentId, + finished_at: finishedAt, + client_id: clientId, + client_request_id: clientRequestId, + model, + reasoning_text: reasoningText, + response, + input_tokens: inputTokens, + cache_creation_input_tokens: cacheCreationInputTokens, + cache_read_input_tokens: cacheReadInputTokens, + reasoning_tokens: reasoningTokens, + output_tokens: outputTokens, + cost: cost.toString(), + credits, + byok, + latency_ms: latencyMs, + ttft_ms: ttftMs, + user_id: userId, + }) + .onConflictDoNothing({ target: schema.message.id }) + } catch (error) { + logger.error( + { + messageId, + userId, + agentId, + error: getErrorObject(error), + pgDetails: extractPostgresErrorDetails(error), + }, + 'Failed to insert message row', + ) + } +} + export async function consumeCreditsAndAddAgentStep(params: { messageId: string userId: string @@ -704,51 +817,21 @@ export async function consumeCreditsAndAddAgentStep(params: { // Always record the message row. If billing failed, mark credits=0 so the // audit row still exists — the row being absent is how OR costs leaked before. const recordedCredits = billingError === null ? credits : 0 - - try { - await db - .insert(schema.message) - .values({ - id: messageId, - agent_id: agentId, - finished_at: new Date(), - client_id: clientId, - client_request_id: clientRequestId, - model, - reasoning_text: reasoningText, - response, - input_tokens: inputTokens, - cache_creation_input_tokens: cacheCreationInputTokens, - cache_read_input_tokens: cacheReadInputTokens, - reasoning_tokens: reasoningTokens, - output_tokens: outputTokens, - cost: cost.toString(), - credits: recordedCredits, - byok, - latency_ms: latencyMs, - ttft_ms: ttftMs, - user_id: userId, - }) - .onConflictDoNothing({ target: schema.message.id }) - } catch (error) { - logger.error( - { - messageId, - userId, - agentId, - error: getErrorObject(error), - pgDetails: extractPostgresErrorDetails(error), - }, - 'Failed to insert message row', - ) - } + await recordMessageWithoutBilling({ + ...params, + credits: recordedCredits, + finishedAt, + latencyMs, + }) if (billingError) { return failure(billingError) } - const finalResult: CreditConsumptionResult = - consumeResult ?? { consumed: 0, fromPurchased: 0 } + const finalResult: CreditConsumptionResult = consumeResult ?? { + consumed: 0, + fromPurchased: 0, + } logger.info( { diff --git a/web/src/app/api/v1/chat/completions/__tests__/completions.test.ts b/web/src/app/api/v1/chat/completions/__tests__/completions.test.ts index 1ec5a37a51..8bf708487e 100644 --- a/web/src/app/api/v1/chat/completions/__tests__/completions.test.ts +++ b/web/src/app/api/v1/chat/completions/__tests__/completions.test.ts @@ -566,6 +566,51 @@ describe('/api/v1/chat/completions POST endpoint', () => { FETCH_PATH_TEST_TIMEOUT_MS, ) + it( + 'skips duplicate country checks when an active freebuff session gate admits the request', + async () => { + const req = new NextRequest( + 'http://localhost:3000/api/v1/chat/completions', + { + method: 'POST', + headers: { + Authorization: 'Bearer test-api-key-new-free', + 'cf-ipcountry': 'T1', + 'x-forwarded-for': '8.8.8.8', + }, + body: JSON.stringify({ + model: 'minimax/minimax-m2.7', + stream: false, + codebuff_metadata: { + run_id: 'run-free', + client_id: 'test-client-id-123', + cost_mode: 'free', + freebuff_instance_id: 'active-instance-123', + }, + }), + }, + ) + + const response = await postChatCompletions({ + req, + getUserInfoFromApiKey: mockGetUserInfoFromApiKey, + logger: mockLogger, + trackEvent: mockTrackEvent, + getUserUsageData: mockGetUserUsageData, + getAgentRunFromId: mockGetAgentRunFromId, + fetch: mockFetch, + insertMessageBigquery: mockInsertMessageBigquery, + loggerWithContext: mockLoggerWithContext, + checkSessionAdmissible: async () => + ({ ok: true, reason: 'active', remainingMs: 60_000 }) as const, + }) + + expect(response.status).toBe(200) + expect(mockGetUserUsageData).not.toHaveBeenCalled() + }, + FETCH_PATH_TEST_TIMEOUT_MS, + ) + it( 'lets a BYOK free-tier new account through the paid-plan gate', async () => { diff --git a/web/src/app/api/v1/chat/completions/_post.ts b/web/src/app/api/v1/chat/completions/_post.ts index 26da944a11..6a61be1739 100644 --- a/web/src/app/api/v1/chat/completions/_post.ts +++ b/web/src/app/api/v1/chat/completions/_post.ts @@ -131,6 +131,17 @@ export const formatQuotaResetCountdown = ( export type CheckSessionAdmissibleFn = typeof checkSessionAdmissible export type CheckFreeModeRateLimitFn = typeof defaultCheckFreeModeRateLimit +const FREEBUFF_SUCCESS_SAMPLE_RATE = 0.01 + +function sampleSuccessLogger(logger: Logger, sampled: boolean): Logger { + if (sampled) return logger + return { + ...logger, + info: (() => {}) as Logger['info'], + debug: (() => {}) as Logger['debug'], + } +} + type GateRejectCode = Extract['code'] const STATUS_BY_GATE_CODE = { @@ -207,6 +218,14 @@ export async function postChatCompletions(params: { // Check if the request is in FREE mode (costs 0 credits for allowed agent+model combos) const costMode = typedBody.codebuff_metadata?.cost_mode const isFreeModeRequest = isFreeMode(costMode) + const sampleFreebuffSuccess = + !isFreeModeRequest || Math.random() < FREEBUFF_SUCCESS_SAMPLE_RATE + + const trackSuccessEvent: TrackEventFn = (eventParams) => { + if (sampleFreebuffSuccess) { + trackEvent(eventParams) + } + } trackEvent = withDefaultProperties(trackEvent, { freebuff: isFreeModeRequest, @@ -267,8 +286,9 @@ export async function postChatCompletions(params: { ) } - // Track API request - trackEvent({ + // Track API request. Freebuff success-path analytics are sampled to keep + // high-volume free traffic from dominating PostHog and log forwarding. + trackSuccessEvent({ event: AnalyticsEvent.CHAT_COMPLETIONS_REQUEST, userId, properties: { @@ -279,54 +299,6 @@ export async function postChatCompletions(params: { logger, }) - // For free mode requests, require a resolved allowlisted country. - if (isFreeModeRequest) { - const countryAccess = await getFreeModeCountryAccess(req, { - fetch, - ipinfoToken: env.IPINFO_TOKEN, - ipHashSecret: env.NEXTAUTH_SECRET, - allowLocalhost: env.NEXT_PUBLIC_CB_ENVIRONMENT === 'dev', - }) - - logger.info( - { - cfHeader: countryAccess.cfCountry, - geoipResult: countryAccess.geoipCountry, - resolvedCountry: countryAccess.countryCode, - countryBlockReason: countryAccess.blockReason, - ipPrivacySignals: countryAccess.ipPrivacy?.signals, - clientIp: countryAccess.hasClientIp ? '[redacted]' : undefined, - }, - 'Free mode country detection', - ) - - if (!countryAccess.allowed) { - trackEvent({ - event: AnalyticsEvent.CHAT_COMPLETIONS_VALIDATION_ERROR, - userId, - properties: { - error: 'free_mode_not_available_in_country', - countryCode: countryAccess.countryCode, - countryBlockReason: countryAccess.blockReason, - ipPrivacySignals: countryAccess.ipPrivacy?.signals, - clientIp: countryAccess.hasClientIp ? '[redacted]' : undefined, - }, - logger, - }) - - return NextResponse.json( - { - error: 'free_mode_unavailable', - message: 'Free mode is not available in your country.', - countryCode: countryAccess.countryCode ?? 'UNKNOWN', - countryBlockReason: countryAccess.blockReason, - ipPrivacySignals: countryAccess.ipPrivacy?.signals, - }, - { status: 403 }, - ) - } - } - // Extract and validate agent run ID const runIdFromBody = typedBody.codebuff_metadata?.run_id if (!runIdFromBody || typeof runIdFromBody !== 'string') { @@ -457,29 +429,91 @@ export async function postChatCompletions(params: { } } + let freeModeSessionGate: SessionGateResult | null = null + // Freebuff waiting-room gate. Usually enforced only when // FREEBUFF_WAITING_ROOM_ENABLED=true. Runs before the rate limiter so // rejected requests don't burn a queued user's free-mode counters. if (isFreeModeRequest) { const claimedInstanceId = typedBody.codebuff_metadata?.freebuff_instance_id - const gate = await checkSession({ + freeModeSessionGate = await checkSession({ userId, userEmail: userInfo.email, claimedInstanceId, requestedModel: typedBody.model, requireActiveSession: isFreebuffGeminiThinkerAgent(agentId), }) - if (!gate.ok) { + if (!freeModeSessionGate.ok) { trackEvent({ event: AnalyticsEvent.CHAT_COMPLETIONS_VALIDATION_ERROR, userId, - properties: { error: gate.code }, + properties: { error: freeModeSessionGate.code }, logger, }) return NextResponse.json( - { error: gate.code, message: gate.message }, - { status: STATUS_BY_GATE_CODE[gate.code] }, + { + error: freeModeSessionGate.code, + message: freeModeSessionGate.message, + }, + { status: STATUS_BY_GATE_CODE[freeModeSessionGate.code] }, + ) + } + } + + // For free mode requests, require a resolved allowlisted country only + // when the waiting-room gate is disabled/bypassed. Active waiting-room + // sessions already passed the POST /freebuff/session country/privacy gate, + // so repeating IPinfo/GeoIP work on every chat completion just burns hot + // path capacity. + if ( + isFreeModeRequest && + (!freeModeSessionGate || freeModeSessionGate.reason === 'disabled') + ) { + const countryAccess = await getFreeModeCountryAccess(req, { + fetch, + ipinfoToken: env.IPINFO_TOKEN, + ipHashSecret: env.NEXTAUTH_SECRET, + allowLocalhost: env.NEXT_PUBLIC_CB_ENVIRONMENT === 'dev', + }) + + if (!countryAccess.allowed || sampleFreebuffSuccess) { + logger.info( + { + cfHeader: countryAccess.cfCountry, + geoipResult: countryAccess.geoipCountry, + resolvedCountry: countryAccess.countryCode, + countryBlockReason: countryAccess.blockReason, + ipPrivacySignals: countryAccess.ipPrivacy?.signals, + clientIp: countryAccess.hasClientIp ? '[redacted]' : undefined, + }, + 'Free mode country detection', + ) + } + + if (!countryAccess.allowed) { + trackEvent({ + event: AnalyticsEvent.CHAT_COMPLETIONS_VALIDATION_ERROR, + userId, + properties: { + error: 'free_mode_not_available_in_country', + countryCode: countryAccess.countryCode, + countryBlockReason: countryAccess.blockReason, + ipPrivacySignals: countryAccess.ipPrivacy?.signals, + clientIp: countryAccess.hasClientIp ? '[redacted]' : undefined, + }, + logger, + }) + + return NextResponse.json( + { + error: 'free_mode_unavailable', + message: 'Free mode is not available in your country.', + countryCode: countryAccess.countryCode ?? 'UNKNOWN', + countryBlockReason: countryAccess.blockReason, + ipPrivacySignals: countryAccess.ipPrivacy?.signals, + }, + { status: 403 }, ) } } @@ -522,8 +556,9 @@ export async function postChatCompletions(params: { // This is done AFTER validation so malformed requests don't start a new 5-hour block. // When the function is provided, always include subscription credits in the balance: // error/null results mean subscription grants have 0 balance, so including them is harmless. - const includeSubscriptionCredits = !!ensureSubscriberBlockGrant - if (ensureSubscriberBlockGrant) { + const includeSubscriptionCredits = + !isFreeModeRequest && !!ensureSubscriberBlockGrant + if (!isFreeModeRequest && ensureSubscriberBlockGrant) { try { const blockGrantResult = await ensureSubscriberBlockGrant({ userId, @@ -541,7 +576,7 @@ export async function postChatCompletions(params: { ? await getUserPreferences({ userId, logger }) : { fallbackToALaCarte: true } // Default to allowing a-la-carte if no preference function - if (!preferences.fallbackToALaCarte && !isFreeModeRequest) { + if (!preferences.fallbackToALaCarte) { const resetTime = blockGrantResult.resetsAt const resetCountdown = formatQuotaResetCountdown( resetTime.toISOString(), @@ -589,32 +624,37 @@ export async function postChatCompletions(params: { } } - // Fetch user credit data (includes subscription credits when block grant was ensured) - const { - balance: { totalRemaining }, - nextQuotaReset, - } = await getUserUsageData({ userId, logger, includeSubscriptionCredits }) - - // Credit check - if (totalRemaining <= 0 && !isFreeModeRequest) { - trackEvent({ - event: AnalyticsEvent.CHAT_COMPLETIONS_INSUFFICIENT_CREDITS, - userId, - properties: { - totalRemaining, - nextQuotaReset, - }, - logger, - }) - return NextResponse.json( - { - message: `Out of credits. Please add credits at ${env.NEXT_PUBLIC_CODEBUFF_APP_URL}/usage.`, - }, - { status: 402 }, - ) + // Free-mode requests have already passed their model/session/rate gates + // and should not touch paid billing/usage paths. + if (!isFreeModeRequest) { + // Fetch user credit data (includes subscription credits when block grant was ensured) + const { + balance: { totalRemaining }, + nextQuotaReset, + } = await getUserUsageData({ userId, logger, includeSubscriptionCredits }) + + // Credit check + if (totalRemaining <= 0) { + trackEvent({ + event: AnalyticsEvent.CHAT_COMPLETIONS_INSUFFICIENT_CREDITS, + userId, + properties: { + totalRemaining, + nextQuotaReset, + }, + logger, + }) + return NextResponse.json( + { + message: `Out of credits. Please add credits at ${env.NEXT_PUBLIC_CODEBUFF_APP_URL}/usage.`, + }, + { status: 402 }, + ) + } } const openrouterApiKey = req.headers.get(BYOK_OPENROUTER_HEADER) + const providerLogger = sampleSuccessLogger(logger, sampleFreebuffSuccess) // Handle streaming vs non-streaming try { @@ -649,7 +689,7 @@ export async function postChatCompletions(params: { stripeCustomerId, agentId, fetch, - logger, + logger: providerLogger, insertMessageBigquery, } const stream = useSiliconFlow @@ -671,7 +711,7 @@ export async function postChatCompletions(params: { openrouterApiKey, }) - trackEvent({ + trackSuccessEvent({ event: AnalyticsEvent.CHAT_COMPLETIONS_STREAM_STARTED, userId, properties: { @@ -722,7 +762,7 @@ export async function postChatCompletions(params: { stripeCustomerId, agentId, fetch, - logger, + logger: providerLogger, insertMessageBigquery, } const nonStreamRequest = useSiliconFlow @@ -745,7 +785,7 @@ export async function postChatCompletions(params: { }) const result = await nonStreamRequest - trackEvent({ + trackSuccessEvent({ event: AnalyticsEvent.CHAT_COMPLETIONS_GENERATION_STARTED, userId, properties: { diff --git a/web/src/llm-api/helpers.ts b/web/src/llm-api/helpers.ts index 14e578fa9b..dfee0f306b 100644 --- a/web/src/llm-api/helpers.ts +++ b/web/src/llm-api/helpers.ts @@ -1,5 +1,8 @@ import { setupBigQuery } from '@codebuff/bigquery' -import { consumeCreditsAndAddAgentStep } from '@codebuff/billing' +import { + consumeCreditsAndAddAgentStep, + recordMessageWithoutBilling, +} from '@codebuff/billing' import { isFreeAgent, isFreeMode, @@ -151,7 +154,34 @@ export async function consumeCreditsForMessage(params: { // Also validates publisher to prevent spoofing attacks const isFreeAgentSmallRequest = isFreeAgent(agentId) && initialCredits < 5 - const credits = isFreeModeAndAllowed || isFreeAgentSmallRequest ? 0 : initialCredits + const credits = + isFreeModeAndAllowed || isFreeAgentSmallRequest ? 0 : initialCredits + + if (isFreeModeAndAllowed) { + await recordMessageWithoutBilling({ + messageId, + userId, + agentId, + clientId, + clientRequestId, + startTime, + model, + reasoningText, + response: responseText, + cost: usageData.cost, + credits: 0, + inputTokens: usageData.inputTokens, + cacheCreationInputTokens: null, + cacheReadInputTokens: usageData.cacheReadInputTokens, + reasoningTokens: + usageData.reasoningTokens > 0 ? usageData.reasoningTokens : null, + outputTokens: usageData.outputTokens, + byok, + logger, + ttftMs: ttftMs ?? null, + }) + return 0 + } await consumeCreditsAndAddAgentStep({ messageId,