diff --git a/include/common_symbol_errors.hpp b/include/common_symbol_errors.hpp index 0f5c44c7..05440383 100644 --- a/include/common_symbol_errors.hpp +++ b/include/common_symbol_errors.hpp @@ -14,10 +14,14 @@ using namespace std::string_view_literals; namespace ddprof { -inline constexpr std::array k_common_frame_names = { - "[truncated]"sv, "[unknown mapping]"sv, - "[unwind failure]"sv, "[incomplete]"sv, - "[lost]"sv, "[maximum pids]"sv}; +inline constexpr std::array k_common_frame_names = { + "[truncated]"sv, + "[unknown mapping]"sv, + "[unwind failure]"sv, + "[incomplete]"sv, + "[lost]"sv, + "[maximum pids]"sv, + "[live-alloc cleared]"sv}; enum SymbolErrors : std::uint8_t { truncated_stack = 0, @@ -26,6 +30,7 @@ enum SymbolErrors : std::uint8_t { incomplete_stack, lost_event, max_pids, + live_alloc_cleared, }; } // namespace ddprof diff --git a/include/ddprof_stats.hpp b/include/ddprof_stats.hpp index 32b304fd..000a4865 100644 --- a/include/ddprof_stats.hpp +++ b/include/ddprof_stats.hpp @@ -46,7 +46,11 @@ namespace ddprof { X(PPROF_SIZE, "pprof.size", STAT_GAUGE) \ X(PROFILE_DURATION, "profile.duration_ms", STAT_GAUGE) \ X(AGGREGATION_AVG_TIME, "aggregation.avg_time_ns", STAT_GAUGE) \ - X(BACKPOPULATE_COUNT, "backpopulate.count", STAT_GAUGE) + X(BACKPOPULATE_COUNT, "backpopulate.count", STAT_GAUGE) \ + X(LIVE_ALLOC_SNAPSHOT_BYTES, "live_alloc.snapshot.bytes", STAT_GAUGE) \ + X(LIVE_ALLOC_CLEARED_STACKS, "live_alloc.snapshot.cleared_stacks", \ + STAT_GAUGE) \ + X(LIVE_ALLOC_DROPPED_PIDS, "live_alloc.snapshot.dropped_pids", STAT_GAUGE) // Expand the enum/index for the individual stats enum DDPROF_STATS : uint8_t { STATS_TABLE(X_ENUM) STATS_LEN }; diff --git a/include/live_allocation.hpp b/include/live_allocation.hpp index d84c03a8..d2e31def 100644 --- a/include/live_allocation.hpp +++ b/include/live_allocation.hpp @@ -11,6 +11,8 @@ #include "unwind_output_hash.hpp" #include +#include +#include #include #include @@ -55,6 +57,23 @@ class LiveAllocation { // NOLINTNEXTLINE(misc-non-private-member-variables-in-classes) WatcherVector _watcher_vector; + // Owns the string storage backing string_views inside UnwindOutputs that + // were restored from a snapshot. Live entries created from incoming + // allocation events continue to use views into Process / base-frame + // tables; this storage is only used by snapshot-restore. + // NOLINTNEXTLINE(misc-non-private-member-variables-in-classes) + std::deque _restored_strings; + + // Returns a string_view backed by _restored_strings. Empty input maps + // to an empty view without allocation. + std::string_view intern_restored_string(std::string_view sv) { + if (sv.empty()) { + return {}; + } + _restored_strings.emplace_back(sv); + return _restored_strings.back(); + } + void register_library_state(int watcher_pos, pid_t pid, uint32_t address_conflict_count, uint32_t tracked_address_count, diff --git a/include/live_allocation_snapshot.hpp b/include/live_allocation_snapshot.hpp new file mode 100644 index 00000000..d454d19b --- /dev/null +++ b/include/live_allocation_snapshot.hpp @@ -0,0 +1,129 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. This product includes software +// developed at Datadog (https://www.datadoghq.com/). Copyright 2021-Present +// Datadog, Inc. + +// Live-allocation snapshot: portable serialisation of LiveAllocation across +// worker resets. +// +// The aggregator state in LiveAllocation refers to per-worker handles +// (libdatadog dictionary pointers, indices into SymbolHdr tables, string_views +// into Process/base-frame caches). None of these survive a worker fork, so +// before a worker restart we capture a fully self-owned snapshot via +// `capture_snapshot`, serialise it through a memfd held by the parent, and +// re-intern everything into the freshly-built tables of the new worker via +// `restore_snapshot`. + +#pragma once + +#include "ddprof_defs.hpp" +#include "live_allocation.hpp" + +#include +#include +#include +#include +#include + +namespace ddprof { + +struct SymbolHdr; + +namespace live_alloc_snapshot { + +// Fully self-owned mirror of one FunLoc. +struct FunLocPortable { + ProcessAddress_t ip{}; + ElfAddress_t elf_addr{}; + uint32_t lineno{}; + std::string fn_name; + std::string fn_system_name; + std::string fn_file; + ElfAddress_t map_low{}; + ElfAddress_t map_high{}; + Offset_t map_offset{}; + std::string map_filename; + std::string map_build_id; +}; + +// Fully self-owned mirror of one UnwindOutput. +struct UnwindOutputPortable { + std::vector locs; + std::string container_id; + std::string exe_name; + std::string thread_name; + int pid{}; + int tid{}; +}; + +struct AddressEntry { + uintptr_t addr{}; + int64_t value{}; + uint32_t stack_idx{}; // index into Snapshot::stacks +}; + +struct PidEntry { + int watcher_pos{}; + pid_t pid{}; + uint32_t address_conflict_count{}; + uint32_t tracked_address_count{}; + std::vector addresses; +}; + +// Special stack index meaning "stack was dropped during budget enforcement". +// On restore this is materialised as a synthetic single-frame stack pointing +// at the [live-alloc cleared] common frame, ensuring per-PID heap totals are +// preserved even when detail is shed. +inline constexpr uint32_t k_cleared_stack_idx = + std::numeric_limits::max(); + +struct Snapshot { + std::vector stacks; + std::vector pids; + // How many distinct allocation addresses had their stack remapped to the + // synthetic cleared stack because of the size budget. + uint32_t cleared_addresses{}; + // How many (watcher_pos, pid) entries we dropped entirely because even + // minimal accounting did not fit in the budget. + uint32_t dropped_pids{}; +}; + +// Default and hard-ceiling sizes for the serialised snapshot. +inline constexpr std::size_t k_default_max_snapshot_bytes = 4UL * 1024 * 1024; +inline constexpr std::size_t k_hard_max_snapshot_bytes = 20UL * 1024 * 1024; + +// Read a Snapshot out of an in-memory LiveAllocation, by resolving symbol / +// mapping IDs through `symbol_hdr`. The result has no references to the +// originating worker. +// +// If `max_bytes` is non-zero and the projected serialised size exceeds it, +// the snapshot is degraded in a value-preserving way: low-value stacks are +// remapped to the synthetic cleared stack first, and finally whole pids are +// dropped from the lowest aggregate value upwards. The `cleared_addresses` / +// `dropped_pids` counters in the result report how aggressive the degradation +// had to be. +Snapshot capture_snapshot(const LiveAllocation &live_alloc, + const SymbolHdr &symbol_hdr, + std::size_t max_bytes = k_default_max_snapshot_bytes); + +// Serialise / deserialise a Snapshot to/from a binary blob. +void serialize(const Snapshot &snapshot, std::vector &out); +bool deserialize(const uint8_t *data, std::size_t size, Snapshot &out); + +// Write the binary blob into `fd` (memfd). Truncates fd to the blob size on +// success. On error, returns false and leaves fd in an unspecified state. +bool write_to_fd(int fd, const Snapshot &snapshot); + +// Read a snapshot blob out of `fd`. Returns false if fd is empty / unreadable +// / malformed. +bool read_from_fd(int fd, Snapshot &out); + +// Re-intern a snapshot into `symbol_hdr` and populate the empty `live_alloc` +// state. Restored UnwindOutputs hold string_views into +// LiveAllocation::_restored_strings. +void restore_snapshot(const Snapshot &snapshot, LiveAllocation &live_alloc, + SymbolHdr &symbol_hdr); + +} // namespace live_alloc_snapshot + +} // namespace ddprof diff --git a/include/persistent_worker_state.hpp b/include/persistent_worker_state.hpp index 87a8c19c..7f033760 100644 --- a/include/persistent_worker_state.hpp +++ b/include/persistent_worker_state.hpp @@ -15,6 +15,13 @@ struct PersistentWorkerState { // Why not volatile ? Although several threads can update the number of // cycles, by design Only a single thread reads and writes to this variable. uint32_t profile_seq; + // memfd holding the most recent serialized live-allocation snapshot. + // The fd is opened by the parent in main_loop and inherited by every + // worker child. A child that is restarting writes its serialized + // LiveAllocation state to this fd just before exiting; the next child + // reads it during worker_library_init so live-heap tracking survives + // worker resets. -1 if snapshotting is disabled. + int live_alloc_snapshot_fd; }; } // namespace ddprof diff --git a/src/ddprof_worker.cc b/src/ddprof_worker.cc index d64a6f4a..e25ca8e2 100644 --- a/src/ddprof_worker.cc +++ b/src/ddprof_worker.cc @@ -6,12 +6,15 @@ #include "ddprof_worker.hpp" #include "ddprof_context.hpp" +#include "ddprof_context_lib.hpp" #include "ddprof_perf_event.hpp" #include "ddprof_stats.hpp" #include "dso_hdr.hpp" #include "exporter/ddprof_exporter.hpp" +#include "live_allocation_snapshot.hpp" #include "logger.hpp" #include "perf.hpp" +#include "persistent_worker_state.hpp" #include "pevent_lib.hpp" #include "pprof/ddprof_pprof.hpp" #include "procutils.hpp" @@ -561,6 +564,27 @@ DDRes worker_library_init(DDProfContext &ctx, ctx.worker_ctx.exp[1] = nullptr; ctx.worker_ctx.pprof[0] = nullptr; ctx.worker_ctx.pprof[1] = nullptr; + + // If the previous worker handed us a live-allocation snapshot, replay + // it before the poll loop starts draining new events. Restored entries + // use string storage owned by LiveAllocation itself. + if (persistent_worker_state && + persistent_worker_state->live_alloc_snapshot_fd >= 0 && + context_allocation_profiling_watcher_idx(ctx) != -1) { + live_alloc_snapshot::Snapshot snap; + if (live_alloc_snapshot::read_from_fd( + persistent_worker_state->live_alloc_snapshot_fd, snap)) { + if (!snap.stacks.empty() || !snap.pids.empty()) { + live_alloc_snapshot::restore_snapshot(snap, + ctx.worker_ctx.live_allocation, + ctx.worker_ctx.us->symbol_hdr); + LG_NTC("[live-alloc] Snapshot restored: stacks=%zu pids=%zu " + "cleared=%u dropped_pids=%u", + snap.stacks.size(), snap.pids.size(), snap.cleared_addresses, + snap.dropped_pids); + } + } + } } CatchExcept2DDRes(); return {}; diff --git a/src/live_allocation_snapshot.cc b/src/live_allocation_snapshot.cc new file mode 100644 index 00000000..77589534 --- /dev/null +++ b/src/live_allocation_snapshot.cc @@ -0,0 +1,687 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. This product includes software +// developed at Datadog (https://www.datadoghq.com/). Copyright 2021-Present +// Datadog, Inc. + +#include "live_allocation_snapshot.hpp" + +#include "common_symbol_errors.hpp" +#include "ddog_profiling_utils.hpp" +#include "ddprof_file_info-i.hpp" +#include "logger.hpp" +#include "symbol_hdr.hpp" + +#include +#include +#include +#include +#include +#include +#include + +namespace ddprof::live_alloc_snapshot { + +namespace { + +constexpr std::array k_magic = {'D', 'D', 'L', 'A'}; +constexpr uint32_t k_version = 1; + +// -- byte-cost estimation ----------------------------------------------------- + +constexpr std::size_t cost_string(std::string_view sv) { + return sizeof(uint32_t) + sv.size(); +} + +std::size_t cost_funloc(const FunLocPortable &fl) { + return (sizeof(uint64_t) * 2) + // ip + elf_addr + sizeof(uint32_t) + // lineno + (sizeof(uint64_t) * 3) + // map_low / map_high / map_offset + cost_string(fl.fn_name) + cost_string(fl.fn_system_name) + + cost_string(fl.fn_file) + cost_string(fl.map_filename) + + cost_string(fl.map_build_id); +} + +std::size_t cost_stack(const UnwindOutputPortable &uo) { + std::size_t c = (sizeof(int32_t) * 2) + // pid + tid + cost_string(uo.container_id) + cost_string(uo.exe_name) + + cost_string(uo.thread_name) + sizeof(uint32_t); // n_locs + for (const auto &fl : uo.locs) { + c += cost_funloc(fl); + } + return c; +} + +constexpr std::size_t k_address_entry_cost = + sizeof(uint64_t) + sizeof(int64_t) + sizeof(uint32_t); + +std::size_t cost_pid_entry(const PidEntry &p) { + return (sizeof(int32_t) * 2) + (sizeof(uint32_t) * 3) + + (p.addresses.size() * k_address_entry_cost); +} + +constexpr std::size_t k_header_cost = sizeof(k_magic) + (sizeof(uint32_t) * 5); + +std::size_t estimate_size(const Snapshot &s) { + std::size_t c = k_header_cost; + for (const auto &st : s.stacks) { + c += cost_stack(st); + } + for (const auto &p : s.pids) { + c += cost_pid_entry(p); + } + return c; +} + +// -- read-back helpers (libdatadog dictionary) -------------------------------- + +std::string copy_string(const ddog_prof_ProfilesDictionary *dict, + ddog_prof_StringId2 id) { + if (!id) { + return {}; + } + return std::string(get_string(dict, id)); +} + +FunLocPortable funloc_to_portable(const FunLoc &fl, + const SymbolHdr &symbol_hdr) { + const auto *dict = symbol_hdr.profiles_dictionary(); + FunLocPortable out; + out.ip = fl.ip; + out.elf_addr = fl.elf_addr; + + if (fl.symbol_idx != k_symbol_idx_null && + static_cast(fl.symbol_idx) < symbol_hdr._symbol_table.size()) { + const Symbol &sym = symbol_hdr._symbol_table[fl.symbol_idx]; + out.lineno = sym._lineno; + if (sym._function_id) { + // Read fields directly from the libdatadog Function2 struct. + // This is the same pattern already used by + // get_location2_function_name() / get_location2_mapping_filename(). + out.fn_name = copy_string(dict, sym._function_id->name); + out.fn_system_name = copy_string(dict, sym._function_id->system_name); + out.fn_file = copy_string(dict, sym._function_id->file_name); + } + } + + if (fl.map_info_idx != k_mapinfo_idx_null && + static_cast(fl.map_info_idx) < symbol_hdr._mapinfo_table.size()) { + ddog_prof_MappingId2 mid = symbol_hdr._mapinfo_table[fl.map_info_idx]; + if (mid) { + out.map_low = mid->memory_start; + out.map_high = mid->memory_limit; + out.map_offset = mid->file_offset; + out.map_filename = copy_string(dict, mid->filename); + out.map_build_id = copy_string(dict, mid->build_id); + } + } + return out; +} + +UnwindOutputPortable uo_to_portable(const UnwindOutput &uo, + const SymbolHdr &symbol_hdr) { + UnwindOutputPortable p; + p.pid = uo.pid; + p.tid = uo.tid; + p.container_id = std::string(uo.container_id); + p.exe_name = std::string(uo.exe_name); + p.thread_name = std::string(uo.thread_name); + p.locs.reserve(uo.locs.size()); + for (const auto &fl : uo.locs) { + p.locs.emplace_back(funloc_to_portable(fl, symbol_hdr)); + } + return p; +} + +// -- binary writer / reader --------------------------------------------------- + +class Writer { +public: + explicit Writer(std::vector *out) : _out(out) {} + + void u32(uint32_t v) { raw(&v, sizeof(v)); } + void i32(int32_t v) { raw(&v, sizeof(v)); } + void u64(uint64_t v) { raw(&v, sizeof(v)); } + void i64(int64_t v) { raw(&v, sizeof(v)); } + void str(std::string_view sv) { + u32(static_cast(sv.size())); + raw(sv.data(), sv.size()); + } + void raw(const void *p, std::size_t n) { + const auto *bytes = static_cast(p); + _out->insert(_out->end(), bytes, bytes + n); + } + +private: + std::vector *_out; +}; + +class Reader { +public: + Reader(const uint8_t *data, std::size_t size) : _p(data), _end(data + size) {} + + bool u32(uint32_t &v) { return raw(&v, sizeof(v)); } + bool i32(int32_t &v) { return raw(&v, sizeof(v)); } + bool u64(uint64_t &v) { return raw(&v, sizeof(v)); } + bool i64(int64_t &v) { return raw(&v, sizeof(v)); } + bool str(std::string &out) { + uint32_t n = 0; + if (!u32(n)) { + return false; + } + if (static_cast(_end - _p) < n) { + return false; + } + out.assign(reinterpret_cast(_p), n); + _p += n; + return true; + } + bool raw(void *dst, std::size_t n) { + if (static_cast(_end - _p) < n) { + return false; + } + std::memcpy(dst, _p, n); + _p += n; + return true; + } + [[nodiscard]] bool eof() const { return _p == _end; } + +private: + const uint8_t *_p; + const uint8_t *_end; +}; + +} // namespace + +// -- capture ------------------------------------------------------------------ + +Snapshot capture_snapshot(const LiveAllocation &live_alloc, + const SymbolHdr &symbol_hdr, std::size_t max_bytes) { + Snapshot snapshot; + + // Stage 1: dedupe stacks. The PprofStacks for different pids may produce + // identical UnwindOutputPortable rows once converted; we keep them + // separate at this stage and dedupe by raw pointer identity (each + // PprofStacks::value_type address is unique within a pid). + // For cross-pid identity we rely on the AddressMap pointer to map back. + // + // The simplest approach: emit one Portable per (pid_stack value_type + // pointer) and look it up by that pointer when filling pid entries. + struct StackKey { + const LiveAllocation::PprofStacks::value_type *ptr; + }; + std::unordered_map + stack_idx_by_ptr; + + for (unsigned watcher_pos = 0; + watcher_pos < live_alloc._watcher_vector.size(); ++watcher_pos) { + const auto &pid_map = live_alloc._watcher_vector[watcher_pos]; + for (const auto &pid_kv : pid_map) { + pid_t const pid = pid_kv.first; + const auto &pid_stacks = pid_kv.second; + + PidEntry pid_entry; + pid_entry.watcher_pos = static_cast(watcher_pos); + pid_entry.pid = pid; + pid_entry.address_conflict_count = pid_stacks._address_conflict_count; + pid_entry.tracked_address_count = pid_stacks._tracked_address_count; + pid_entry.addresses.reserve(pid_stacks._address_map.size()); + + for (const auto &addr_kv : pid_stacks._address_map) { + const auto *stack_ptr = addr_kv.second._unique_stack; + if (!stack_ptr || stack_ptr->first.locs.empty()) { + continue; + } + uint32_t stack_idx; + auto it = stack_idx_by_ptr.find(stack_ptr); + if (it == stack_idx_by_ptr.end()) { + stack_idx = static_cast(snapshot.stacks.size()); + stack_idx_by_ptr.emplace(stack_ptr, stack_idx); + snapshot.stacks.emplace_back( + uo_to_portable(stack_ptr->first, symbol_hdr)); + } else { + stack_idx = it->second; + } + pid_entry.addresses.push_back( + {addr_kv.first, addr_kv.second._value, stack_idx}); + } + if (!pid_entry.addresses.empty()) { + snapshot.pids.emplace_back(std::move(pid_entry)); + } + } + } + + if (max_bytes == 0) { + return snapshot; + } + + // Stage 2: budget enforcement. + std::size_t projected = estimate_size(snapshot); + if (projected <= max_bytes) { + return snapshot; + } + + // 2a) Rank stacks by aggregate value across all pids (low value goes first). + std::vector stack_total_value(snapshot.stacks.size(), 0); + for (const auto &p : snapshot.pids) { + for (const auto &a : p.addresses) { + stack_total_value[a.stack_idx] += a.value; + } + } + std::vector stack_order(snapshot.stacks.size()); + std::iota(stack_order.begin(), stack_order.end(), 0U); + std::sort(stack_order.begin(), stack_order.end(), + [&](uint32_t a, uint32_t b) { + return stack_total_value[a] < stack_total_value[b]; + }); + + // Cost of the synthetic cleared stack we may need to introduce. It is a + // single FunLocPortable with all strings empty -> small fixed cost. + const UnwindOutputPortable cleared_stack_template{}; // empty + const std::size_t cleared_stack_cost = cost_stack(cleared_stack_template); + + std::vector stack_dropped(snapshot.stacks.size(), false); + bool cleared_stack_needed = false; + for (uint32_t const idx : stack_order) { + if (projected <= max_bytes) { + break; + } + std::size_t const saved = cost_stack(snapshot.stacks[idx]); + projected -= saved; + if (!cleared_stack_needed) { + projected += cleared_stack_cost; + cleared_stack_needed = true; + } + stack_dropped[idx] = true; + } + + // 2b) If still over, drop pids from lowest aggregate value upwards. + if (projected > max_bytes) { + std::vector pid_total_value(snapshot.pids.size(), 0); + for (size_t i = 0; i < snapshot.pids.size(); ++i) { + for (const auto &a : snapshot.pids[i].addresses) { + pid_total_value[i] += a.value; + } + } + std::vector pid_order(snapshot.pids.size()); + std::iota(pid_order.begin(), pid_order.end(), 0U); + std::sort(pid_order.begin(), pid_order.end(), [&](uint32_t a, uint32_t b) { + return pid_total_value[a] < pid_total_value[b]; + }); + std::vector pid_dropped(snapshot.pids.size(), false); + for (uint32_t const pidx : pid_order) { + if (projected <= max_bytes) { + break; + } + projected -= cost_pid_entry(snapshot.pids[pidx]); + pid_dropped[pidx] = true; + ++snapshot.dropped_pids; + } + if (snapshot.dropped_pids) { + std::vector kept; + kept.reserve(snapshot.pids.size() - snapshot.dropped_pids); + for (size_t i = 0; i < snapshot.pids.size(); ++i) { + if (!pid_dropped[i]) { + kept.emplace_back(std::move(snapshot.pids[i])); + } + } + snapshot.pids = std::move(kept); + } + } + + // Apply stack drops: remap their address entries to k_cleared_stack_idx, + // then compact the stacks[] vector and rewrite remaining stack_idx values. + if (cleared_stack_needed) { + for (auto &p : snapshot.pids) { + for (auto &a : p.addresses) { + if (stack_dropped[a.stack_idx]) { + a.stack_idx = k_cleared_stack_idx; + ++snapshot.cleared_addresses; + } + } + } + std::vector remap(snapshot.stacks.size(), k_cleared_stack_idx); + std::vector kept_stacks; + kept_stacks.reserve(snapshot.stacks.size()); + for (size_t i = 0; i < snapshot.stacks.size(); ++i) { + if (!stack_dropped[i]) { + remap[i] = static_cast(kept_stacks.size()); + kept_stacks.emplace_back(std::move(snapshot.stacks[i])); + } + } + snapshot.stacks = std::move(kept_stacks); + for (auto &p : snapshot.pids) { + for (auto &a : p.addresses) { + if (a.stack_idx != k_cleared_stack_idx) { + a.stack_idx = remap[a.stack_idx]; + } + } + } + } + + return snapshot; +} + +// -- serialise / deserialise -------------------------------------------------- + +void serialize(const Snapshot &s, std::vector &out) { + out.clear(); + out.reserve(estimate_size(s)); + Writer w(&out); + w.raw(k_magic.data(), k_magic.size()); + w.u32(k_version); + w.u32(static_cast(s.stacks.size())); + w.u32(static_cast(s.pids.size())); + w.u32(s.cleared_addresses); + w.u32(s.dropped_pids); + + for (const auto &st : s.stacks) { + w.i32(st.pid); + w.i32(st.tid); + w.str(st.container_id); + w.str(st.exe_name); + w.str(st.thread_name); + w.u32(static_cast(st.locs.size())); + for (const auto &fl : st.locs) { + w.u64(fl.ip); + w.u64(fl.elf_addr); + w.u32(fl.lineno); + w.u64(fl.map_low); + w.u64(fl.map_high); + w.u64(fl.map_offset); + w.str(fl.fn_name); + w.str(fl.fn_system_name); + w.str(fl.fn_file); + w.str(fl.map_filename); + w.str(fl.map_build_id); + } + } + + for (const auto &p : s.pids) { + w.i32(p.watcher_pos); + w.i32(p.pid); + w.u32(p.address_conflict_count); + w.u32(p.tracked_address_count); + w.u32(static_cast(p.addresses.size())); + for (const auto &a : p.addresses) { + w.u64(a.addr); + w.i64(a.value); + w.u32(a.stack_idx); + } + } +} + +bool deserialize(const uint8_t *data, std::size_t size, Snapshot &out) { + out = {}; + Reader r(data, size); + std::array magic{}; + if (!r.raw(magic.data(), magic.size()) || magic != k_magic) { + return false; + } + uint32_t version = 0; + if (!r.u32(version) || version != k_version) { + return false; + } + uint32_t n_stacks = 0; + uint32_t n_pids = 0; + if (!r.u32(n_stacks) || !r.u32(n_pids) || !r.u32(out.cleared_addresses) || + !r.u32(out.dropped_pids)) { + return false; + } + out.stacks.resize(n_stacks); + for (auto &st : out.stacks) { + if (!r.i32(st.pid) || !r.i32(st.tid) || !r.str(st.container_id) || + !r.str(st.exe_name) || !r.str(st.thread_name)) { + return false; + } + uint32_t n_locs = 0; + if (!r.u32(n_locs)) { + return false; + } + st.locs.resize(n_locs); + for (auto &fl : st.locs) { + if (!r.u64(fl.ip) || !r.u64(fl.elf_addr) || !r.u32(fl.lineno) || + !r.u64(fl.map_low) || !r.u64(fl.map_high) || !r.u64(fl.map_offset) || + !r.str(fl.fn_name) || !r.str(fl.fn_system_name) || + !r.str(fl.fn_file) || !r.str(fl.map_filename) || + !r.str(fl.map_build_id)) { + return false; + } + } + } + out.pids.resize(n_pids); + for (auto &p : out.pids) { + if (!r.i32(p.watcher_pos) || !r.i32(p.pid) || + !r.u32(p.address_conflict_count) || !r.u32(p.tracked_address_count)) { + return false; + } + uint32_t n_addr = 0; + if (!r.u32(n_addr)) { + return false; + } + p.addresses.resize(n_addr); + for (auto &a : p.addresses) { + if (!r.u64(a.addr) || !r.i64(a.value) || !r.u32(a.stack_idx)) { + return false; + } + } + } + return r.eof(); +} + +bool write_to_fd(int fd, const Snapshot &snapshot) { + if (fd < 0) { + return false; + } + std::vector buf; + serialize(snapshot, buf); + if (ftruncate(fd, 0) != 0) { + return false; + } + if (lseek(fd, 0, SEEK_SET) == static_cast(-1)) { + return false; + } + std::size_t written = 0; + while (written < buf.size()) { + ssize_t const n = write(fd, buf.data() + written, buf.size() - written); + if (n < 0) { + if (errno == EINTR) { + continue; + } + return false; + } + if (n == 0) { + return false; + } + written += static_cast(n); + } + return true; +} + +bool read_from_fd(int fd, Snapshot &out) { + if (fd < 0) { + return false; + } + off_t const sz = lseek(fd, 0, SEEK_END); + if (sz <= 0) { + return false; + } + if (lseek(fd, 0, SEEK_SET) == static_cast(-1)) { + return false; + } + std::vector buf(static_cast(sz)); + std::size_t read_total = 0; + while (read_total < buf.size()) { + ssize_t const n = + read(fd, buf.data() + read_total, buf.size() - read_total); + if (n < 0) { + if (errno == EINTR) { + continue; + } + return false; + } + if (n == 0) { + break; + } + read_total += static_cast(n); + } + bool const ok = deserialize(buf.data(), read_total, out); + // Consume-on-read: clear the memfd so a future worker that does not + // produce its own snapshot does not inadvertently replay stale state. + if (ftruncate(fd, 0) != 0) { + LG_DBG("[live-alloc] ftruncate(snapshot_fd) failed: %d", errno); + } + if (lseek(fd, 0, SEEK_SET) == static_cast(-1)) { + LG_DBG("[live-alloc] lseek(snapshot_fd) failed: %d", errno); + } + return ok; +} + +// -- restore ------------------------------------------------------------------ + +namespace { + +// Build the synthetic [live-alloc cleared] UnwindOutput. Allocates one Symbol +// + one mapping in symbol_hdr. +UnwindOutput build_cleared_stack(SymbolHdr &symbol_hdr) { + const auto *dict = symbol_hdr.profiles_dictionary(); + ddog_prof_FunctionId2 fn = intern_function( + dict, k_common_frame_names[SymbolErrors::live_alloc_cleared], {}); + ddog_prof_MappingId2 mp = intern_mapping(dict, 0, 0, 0, {}, {}); + + symbol_hdr._symbol_table.emplace_back(0U, fn); + auto const symbol_idx = + static_cast(symbol_hdr._symbol_table.size() - 1); + symbol_hdr._mapinfo_table.emplace_back(mp); + auto const map_idx = + static_cast(symbol_hdr._mapinfo_table.size() - 1); + + UnwindOutput uo; + uo.locs.push_back( + {0, 0, /*file_info_id*/ k_file_info_undef, symbol_idx, map_idx}); + // Empty pid/tid/strings: distinct from any natural unwind output. + return uo; +} + +// TODO(r1viollet): unify caching strategies across the profiler. +// +// Today there are four overlapping caches that all describe symbols and +// mappings, and they don't talk to each other on the restore path: +// +// L1 libdatadog ProfilesDictionary : interns (Function2, Mapping2, +// string) by content; returns stable +// opaque handles (FunctionId2*, +// MappingId2*). +// L2 SymbolTable / MapInfoTable : append-only vectors of +// {lineno, FunctionId2*} and +// MappingId2*. FunLoc carries L2 +// indices. +// L3 RuntimeSymbolLookup, : (pid, raw addr) -> L2 index, owned +// DsoSymbolLookup, by SymbolHdr, populated by the +// MapInfoLookup, normal unwind path only. +// BaseFrameSymbolLookup, +// CommonSymbolLookup +// L4 LiveAllocation._restored_strings : backing storage for the three +// string_views in restored +// UnwindOutputs (container_id, +// exe_name, thread_name). +// +// On restore we re-intern through L1 (which correctly dedupes) and append +// fresh L2 rows, but we never touch L3. Consequence: when the same +// allocation site fires again after a restart via a natural sample, L3 +// misses and the normal path appends *another* L2 row pointing at the +// same L1 handle. The resulting FunLoc has a different SymbolIdx_t / +// MapInfoIdx_t than the restored one, so UnwindOutput::operator<=> treats +// the two as distinct keys in LiveAllocation._unique_stacks. The exported +// pprof is unaffected (libdatadog re-collapses via L1), but we carry up +// to one duplicate aggregator entry per restored stack until natural +// deallocations evict it. +// +// The clean fix is to make FunLoc identity content-based, keyed on the L1 +// handle pointers (function_id, mapping_id, ip, elf_addr) instead of L2 +// indices. That removes the duplication, lets restored and natural +// UnwindOutputs collapse together, and would also let us drop the L2 +// indirection entirely for cold paths. It is a cross-cutting change +// (unwind_helper, ddog_profiling_utils, pprof emission) and should be its +// own PR. +UnwindOutput portable_to_uo(const UnwindOutputPortable &p, + SymbolHdr &symbol_hdr, LiveAllocation &live_alloc) { + const auto *dict = symbol_hdr.profiles_dictionary(); + UnwindOutput uo; + uo.pid = p.pid; + uo.tid = p.tid; + uo.container_id = live_alloc.intern_restored_string(p.container_id); + uo.exe_name = live_alloc.intern_restored_string(p.exe_name); + uo.thread_name = live_alloc.intern_restored_string(p.thread_name); + uo.locs.reserve(p.locs.size()); + for (const auto &fl : p.locs) { + ddog_prof_MappingId2 mid = + intern_mapping(dict, fl.map_low, fl.map_high, fl.map_offset, + fl.map_filename, fl.map_build_id); + symbol_hdr._mapinfo_table.emplace_back(mid); + auto const map_idx = + static_cast(symbol_hdr._mapinfo_table.size() - 1); + + ddog_prof_FunctionId2 fn = + intern_function(dict, fl.fn_name, fl.fn_file, fl.fn_system_name); + symbol_hdr._symbol_table.emplace_back(fl.lineno, fn); + auto const sym_idx = + static_cast(symbol_hdr._symbol_table.size() - 1); + + uo.locs.push_back( + {fl.ip, fl.elf_addr, k_file_info_undef, sym_idx, map_idx}); + } + return uo; +} + +} // namespace + +void restore_snapshot(const Snapshot &snapshot, LiveAllocation &live_alloc, + SymbolHdr &symbol_hdr) { + // Pre-build restored UnwindOutputs for every snapshot stack. + std::vector rebuilt; + rebuilt.reserve(snapshot.stacks.size()); + for (const auto &p : snapshot.stacks) { + rebuilt.emplace_back(portable_to_uo(p, symbol_hdr, live_alloc)); + } + + // Build the synthetic cleared stack lazily, only if referenced. + bool need_cleared = false; + for (const auto &p : snapshot.pids) { + for (const auto &a : p.addresses) { + if (a.stack_idx == k_cleared_stack_idx) { + need_cleared = true; + break; + } + } + if (need_cleared) { + break; + } + } + UnwindOutput cleared_uo; + if (need_cleared) { + cleared_uo = build_cleared_stack(symbol_hdr); + } + + for (const auto &p : snapshot.pids) { + for (const auto &a : p.addresses) { + const UnwindOutput &uo = (a.stack_idx == k_cleared_stack_idx) + ? cleared_uo + : rebuilt[a.stack_idx]; + live_alloc.register_allocation(uo, a.addr, a.value, p.watcher_pos, p.pid); + } + // Carry over the library/profiler tracked-address counters so the + // post-restore mismatch warning is anchored at the same baseline. + auto &pid_map = access_resize(live_alloc._watcher_vector, p.watcher_pos); + auto &pid_stacks = pid_map[p.pid]; + pid_stacks._address_conflict_count = p.address_conflict_count; + pid_stacks._tracked_address_count = p.tracked_address_count; + } + + if (snapshot.cleared_addresses || snapshot.dropped_pids) { + LG_NTC("[live-alloc] Snapshot degraded: cleared_addresses=%u " + "dropped_pids=%u", + snapshot.cleared_addresses, snapshot.dropped_pids); + } +} + +} // namespace ddprof::live_alloc_snapshot diff --git a/src/perf_mainloop.cc b/src/perf_mainloop.cc index d0c7548e..f1e281eb 100644 --- a/src/perf_mainloop.cc +++ b/src/perf_mainloop.cc @@ -6,15 +6,18 @@ #include "perf_mainloop.hpp" #include "ddprof_context_lib.hpp" +#include "ddprof_stats.hpp" #include "ddprof_worker.hpp" #include "ddres.hpp" #include "defer.hpp" #include "ipc.hpp" +#include "live_allocation_snapshot.hpp" #include "logger.hpp" #include "perf.hpp" #include "persistent_worker_state.hpp" #include "pevent.hpp" #include "ringbuffer_utils.hpp" +#include "syscalls.hpp" #include "unique_fd.hpp" #include "unwind.h" #include "unwind_state.hpp" @@ -439,6 +442,50 @@ DDRes worker_loop(DDProfContext &ctx, const WorkerAttr *attr, DDRES_CHECK_FWD(ddprof_worker_maybe_export(ctx, now)); if (ctx.worker_ctx.persistent_worker_state->restart_worker) { + // Snapshot the live-allocation aggregator so the next worker can + // resume heap tracking without losing in-flight live addresses. + int const snap_fd = + ctx.worker_ctx.persistent_worker_state->live_alloc_snapshot_fd; + if (snap_fd >= 0 && context_allocation_profiling_watcher_idx(ctx) != -1) { + // Allow tests / debugging to override the snapshot budget without + // touching the CLI. Honored at every capture; capped at the hard + // ceiling defined in the snapshot module. + std::size_t max_bytes = + live_alloc_snapshot::k_default_max_snapshot_bytes; + if (const char *env = std::getenv( + "DD_PROFILING_NATIVE_LIVE_ALLOC_SNAPSHOT_MAX_BYTES")) { + char *end = nullptr; + unsigned long long const v = std::strtoull(env, &end, 10); + if (end != env && v > 0) { + max_bytes = std::min( + static_cast(v), + live_alloc_snapshot::k_hard_max_snapshot_bytes); + } else { + LG_WRN("[live-alloc] Invalid " + "DD_PROFILING_NATIVE_LIVE_ALLOC_SNAPSHOT_MAX_BYTES=%s", + env); + } + } + auto snap = live_alloc_snapshot::capture_snapshot( + ctx.worker_ctx.live_allocation, ctx.worker_ctx.us->symbol_hdr, + max_bytes); + if (!live_alloc_snapshot::write_to_fd(snap_fd, snap)) { + LG_WRN("[live-alloc] Failed to write snapshot before restart"); + } else { + off_t const snap_size = lseek(snap_fd, 0, SEEK_END); + if (snap_size >= 0) { + ddprof_stats_set(STATS_LIVE_ALLOC_SNAPSHOT_BYTES, + static_cast(snap_size)); + } + LG_NTC("[live-alloc] Snapshot written: stacks=%zu pids=%zu " + "cleared=%u dropped_pids=%u", + snap.stacks.size(), snap.pids.size(), snap.cleared_addresses, + snap.dropped_pids); + ddprof_stats_set(STATS_LIVE_ALLOC_CLEARED_STACKS, + snap.cleared_addresses); + ddprof_stats_set(STATS_LIVE_ALLOC_DROPPED_PIDS, snap.dropped_pids); + } + } // return directly no need to do a final export return {}; } @@ -488,6 +535,24 @@ DDRes main_loop(const WorkerAttr *attr, DDProfContext *ctx) { defer { munmap(persistent_worker_state, sizeof(*persistent_worker_state)); }; + // Allocate a memfd that the parent keeps open for the lifetime of the + // run. Each worker child inherits this fd and uses it as a hand-off + // buffer for its LiveAllocation snapshot when a restart is requested. + // Clearing CLOEXEC because we want the fd to survive across the child's + // execve-less life cycle and be reusable by the next forked child. + persistent_worker_state->live_alloc_snapshot_fd = + memfd_create("ddprof_live_alloc", 0U); + if (persistent_worker_state->live_alloc_snapshot_fd < 0) { + LG_WRN("memfd_create for live-alloc snapshot failed (errno=%d); " + "live-allocation state will be lost on worker resets", + errno); + } + defer { + if (persistent_worker_state->live_alloc_snapshot_fd >= 0) { + close(persistent_worker_state->live_alloc_snapshot_fd); + } + }; + // Create worker processes to fulfill poll loop. Only the parent process // can exit with an error code, which signals the termination of profiling. bool is_worker = false; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 328f6694..55dabc78 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -398,6 +398,11 @@ add_unit_test(tracepoint_config-ut tracepoint_config-ut.cc ../src/tracepoint_con add_unit_test(live_allocation-ut live_allocation-ut.cc ../src/live_allocation.cc) +add_unit_test( + live_allocation_snapshot-ut live_allocation_snapshot-ut.cc ../src/live_allocation_snapshot.cc + ../src/live_allocation.cc ../src/ddog_profiling_utils.cc ../src/demangler/demangler.cc + LIBRARIES Datadog::Profiling llvm-demangle) + add_unit_test(ddprof_process-ut ddprof_process-ut.cc ${PROCESS_SRC} LIBRARIES ${ELFUTILS_LIBRARIES}) add_unit_test(glibc_fixes-ut glibc_fixes-ut.cc ../src/lib/glibc_fixes.c LIBRARIES pthread) diff --git a/test/live_allocation_snapshot-ut.cc b/test/live_allocation_snapshot-ut.cc new file mode 100644 index 00000000..fe28faf1 --- /dev/null +++ b/test/live_allocation_snapshot-ut.cc @@ -0,0 +1,140 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. This product includes software +// developed at Datadog (https://www.datadoghq.com/). Copyright 2021-Present +// Datadog, Inc. + +#include "live_allocation_snapshot.hpp" + +#include + +namespace ddprof::live_alloc_snapshot { + +namespace { + +Snapshot make_sample_snapshot() { + Snapshot s; + UnwindOutputPortable uo; + uo.pid = 100; + uo.tid = 101; + uo.container_id = "ctr-abc"; + uo.exe_name = "/usr/bin/myapp"; + uo.thread_name = "worker-0"; + uo.locs.push_back({/*ip*/ 0x1000, /*elf_addr*/ 0x500, /*lineno*/ 42, "malloc", + "__libc_malloc", "malloc.c", + /*map_low*/ 0x10000, /*map_high*/ 0x20000, + /*map_offset*/ 0, "/lib/libc.so", "abcdef0123"}); + uo.locs.push_back({0x2000, 0x600, 0, "main", "", "main.c", 0x30000, 0x40000, + 0, "/usr/bin/myapp", ""}); + s.stacks.push_back(std::move(uo)); + + UnwindOutputPortable uo2; + uo2.pid = 200; + uo2.tid = 200; + uo2.locs.push_back({0x3000, 0x700, 7, "fn", "", "f.c", 0, 0, 0, "", ""}); + s.stacks.push_back(std::move(uo2)); + + PidEntry pe; + pe.watcher_pos = 0; + pe.pid = 100; + pe.address_conflict_count = 3; + pe.tracked_address_count = 17; + pe.addresses.push_back({0xdead0000, 1024, 0}); + pe.addresses.push_back({0xdead1000, 2048, 0}); + pe.addresses.push_back({0xdead2000, 4096, 1}); + // One synthetic-cleared address. + pe.addresses.push_back({0xdead3000, 32, k_cleared_stack_idx}); + s.pids.push_back(std::move(pe)); + + s.cleared_addresses = 1; + s.dropped_pids = 0; + return s; +} + +void expect_eq(const UnwindOutputPortable &a, const UnwindOutputPortable &b) { + EXPECT_EQ(a.pid, b.pid); + EXPECT_EQ(a.tid, b.tid); + EXPECT_EQ(a.container_id, b.container_id); + EXPECT_EQ(a.exe_name, b.exe_name); + EXPECT_EQ(a.thread_name, b.thread_name); + ASSERT_EQ(a.locs.size(), b.locs.size()); + for (size_t i = 0; i < a.locs.size(); ++i) { + const auto &x = a.locs[i]; + const auto &y = b.locs[i]; + EXPECT_EQ(x.ip, y.ip); + EXPECT_EQ(x.elf_addr, y.elf_addr); + EXPECT_EQ(x.lineno, y.lineno); + EXPECT_EQ(x.fn_name, y.fn_name); + EXPECT_EQ(x.fn_system_name, y.fn_system_name); + EXPECT_EQ(x.fn_file, y.fn_file); + EXPECT_EQ(x.map_low, y.map_low); + EXPECT_EQ(x.map_high, y.map_high); + EXPECT_EQ(x.map_offset, y.map_offset); + EXPECT_EQ(x.map_filename, y.map_filename); + EXPECT_EQ(x.map_build_id, y.map_build_id); + } +} + +} // namespace + +TEST(LiveAllocationSnapshotTest, RoundTripBinary) { + Snapshot in = make_sample_snapshot(); + std::vector buf; + serialize(in, buf); + ASSERT_FALSE(buf.empty()); + + Snapshot out; + ASSERT_TRUE(deserialize(buf.data(), buf.size(), out)); + + ASSERT_EQ(in.stacks.size(), out.stacks.size()); + for (size_t i = 0; i < in.stacks.size(); ++i) { + expect_eq(in.stacks[i], out.stacks[i]); + } + ASSERT_EQ(in.pids.size(), out.pids.size()); + for (size_t i = 0; i < in.pids.size(); ++i) { + const auto &a = in.pids[i]; + const auto &b = out.pids[i]; + EXPECT_EQ(a.watcher_pos, b.watcher_pos); + EXPECT_EQ(a.pid, b.pid); + EXPECT_EQ(a.address_conflict_count, b.address_conflict_count); + EXPECT_EQ(a.tracked_address_count, b.tracked_address_count); + ASSERT_EQ(a.addresses.size(), b.addresses.size()); + for (size_t j = 0; j < a.addresses.size(); ++j) { + EXPECT_EQ(a.addresses[j].addr, b.addresses[j].addr); + EXPECT_EQ(a.addresses[j].value, b.addresses[j].value); + EXPECT_EQ(a.addresses[j].stack_idx, b.addresses[j].stack_idx); + } + } + EXPECT_EQ(in.cleared_addresses, out.cleared_addresses); + EXPECT_EQ(in.dropped_pids, out.dropped_pids); +} + +TEST(LiveAllocationSnapshotTest, RejectsBadMagic) { + Snapshot in = make_sample_snapshot(); + std::vector buf; + serialize(in, buf); + buf[0] = 'X'; + Snapshot out; + EXPECT_FALSE(deserialize(buf.data(), buf.size(), out)); +} + +TEST(LiveAllocationSnapshotTest, RejectsTruncated) { + Snapshot in = make_sample_snapshot(); + std::vector buf; + serialize(in, buf); + Snapshot out; + EXPECT_FALSE(deserialize(buf.data(), buf.size() - 4, out)); +} + +TEST(LiveAllocationSnapshotTest, EmptySnapshotRoundTrip) { + Snapshot in; + std::vector buf; + serialize(in, buf); + Snapshot out; + ASSERT_TRUE(deserialize(buf.data(), buf.size(), out)); + EXPECT_TRUE(out.stacks.empty()); + EXPECT_TRUE(out.pids.empty()); + EXPECT_EQ(out.cleared_addresses, 0u); + EXPECT_EQ(out.dropped_pids, 0u); +} + +} // namespace ddprof::live_alloc_snapshot diff --git a/test/simple_malloc-ut.sh b/test/simple_malloc-ut.sh index d0974218..9d8be13e 100755 --- a/test/simple_malloc-ut.sh +++ b/test/simple_malloc-ut.sh @@ -113,6 +113,46 @@ check() { check_logs "$@" } +# Variant of check() that additionally asserts the LiveAllocation snapshot +# path survives at least one worker reset: +# - at least one '[live-alloc] Snapshot restored' line, and +# - zero 'Tracked address count mismatch' warnings between profiler and +# library after restore. +check_live_alloc_persistence() { + cmd="$1" + min_restored="${5:-1}" + if [[ "$use_taskset" -eq 1 ]]; then + cmd="taskset ${test_cpu_mask} ${cmd}" + fi + echo "Running: ${cmd}" + # shellcheck disable=SC2086 + eval ${cmd} || ( echo "Command failed: ${cmd}" && cat "${log_file}" && exit 1 ) + sync "${log_file}" + ddprof_pid=$(grep -m1 -oP -a ' ddprof\[\K[0-9]+(?=\]: Starting profiler)' "${log_file}" || true) + if [ -z "${ddprof_pid}" ]; then + echo "Unable to find profiler pid" + cat "${log_file}" + exit 1 + fi + timeout "$timeout_sec" tail --pid="$ddprof_pid" -f /dev/null + sync "${log_file}" + + restored=$(grep -c '\[live-alloc\] Snapshot restored' "${log_file}" || true) + mismatches=$(grep -c 'Tracked address count mismatch' "${log_file}" || true) + if [[ "${restored}" -lt "${min_restored}" ]]; then + echo "Expected at least ${min_restored} 'Snapshot restored', got ${restored}" + cat "${log_file}" + exit 1 + fi + if [[ "${mismatches}" -ne 0 ]]; then + echo "Unexpected 'Tracked address count mismatch' lines: ${mismatches}" + cat "${log_file}" + exit 1 + fi + echo "Live-alloc snapshot persistence OK: restored=${restored}, mismatches=0" + check_logs "$@" +} + # Test disabled static lib mode check "./test/simple_malloc-static ${opts}" -1 @@ -136,6 +176,16 @@ check "./ddprof --show_config --event \"${event}\" ./test/simple_malloc ${opts} event="sALLOC,period=-524288,mode=sl;sCPU" check "./ddprof --show_config --event \"${event}\" ./test/simple_malloc ${opts} --skip-free 100" 1 1 "inuse-space,alloc-space,cpu-time" +# Test that live-heap tracking survives a worker reset. +# Use a short upload_period and a low worker_period so several resets occur +# within the run, and a longer simple_malloc loop so the target outlives at +# least the first reset. --skip-free 100 keeps ~99% of allocations live. +event="sALLOC,period=-524288,mode=sl;sCPU" +opts_persist="--malloc 4096 --skip-free 100 --loop 80000 --spin 80 --nice 19" +check_live_alloc_persistence \ + "./ddprof --event \"${event}\" --upload-period 2 --worker_period 2 ./test/simple_malloc ${opts_persist}" \ + 1 1 "inuse-space,alloc-space,cpu-time" 1 + # Test wrapper mode with forks + threads opts_more_spin="--loop 1000 --spin 400" check "./ddprof ./test/simple_malloc ${opts_more_spin} --fork 2 --threads 2" 2 4 diff --git a/test/simple_malloc.cc b/test/simple_malloc.cc index 2c55165c..32def429 100644 --- a/test/simple_malloc.cc +++ b/test/simple_malloc.cc @@ -4,6 +4,7 @@ // Datadog, Inc. #include +#include #include #include #include @@ -18,6 +19,7 @@ #include #include #include +#include #include "clocks.hpp" #include "ddprof_base.hpp" @@ -95,9 +97,58 @@ struct Options { bool use_shared_library = false; bool avoid_dlopen_hook = false; bool stop{false}; + uint32_t unique_sites{0}; }; // NOLINTBEGIN(clang-analyzer-unix.Malloc) + +// One templated allocation site per Tag. Each instantiation has a distinct +// mangled name, so the unwinder reports it as a distinct innermost frame — +// useful for stress-testing the LiveAllocation snapshot path with many +// unique stacks without rewriting the loop body. NOINLINE keeps the symbol +// alive and the frame distinguishable. +template +DDPROF_NOINLINE void +alloc_at_site(uint64_t size, std::deque &live_allocations, + uint64_t keep_live, uint32_t skip_free_target, + unsigned &skip_free_counter, uint64_t &nb_alloc, + uint64_t &alloc_bytes) { + void *p = nullptr; + if (size) { + p = malloc(size); + ++nb_alloc; + alloc_bytes += size; + } + DoNotOptimize(p); + if (keep_live > 0) { + if (p != nullptr) { + live_allocations.push_back(p); + while (live_allocations.size() > keep_live) { + free(live_allocations.front()); + live_allocations.pop_front(); + } + } + } else { + if (skip_free_counter++ >= skip_free_target) { + free(p); + skip_free_counter = 0; + } + } +} + +using AllocSiteFn = void (*)(uint64_t, std::deque &, uint64_t, uint32_t, + unsigned &, uint64_t &, uint64_t &); + +template +constexpr auto make_alloc_site_table(std::index_sequence) { + return std::array{ + &alloc_at_site(Is)>...}; +} + +constexpr std::size_t k_max_unique_alloc_sites = 256; +constexpr auto k_alloc_site_table = + make_alloc_site_table(std::make_index_sequence{}); + extern "C" DDPROF_NOINLINE void do_lot_of_allocations(const Options &options, Stats &stats) { uint64_t nb_alloc{0}; @@ -108,38 +159,50 @@ extern "C" DDPROF_NOINLINE void do_lot_of_allocations(const Options &options, auto start_cpu = ThreadCpuClock::now(); unsigned skip_free = 0; std::deque live_allocations; + uint32_t const nb_sites = std::min( + options.unique_sites, static_cast(k_alloc_site_table.size())); for (uint64_t i = 0; i < options.loop_count; ++i) { - void *p = nullptr; - if (options.malloc_size) { - p = malloc(options.malloc_size); - ++nb_alloc; - alloc_bytes += options.malloc_size; - } - // NOLINTNEXTLINE(clang-analyzer-unix.Malloc) - DoNotOptimize(p); - void *p2; - if (options.realloc_size) { - p2 = realloc(p, options.realloc_size); - ++nb_alloc; - alloc_bytes += options.realloc_size; + if (nb_sites > 0) { + // Pick a site deterministically by index; avoids the per-iteration + // cost of an RNG and still produces a uniform distribution. + std::size_t const site = + static_cast((i * 2654435761ULL) % nb_sites); + k_alloc_site_table[site](options.malloc_size, live_allocations, + options.keep_live_allocations, options.skip_free, + skip_free, nb_alloc, alloc_bytes); } else { - p2 = p; - } - // NOLINTNEXTLINE(clang-analyzer-unix.Malloc) - DoNotOptimize(p2); - - if (options.keep_live_allocations > 0) { - if (p2 != nullptr) { - live_allocations.push_back(p2); - while (live_allocations.size() > options.keep_live_allocations) { - free(live_allocations.front()); - live_allocations.pop_front(); - } + void *p = nullptr; + if (options.malloc_size) { + p = malloc(options.malloc_size); + ++nb_alloc; + alloc_bytes += options.malloc_size; } - } else { - if (skip_free++ >= options.skip_free) { - free(p2); - skip_free = 0; + // NOLINTNEXTLINE(clang-analyzer-unix.Malloc) + DoNotOptimize(p); + void *p2; + if (options.realloc_size) { + p2 = realloc(p, options.realloc_size); + ++nb_alloc; + alloc_bytes += options.realloc_size; + } else { + p2 = p; + } + // NOLINTNEXTLINE(clang-analyzer-unix.Malloc) + DoNotOptimize(p2); + + if (options.keep_live_allocations > 0) { + if (p2 != nullptr) { + live_allocations.push_back(p2); + while (live_allocations.size() > options.keep_live_allocations) { + free(live_allocations.front()); + live_allocations.pop_front(); + } + } + } else { + if (skip_free++ >= options.skip_free) { + free(p2); + skip_free = 0; + } } } @@ -311,6 +374,13 @@ int main(int argc, char *argv[]) { "live heap size") ->default_val(0) ->check(CLI::NonNegativeNumber); + app.add_option("--unique-sites", opts.unique_sites, + "Spread allocations across N distinct templated call sites " + "(0 = single site, default). Used to stress-test profilers " + "with many distinct unwind outputs. Capped at the templated " + "table size (256).") + ->default_val(0) + ->check(CLI::NonNegativeNumber); app.add_option( "--timeout", opts.timeout_duration, "Timeout after N milliseconds")