Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ compile_rust.sh @Datadog/libdatadog-apm

# FFE (Feature Flagging & Experimentation) SDK Team
/components-rs/ffe.rs @Datadog/libdatadog-apm @DataDog/feature-flagging-and-experimentation-sdk
/libdatadog @Datadog/libdatadog-apm @DataDog/feature-flagging-and-experimentation-sdk
/src/api/FeatureFlags/ @DataDog/feature-flagging-and-experimentation-sdk
/tests/api/Unit/FeatureFlags/ @DataDog/feature-flagging-and-experimentation-sdk
/tests/ext/ffe/ @DataDog/feature-flagging-and-experimentation-sdk
Expand Down
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions components-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ serde = "1.0.196"
simd-json = "0.14.1"
serde_with = "3.6.0"
lazy_static = "1.4"
lru = "0.16.4"
log = "0.4.20"
env_logger = "0.10.1"
zwohash = "0.1.2"
Expand Down
16 changes: 16 additions & 0 deletions components-rs/ddtrace.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,22 @@ bool ddog_ffe_result_do_log(const struct ddog_FfeResult *result);

void ddog_ffe_free_result(struct ddog_FfeResult *result);

void ddog_ffe_set_service_context(const char *service,
const char *env,
const char *version);

bool ddog_ffe_enqueue_exposure(const char *event_json,
const char *flag_key,
const char *allocation_key,
const char *targeting_key,
const char *variant_key);

ddog_CharSlice ddog_ffe_flush_exposures(void);

void ddog_ffe_free_flush_result(ddog_CharSlice slice);

void ddog_ffe_reset_exposure_state(void);

const char *ddog_normalize_process_tag_value(ddog_CharSlice tag_value);

void ddog_free_normalized_tag_value(const char *ptr);
Expand Down
224 changes: 224 additions & 0 deletions components-rs/ffe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ use datadog_ffe::rules_based::{
self as ffe, AssignmentReason, AssignmentValue, Attribute, Configuration, EvaluationContext,
EvaluationError, ExpectedFlagType, Str, UniversalFlagConfig,
};
use libdd_common_ffi::CharSlice;
use lru::LruCache;
use std::collections::HashMap;
use std::ffi::{c_char, CStr, CString};
use std::num::NonZeroUsize;
use std::sync::{Arc, Mutex};

struct FfeState {
Expand Down Expand Up @@ -331,3 +334,224 @@ fn assignment_value_to_json(value: &AssignmentValue) -> String {
fn string_to_cstring(value: String) -> CString {
CString::new(value).unwrap_or_default()
}

struct ServiceContext {
service: String,
env: String,
version: String,
}

struct ExposureState {
dedup_cache: LruCache<(String, String), (String, String)>,
batch_buffer: Vec<String>,
service_context: Option<ServiceContext>,
}

const EXPOSURE_DEDUP_LIMIT: usize = 65_536;
const EXPOSURE_BATCH_LIMIT: usize = 1_000;

lazy_static::lazy_static! {
static ref EXPOSURE_STATE: Mutex<ExposureState> = Mutex::new(ExposureState {
dedup_cache: LruCache::new(NonZeroUsize::new(EXPOSURE_DEDUP_LIMIT).unwrap()),
batch_buffer: Vec::new(),
service_context: None,
});
}

#[no_mangle]
pub unsafe extern "C" fn ddog_ffe_set_service_context(
service: *const c_char,
env: *const c_char,
version: *const c_char,
) {
if let Ok(mut state) = EXPOSURE_STATE.lock() {
state.service_context = Some(ServiceContext {
service: optional_cstr_to_string(service),
env: optional_cstr_to_string(env),
version: optional_cstr_to_string(version),
});
}
}

#[no_mangle]
pub unsafe extern "C" fn ddog_ffe_enqueue_exposure(
event_json: *const c_char,
flag_key: *const c_char,
allocation_key: *const c_char,
targeting_key: *const c_char,
variant_key: *const c_char,
) -> bool {
if event_json.is_null() || flag_key.is_null() || variant_key.is_null() {
return false;
}

let event = match required_cstr_to_string(event_json) {
Some(event) => event,
None => return false,
};
let flag = match required_cstr_to_string(flag_key) {
Some(flag) => flag,
None => return false,
};
let variant = match required_cstr_to_string(variant_key) {
Some(variant) => variant,
None => return false,
};
let allocation = optional_cstr_to_string(allocation_key);
let targeting = optional_cstr_to_string(targeting_key);

let dedup_key = (flag, targeting);
let dedup_value = (allocation, variant);

if let Ok(mut state) = EXPOSURE_STATE.lock() {
if let Some(cached) = state.dedup_cache.get(&dedup_key) {
if *cached == dedup_value {
return false;
}
}

state.dedup_cache.put(dedup_key, dedup_value);
if state.batch_buffer.len() >= EXPOSURE_BATCH_LIMIT {
return false;
}

state.batch_buffer.push(event);
return true;
}

false
}

#[no_mangle]
pub extern "C" fn ddog_ffe_flush_exposures() -> CharSlice<'static> {
if let Ok(mut state) = EXPOSURE_STATE.lock() {
if state.batch_buffer.is_empty() {
return CharSlice::default();
}

let events = state.batch_buffer.drain(..).collect::<Vec<_>>();
let context = match &state.service_context {
Some(context) => serde_json::json!({
"service": context.service.as_str(),
"env": context.env.as_str(),
"version": context.version.as_str(),
}),
None => serde_json::json!({
"service": "",
"env": "",
"version": "",
}),
};

let payload = format!(
r#"{{"context":{},"exposures":[{}]}}"#,
context,
events.join(",")
);
let mut bytes = payload.into_bytes().into_boxed_slice();
let ptr = bytes.as_mut_ptr();
let len = bytes.len();
std::mem::forget(bytes);

return unsafe { CharSlice::from_raw_parts(ptr as *const c_char, len) };
}

CharSlice::default()
}

#[no_mangle]
pub unsafe extern "C" fn ddog_ffe_free_flush_result(slice: CharSlice<'static>) {
use libdd_common_ffi::slice::AsBytes;

let bytes = slice.as_bytes();
let len = bytes.len();
let ptr = bytes.as_ptr() as *mut u8;
if !ptr.is_null() && len > 0 {
let _ = Box::from_raw(std::slice::from_raw_parts_mut(ptr, len) as *mut [u8]);
}
}

#[no_mangle]
pub extern "C" fn ddog_ffe_reset_exposure_state() {
if let Ok(mut state) = EXPOSURE_STATE.lock() {
state.dedup_cache.clear();
state.batch_buffer.clear();
state.service_context = None;
}
}

unsafe fn required_cstr_to_string(value: *const c_char) -> Option<String> {
CStr::from_ptr(value)
.to_str()
.ok()
.map(|value| value.to_string())
}

unsafe fn optional_cstr_to_string(value: *const c_char) -> String {
if value.is_null() {
return String::new();
}

required_cstr_to_string(value).unwrap_or_default()
}

#[cfg(test)]
mod tests {
use super::*;
use libdd_common_ffi::slice::AsBytes;

#[test]
fn exposure_flush_drains_buffer_and_keeps_context() {
ddog_ffe_reset_exposure_state();

let service = CString::new("svc-flush").unwrap();
let env = CString::new("test").unwrap();
let version = CString::new("1.2.3").unwrap();
let event = CString::new(
r#"{"timestamp":1,"flag":{"key":"demo"},"allocation":{"key":"alloc-a"},"variant":{"key":"on"},"subject":{"id":"user-1","attributes":{}}}"#,
)
.unwrap();
let flag = CString::new("demo").unwrap();
let allocation = CString::new("alloc-a").unwrap();
let targeting = CString::new("user-1").unwrap();
let on = CString::new("on").unwrap();
let off = CString::new("off").unwrap();

unsafe {
ddog_ffe_set_service_context(service.as_ptr(), env.as_ptr(), version.as_ptr());
assert!(ddog_ffe_enqueue_exposure(
event.as_ptr(),
flag.as_ptr(),
allocation.as_ptr(),
targeting.as_ptr(),
on.as_ptr(),
));
assert!(!ddog_ffe_enqueue_exposure(
event.as_ptr(),
flag.as_ptr(),
allocation.as_ptr(),
targeting.as_ptr(),
on.as_ptr(),
));
assert!(ddog_ffe_enqueue_exposure(
event.as_ptr(),
flag.as_ptr(),
allocation.as_ptr(),
targeting.as_ptr(),
off.as_ptr(),
));
}

let payload = ddog_ffe_flush_exposures();
assert!(!payload.as_bytes().is_empty());
let decoded: serde_json::Value = serde_json::from_slice(payload.as_bytes()).unwrap();
assert_eq!(decoded["context"]["service"], "svc-flush");
assert_eq!(decoded["context"]["env"], "test");
assert_eq!(decoded["context"]["version"], "1.2.3");
assert_eq!(decoded["exposures"].as_array().unwrap().len(), 2);
unsafe { ddog_ffe_free_flush_result(payload) };

let empty = ddog_ffe_flush_exposures();
assert!(empty.as_bytes().is_empty());
}
}
5 changes: 5 additions & 0 deletions components-rs/sidecar.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,11 @@ ddog_MaybeError ddog_sidecar_send_debugger_datum(struct ddog_SidecarTransport **
ddog_QueueId queue_id,
struct ddog_DebuggerPayload *payload);

ddog_MaybeError ddog_sidecar_send_ffe_exposures(struct ddog_SidecarTransport **transport,
const struct ddog_InstanceId *instance_id,
const ddog_QueueId *queue_id,
ddog_CharSlice payload);

ddog_MaybeError ddog_sidecar_send_debugger_diagnostics(struct ddog_SidecarTransport **transport,
const struct ddog_InstanceId *instance_id,
ddog_QueueId queue_id,
Expand Down
65 changes: 65 additions & 0 deletions ext/ddtrace.c
Original file line number Diff line number Diff line change
Expand Up @@ -3083,6 +3083,71 @@ PHP_FUNCTION(DDTrace_ffe_evaluate) {
ddog_ffe_free_result(result);
}

PHP_FUNCTION(DDTrace_ffe_send_exposure) {
char *event_json, *flag_key, *variant_key;
size_t event_json_len, flag_key_len, variant_key_len;
char *allocation_key = NULL;
size_t allocation_key_len = 0;
char *targeting_key = NULL;
size_t targeting_key_len = 0;

ZEND_PARSE_PARAMETERS_START(5, 5)
Z_PARAM_STRING(event_json, event_json_len)
Z_PARAM_STRING(flag_key, flag_key_len)
Z_PARAM_STRING_OR_NULL(allocation_key, allocation_key_len)
Z_PARAM_STRING_OR_NULL(targeting_key, targeting_key_len)
Z_PARAM_STRING(variant_key, variant_key_len)
ZEND_PARSE_PARAMETERS_END();

UNUSED(event_json_len);
UNUSED(flag_key_len);
UNUSED(variant_key_len);

RETURN_BOOL(ddog_ffe_enqueue_exposure(
event_json,
flag_key,
allocation_key_len > 0 ? allocation_key : NULL,
targeting_key_len > 0 ? targeting_key : NULL,
variant_key));
}

PHP_FUNCTION(DDTrace_ffe_flush_exposures) {
ddog_CharSlice payload;

ZEND_PARSE_PARAMETERS_NONE();

payload = ddog_ffe_flush_exposures();
if (payload.ptr == NULL || payload.len == 0) {
RETURN_NULL();
}

RETVAL_STRINGL(payload.ptr, payload.len);
ddog_ffe_free_flush_result(payload);
}

PHP_FUNCTION(DDTrace_ffe_set_service_context) {
char *service, *env, *version;
size_t service_len, env_len, version_len;

ZEND_PARSE_PARAMETERS_START(3, 3)
Z_PARAM_STRING(service, service_len)
Z_PARAM_STRING(env, env_len)
Z_PARAM_STRING(version, version_len)
ZEND_PARSE_PARAMETERS_END();

UNUSED(service_len);
UNUSED(env_len);
UNUSED(version_len);

ddog_ffe_set_service_context(service, env, version);
}

PHP_FUNCTION(DDTrace_ffe_reset_exposure_state) {
ZEND_PARSE_PARAMETERS_NONE();

ddog_ffe_reset_exposure_state();
}

PHP_FUNCTION(dd_trace_send_traces_via_thread) {
char *payload = NULL;
ddtrace_zpplong_t num_traces = 0;
Expand Down
Loading
Loading