Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .fernignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ tests/unit/event-capture.test.ts
tests/unit/events.test.ts
tests/unit/logger.test.ts
tests/unit/rules-engine.test.ts
tests/unit/wasm-datastream.test.ts
tests/unit/wasm-integration.test.ts
tests/unit/webhooks.test.ts
tests/unit/wrapper.test.ts
Expand Down
56 changes: 49 additions & 7 deletions src/datastream/datastream-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { RulesEngineClient } from '../rules-engine';
import { Logger } from '../logger';
import { LazyEmitter } from './emitter';
import { partialCompany, partialUser, deepCopyCompany as deepCopyCompanyFn } from './merge';
import * as serializers from '../serialization';

// Import cache providers from the cache module
import type { CacheProvider } from '../cache/types';
Expand Down Expand Up @@ -705,7 +706,18 @@ export class DataStreamClient extends LazyEmitter {
return;
}
} else {
company = message.data as Schematic.RulesengineCompany;
try {
// passthrough (not the Fern default of "fail") so a payload carrying a
// field this SDK's schema doesn't know about yet — e.g. a server that
// ships ahead of the pinned SDK — is canonicalized to camelCase for its
// known fields and kept, rather than dropping the whole entity.
company = serializers.RulesengineCompany.parseOrThrow(message.data, {
unrecognizedObjectKeys: "passthrough",
});
} catch (error) {
this.logger.warn(`Failed to deserialize company payload: ${error}`);
return;
}
}

if (!company) {
Expand Down Expand Up @@ -768,7 +780,14 @@ export class DataStreamClient extends LazyEmitter {
return;
}
} else {
user = message.data as Schematic.RulesengineUser;
try {
user = serializers.RulesengineUser.parseOrThrow(message.data, {
unrecognizedObjectKeys: "passthrough",
});
} catch (error) {
this.logger.warn(`Failed to deserialize user payload: ${error}`);
return;
}
}

if (!user) {
Expand Down Expand Up @@ -808,13 +827,28 @@ export class DataStreamClient extends LazyEmitter {
* handleFlagsMessage processes bulk flags messages
*/
private async handleFlagsMessage(message: DataStreamResp): Promise<void> {
const flags = message.data as Schematic.RulesengineFlag[];
if (!Array.isArray(flags)) {
const rawFlags = message.data as unknown[];

if (!Array.isArray(rawFlags)) {
this.logger.warn('Expected flags array in bulk flags message');
return;
}

const flags: Schematic.RulesengineFlag[] = [];
let parseFailureCount = 0;
let firstFailure: unknown = undefined;
for (const raw of rawFlags) {
try {
flags.push(serializers.RulesengineFlag.parseOrThrow(raw, { unrecognizedObjectKeys: "passthrough" }));
} catch (error) {
parseFailureCount++;
if (firstFailure === undefined) firstFailure = error;
}
}
if (parseFailureCount > 0) {
this.logger.warn(`Failed to deserialize ${parseFailureCount} flag(s) in bulk message: ${String(firstFailure)}`);
}

const results = await Promise.allSettled(
flags
.filter((flag) => flag?.key)
Expand Down Expand Up @@ -854,8 +888,16 @@ export class DataStreamClient extends LazyEmitter {
* handleFlagMessage processes single flag messages
*/
private async handleFlagMessage(message: DataStreamResp): Promise<void> {
const flag = message.data as Schematic.RulesengineFlag;

let flag: Schematic.RulesengineFlag;
try {
flag = serializers.RulesengineFlag.parseOrThrow(message.data, {
unrecognizedObjectKeys: "passthrough",
});
} catch (error) {
this.logger.warn(`Failed to deserialize flag payload: ${error}`);
return;
}

if (!flag?.key) {
return;
}
Expand Down
109 changes: 69 additions & 40 deletions src/datastream/merge.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
import type * as Schematic from "../api/types";
import type { Schema } from "../core/schemas";
import * as serializers from "../serialization";

const PARSE_OPTS = { unrecognizedObjectKeys: "passthrough" as const };

/**
* Helper to read a property that may be in camelCase or snake_case form.
* Wire data from WebSocket uses snake_case; Fern-generated types use camelCase.
* Canonicalizes a raw wire object (snake_case) to camelCase via its Fern
* serializer, falling back to the raw object if parsing fails. Used on nested
* objects arriving in partial payloads so the merged entity keeps a single
* shape — the WASM rules engine rejects objects carrying both casings of the
* same field ("duplicate field"), so per-object purity is load-bearing.
*/
function getProp(obj: Record<string, unknown>, camel: string, snake: string): unknown {
return obj[camel] ?? obj[snake];
function canonicalize<Raw, Parsed>(schema: Schema<Raw, Parsed>, raw: unknown): Parsed {
const result = schema.parse(raw, PARSE_OPTS);
return result.ok ? result.value : (raw as Parsed);
}

/**
* Creates a complete deep copy of a Company object.
*/
export function deepCopyCompany(c: Schematic.RulesengineCompany): Schematic.RulesengineCompany {
return JSON.parse(JSON.stringify(c));
}

/**
* Creates a complete deep copy of a User object.
*/
export function deepCopyUser(u: Schematic.RulesengineUser): Schematic.RulesengineUser {
return JSON.parse(JSON.stringify(u));
}
Expand All @@ -32,13 +34,13 @@ export function deepCopyUser(u: Schematic.RulesengineUser): Schematic.Rulesengin
* Partials don't carry refreshed entitlements, so when their derived fields
* change in another part of the company we sync them here to match server
* behavior:
* - credit_remaining ← credit_balances[credit_id]
* - usage ← metric value matching (event_name, metric_period, month_reset)
* - creditRemaining ← credit_balances[credit_id]
* - usage ← metric value matching (eventName, metricPeriod, monthReset)
* Both are skipped when the partial also sends entitlements wholesale.
*
* Wire format uses snake_case keys. The existing company from cache
* may have either camelCase or snake_case keys depending on how it
* was stored.
* Partial updates arrive as raw wire payloads (snake_case keys) and are merged
* into an existing camelCase-canonicalized entity; each case writes the
* corresponding camelCase field so the cached entity stays in a single shape.
*/
export function partialCompany(
existing: Schematic.RulesengineCompany,
Expand All @@ -54,43 +56,61 @@ export function partialCompany(
for (const key of Object.keys(partial)) {
switch (key) {
case "id":
merged.id = partial[key];
break;
case "account_id":
merged.accountId = partial[key];
break;
case "environment_id":
merged[key] = partial[key];
merged.environmentId = partial[key];
break;
case "base_plan_id":
merged[key] = partial[key] ?? null;
merged.basePlanId = partial[key] ?? null;
break;
case "billing_product_ids":
merged.billingProductIds = partial[key];
break;
case "plan_ids":
merged.planIds = partial[key];
break;
case "plan_version_ids":
case "entitlements":
merged.planVersionIds = partial[key];
break;
case "entitlements": {
const incoming = (partial[key] ?? []) as unknown[];
merged.entitlements = incoming.map((e) =>
canonicalize(serializers.RulesengineFeatureEntitlement, e),
);
break;
}
case "rules":
merged.rules = partial[key];
break;
case "traits":
merged.traits = partial[key];
break;
case "subscription":
merged[key] = partial[key];
merged.subscription = partial[key];
break;
case "keys": {
const existingKeys = (getProp(merged, "keys", "keys") ?? {}) as Record<string, string>;
const existingKeys = (merged.keys ?? {}) as Record<string, string>;
const incomingKeys = partial[key] as Record<string, string>;
merged[key] = { ...existingKeys, ...incomingKeys };
merged.keys = { ...existingKeys, ...incomingKeys };
break;
}
case "credit_balances": {
const existingCB = (getProp(merged, "creditBalances", "credit_balances") ?? {}) as Record<
string,
number
>;
const existingCB = (merged.creditBalances ?? {}) as Record<string, number>;
const incomingCB = (partial[key] ?? {}) as Record<string, number>;
merged[key] = { ...existingCB, ...incomingCB };
merged.creditBalances = { ...existingCB, ...incomingCB };
updatedBalances = incomingCB;
break;
}
case "metrics": {
const existingMetrics = ((getProp(merged, "metrics", "metrics") as unknown[]) ??
[]) as Schematic.RulesengineCompanyMetric[];
const incomingMetrics = (partial[key] ?? []) as Schematic.RulesengineCompanyMetric[];
merged[key] = upsertMetrics(existingMetrics, incomingMetrics);
const existingMetrics = (merged.metrics ?? []) as Schematic.RulesengineCompanyMetric[];
const incomingMetrics = ((partial[key] ?? []) as unknown[]).map((m) =>
canonicalize(serializers.RulesengineCompanyMetric, m),
);
merged.metrics = upsertMetrics(existingMetrics, incomingMetrics);
metricsUpdated = true;
break;
}
Expand All @@ -109,16 +129,16 @@ export function partialCompany(
* Re-derives entitlement fields whose source data changed in a partial that
* did not itself carry fresh entitlements. Mutates the entitlement objects on
* the already-deep-copied `merged` company in place:
* - credit_remaining ← the incoming balance for the entitlement's credit_id
* - usage ← the merged metric value matching (event_name, metric_period, month_reset),
* defaulting metric_period to "all_time" and month_reset to "first_of_month"
* - creditRemaining ← the incoming balance for the entitlement's creditId
* - usage ← the merged metric value matching (eventName, metricPeriod, monthReset),
* defaulting metricPeriod to "all_time" and monthReset to "first_of_month"
*/
function syncEntitlementDerivedFields(
merged: Record<string, unknown>,
updatedBalances: Record<string, number> | undefined,
metricsUpdated: boolean,
): void {
const entitlements = (getProp(merged, "entitlements", "entitlements") ?? []) as Record<string, unknown>[];
const entitlements = (merged.entitlements ?? []) as Record<string, unknown>[];
if (entitlements.length === 0) {
return;
}
Expand All @@ -127,7 +147,7 @@ function syncEntitlementDerivedFields(
// upsert so entitlements can find their matching usage.
const metricsLookup = new Map<string, number>();
if (metricsUpdated) {
const mergedMetrics = (getProp(merged, "metrics", "metrics") ?? []) as Record<string, unknown>[];
const mergedMetrics = (merged.metrics ?? []) as Record<string, unknown>[];
for (const m of mergedMetrics) {
if (!m) continue;
metricsLookup.set(metricKeyString(getMetricKey(m)), (m.value as number) ?? 0);
Expand All @@ -137,7 +157,10 @@ function syncEntitlementDerivedFields(
for (const ent of entitlements) {
const creditId = (ent.creditId ?? ent.credit_id) as string | undefined;
if (updatedBalances && creditId && creditId in updatedBalances) {
ent.credit_remaining = updatedBalances[creditId];
// Write the camelCase field: the cached entity is canonicalized, and
// the WASM engine errors on an object carrying both casings.
ent.creditRemaining = updatedBalances[creditId];
delete ent.credit_remaining;
}

// Credit-attached entitlements are intentionally NOT skipped: usage here
Expand Down Expand Up @@ -170,19 +193,25 @@ export function partialUser(
for (const key of Object.keys(partial)) {
switch (key) {
case "id":
merged.id = partial[key];
break;
case "account_id":
merged.accountId = partial[key];
break;
case "environment_id":
merged[key] = partial[key];
merged.environmentId = partial[key];
break;
case "keys": {
const existingKeys = (getProp(merged, "keys", "keys") ?? {}) as Record<string, string>;
const existingKeys = (merged.keys ?? {}) as Record<string, string>;
const incomingKeys = partial[key] as Record<string, string>;
merged[key] = { ...existingKeys, ...incomingKeys };
merged.keys = { ...existingKeys, ...incomingKeys };
break;
}
case "traits":
merged.traits = partial[key];
break;
case "rules":
merged[key] = partial[key];
merged.rules = partial[key];
break;
// Ignore unknown keys silently
}
Expand Down
Loading