From fe30353c32826a74eda3f6c4ea9726312fbea6a0 Mon Sep 17 00:00:00 2001 From: Erwan Viollet Date: Tue, 26 May 2026 12:29:17 +0200 Subject: [PATCH 1/6] Preserve LiveAllocation across worker resets via memfd snapshot MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Workers are restarted by forking a fresh process from the parent, which loses everything in DDProfWorkerContext — including the heap-tracking aggregator in LiveAllocation. Until natural alloc/free traffic refills the map, live-heap is undercounted for the rest of the target's life. Add a serialisation path that survives the fork: - main_loop allocates a memfd that the parent keeps open and every worker child inherits. - On 'restart_worker', the outgoing child resolves its UnwindOutput handles back to portable strings (via libdatadog Function2/Mapping2 read-back) and writes a self-owned snapshot to the memfd. - The new child reads the snapshot in worker_library_init, re-interns mappings/functions into its fresh ProfilesDictionary and rebuilds the LiveAllocation maps before the poll loop starts draining events. - LiveAllocation owns a string deque backing the string_views of restored UnwindOutputs; live entries built from incoming events keep using Process/base-frame views. Budget enforcement, value-preserving: - Default target 4 MB, hard ceiling 20 MB. - When over budget, rank stacks by aggregate value and drop the lowest; their addresses are remapped to a synthetic [live-alloc cleared] common frame so per-PID heap totals remain correct. - If still over after dropping all stacks, drop entire PIDs from the lowest aggregate value upwards. In-flight events between the old child exit and the new child's first poll are still lost; a library-side pause hook is a separate change. --- include/common_symbol_errors.hpp | 10 +- include/ddprof_stats.hpp | 6 +- include/live_allocation.hpp | 19 + include/live_allocation_snapshot.hpp | 129 ++++++ include/persistent_worker_state.hpp | 7 + src/ddprof_worker.cc | 24 + src/live_allocation_snapshot.cc | 652 +++++++++++++++++++++++++++ src/perf_mainloop.cc | 46 ++ test/CMakeLists.txt | 9 + test/live_allocation_snapshot-ut.cc | 140 ++++++ 10 files changed, 1037 insertions(+), 5 deletions(-) create mode 100644 include/live_allocation_snapshot.hpp create mode 100644 src/live_allocation_snapshot.cc create mode 100644 test/live_allocation_snapshot-ut.cc diff --git a/include/common_symbol_errors.hpp b/include/common_symbol_errors.hpp index 0f5c44c78..46abd91ed 100644 --- a/include/common_symbol_errors.hpp +++ b/include/common_symbol_errors.hpp @@ -14,10 +14,11 @@ using namespace std::string_view_literals; namespace ddprof { -inline constexpr std::array k_common_frame_names = { - "[truncated]"sv, "[unknown mapping]"sv, - "[unwind failure]"sv, "[incomplete]"sv, - "[lost]"sv, "[maximum pids]"sv}; +inline constexpr std::array k_common_frame_names = { + "[truncated]"sv, "[unknown mapping]"sv, + "[unwind failure]"sv, "[incomplete]"sv, + "[lost]"sv, "[maximum pids]"sv, + "[live-alloc cleared]"sv}; enum SymbolErrors : std::uint8_t { truncated_stack = 0, @@ -26,6 +27,7 @@ enum SymbolErrors : std::uint8_t { incomplete_stack, lost_event, max_pids, + live_alloc_cleared, }; } // namespace ddprof diff --git a/include/ddprof_stats.hpp b/include/ddprof_stats.hpp index 32b304fdd..7366cf441 100644 --- a/include/ddprof_stats.hpp +++ b/include/ddprof_stats.hpp @@ -46,7 +46,11 @@ namespace ddprof { X(PPROF_SIZE, "pprof.size", STAT_GAUGE) \ X(PROFILE_DURATION, "profile.duration_ms", STAT_GAUGE) \ X(AGGREGATION_AVG_TIME, "aggregation.avg_time_ns", STAT_GAUGE) \ - X(BACKPOPULATE_COUNT, "backpopulate.count", STAT_GAUGE) + X(BACKPOPULATE_COUNT, "backpopulate.count", STAT_GAUGE) \ + X(LIVE_ALLOC_SNAPSHOT_BYTES, "live_alloc.snapshot.bytes", STAT_GAUGE) \ + X(LIVE_ALLOC_CLEARED_STACKS, "live_alloc.snapshot.cleared_stacks", \ + STAT_GAUGE) \ + X(LIVE_ALLOC_DROPPED_PIDS, "live_alloc.snapshot.dropped_pids", STAT_GAUGE) // Expand the enum/index for the individual stats enum DDPROF_STATS : uint8_t { STATS_TABLE(X_ENUM) STATS_LEN }; diff --git a/include/live_allocation.hpp b/include/live_allocation.hpp index d84c03a82..d2e31def3 100644 --- a/include/live_allocation.hpp +++ b/include/live_allocation.hpp @@ -11,6 +11,8 @@ #include "unwind_output_hash.hpp" #include +#include +#include #include #include @@ -55,6 +57,23 @@ class LiveAllocation { // NOLINTNEXTLINE(misc-non-private-member-variables-in-classes) WatcherVector _watcher_vector; + // Owns the string storage backing string_views inside UnwindOutputs that + // were restored from a snapshot. Live entries created from incoming + // allocation events continue to use views into Process / base-frame + // tables; this storage is only used by snapshot-restore. + // NOLINTNEXTLINE(misc-non-private-member-variables-in-classes) + std::deque _restored_strings; + + // Returns a string_view backed by _restored_strings. Empty input maps + // to an empty view without allocation. + std::string_view intern_restored_string(std::string_view sv) { + if (sv.empty()) { + return {}; + } + _restored_strings.emplace_back(sv); + return _restored_strings.back(); + } + void register_library_state(int watcher_pos, pid_t pid, uint32_t address_conflict_count, uint32_t tracked_address_count, diff --git a/include/live_allocation_snapshot.hpp b/include/live_allocation_snapshot.hpp new file mode 100644 index 000000000..d454d19b2 --- /dev/null +++ b/include/live_allocation_snapshot.hpp @@ -0,0 +1,129 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. This product includes software +// developed at Datadog (https://www.datadoghq.com/). Copyright 2021-Present +// Datadog, Inc. + +// Live-allocation snapshot: portable serialisation of LiveAllocation across +// worker resets. +// +// The aggregator state in LiveAllocation refers to per-worker handles +// (libdatadog dictionary pointers, indices into SymbolHdr tables, string_views +// into Process/base-frame caches). None of these survive a worker fork, so +// before a worker restart we capture a fully self-owned snapshot via +// `capture_snapshot`, serialise it through a memfd held by the parent, and +// re-intern everything into the freshly-built tables of the new worker via +// `restore_snapshot`. + +#pragma once + +#include "ddprof_defs.hpp" +#include "live_allocation.hpp" + +#include +#include +#include +#include +#include + +namespace ddprof { + +struct SymbolHdr; + +namespace live_alloc_snapshot { + +// Fully self-owned mirror of one FunLoc. +struct FunLocPortable { + ProcessAddress_t ip{}; + ElfAddress_t elf_addr{}; + uint32_t lineno{}; + std::string fn_name; + std::string fn_system_name; + std::string fn_file; + ElfAddress_t map_low{}; + ElfAddress_t map_high{}; + Offset_t map_offset{}; + std::string map_filename; + std::string map_build_id; +}; + +// Fully self-owned mirror of one UnwindOutput. +struct UnwindOutputPortable { + std::vector locs; + std::string container_id; + std::string exe_name; + std::string thread_name; + int pid{}; + int tid{}; +}; + +struct AddressEntry { + uintptr_t addr{}; + int64_t value{}; + uint32_t stack_idx{}; // index into Snapshot::stacks +}; + +struct PidEntry { + int watcher_pos{}; + pid_t pid{}; + uint32_t address_conflict_count{}; + uint32_t tracked_address_count{}; + std::vector addresses; +}; + +// Special stack index meaning "stack was dropped during budget enforcement". +// On restore this is materialised as a synthetic single-frame stack pointing +// at the [live-alloc cleared] common frame, ensuring per-PID heap totals are +// preserved even when detail is shed. +inline constexpr uint32_t k_cleared_stack_idx = + std::numeric_limits::max(); + +struct Snapshot { + std::vector stacks; + std::vector pids; + // How many distinct allocation addresses had their stack remapped to the + // synthetic cleared stack because of the size budget. + uint32_t cleared_addresses{}; + // How many (watcher_pos, pid) entries we dropped entirely because even + // minimal accounting did not fit in the budget. + uint32_t dropped_pids{}; +}; + +// Default and hard-ceiling sizes for the serialised snapshot. +inline constexpr std::size_t k_default_max_snapshot_bytes = 4UL * 1024 * 1024; +inline constexpr std::size_t k_hard_max_snapshot_bytes = 20UL * 1024 * 1024; + +// Read a Snapshot out of an in-memory LiveAllocation, by resolving symbol / +// mapping IDs through `symbol_hdr`. The result has no references to the +// originating worker. +// +// If `max_bytes` is non-zero and the projected serialised size exceeds it, +// the snapshot is degraded in a value-preserving way: low-value stacks are +// remapped to the synthetic cleared stack first, and finally whole pids are +// dropped from the lowest aggregate value upwards. The `cleared_addresses` / +// `dropped_pids` counters in the result report how aggressive the degradation +// had to be. +Snapshot capture_snapshot(const LiveAllocation &live_alloc, + const SymbolHdr &symbol_hdr, + std::size_t max_bytes = k_default_max_snapshot_bytes); + +// Serialise / deserialise a Snapshot to/from a binary blob. +void serialize(const Snapshot &snapshot, std::vector &out); +bool deserialize(const uint8_t *data, std::size_t size, Snapshot &out); + +// Write the binary blob into `fd` (memfd). Truncates fd to the blob size on +// success. On error, returns false and leaves fd in an unspecified state. +bool write_to_fd(int fd, const Snapshot &snapshot); + +// Read a snapshot blob out of `fd`. Returns false if fd is empty / unreadable +// / malformed. +bool read_from_fd(int fd, Snapshot &out); + +// Re-intern a snapshot into `symbol_hdr` and populate the empty `live_alloc` +// state. Restored UnwindOutputs hold string_views into +// LiveAllocation::_restored_strings. +void restore_snapshot(const Snapshot &snapshot, LiveAllocation &live_alloc, + SymbolHdr &symbol_hdr); + +} // namespace live_alloc_snapshot + +} // namespace ddprof diff --git a/include/persistent_worker_state.hpp b/include/persistent_worker_state.hpp index 87a8c19cc..7f0337608 100644 --- a/include/persistent_worker_state.hpp +++ b/include/persistent_worker_state.hpp @@ -15,6 +15,13 @@ struct PersistentWorkerState { // Why not volatile ? Although several threads can update the number of // cycles, by design Only a single thread reads and writes to this variable. uint32_t profile_seq; + // memfd holding the most recent serialized live-allocation snapshot. + // The fd is opened by the parent in main_loop and inherited by every + // worker child. A child that is restarting writes its serialized + // LiveAllocation state to this fd just before exiting; the next child + // reads it during worker_library_init so live-heap tracking survives + // worker resets. -1 if snapshotting is disabled. + int live_alloc_snapshot_fd; }; } // namespace ddprof diff --git a/src/ddprof_worker.cc b/src/ddprof_worker.cc index d64a6f4a4..039e2411a 100644 --- a/src/ddprof_worker.cc +++ b/src/ddprof_worker.cc @@ -6,10 +6,13 @@ #include "ddprof_worker.hpp" #include "ddprof_context.hpp" +#include "ddprof_context_lib.hpp" #include "ddprof_perf_event.hpp" #include "ddprof_stats.hpp" #include "dso_hdr.hpp" #include "exporter/ddprof_exporter.hpp" +#include "live_allocation_snapshot.hpp" +#include "persistent_worker_state.hpp" #include "logger.hpp" #include "perf.hpp" #include "pevent_lib.hpp" @@ -561,6 +564,27 @@ DDRes worker_library_init(DDProfContext &ctx, ctx.worker_ctx.exp[1] = nullptr; ctx.worker_ctx.pprof[0] = nullptr; ctx.worker_ctx.pprof[1] = nullptr; + + // If the previous worker handed us a live-allocation snapshot, replay + // it before the poll loop starts draining new events. Restored entries + // use string storage owned by LiveAllocation itself. + if (persistent_worker_state && + persistent_worker_state->live_alloc_snapshot_fd >= 0 && + context_allocation_profiling_watcher_idx(ctx) != -1) { + live_alloc_snapshot::Snapshot snap; + if (live_alloc_snapshot::read_from_fd( + persistent_worker_state->live_alloc_snapshot_fd, snap)) { + if (!snap.stacks.empty() || !snap.pids.empty()) { + live_alloc_snapshot::restore_snapshot( + snap, ctx.worker_ctx.live_allocation, + ctx.worker_ctx.us->symbol_hdr); + LG_NTC("[live-alloc] Snapshot restored: stacks=%zu pids=%zu " + "cleared=%u dropped_pids=%u", + snap.stacks.size(), snap.pids.size(), + snap.cleared_addresses, snap.dropped_pids); + } + } + } } CatchExcept2DDRes(); return {}; diff --git a/src/live_allocation_snapshot.cc b/src/live_allocation_snapshot.cc new file mode 100644 index 000000000..aa16694bf --- /dev/null +++ b/src/live_allocation_snapshot.cc @@ -0,0 +1,652 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. This product includes software +// developed at Datadog (https://www.datadoghq.com/). Copyright 2021-Present +// Datadog, Inc. + +#include "live_allocation_snapshot.hpp" + +#include "common_symbol_errors.hpp" +#include "ddog_profiling_utils.hpp" +#include "ddprof_file_info-i.hpp" +#include "logger.hpp" +#include "symbol_hdr.hpp" + +#include +#include +#include +#include +#include +#include +#include + +namespace ddprof::live_alloc_snapshot { + +namespace { + +constexpr std::array k_magic = {'D', 'D', 'L', 'A'}; +constexpr uint32_t k_version = 1; + +// -- byte-cost estimation ----------------------------------------------------- + +constexpr std::size_t cost_string(std::string_view sv) { + return sizeof(uint32_t) + sv.size(); +} + +std::size_t cost_funloc(const FunLocPortable &fl) { + return sizeof(uint64_t) * 2 + // ip + elf_addr + sizeof(uint32_t) + // lineno + sizeof(uint64_t) * 3 + // map_low / map_high / map_offset + cost_string(fl.fn_name) + cost_string(fl.fn_system_name) + + cost_string(fl.fn_file) + cost_string(fl.map_filename) + + cost_string(fl.map_build_id); +} + +std::size_t cost_stack(const UnwindOutputPortable &uo) { + std::size_t c = sizeof(int32_t) * 2 + // pid + tid + cost_string(uo.container_id) + cost_string(uo.exe_name) + + cost_string(uo.thread_name) + sizeof(uint32_t); // n_locs + for (const auto &fl : uo.locs) { + c += cost_funloc(fl); + } + return c; +} + +constexpr std::size_t k_address_entry_cost = + sizeof(uint64_t) + sizeof(int64_t) + sizeof(uint32_t); + +std::size_t cost_pid_entry(const PidEntry &p) { + return sizeof(int32_t) * 2 + sizeof(uint32_t) * 3 + + p.addresses.size() * k_address_entry_cost; +} + +constexpr std::size_t k_header_cost = sizeof(k_magic) + sizeof(uint32_t) * 5; + +std::size_t estimate_size(const Snapshot &s) { + std::size_t c = k_header_cost; + for (const auto &st : s.stacks) { + c += cost_stack(st); + } + for (const auto &p : s.pids) { + c += cost_pid_entry(p); + } + return c; +} + +// -- read-back helpers (libdatadog dictionary) -------------------------------- + +std::string copy_string(const ddog_prof_ProfilesDictionary *dict, + ddog_prof_StringId2 id) { + if (!id) { + return {}; + } + return std::string(get_string(dict, id)); +} + +FunLocPortable funloc_to_portable(const FunLoc &fl, + const SymbolHdr &symbol_hdr) { + const auto *dict = symbol_hdr.profiles_dictionary(); + FunLocPortable out; + out.ip = fl.ip; + out.elf_addr = fl.elf_addr; + + if (fl.symbol_idx != k_symbol_idx_null && + static_cast(fl.symbol_idx) < symbol_hdr._symbol_table.size()) { + const Symbol &sym = symbol_hdr._symbol_table[fl.symbol_idx]; + out.lineno = sym._lineno; + if (sym._function_id) { + // Read fields directly from the libdatadog Function2 struct. + // This is the same pattern already used by + // get_location2_function_name() / get_location2_mapping_filename(). + out.fn_name = copy_string(dict, sym._function_id->name); + out.fn_system_name = copy_string(dict, sym._function_id->system_name); + out.fn_file = copy_string(dict, sym._function_id->file_name); + } + } + + if (fl.map_info_idx != k_mapinfo_idx_null && + static_cast(fl.map_info_idx) < + symbol_hdr._mapinfo_table.size()) { + ddog_prof_MappingId2 mid = symbol_hdr._mapinfo_table[fl.map_info_idx]; + if (mid) { + out.map_low = mid->memory_start; + out.map_high = mid->memory_limit; + out.map_offset = mid->file_offset; + out.map_filename = copy_string(dict, mid->filename); + out.map_build_id = copy_string(dict, mid->build_id); + } + } + return out; +} + +UnwindOutputPortable uo_to_portable(const UnwindOutput &uo, + const SymbolHdr &symbol_hdr) { + UnwindOutputPortable p; + p.pid = uo.pid; + p.tid = uo.tid; + p.container_id = std::string(uo.container_id); + p.exe_name = std::string(uo.exe_name); + p.thread_name = std::string(uo.thread_name); + p.locs.reserve(uo.locs.size()); + for (const auto &fl : uo.locs) { + p.locs.emplace_back(funloc_to_portable(fl, symbol_hdr)); + } + return p; +} + +// -- binary writer / reader --------------------------------------------------- + +class Writer { +public: + explicit Writer(std::vector &out) : _out(out) {} + + void u32(uint32_t v) { raw(&v, sizeof(v)); } + void i32(int32_t v) { raw(&v, sizeof(v)); } + void u64(uint64_t v) { raw(&v, sizeof(v)); } + void i64(int64_t v) { raw(&v, sizeof(v)); } + void str(std::string_view sv) { + u32(static_cast(sv.size())); + raw(sv.data(), sv.size()); + } + void raw(const void *p, std::size_t n) { + const auto *bytes = static_cast(p); + _out.insert(_out.end(), bytes, bytes + n); + } + +private: + std::vector &_out; +}; + +class Reader { +public: + Reader(const uint8_t *data, std::size_t size) : _p(data), _end(data + size) {} + + bool u32(uint32_t &v) { return raw(&v, sizeof(v)); } + bool i32(int32_t &v) { return raw(&v, sizeof(v)); } + bool u64(uint64_t &v) { return raw(&v, sizeof(v)); } + bool i64(int64_t &v) { return raw(&v, sizeof(v)); } + bool str(std::string &out) { + uint32_t n = 0; + if (!u32(n)) { + return false; + } + if (static_cast(_end - _p) < n) { + return false; + } + out.assign(reinterpret_cast(_p), n); + _p += n; + return true; + } + bool raw(void *dst, std::size_t n) { + if (static_cast(_end - _p) < n) { + return false; + } + std::memcpy(dst, _p, n); + _p += n; + return true; + } + [[nodiscard]] bool eof() const { return _p == _end; } + +private: + const uint8_t *_p; + const uint8_t *_end; +}; + +} // namespace + +// -- capture ------------------------------------------------------------------ + +Snapshot capture_snapshot(const LiveAllocation &live_alloc, + const SymbolHdr &symbol_hdr, std::size_t max_bytes) { + Snapshot snapshot; + + // Stage 1: dedupe stacks. The PprofStacks for different pids may produce + // identical UnwindOutputPortable rows once converted; we keep them + // separate at this stage and dedupe by raw pointer identity (each + // PprofStacks::value_type address is unique within a pid). + // For cross-pid identity we rely on the AddressMap pointer to map back. + // + // The simplest approach: emit one Portable per (pid_stack value_type + // pointer) and look it up by that pointer when filling pid entries. + struct StackKey { + const LiveAllocation::PprofStacks::value_type *ptr; + }; + std::unordered_map + stack_idx_by_ptr; + + for (unsigned watcher_pos = 0; + watcher_pos < live_alloc._watcher_vector.size(); ++watcher_pos) { + const auto &pid_map = live_alloc._watcher_vector[watcher_pos]; + for (const auto &pid_kv : pid_map) { + pid_t const pid = pid_kv.first; + const auto &pid_stacks = pid_kv.second; + + PidEntry pid_entry; + pid_entry.watcher_pos = static_cast(watcher_pos); + pid_entry.pid = pid; + pid_entry.address_conflict_count = pid_stacks._address_conflict_count; + pid_entry.tracked_address_count = pid_stacks._tracked_address_count; + pid_entry.addresses.reserve(pid_stacks._address_map.size()); + + for (const auto &addr_kv : pid_stacks._address_map) { + const auto *stack_ptr = addr_kv.second._unique_stack; + if (!stack_ptr || stack_ptr->first.locs.empty()) { + continue; + } + uint32_t stack_idx; + auto it = stack_idx_by_ptr.find(stack_ptr); + if (it == stack_idx_by_ptr.end()) { + stack_idx = static_cast(snapshot.stacks.size()); + stack_idx_by_ptr.emplace(stack_ptr, stack_idx); + snapshot.stacks.emplace_back( + uo_to_portable(stack_ptr->first, symbol_hdr)); + } else { + stack_idx = it->second; + } + pid_entry.addresses.push_back( + {addr_kv.first, addr_kv.second._value, stack_idx}); + } + if (!pid_entry.addresses.empty()) { + snapshot.pids.emplace_back(std::move(pid_entry)); + } + } + } + + if (max_bytes == 0) { + return snapshot; + } + + // Stage 2: budget enforcement. + std::size_t projected = estimate_size(snapshot); + if (projected <= max_bytes) { + return snapshot; + } + + // 2a) Rank stacks by aggregate value across all pids (low value goes first). + std::vector stack_total_value(snapshot.stacks.size(), 0); + for (const auto &p : snapshot.pids) { + for (const auto &a : p.addresses) { + stack_total_value[a.stack_idx] += a.value; + } + } + std::vector stack_order(snapshot.stacks.size()); + std::iota(stack_order.begin(), stack_order.end(), 0u); + std::sort(stack_order.begin(), stack_order.end(), + [&](uint32_t a, uint32_t b) { + return stack_total_value[a] < stack_total_value[b]; + }); + + // Cost of the synthetic cleared stack we may need to introduce. It is a + // single FunLocPortable with all strings empty -> small fixed cost. + const UnwindOutputPortable cleared_stack_template{}; // empty + const std::size_t cleared_stack_cost = cost_stack(cleared_stack_template); + + std::vector stack_dropped(snapshot.stacks.size(), false); + bool cleared_stack_needed = false; + for (uint32_t idx : stack_order) { + if (projected <= max_bytes) { + break; + } + std::size_t const saved = cost_stack(snapshot.stacks[idx]); + projected -= saved; + if (!cleared_stack_needed) { + projected += cleared_stack_cost; + cleared_stack_needed = true; + } + stack_dropped[idx] = true; + } + + // 2b) If still over, drop pids from lowest aggregate value upwards. + if (projected > max_bytes) { + std::vector pid_total_value(snapshot.pids.size(), 0); + for (size_t i = 0; i < snapshot.pids.size(); ++i) { + for (const auto &a : snapshot.pids[i].addresses) { + pid_total_value[i] += a.value; + } + } + std::vector pid_order(snapshot.pids.size()); + std::iota(pid_order.begin(), pid_order.end(), 0u); + std::sort(pid_order.begin(), pid_order.end(), + [&](uint32_t a, uint32_t b) { + return pid_total_value[a] < pid_total_value[b]; + }); + std::vector pid_dropped(snapshot.pids.size(), false); + for (uint32_t pidx : pid_order) { + if (projected <= max_bytes) { + break; + } + projected -= cost_pid_entry(snapshot.pids[pidx]); + pid_dropped[pidx] = true; + ++snapshot.dropped_pids; + } + if (snapshot.dropped_pids) { + std::vector kept; + kept.reserve(snapshot.pids.size() - snapshot.dropped_pids); + for (size_t i = 0; i < snapshot.pids.size(); ++i) { + if (!pid_dropped[i]) { + kept.emplace_back(std::move(snapshot.pids[i])); + } + } + snapshot.pids = std::move(kept); + } + } + + // Apply stack drops: remap their address entries to k_cleared_stack_idx, + // then compact the stacks[] vector and rewrite remaining stack_idx values. + if (cleared_stack_needed) { + for (auto &p : snapshot.pids) { + for (auto &a : p.addresses) { + if (stack_dropped[a.stack_idx]) { + a.stack_idx = k_cleared_stack_idx; + ++snapshot.cleared_addresses; + } + } + } + std::vector remap(snapshot.stacks.size(), k_cleared_stack_idx); + std::vector kept_stacks; + kept_stacks.reserve(snapshot.stacks.size()); + for (size_t i = 0; i < snapshot.stacks.size(); ++i) { + if (!stack_dropped[i]) { + remap[i] = static_cast(kept_stacks.size()); + kept_stacks.emplace_back(std::move(snapshot.stacks[i])); + } + } + snapshot.stacks = std::move(kept_stacks); + for (auto &p : snapshot.pids) { + for (auto &a : p.addresses) { + if (a.stack_idx != k_cleared_stack_idx) { + a.stack_idx = remap[a.stack_idx]; + } + } + } + } + + return snapshot; +} + +// -- serialise / deserialise -------------------------------------------------- + +void serialize(const Snapshot &s, std::vector &out) { + out.clear(); + out.reserve(estimate_size(s)); + Writer w(out); + w.raw(k_magic.data(), k_magic.size()); + w.u32(k_version); + w.u32(static_cast(s.stacks.size())); + w.u32(static_cast(s.pids.size())); + w.u32(s.cleared_addresses); + w.u32(s.dropped_pids); + + for (const auto &st : s.stacks) { + w.i32(st.pid); + w.i32(st.tid); + w.str(st.container_id); + w.str(st.exe_name); + w.str(st.thread_name); + w.u32(static_cast(st.locs.size())); + for (const auto &fl : st.locs) { + w.u64(fl.ip); + w.u64(fl.elf_addr); + w.u32(fl.lineno); + w.u64(fl.map_low); + w.u64(fl.map_high); + w.u64(fl.map_offset); + w.str(fl.fn_name); + w.str(fl.fn_system_name); + w.str(fl.fn_file); + w.str(fl.map_filename); + w.str(fl.map_build_id); + } + } + + for (const auto &p : s.pids) { + w.i32(p.watcher_pos); + w.i32(p.pid); + w.u32(p.address_conflict_count); + w.u32(p.tracked_address_count); + w.u32(static_cast(p.addresses.size())); + for (const auto &a : p.addresses) { + w.u64(a.addr); + w.i64(a.value); + w.u32(a.stack_idx); + } + } +} + +bool deserialize(const uint8_t *data, std::size_t size, Snapshot &out) { + out = {}; + Reader r(data, size); + std::array magic{}; + if (!r.raw(magic.data(), magic.size()) || magic != k_magic) { + return false; + } + uint32_t version = 0; + if (!r.u32(version) || version != k_version) { + return false; + } + uint32_t n_stacks = 0; + uint32_t n_pids = 0; + if (!r.u32(n_stacks) || !r.u32(n_pids) || !r.u32(out.cleared_addresses) || + !r.u32(out.dropped_pids)) { + return false; + } + out.stacks.resize(n_stacks); + for (auto &st : out.stacks) { + if (!r.i32(st.pid) || !r.i32(st.tid) || !r.str(st.container_id) || + !r.str(st.exe_name) || !r.str(st.thread_name)) { + return false; + } + uint32_t n_locs = 0; + if (!r.u32(n_locs)) { + return false; + } + st.locs.resize(n_locs); + for (auto &fl : st.locs) { + if (!r.u64(fl.ip) || !r.u64(fl.elf_addr) || !r.u32(fl.lineno) || + !r.u64(fl.map_low) || !r.u64(fl.map_high) || !r.u64(fl.map_offset) || + !r.str(fl.fn_name) || !r.str(fl.fn_system_name) || + !r.str(fl.fn_file) || !r.str(fl.map_filename) || + !r.str(fl.map_build_id)) { + return false; + } + } + } + out.pids.resize(n_pids); + for (auto &p : out.pids) { + if (!r.i32(p.watcher_pos) || !r.i32(p.pid) || + !r.u32(p.address_conflict_count) || + !r.u32(p.tracked_address_count)) { + return false; + } + uint32_t n_addr = 0; + if (!r.u32(n_addr)) { + return false; + } + p.addresses.resize(n_addr); + for (auto &a : p.addresses) { + if (!r.u64(a.addr) || !r.i64(a.value) || !r.u32(a.stack_idx)) { + return false; + } + } + } + return r.eof(); +} + +bool write_to_fd(int fd, const Snapshot &snapshot) { + if (fd < 0) { + return false; + } + std::vector buf; + serialize(snapshot, buf); + if (ftruncate(fd, 0) != 0) { + return false; + } + if (lseek(fd, 0, SEEK_SET) == static_cast(-1)) { + return false; + } + std::size_t written = 0; + while (written < buf.size()) { + ssize_t const n = + write(fd, buf.data() + written, buf.size() - written); + if (n < 0) { + if (errno == EINTR) { + continue; + } + return false; + } + if (n == 0) { + return false; + } + written += static_cast(n); + } + return true; +} + +bool read_from_fd(int fd, Snapshot &out) { + if (fd < 0) { + return false; + } + off_t const sz = lseek(fd, 0, SEEK_END); + if (sz <= 0) { + return false; + } + if (lseek(fd, 0, SEEK_SET) == static_cast(-1)) { + return false; + } + std::vector buf(static_cast(sz)); + std::size_t read_total = 0; + while (read_total < buf.size()) { + ssize_t const n = + read(fd, buf.data() + read_total, buf.size() - read_total); + if (n < 0) { + if (errno == EINTR) { + continue; + } + return false; + } + if (n == 0) { + break; + } + read_total += static_cast(n); + } + bool const ok = deserialize(buf.data(), read_total, out); + // Consume-on-read: clear the memfd so a future worker that does not + // produce its own snapshot does not inadvertently replay stale state. + if (ftruncate(fd, 0) != 0) { + LG_DBG("[live-alloc] ftruncate(snapshot_fd) failed: %d", errno); + } + if (lseek(fd, 0, SEEK_SET) == static_cast(-1)) { + LG_DBG("[live-alloc] lseek(snapshot_fd) failed: %d", errno); + } + return ok; +} + +// -- restore ------------------------------------------------------------------ + +namespace { + +// Build the synthetic [live-alloc cleared] UnwindOutput. Allocates one Symbol +// + one mapping in symbol_hdr. +UnwindOutput build_cleared_stack(SymbolHdr &symbol_hdr) { + const auto *dict = symbol_hdr.profiles_dictionary(); + ddog_prof_FunctionId2 fn = intern_function( + dict, k_common_frame_names[SymbolErrors::live_alloc_cleared], {}); + ddog_prof_MappingId2 mp = intern_mapping(dict, 0, 0, 0, {}, {}); + + symbol_hdr._symbol_table.emplace_back(0u, fn); + auto const symbol_idx = + static_cast(symbol_hdr._symbol_table.size() - 1); + symbol_hdr._mapinfo_table.emplace_back(mp); + auto const map_idx = + static_cast(symbol_hdr._mapinfo_table.size() - 1); + + UnwindOutput uo; + uo.locs.push_back( + {0, 0, /*file_info_id*/ k_file_info_undef, symbol_idx, map_idx}); + // Empty pid/tid/strings: distinct from any natural unwind output. + return uo; +} + +UnwindOutput portable_to_uo(const UnwindOutputPortable &p, + SymbolHdr &symbol_hdr, + LiveAllocation &live_alloc) { + const auto *dict = symbol_hdr.profiles_dictionary(); + UnwindOutput uo; + uo.pid = p.pid; + uo.tid = p.tid; + uo.container_id = live_alloc.intern_restored_string(p.container_id); + uo.exe_name = live_alloc.intern_restored_string(p.exe_name); + uo.thread_name = live_alloc.intern_restored_string(p.thread_name); + uo.locs.reserve(p.locs.size()); + for (const auto &fl : p.locs) { + ddog_prof_MappingId2 mid = intern_mapping( + dict, fl.map_low, fl.map_high, fl.map_offset, fl.map_filename, + fl.map_build_id); + symbol_hdr._mapinfo_table.emplace_back(mid); + auto const map_idx = + static_cast(symbol_hdr._mapinfo_table.size() - 1); + + ddog_prof_FunctionId2 fn = + intern_function(dict, fl.fn_name, fl.fn_file, fl.fn_system_name); + symbol_hdr._symbol_table.emplace_back(fl.lineno, fn); + auto const sym_idx = + static_cast(symbol_hdr._symbol_table.size() - 1); + + uo.locs.push_back( + {fl.ip, fl.elf_addr, k_file_info_undef, sym_idx, map_idx}); + } + return uo; +} + +} // namespace + +void restore_snapshot(const Snapshot &snapshot, LiveAllocation &live_alloc, + SymbolHdr &symbol_hdr) { + // Pre-build restored UnwindOutputs for every snapshot stack. + std::vector rebuilt; + rebuilt.reserve(snapshot.stacks.size()); + for (const auto &p : snapshot.stacks) { + rebuilt.emplace_back(portable_to_uo(p, symbol_hdr, live_alloc)); + } + + // Build the synthetic cleared stack lazily, only if referenced. + bool need_cleared = false; + for (const auto &p : snapshot.pids) { + for (const auto &a : p.addresses) { + if (a.stack_idx == k_cleared_stack_idx) { + need_cleared = true; + break; + } + } + if (need_cleared) { + break; + } + } + UnwindOutput cleared_uo; + if (need_cleared) { + cleared_uo = build_cleared_stack(symbol_hdr); + } + + for (const auto &p : snapshot.pids) { + for (const auto &a : p.addresses) { + const UnwindOutput &uo = + (a.stack_idx == k_cleared_stack_idx) ? cleared_uo + : rebuilt[a.stack_idx]; + live_alloc.register_allocation(uo, a.addr, a.value, p.watcher_pos, + p.pid); + } + // Carry over the library/profiler tracked-address counters so the + // post-restore mismatch warning is anchored at the same baseline. + auto &pid_map = access_resize(live_alloc._watcher_vector, p.watcher_pos); + auto &pid_stacks = pid_map[p.pid]; + pid_stacks._address_conflict_count = p.address_conflict_count; + pid_stacks._tracked_address_count = p.tracked_address_count; + } + + if (snapshot.cleared_addresses || snapshot.dropped_pids) { + LG_NTC("[live-alloc] Snapshot degraded: cleared_addresses=%u " + "dropped_pids=%u", + snapshot.cleared_addresses, snapshot.dropped_pids); + } +} + +} // namespace ddprof::live_alloc_snapshot diff --git a/src/perf_mainloop.cc b/src/perf_mainloop.cc index d0c7548e7..42a1561bc 100644 --- a/src/perf_mainloop.cc +++ b/src/perf_mainloop.cc @@ -6,15 +6,18 @@ #include "perf_mainloop.hpp" #include "ddprof_context_lib.hpp" +#include "ddprof_stats.hpp" #include "ddprof_worker.hpp" #include "ddres.hpp" #include "defer.hpp" #include "ipc.hpp" +#include "live_allocation_snapshot.hpp" #include "logger.hpp" #include "perf.hpp" #include "persistent_worker_state.hpp" #include "pevent.hpp" #include "ringbuffer_utils.hpp" +#include "syscalls.hpp" #include "unique_fd.hpp" #include "unwind.h" #include "unwind_state.hpp" @@ -439,6 +442,31 @@ DDRes worker_loop(DDProfContext &ctx, const WorkerAttr *attr, DDRES_CHECK_FWD(ddprof_worker_maybe_export(ctx, now)); if (ctx.worker_ctx.persistent_worker_state->restart_worker) { + // Snapshot the live-allocation aggregator so the next worker can + // resume heap tracking without losing in-flight live addresses. + int const snap_fd = + ctx.worker_ctx.persistent_worker_state->live_alloc_snapshot_fd; + if (snap_fd >= 0 && + context_allocation_profiling_watcher_idx(ctx) != -1) { + auto snap = live_alloc_snapshot::capture_snapshot( + ctx.worker_ctx.live_allocation, ctx.worker_ctx.us->symbol_hdr); + if (!live_alloc_snapshot::write_to_fd(snap_fd, snap)) { + LG_WRN("[live-alloc] Failed to write snapshot before restart"); + } else { + off_t const snap_size = lseek(snap_fd, 0, SEEK_END); + if (snap_size >= 0) { + ddprof_stats_set(STATS_LIVE_ALLOC_SNAPSHOT_BYTES, + static_cast(snap_size)); + } + LG_NTC("[live-alloc] Snapshot written: stacks=%zu pids=%zu " + "cleared=%u dropped_pids=%u", + snap.stacks.size(), snap.pids.size(), + snap.cleared_addresses, snap.dropped_pids); + ddprof_stats_set(STATS_LIVE_ALLOC_CLEARED_STACKS, + snap.cleared_addresses); + ddprof_stats_set(STATS_LIVE_ALLOC_DROPPED_PIDS, snap.dropped_pids); + } + } // return directly no need to do a final export return {}; } @@ -488,6 +516,24 @@ DDRes main_loop(const WorkerAttr *attr, DDProfContext *ctx) { defer { munmap(persistent_worker_state, sizeof(*persistent_worker_state)); }; + // Allocate a memfd that the parent keeps open for the lifetime of the + // run. Each worker child inherits this fd and uses it as a hand-off + // buffer for its LiveAllocation snapshot when a restart is requested. + // Clearing CLOEXEC because we want the fd to survive across the child's + // execve-less life cycle and be reusable by the next forked child. + persistent_worker_state->live_alloc_snapshot_fd = + memfd_create("ddprof_live_alloc", 0U); + if (persistent_worker_state->live_alloc_snapshot_fd < 0) { + LG_WRN("memfd_create for live-alloc snapshot failed (errno=%d); " + "live-allocation state will be lost on worker resets", + errno); + } + defer { + if (persistent_worker_state->live_alloc_snapshot_fd >= 0) { + close(persistent_worker_state->live_alloc_snapshot_fd); + } + }; + // Create worker processes to fulfill poll loop. Only the parent process // can exit with an error code, which signals the termination of profiling. bool is_worker = false; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 328f6694e..1df425811 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -398,6 +398,15 @@ add_unit_test(tracepoint_config-ut tracepoint_config-ut.cc ../src/tracepoint_con add_unit_test(live_allocation-ut live_allocation-ut.cc ../src/live_allocation.cc) +add_unit_test( + live_allocation_snapshot-ut + live_allocation_snapshot-ut.cc + ../src/live_allocation_snapshot.cc + ../src/live_allocation.cc + ../src/ddog_profiling_utils.cc + ../src/demangler/demangler.cc + LIBRARIES Datadog::Profiling llvm-demangle) + add_unit_test(ddprof_process-ut ddprof_process-ut.cc ${PROCESS_SRC} LIBRARIES ${ELFUTILS_LIBRARIES}) add_unit_test(glibc_fixes-ut glibc_fixes-ut.cc ../src/lib/glibc_fixes.c LIBRARIES pthread) diff --git a/test/live_allocation_snapshot-ut.cc b/test/live_allocation_snapshot-ut.cc new file mode 100644 index 000000000..03006cc31 --- /dev/null +++ b/test/live_allocation_snapshot-ut.cc @@ -0,0 +1,140 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. This product includes software +// developed at Datadog (https://www.datadoghq.com/). Copyright 2021-Present +// Datadog, Inc. + +#include "live_allocation_snapshot.hpp" + +#include + +namespace ddprof::live_alloc_snapshot { + +namespace { + +Snapshot make_sample_snapshot() { + Snapshot s; + UnwindOutputPortable uo; + uo.pid = 100; + uo.tid = 101; + uo.container_id = "ctr-abc"; + uo.exe_name = "/usr/bin/myapp"; + uo.thread_name = "worker-0"; + uo.locs.push_back({/*ip*/ 0x1000, /*elf_addr*/ 0x500, /*lineno*/ 42, + "malloc", "__libc_malloc", "malloc.c", + /*map_low*/ 0x10000, /*map_high*/ 0x20000, + /*map_offset*/ 0, "/lib/libc.so", "abcdef0123"}); + uo.locs.push_back({0x2000, 0x600, 0, "main", "", "main.c", 0x30000, 0x40000, + 0, "/usr/bin/myapp", ""}); + s.stacks.push_back(std::move(uo)); + + UnwindOutputPortable uo2; + uo2.pid = 200; + uo2.tid = 200; + uo2.locs.push_back({0x3000, 0x700, 7, "fn", "", "f.c", 0, 0, 0, "", ""}); + s.stacks.push_back(std::move(uo2)); + + PidEntry pe; + pe.watcher_pos = 0; + pe.pid = 100; + pe.address_conflict_count = 3; + pe.tracked_address_count = 17; + pe.addresses.push_back({0xdead0000, 1024, 0}); + pe.addresses.push_back({0xdead1000, 2048, 0}); + pe.addresses.push_back({0xdead2000, 4096, 1}); + // One synthetic-cleared address. + pe.addresses.push_back({0xdead3000, 32, k_cleared_stack_idx}); + s.pids.push_back(std::move(pe)); + + s.cleared_addresses = 1; + s.dropped_pids = 0; + return s; +} + +void expect_eq(const UnwindOutputPortable &a, const UnwindOutputPortable &b) { + EXPECT_EQ(a.pid, b.pid); + EXPECT_EQ(a.tid, b.tid); + EXPECT_EQ(a.container_id, b.container_id); + EXPECT_EQ(a.exe_name, b.exe_name); + EXPECT_EQ(a.thread_name, b.thread_name); + ASSERT_EQ(a.locs.size(), b.locs.size()); + for (size_t i = 0; i < a.locs.size(); ++i) { + const auto &x = a.locs[i]; + const auto &y = b.locs[i]; + EXPECT_EQ(x.ip, y.ip); + EXPECT_EQ(x.elf_addr, y.elf_addr); + EXPECT_EQ(x.lineno, y.lineno); + EXPECT_EQ(x.fn_name, y.fn_name); + EXPECT_EQ(x.fn_system_name, y.fn_system_name); + EXPECT_EQ(x.fn_file, y.fn_file); + EXPECT_EQ(x.map_low, y.map_low); + EXPECT_EQ(x.map_high, y.map_high); + EXPECT_EQ(x.map_offset, y.map_offset); + EXPECT_EQ(x.map_filename, y.map_filename); + EXPECT_EQ(x.map_build_id, y.map_build_id); + } +} + +} // namespace + +TEST(LiveAllocationSnapshotTest, RoundTripBinary) { + Snapshot in = make_sample_snapshot(); + std::vector buf; + serialize(in, buf); + ASSERT_FALSE(buf.empty()); + + Snapshot out; + ASSERT_TRUE(deserialize(buf.data(), buf.size(), out)); + + ASSERT_EQ(in.stacks.size(), out.stacks.size()); + for (size_t i = 0; i < in.stacks.size(); ++i) { + expect_eq(in.stacks[i], out.stacks[i]); + } + ASSERT_EQ(in.pids.size(), out.pids.size()); + for (size_t i = 0; i < in.pids.size(); ++i) { + const auto &a = in.pids[i]; + const auto &b = out.pids[i]; + EXPECT_EQ(a.watcher_pos, b.watcher_pos); + EXPECT_EQ(a.pid, b.pid); + EXPECT_EQ(a.address_conflict_count, b.address_conflict_count); + EXPECT_EQ(a.tracked_address_count, b.tracked_address_count); + ASSERT_EQ(a.addresses.size(), b.addresses.size()); + for (size_t j = 0; j < a.addresses.size(); ++j) { + EXPECT_EQ(a.addresses[j].addr, b.addresses[j].addr); + EXPECT_EQ(a.addresses[j].value, b.addresses[j].value); + EXPECT_EQ(a.addresses[j].stack_idx, b.addresses[j].stack_idx); + } + } + EXPECT_EQ(in.cleared_addresses, out.cleared_addresses); + EXPECT_EQ(in.dropped_pids, out.dropped_pids); +} + +TEST(LiveAllocationSnapshotTest, RejectsBadMagic) { + Snapshot in = make_sample_snapshot(); + std::vector buf; + serialize(in, buf); + buf[0] = 'X'; + Snapshot out; + EXPECT_FALSE(deserialize(buf.data(), buf.size(), out)); +} + +TEST(LiveAllocationSnapshotTest, RejectsTruncated) { + Snapshot in = make_sample_snapshot(); + std::vector buf; + serialize(in, buf); + Snapshot out; + EXPECT_FALSE(deserialize(buf.data(), buf.size() - 4, out)); +} + +TEST(LiveAllocationSnapshotTest, EmptySnapshotRoundTrip) { + Snapshot in; + std::vector buf; + serialize(in, buf); + Snapshot out; + ASSERT_TRUE(deserialize(buf.data(), buf.size(), out)); + EXPECT_TRUE(out.stacks.empty()); + EXPECT_TRUE(out.pids.empty()); + EXPECT_EQ(out.cleared_addresses, 0u); + EXPECT_EQ(out.dropped_pids, 0u); +} + +} // namespace ddprof::live_alloc_snapshot From 4479c8b7c89270eb999b71802af7e737c2228307 Mon Sep 17 00:00:00 2001 From: Erwan Viollet Date: Tue, 26 May 2026 13:30:18 +0200 Subject: [PATCH 2/6] test(simple_malloc): assert live-heap state survives worker reset Add a third live-heap variant to simple_malloc-ut.sh that drives the worker into at least one reset (upload_period=2s, worker_period=2) with --skip-free 100 keeping ~99% of allocations live, and checks: - at least one '[live-alloc] Snapshot restored' log line - zero 'Tracked address count mismatch' warnings between the profiler and the in-target library after restore Adds ~7s to the simple_malloc suite (target needs to outlive 2 export cycles). Same test runs under DD_PROFILING_REORDER_EVENTS=1 too. --- test/simple_malloc-ut.sh | 50 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/test/simple_malloc-ut.sh b/test/simple_malloc-ut.sh index d09742186..9d8be13ef 100755 --- a/test/simple_malloc-ut.sh +++ b/test/simple_malloc-ut.sh @@ -113,6 +113,46 @@ check() { check_logs "$@" } +# Variant of check() that additionally asserts the LiveAllocation snapshot +# path survives at least one worker reset: +# - at least one '[live-alloc] Snapshot restored' line, and +# - zero 'Tracked address count mismatch' warnings between profiler and +# library after restore. +check_live_alloc_persistence() { + cmd="$1" + min_restored="${5:-1}" + if [[ "$use_taskset" -eq 1 ]]; then + cmd="taskset ${test_cpu_mask} ${cmd}" + fi + echo "Running: ${cmd}" + # shellcheck disable=SC2086 + eval ${cmd} || ( echo "Command failed: ${cmd}" && cat "${log_file}" && exit 1 ) + sync "${log_file}" + ddprof_pid=$(grep -m1 -oP -a ' ddprof\[\K[0-9]+(?=\]: Starting profiler)' "${log_file}" || true) + if [ -z "${ddprof_pid}" ]; then + echo "Unable to find profiler pid" + cat "${log_file}" + exit 1 + fi + timeout "$timeout_sec" tail --pid="$ddprof_pid" -f /dev/null + sync "${log_file}" + + restored=$(grep -c '\[live-alloc\] Snapshot restored' "${log_file}" || true) + mismatches=$(grep -c 'Tracked address count mismatch' "${log_file}" || true) + if [[ "${restored}" -lt "${min_restored}" ]]; then + echo "Expected at least ${min_restored} 'Snapshot restored', got ${restored}" + cat "${log_file}" + exit 1 + fi + if [[ "${mismatches}" -ne 0 ]]; then + echo "Unexpected 'Tracked address count mismatch' lines: ${mismatches}" + cat "${log_file}" + exit 1 + fi + echo "Live-alloc snapshot persistence OK: restored=${restored}, mismatches=0" + check_logs "$@" +} + # Test disabled static lib mode check "./test/simple_malloc-static ${opts}" -1 @@ -136,6 +176,16 @@ check "./ddprof --show_config --event \"${event}\" ./test/simple_malloc ${opts} event="sALLOC,period=-524288,mode=sl;sCPU" check "./ddprof --show_config --event \"${event}\" ./test/simple_malloc ${opts} --skip-free 100" 1 1 "inuse-space,alloc-space,cpu-time" +# Test that live-heap tracking survives a worker reset. +# Use a short upload_period and a low worker_period so several resets occur +# within the run, and a longer simple_malloc loop so the target outlives at +# least the first reset. --skip-free 100 keeps ~99% of allocations live. +event="sALLOC,period=-524288,mode=sl;sCPU" +opts_persist="--malloc 4096 --skip-free 100 --loop 80000 --spin 80 --nice 19" +check_live_alloc_persistence \ + "./ddprof --event \"${event}\" --upload-period 2 --worker_period 2 ./test/simple_malloc ${opts_persist}" \ + 1 1 "inuse-space,alloc-space,cpu-time" 1 + # Test wrapper mode with forks + threads opts_more_spin="--loop 1000 --spin 400" check "./ddprof ./test/simple_malloc ${opts_more_spin} --fork 2 --threads 2" 2 4 From e6b2dd5306b490cda047202f5d7e2e78da1d1c9d Mon Sep 17 00:00:00 2001 From: Erwan Viollet Date: Tue, 26 May 2026 15:08:24 +0200 Subject: [PATCH 3/6] style: clang-format the new live-allocation snapshot code --- include/common_symbol_errors.hpp | 9 +++-- include/ddprof_stats.hpp | 4 +-- src/ddprof_worker.cc | 12 +++---- src/live_allocation_snapshot.cc | 52 +++++++++++++---------------- src/perf_mainloop.cc | 7 ++-- test/CMakeLists.txt | 8 ++--- test/live_allocation_snapshot-ut.cc | 4 +-- 7 files changed, 44 insertions(+), 52 deletions(-) diff --git a/include/common_symbol_errors.hpp b/include/common_symbol_errors.hpp index 46abd91ed..054403830 100644 --- a/include/common_symbol_errors.hpp +++ b/include/common_symbol_errors.hpp @@ -15,9 +15,12 @@ using namespace std::string_view_literals; namespace ddprof { inline constexpr std::array k_common_frame_names = { - "[truncated]"sv, "[unknown mapping]"sv, - "[unwind failure]"sv, "[incomplete]"sv, - "[lost]"sv, "[maximum pids]"sv, + "[truncated]"sv, + "[unknown mapping]"sv, + "[unwind failure]"sv, + "[incomplete]"sv, + "[lost]"sv, + "[maximum pids]"sv, "[live-alloc cleared]"sv}; enum SymbolErrors : std::uint8_t { diff --git a/include/ddprof_stats.hpp b/include/ddprof_stats.hpp index 7366cf441..000a4865e 100644 --- a/include/ddprof_stats.hpp +++ b/include/ddprof_stats.hpp @@ -47,8 +47,8 @@ namespace ddprof { X(PROFILE_DURATION, "profile.duration_ms", STAT_GAUGE) \ X(AGGREGATION_AVG_TIME, "aggregation.avg_time_ns", STAT_GAUGE) \ X(BACKPOPULATE_COUNT, "backpopulate.count", STAT_GAUGE) \ - X(LIVE_ALLOC_SNAPSHOT_BYTES, "live_alloc.snapshot.bytes", STAT_GAUGE) \ - X(LIVE_ALLOC_CLEARED_STACKS, "live_alloc.snapshot.cleared_stacks", \ + X(LIVE_ALLOC_SNAPSHOT_BYTES, "live_alloc.snapshot.bytes", STAT_GAUGE) \ + X(LIVE_ALLOC_CLEARED_STACKS, "live_alloc.snapshot.cleared_stacks", \ STAT_GAUGE) \ X(LIVE_ALLOC_DROPPED_PIDS, "live_alloc.snapshot.dropped_pids", STAT_GAUGE) diff --git a/src/ddprof_worker.cc b/src/ddprof_worker.cc index 039e2411a..e25ca8e2d 100644 --- a/src/ddprof_worker.cc +++ b/src/ddprof_worker.cc @@ -12,9 +12,9 @@ #include "dso_hdr.hpp" #include "exporter/ddprof_exporter.hpp" #include "live_allocation_snapshot.hpp" -#include "persistent_worker_state.hpp" #include "logger.hpp" #include "perf.hpp" +#include "persistent_worker_state.hpp" #include "pevent_lib.hpp" #include "pprof/ddprof_pprof.hpp" #include "procutils.hpp" @@ -575,13 +575,13 @@ DDRes worker_library_init(DDProfContext &ctx, if (live_alloc_snapshot::read_from_fd( persistent_worker_state->live_alloc_snapshot_fd, snap)) { if (!snap.stacks.empty() || !snap.pids.empty()) { - live_alloc_snapshot::restore_snapshot( - snap, ctx.worker_ctx.live_allocation, - ctx.worker_ctx.us->symbol_hdr); + live_alloc_snapshot::restore_snapshot(snap, + ctx.worker_ctx.live_allocation, + ctx.worker_ctx.us->symbol_hdr); LG_NTC("[live-alloc] Snapshot restored: stacks=%zu pids=%zu " "cleared=%u dropped_pids=%u", - snap.stacks.size(), snap.pids.size(), - snap.cleared_addresses, snap.dropped_pids); + snap.stacks.size(), snap.pids.size(), snap.cleared_addresses, + snap.dropped_pids); } } } diff --git a/src/live_allocation_snapshot.cc b/src/live_allocation_snapshot.cc index aa16694bf..72fa2d69d 100644 --- a/src/live_allocation_snapshot.cc +++ b/src/live_allocation_snapshot.cc @@ -33,18 +33,18 @@ constexpr std::size_t cost_string(std::string_view sv) { } std::size_t cost_funloc(const FunLocPortable &fl) { - return sizeof(uint64_t) * 2 + // ip + elf_addr - sizeof(uint32_t) + // lineno - sizeof(uint64_t) * 3 + // map_low / map_high / map_offset - cost_string(fl.fn_name) + cost_string(fl.fn_system_name) + - cost_string(fl.fn_file) + cost_string(fl.map_filename) + - cost_string(fl.map_build_id); + return sizeof(uint64_t) * 2 + // ip + elf_addr + sizeof(uint32_t) + // lineno + sizeof(uint64_t) * 3 + // map_low / map_high / map_offset + cost_string(fl.fn_name) + cost_string(fl.fn_system_name) + + cost_string(fl.fn_file) + cost_string(fl.map_filename) + + cost_string(fl.map_build_id); } std::size_t cost_stack(const UnwindOutputPortable &uo) { std::size_t c = sizeof(int32_t) * 2 + // pid + tid - cost_string(uo.container_id) + cost_string(uo.exe_name) + - cost_string(uo.thread_name) + sizeof(uint32_t); // n_locs + cost_string(uo.container_id) + cost_string(uo.exe_name) + + cost_string(uo.thread_name) + sizeof(uint32_t); // n_locs for (const auto &fl : uo.locs) { c += cost_funloc(fl); } @@ -56,7 +56,7 @@ constexpr std::size_t k_address_entry_cost = std::size_t cost_pid_entry(const PidEntry &p) { return sizeof(int32_t) * 2 + sizeof(uint32_t) * 3 + - p.addresses.size() * k_address_entry_cost; + p.addresses.size() * k_address_entry_cost; } constexpr std::size_t k_header_cost = sizeof(k_magic) + sizeof(uint32_t) * 5; @@ -104,8 +104,7 @@ FunLocPortable funloc_to_portable(const FunLoc &fl, } if (fl.map_info_idx != k_mapinfo_idx_null && - static_cast(fl.map_info_idx) < - symbol_hdr._mapinfo_table.size()) { + static_cast(fl.map_info_idx) < symbol_hdr._mapinfo_table.size()) { ddog_prof_MappingId2 mid = symbol_hdr._mapinfo_table[fl.map_info_idx]; if (mid) { out.map_low = mid->memory_start; @@ -305,10 +304,9 @@ Snapshot capture_snapshot(const LiveAllocation &live_alloc, } std::vector pid_order(snapshot.pids.size()); std::iota(pid_order.begin(), pid_order.end(), 0u); - std::sort(pid_order.begin(), pid_order.end(), - [&](uint32_t a, uint32_t b) { - return pid_total_value[a] < pid_total_value[b]; - }); + std::sort(pid_order.begin(), pid_order.end(), [&](uint32_t a, uint32_t b) { + return pid_total_value[a] < pid_total_value[b]; + }); std::vector pid_dropped(snapshot.pids.size(), false); for (uint32_t pidx : pid_order) { if (projected <= max_bytes) { @@ -453,8 +451,7 @@ bool deserialize(const uint8_t *data, std::size_t size, Snapshot &out) { out.pids.resize(n_pids); for (auto &p : out.pids) { if (!r.i32(p.watcher_pos) || !r.i32(p.pid) || - !r.u32(p.address_conflict_count) || - !r.u32(p.tracked_address_count)) { + !r.u32(p.address_conflict_count) || !r.u32(p.tracked_address_count)) { return false; } uint32_t n_addr = 0; @@ -485,8 +482,7 @@ bool write_to_fd(int fd, const Snapshot &snapshot) { } std::size_t written = 0; while (written < buf.size()) { - ssize_t const n = - write(fd, buf.data() + written, buf.size() - written); + ssize_t const n = write(fd, buf.data() + written, buf.size() - written); if (n < 0) { if (errno == EINTR) { continue; @@ -567,8 +563,7 @@ UnwindOutput build_cleared_stack(SymbolHdr &symbol_hdr) { } UnwindOutput portable_to_uo(const UnwindOutputPortable &p, - SymbolHdr &symbol_hdr, - LiveAllocation &live_alloc) { + SymbolHdr &symbol_hdr, LiveAllocation &live_alloc) { const auto *dict = symbol_hdr.profiles_dictionary(); UnwindOutput uo; uo.pid = p.pid; @@ -578,9 +573,9 @@ UnwindOutput portable_to_uo(const UnwindOutputPortable &p, uo.thread_name = live_alloc.intern_restored_string(p.thread_name); uo.locs.reserve(p.locs.size()); for (const auto &fl : p.locs) { - ddog_prof_MappingId2 mid = intern_mapping( - dict, fl.map_low, fl.map_high, fl.map_offset, fl.map_filename, - fl.map_build_id); + ddog_prof_MappingId2 mid = + intern_mapping(dict, fl.map_low, fl.map_high, fl.map_offset, + fl.map_filename, fl.map_build_id); symbol_hdr._mapinfo_table.emplace_back(mid); auto const map_idx = static_cast(symbol_hdr._mapinfo_table.size() - 1); @@ -628,11 +623,10 @@ void restore_snapshot(const Snapshot &snapshot, LiveAllocation &live_alloc, for (const auto &p : snapshot.pids) { for (const auto &a : p.addresses) { - const UnwindOutput &uo = - (a.stack_idx == k_cleared_stack_idx) ? cleared_uo - : rebuilt[a.stack_idx]; - live_alloc.register_allocation(uo, a.addr, a.value, p.watcher_pos, - p.pid); + const UnwindOutput &uo = (a.stack_idx == k_cleared_stack_idx) + ? cleared_uo + : rebuilt[a.stack_idx]; + live_alloc.register_allocation(uo, a.addr, a.value, p.watcher_pos, p.pid); } // Carry over the library/profiler tracked-address counters so the // post-restore mismatch warning is anchored at the same baseline. diff --git a/src/perf_mainloop.cc b/src/perf_mainloop.cc index 42a1561bc..894a5d199 100644 --- a/src/perf_mainloop.cc +++ b/src/perf_mainloop.cc @@ -446,8 +446,7 @@ DDRes worker_loop(DDProfContext &ctx, const WorkerAttr *attr, // resume heap tracking without losing in-flight live addresses. int const snap_fd = ctx.worker_ctx.persistent_worker_state->live_alloc_snapshot_fd; - if (snap_fd >= 0 && - context_allocation_profiling_watcher_idx(ctx) != -1) { + if (snap_fd >= 0 && context_allocation_profiling_watcher_idx(ctx) != -1) { auto snap = live_alloc_snapshot::capture_snapshot( ctx.worker_ctx.live_allocation, ctx.worker_ctx.us->symbol_hdr); if (!live_alloc_snapshot::write_to_fd(snap_fd, snap)) { @@ -460,8 +459,8 @@ DDRes worker_loop(DDProfContext &ctx, const WorkerAttr *attr, } LG_NTC("[live-alloc] Snapshot written: stacks=%zu pids=%zu " "cleared=%u dropped_pids=%u", - snap.stacks.size(), snap.pids.size(), - snap.cleared_addresses, snap.dropped_pids); + snap.stacks.size(), snap.pids.size(), snap.cleared_addresses, + snap.dropped_pids); ddprof_stats_set(STATS_LIVE_ALLOC_CLEARED_STACKS, snap.cleared_addresses); ddprof_stats_set(STATS_LIVE_ALLOC_DROPPED_PIDS, snap.dropped_pids); diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 1df425811..55dabc786 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -399,12 +399,8 @@ add_unit_test(tracepoint_config-ut tracepoint_config-ut.cc ../src/tracepoint_con add_unit_test(live_allocation-ut live_allocation-ut.cc ../src/live_allocation.cc) add_unit_test( - live_allocation_snapshot-ut - live_allocation_snapshot-ut.cc - ../src/live_allocation_snapshot.cc - ../src/live_allocation.cc - ../src/ddog_profiling_utils.cc - ../src/demangler/demangler.cc + live_allocation_snapshot-ut live_allocation_snapshot-ut.cc ../src/live_allocation_snapshot.cc + ../src/live_allocation.cc ../src/ddog_profiling_utils.cc ../src/demangler/demangler.cc LIBRARIES Datadog::Profiling llvm-demangle) add_unit_test(ddprof_process-ut ddprof_process-ut.cc ${PROCESS_SRC} LIBRARIES ${ELFUTILS_LIBRARIES}) diff --git a/test/live_allocation_snapshot-ut.cc b/test/live_allocation_snapshot-ut.cc index 03006cc31..fe28faf1b 100644 --- a/test/live_allocation_snapshot-ut.cc +++ b/test/live_allocation_snapshot-ut.cc @@ -19,8 +19,8 @@ Snapshot make_sample_snapshot() { uo.container_id = "ctr-abc"; uo.exe_name = "/usr/bin/myapp"; uo.thread_name = "worker-0"; - uo.locs.push_back({/*ip*/ 0x1000, /*elf_addr*/ 0x500, /*lineno*/ 42, - "malloc", "__libc_malloc", "malloc.c", + uo.locs.push_back({/*ip*/ 0x1000, /*elf_addr*/ 0x500, /*lineno*/ 42, "malloc", + "__libc_malloc", "malloc.c", /*map_low*/ 0x10000, /*map_high*/ 0x20000, /*map_offset*/ 0, "/lib/libc.so", "abcdef0123"}); uo.locs.push_back({0x2000, 0x600, 0, "main", "", "main.c", 0x30000, 0x40000, From 541c2bc0a3fe3c811988e6d4c1e69c06a430acb7 Mon Sep 17 00:00:00 2001 From: Erwan Viollet Date: Tue, 26 May 2026 15:31:43 +0200 Subject: [PATCH 4/6] live-alloc-snapshot: fix clang-tidy violations + document caching layers clang-tidy errors flagged by CI: - readability-math-missing-parentheses on sizeof(T) * N + ... arithmetic - cppcoreguidelines-avoid-const-or-ref-data-members on Writer::_out (switch the reference member to a non-owning pointer) - readability-uppercase-literal-suffix (0u -> 0U) - misc-const-correctness on loop indices (uint32_t idx -> uint32_t const idx) Also adds a TODO block above portable_to_uo() spelling out the four overlapping caches (ProfilesDictionary, SymbolTable/MapInfoTable, RuntimeSymbolLookup et al., _restored_strings), the duplicate-entry cost we accept on the restore path, and how a future PR can unify the model by making FunLoc identity content-based on libdatadog handles. --- src/live_allocation_snapshot.cc | 73 +++++++++++++++++++++++++-------- 1 file changed, 57 insertions(+), 16 deletions(-) diff --git a/src/live_allocation_snapshot.cc b/src/live_allocation_snapshot.cc index 72fa2d69d..77589534e 100644 --- a/src/live_allocation_snapshot.cc +++ b/src/live_allocation_snapshot.cc @@ -33,16 +33,16 @@ constexpr std::size_t cost_string(std::string_view sv) { } std::size_t cost_funloc(const FunLocPortable &fl) { - return sizeof(uint64_t) * 2 + // ip + elf_addr - sizeof(uint32_t) + // lineno - sizeof(uint64_t) * 3 + // map_low / map_high / map_offset + return (sizeof(uint64_t) * 2) + // ip + elf_addr + sizeof(uint32_t) + // lineno + (sizeof(uint64_t) * 3) + // map_low / map_high / map_offset cost_string(fl.fn_name) + cost_string(fl.fn_system_name) + cost_string(fl.fn_file) + cost_string(fl.map_filename) + cost_string(fl.map_build_id); } std::size_t cost_stack(const UnwindOutputPortable &uo) { - std::size_t c = sizeof(int32_t) * 2 + // pid + tid + std::size_t c = (sizeof(int32_t) * 2) + // pid + tid cost_string(uo.container_id) + cost_string(uo.exe_name) + cost_string(uo.thread_name) + sizeof(uint32_t); // n_locs for (const auto &fl : uo.locs) { @@ -55,11 +55,11 @@ constexpr std::size_t k_address_entry_cost = sizeof(uint64_t) + sizeof(int64_t) + sizeof(uint32_t); std::size_t cost_pid_entry(const PidEntry &p) { - return sizeof(int32_t) * 2 + sizeof(uint32_t) * 3 + - p.addresses.size() * k_address_entry_cost; + return (sizeof(int32_t) * 2) + (sizeof(uint32_t) * 3) + + (p.addresses.size() * k_address_entry_cost); } -constexpr std::size_t k_header_cost = sizeof(k_magic) + sizeof(uint32_t) * 5; +constexpr std::size_t k_header_cost = sizeof(k_magic) + (sizeof(uint32_t) * 5); std::size_t estimate_size(const Snapshot &s) { std::size_t c = k_header_cost; @@ -136,7 +136,7 @@ UnwindOutputPortable uo_to_portable(const UnwindOutput &uo, class Writer { public: - explicit Writer(std::vector &out) : _out(out) {} + explicit Writer(std::vector *out) : _out(out) {} void u32(uint32_t v) { raw(&v, sizeof(v)); } void i32(int32_t v) { raw(&v, sizeof(v)); } @@ -148,11 +148,11 @@ class Writer { } void raw(const void *p, std::size_t n) { const auto *bytes = static_cast(p); - _out.insert(_out.end(), bytes, bytes + n); + _out->insert(_out->end(), bytes, bytes + n); } private: - std::vector &_out; + std::vector *_out; }; class Reader { @@ -268,7 +268,7 @@ Snapshot capture_snapshot(const LiveAllocation &live_alloc, } } std::vector stack_order(snapshot.stacks.size()); - std::iota(stack_order.begin(), stack_order.end(), 0u); + std::iota(stack_order.begin(), stack_order.end(), 0U); std::sort(stack_order.begin(), stack_order.end(), [&](uint32_t a, uint32_t b) { return stack_total_value[a] < stack_total_value[b]; @@ -281,7 +281,7 @@ Snapshot capture_snapshot(const LiveAllocation &live_alloc, std::vector stack_dropped(snapshot.stacks.size(), false); bool cleared_stack_needed = false; - for (uint32_t idx : stack_order) { + for (uint32_t const idx : stack_order) { if (projected <= max_bytes) { break; } @@ -303,12 +303,12 @@ Snapshot capture_snapshot(const LiveAllocation &live_alloc, } } std::vector pid_order(snapshot.pids.size()); - std::iota(pid_order.begin(), pid_order.end(), 0u); + std::iota(pid_order.begin(), pid_order.end(), 0U); std::sort(pid_order.begin(), pid_order.end(), [&](uint32_t a, uint32_t b) { return pid_total_value[a] < pid_total_value[b]; }); std::vector pid_dropped(snapshot.pids.size(), false); - for (uint32_t pidx : pid_order) { + for (uint32_t const pidx : pid_order) { if (projected <= max_bytes) { break; } @@ -366,7 +366,7 @@ Snapshot capture_snapshot(const LiveAllocation &live_alloc, void serialize(const Snapshot &s, std::vector &out) { out.clear(); out.reserve(estimate_size(s)); - Writer w(out); + Writer w(&out); w.raw(k_magic.data(), k_magic.size()); w.u32(k_version); w.u32(static_cast(s.stacks.size())); @@ -548,7 +548,7 @@ UnwindOutput build_cleared_stack(SymbolHdr &symbol_hdr) { dict, k_common_frame_names[SymbolErrors::live_alloc_cleared], {}); ddog_prof_MappingId2 mp = intern_mapping(dict, 0, 0, 0, {}, {}); - symbol_hdr._symbol_table.emplace_back(0u, fn); + symbol_hdr._symbol_table.emplace_back(0U, fn); auto const symbol_idx = static_cast(symbol_hdr._symbol_table.size() - 1); symbol_hdr._mapinfo_table.emplace_back(mp); @@ -562,6 +562,47 @@ UnwindOutput build_cleared_stack(SymbolHdr &symbol_hdr) { return uo; } +// TODO(r1viollet): unify caching strategies across the profiler. +// +// Today there are four overlapping caches that all describe symbols and +// mappings, and they don't talk to each other on the restore path: +// +// L1 libdatadog ProfilesDictionary : interns (Function2, Mapping2, +// string) by content; returns stable +// opaque handles (FunctionId2*, +// MappingId2*). +// L2 SymbolTable / MapInfoTable : append-only vectors of +// {lineno, FunctionId2*} and +// MappingId2*. FunLoc carries L2 +// indices. +// L3 RuntimeSymbolLookup, : (pid, raw addr) -> L2 index, owned +// DsoSymbolLookup, by SymbolHdr, populated by the +// MapInfoLookup, normal unwind path only. +// BaseFrameSymbolLookup, +// CommonSymbolLookup +// L4 LiveAllocation._restored_strings : backing storage for the three +// string_views in restored +// UnwindOutputs (container_id, +// exe_name, thread_name). +// +// On restore we re-intern through L1 (which correctly dedupes) and append +// fresh L2 rows, but we never touch L3. Consequence: when the same +// allocation site fires again after a restart via a natural sample, L3 +// misses and the normal path appends *another* L2 row pointing at the +// same L1 handle. The resulting FunLoc has a different SymbolIdx_t / +// MapInfoIdx_t than the restored one, so UnwindOutput::operator<=> treats +// the two as distinct keys in LiveAllocation._unique_stacks. The exported +// pprof is unaffected (libdatadog re-collapses via L1), but we carry up +// to one duplicate aggregator entry per restored stack until natural +// deallocations evict it. +// +// The clean fix is to make FunLoc identity content-based, keyed on the L1 +// handle pointers (function_id, mapping_id, ip, elf_addr) instead of L2 +// indices. That removes the duplication, lets restored and natural +// UnwindOutputs collapse together, and would also let us drop the L2 +// indirection entirely for cold paths. It is a cross-cutting change +// (unwind_helper, ddog_profiling_utils, pprof emission) and should be its +// own PR. UnwindOutput portable_to_uo(const UnwindOutputPortable &p, SymbolHdr &symbol_hdr, LiveAllocation &live_alloc) { const auto *dict = symbol_hdr.profiles_dictionary(); From deaabdc64f1b2113b958992198edbb65f9b5f1dd Mon Sep 17 00:00:00 2001 From: Erwan Viollet Date: Tue, 26 May 2026 15:49:02 +0200 Subject: [PATCH 5/6] live-alloc: env-var snapshot budget + simple_malloc --unique-sites - DD_PROFILING_NATIVE_LIVE_ALLOC_SNAPSHOT_MAX_BYTES overrides the per-capture budget. Capped at the hard ceiling. Lets tests force the cleared-stack remap path and the dropped-pid fallback without rebuilding the binary. - simple_malloc --unique-sites N spreads allocations across up to 256 templated alloc_at_site instantiations, each producing a distinct innermost frame to the unwinder. Used to stress-test the snapshot path with many unique stacks per cycle. Verified locally at three budget levels: full preservation, cleared remap (stacks=30 cleared=582 dropped_pids=0 at 240 KB), and pid drop (dropped_pids=1 at 8 KB). All paths keep 'Tracked address count mismatch' warnings at zero in the steady state. --- src/perf_mainloop.cc | 22 ++++++- test/simple_malloc.cc | 131 ++++++++++++++++++++++++++++++++---------- 2 files changed, 123 insertions(+), 30 deletions(-) diff --git a/src/perf_mainloop.cc b/src/perf_mainloop.cc index 894a5d199..f1e281eb2 100644 --- a/src/perf_mainloop.cc +++ b/src/perf_mainloop.cc @@ -447,8 +447,28 @@ DDRes worker_loop(DDProfContext &ctx, const WorkerAttr *attr, int const snap_fd = ctx.worker_ctx.persistent_worker_state->live_alloc_snapshot_fd; if (snap_fd >= 0 && context_allocation_profiling_watcher_idx(ctx) != -1) { + // Allow tests / debugging to override the snapshot budget without + // touching the CLI. Honored at every capture; capped at the hard + // ceiling defined in the snapshot module. + std::size_t max_bytes = + live_alloc_snapshot::k_default_max_snapshot_bytes; + if (const char *env = std::getenv( + "DD_PROFILING_NATIVE_LIVE_ALLOC_SNAPSHOT_MAX_BYTES")) { + char *end = nullptr; + unsigned long long const v = std::strtoull(env, &end, 10); + if (end != env && v > 0) { + max_bytes = std::min( + static_cast(v), + live_alloc_snapshot::k_hard_max_snapshot_bytes); + } else { + LG_WRN("[live-alloc] Invalid " + "DD_PROFILING_NATIVE_LIVE_ALLOC_SNAPSHOT_MAX_BYTES=%s", + env); + } + } auto snap = live_alloc_snapshot::capture_snapshot( - ctx.worker_ctx.live_allocation, ctx.worker_ctx.us->symbol_hdr); + ctx.worker_ctx.live_allocation, ctx.worker_ctx.us->symbol_hdr, + max_bytes); if (!live_alloc_snapshot::write_to_fd(snap_fd, snap)) { LG_WRN("[live-alloc] Failed to write snapshot before restart"); } else { diff --git a/test/simple_malloc.cc b/test/simple_malloc.cc index 2c55165c4..d284a9f27 100644 --- a/test/simple_malloc.cc +++ b/test/simple_malloc.cc @@ -4,6 +4,7 @@ // Datadog, Inc. #include +#include #include #include #include @@ -18,6 +19,7 @@ #include #include #include +#include #include "clocks.hpp" #include "ddprof_base.hpp" @@ -95,9 +97,60 @@ struct Options { bool use_shared_library = false; bool avoid_dlopen_hook = false; bool stop{false}; + uint32_t unique_sites{0}; }; // NOLINTBEGIN(clang-analyzer-unix.Malloc) + +// One templated allocation site per Tag. Each instantiation has a distinct +// mangled name, so the unwinder reports it as a distinct innermost frame — +// useful for stress-testing the LiveAllocation snapshot path with many +// unique stacks without rewriting the loop body. NOINLINE keeps the symbol +// alive and the frame distinguishable. +template +DDPROF_NOINLINE void alloc_at_site(uint64_t size, + std::deque &live_allocations, + uint64_t keep_live, + uint32_t skip_free_target, + unsigned &skip_free_counter, + uint64_t &nb_alloc, + uint64_t &alloc_bytes) { + void *p = nullptr; + if (size) { + p = malloc(size); + ++nb_alloc; + alloc_bytes += size; + } + DoNotOptimize(p); + if (keep_live > 0) { + if (p != nullptr) { + live_allocations.push_back(p); + while (live_allocations.size() > keep_live) { + free(live_allocations.front()); + live_allocations.pop_front(); + } + } + } else { + if (skip_free_counter++ >= skip_free_target) { + free(p); + skip_free_counter = 0; + } + } +} + +using AllocSiteFn = void (*)(uint64_t, std::deque &, uint64_t, + uint32_t, unsigned &, uint64_t &, uint64_t &); + +template +constexpr auto make_alloc_site_table(std::index_sequence) { + return std::array{ + &alloc_at_site(Is)>...}; +} + +constexpr std::size_t k_max_unique_alloc_sites = 256; +constexpr auto k_alloc_site_table = + make_alloc_site_table(std::make_index_sequence{}); + extern "C" DDPROF_NOINLINE void do_lot_of_allocations(const Options &options, Stats &stats) { uint64_t nb_alloc{0}; @@ -108,38 +161,50 @@ extern "C" DDPROF_NOINLINE void do_lot_of_allocations(const Options &options, auto start_cpu = ThreadCpuClock::now(); unsigned skip_free = 0; std::deque live_allocations; + uint32_t const nb_sites = std::min( + options.unique_sites, static_cast(k_alloc_site_table.size())); for (uint64_t i = 0; i < options.loop_count; ++i) { - void *p = nullptr; - if (options.malloc_size) { - p = malloc(options.malloc_size); - ++nb_alloc; - alloc_bytes += options.malloc_size; - } - // NOLINTNEXTLINE(clang-analyzer-unix.Malloc) - DoNotOptimize(p); - void *p2; - if (options.realloc_size) { - p2 = realloc(p, options.realloc_size); - ++nb_alloc; - alloc_bytes += options.realloc_size; + if (nb_sites > 0) { + // Pick a site deterministically by index; avoids the per-iteration + // cost of an RNG and still produces a uniform distribution. + std::size_t const site = + static_cast((i * 2654435761ULL) % nb_sites); + k_alloc_site_table[site](options.malloc_size, live_allocations, + options.keep_live_allocations, options.skip_free, + skip_free, nb_alloc, alloc_bytes); } else { - p2 = p; - } - // NOLINTNEXTLINE(clang-analyzer-unix.Malloc) - DoNotOptimize(p2); - - if (options.keep_live_allocations > 0) { - if (p2 != nullptr) { - live_allocations.push_back(p2); - while (live_allocations.size() > options.keep_live_allocations) { - free(live_allocations.front()); - live_allocations.pop_front(); - } + void *p = nullptr; + if (options.malloc_size) { + p = malloc(options.malloc_size); + ++nb_alloc; + alloc_bytes += options.malloc_size; } - } else { - if (skip_free++ >= options.skip_free) { - free(p2); - skip_free = 0; + // NOLINTNEXTLINE(clang-analyzer-unix.Malloc) + DoNotOptimize(p); + void *p2; + if (options.realloc_size) { + p2 = realloc(p, options.realloc_size); + ++nb_alloc; + alloc_bytes += options.realloc_size; + } else { + p2 = p; + } + // NOLINTNEXTLINE(clang-analyzer-unix.Malloc) + DoNotOptimize(p2); + + if (options.keep_live_allocations > 0) { + if (p2 != nullptr) { + live_allocations.push_back(p2); + while (live_allocations.size() > options.keep_live_allocations) { + free(live_allocations.front()); + live_allocations.pop_front(); + } + } + } else { + if (skip_free++ >= options.skip_free) { + free(p2); + skip_free = 0; + } } } @@ -311,6 +376,14 @@ int main(int argc, char *argv[]) { "live heap size") ->default_val(0) ->check(CLI::NonNegativeNumber); + app.add_option( + "--unique-sites", opts.unique_sites, + "Spread allocations across N distinct templated call sites " + "(0 = single site, default). Used to stress-test profilers " + "with many distinct unwind outputs. Capped at the templated " + "table size (256).") + ->default_val(0) + ->check(CLI::NonNegativeNumber); app.add_option( "--timeout", opts.timeout_duration, "Timeout after N milliseconds") From a74f2b018a04d71db0eb4662aaec89b81e8572e7 Mon Sep 17 00:00:00 2001 From: Erwan Viollet Date: Wed, 27 May 2026 17:21:17 +0200 Subject: [PATCH 6/6] style: clang-format simple_malloc.cc unique-sites additions --- test/simple_malloc.cc | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/test/simple_malloc.cc b/test/simple_malloc.cc index d284a9f27..32def4292 100644 --- a/test/simple_malloc.cc +++ b/test/simple_malloc.cc @@ -108,13 +108,11 @@ struct Options { // unique stacks without rewriting the loop body. NOINLINE keeps the symbol // alive and the frame distinguishable. template -DDPROF_NOINLINE void alloc_at_site(uint64_t size, - std::deque &live_allocations, - uint64_t keep_live, - uint32_t skip_free_target, - unsigned &skip_free_counter, - uint64_t &nb_alloc, - uint64_t &alloc_bytes) { +DDPROF_NOINLINE void +alloc_at_site(uint64_t size, std::deque &live_allocations, + uint64_t keep_live, uint32_t skip_free_target, + unsigned &skip_free_counter, uint64_t &nb_alloc, + uint64_t &alloc_bytes) { void *p = nullptr; if (size) { p = malloc(size); @@ -138,8 +136,8 @@ DDPROF_NOINLINE void alloc_at_site(uint64_t size, } } -using AllocSiteFn = void (*)(uint64_t, std::deque &, uint64_t, - uint32_t, unsigned &, uint64_t &, uint64_t &); +using AllocSiteFn = void (*)(uint64_t, std::deque &, uint64_t, uint32_t, + unsigned &, uint64_t &, uint64_t &); template constexpr auto make_alloc_site_table(std::index_sequence) { @@ -376,12 +374,11 @@ int main(int argc, char *argv[]) { "live heap size") ->default_val(0) ->check(CLI::NonNegativeNumber); - app.add_option( - "--unique-sites", opts.unique_sites, - "Spread allocations across N distinct templated call sites " - "(0 = single site, default). Used to stress-test profilers " - "with many distinct unwind outputs. Capped at the templated " - "table size (256).") + app.add_option("--unique-sites", opts.unique_sites, + "Spread allocations across N distinct templated call sites " + "(0 = single site, default). Used to stress-test profilers " + "with many distinct unwind outputs. Capped at the templated " + "table size (256).") ->default_val(0) ->check(CLI::NonNegativeNumber);