Skip to main content

cu29_runtime/
monitoring.rs

1//! Some basic internal monitoring tooling Copper uses to monitor itself and the components it runs.
2//!
3
4use crate::config::CuConfig;
5use crate::config::{
6    BridgeChannelConfigRepresentation, BridgeConfig, ComponentConfig, CuGraph, Flavor, NodeId,
7};
8use crate::context::CuContext;
9use crate::cutask::CuMsgMetadata;
10use bincode::Encode;
11use bincode::config::standard;
12use bincode::enc::EncoderImpl;
13use bincode::enc::write::SizeWriter;
14use compact_str::CompactString;
15use cu29_clock::CuDuration;
16#[allow(unused_imports)]
17use cu29_log::CuLogLevel;
18#[cfg(all(feature = "std", debug_assertions))]
19use cu29_log_runtime::{
20    format_message_only, register_live_log_listener, unregister_live_log_listener,
21};
22use cu29_traits::{
23    CuError, CuResult, ObservedWriter, abort_observed_encode, begin_observed_encode,
24    finish_observed_encode,
25};
26use portable_atomic::{
27    AtomicBool as PortableAtomicBool, AtomicU64 as PortableAtomicU64, Ordering as PortableOrdering,
28};
29use serde_derive::{Deserialize, Serialize};
30
31#[cfg(not(feature = "std"))]
32extern crate alloc;
33
34#[cfg(feature = "std")]
35use core::cell::Cell;
36#[cfg(feature = "std")]
37use std::backtrace::Backtrace;
38#[cfg(feature = "std")]
39use std::fs::File;
40#[cfg(feature = "std")]
41use std::io::Write;
42#[cfg(feature = "std")]
43use std::panic::PanicHookInfo;
44#[cfg(feature = "std")]
45use std::sync::{Arc, Mutex as StdMutex, OnceLock};
46#[cfg(feature = "std")]
47use std::thread_local;
48#[cfg(feature = "std")]
49use std::time::{SystemTime, UNIX_EPOCH};
50#[cfg(feature = "std")]
51use std::{collections::HashMap as Map, string::String, string::ToString, vec::Vec};
52
53#[cfg(not(feature = "std"))]
54use alloc::{collections::BTreeMap as Map, string::String, string::ToString, vec::Vec};
55#[cfg(not(target_has_atomic = "64"))]
56use spin::Mutex;
57#[cfg(not(feature = "std"))]
58use spin::Mutex as SpinMutex;
59
60#[cfg(not(feature = "std"))]
61mod imp {
62    pub use alloc::alloc::{GlobalAlloc, Layout};
63    #[cfg(target_has_atomic = "64")]
64    pub use core::sync::atomic::AtomicU64;
65    pub use core::sync::atomic::{AtomicUsize, Ordering};
66    pub use libm::sqrt;
67}
68
69#[cfg(feature = "std")]
70mod imp {
71    #[cfg(feature = "memory_monitoring")]
72    use super::CountingAlloc;
73    #[cfg(feature = "memory_monitoring")]
74    pub use std::alloc::System;
75    pub use std::alloc::{GlobalAlloc, Layout};
76    #[cfg(target_has_atomic = "64")]
77    pub use std::sync::atomic::AtomicU64;
78    pub use std::sync::atomic::{AtomicUsize, Ordering};
79    #[cfg(feature = "memory_monitoring")]
80    #[global_allocator]
81    pub static GLOBAL: CountingAlloc<System> = CountingAlloc::new(System);
82}
83
84use imp::*;
85
86#[cfg(all(feature = "std", debug_assertions))]
87fn format_timestamp(time: CuDuration) -> String {
88    // Render CuTime/CuDuration as HH:mm:ss.xxxx (4 fractional digits of a second).
89    let nanos = time.as_nanos();
90    let total_seconds = nanos / 1_000_000_000;
91    let hours = total_seconds / 3600;
92    let minutes = (total_seconds / 60) % 60;
93    let seconds = total_seconds % 60;
94    let fractional_1e4 = (nanos % 1_000_000_000) / 100_000;
95    format!("{hours:02}:{minutes:02}:{seconds:02}.{fractional_1e4:04}")
96}
97
98/// Lifecycle state of a monitored component.
99#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
100pub enum CuComponentState {
101    Start,
102    Preprocess,
103    Process,
104    Postprocess,
105    Stop,
106}
107
108/// Strongly-typed index into [`CuMonitoringMetadata::components`].
109#[repr(transparent)]
110#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
111pub struct ComponentId(usize);
112
113impl ComponentId {
114    pub const INVALID: Self = Self(usize::MAX);
115
116    #[inline]
117    pub const fn new(index: usize) -> Self {
118        Self(index)
119    }
120
121    #[inline]
122    pub const fn index(self) -> usize {
123        self.0
124    }
125}
126
127impl core::fmt::Display for ComponentId {
128    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
129        self.0.fmt(f)
130    }
131}
132
133impl From<ComponentId> for usize {
134    fn from(value: ComponentId) -> Self {
135        value.index()
136    }
137}
138
139/// Strongly-typed CopperList slot index.
140#[repr(transparent)]
141#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
142pub struct CuListSlot(usize);
143
144impl CuListSlot {
145    #[inline]
146    pub const fn new(index: usize) -> Self {
147        Self(index)
148    }
149
150    #[inline]
151    pub const fn index(self) -> usize {
152        self.0
153    }
154}
155
156impl From<CuListSlot> for usize {
157    fn from(value: CuListSlot) -> Self {
158        value.index()
159    }
160}
161
162/// Static monitor-side CopperList indexing layout.
163///
164/// This layout is mission/runtime scoped and remains constant after monitor construction.
165#[derive(Debug, Clone, Copy)]
166pub struct CopperListLayout {
167    components: &'static [MonitorComponentMetadata],
168    slot_to_component: &'static [ComponentId],
169}
170
171impl CopperListLayout {
172    #[inline]
173    pub const fn new(
174        components: &'static [MonitorComponentMetadata],
175        slot_to_component: &'static [ComponentId],
176    ) -> Self {
177        Self {
178            components,
179            slot_to_component,
180        }
181    }
182
183    #[inline]
184    pub const fn components(self) -> &'static [MonitorComponentMetadata] {
185        self.components
186    }
187
188    #[inline]
189    pub const fn component_count(self) -> usize {
190        self.components.len()
191    }
192
193    #[inline]
194    pub const fn culist_slot_count(self) -> usize {
195        self.slot_to_component.len()
196    }
197
198    #[inline]
199    pub fn component(self, id: ComponentId) -> &'static MonitorComponentMetadata {
200        &self.components[id.index()]
201    }
202
203    #[inline]
204    pub fn component_for_slot(self, culist_slot: CuListSlot) -> ComponentId {
205        self.slot_to_component[culist_slot.index()]
206    }
207
208    #[inline]
209    pub const fn slot_to_component(self) -> &'static [ComponentId] {
210        self.slot_to_component
211    }
212
213    #[inline]
214    pub fn view<'a>(self, msgs: &'a [&'a CuMsgMetadata]) -> CopperListView<'a> {
215        CopperListView::new(self, msgs)
216    }
217}
218
219/// Per-loop monitor view over CopperList metadata paired with static component mapping.
220#[derive(Debug, Clone, Copy)]
221pub struct CopperListView<'a> {
222    layout: CopperListLayout,
223    msgs: &'a [&'a CuMsgMetadata],
224}
225
226impl<'a> CopperListView<'a> {
227    #[inline]
228    pub fn new(layout: CopperListLayout, msgs: &'a [&'a CuMsgMetadata]) -> Self {
229        assert_eq!(
230            msgs.len(),
231            layout.culist_slot_count(),
232            "invalid monitor CopperList view: msgs len {} != slot mapping len {}",
233            msgs.len(),
234            layout.culist_slot_count()
235        );
236        Self { layout, msgs }
237    }
238
239    #[inline]
240    pub const fn layout(self) -> CopperListLayout {
241        self.layout
242    }
243
244    #[inline]
245    pub const fn msgs(self) -> &'a [&'a CuMsgMetadata] {
246        self.msgs
247    }
248
249    #[inline]
250    pub const fn len(self) -> usize {
251        self.msgs.len()
252    }
253
254    #[inline]
255    pub const fn is_empty(self) -> bool {
256        self.msgs.is_empty()
257    }
258
259    #[inline]
260    pub fn entry(self, culist_slot: CuListSlot) -> CopperListEntry<'a> {
261        let index = culist_slot.index();
262        CopperListEntry {
263            culist_slot,
264            component_id: self.layout.component_for_slot(culist_slot),
265            msg: self.msgs[index],
266        }
267    }
268
269    pub fn entries(self) -> impl Iterator<Item = CopperListEntry<'a>> + 'a {
270        self.msgs.iter().enumerate().map(move |(idx, msg)| {
271            let culist_slot = CuListSlot::new(idx);
272            CopperListEntry {
273                culist_slot,
274                component_id: self.layout.component_for_slot(culist_slot),
275                msg,
276            }
277        })
278    }
279}
280
281/// One message entry in CopperList slot order with resolved component identity.
282#[derive(Debug, Clone, Copy)]
283pub struct CopperListEntry<'a> {
284    pub culist_slot: CuListSlot,
285    pub component_id: ComponentId,
286    pub msg: &'a CuMsgMetadata,
287}
288
289impl<'a> CopperListEntry<'a> {
290    #[inline]
291    pub fn component(self, layout: CopperListLayout) -> &'static MonitorComponentMetadata {
292        layout.component(self.component_id)
293    }
294
295    #[inline]
296    pub fn component_type(self, layout: CopperListLayout) -> ComponentType {
297        layout.component(self.component_id).kind()
298    }
299}
300
301/// Execution progress marker emitted by the runtime before running a component step.
302#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
303pub struct ExecutionMarker {
304    /// Index into `CuMonitoringMetadata::components()`.
305    pub component_id: ComponentId,
306    /// Lifecycle phase currently entered.
307    pub step: CuComponentState,
308    /// CopperList id when available (runtime loop), None during start/stop.
309    pub culistid: Option<u64>,
310}
311
312/// Lock-free runtime-side progress probe.
313///
314/// The runtime writes execution markers directly into this probe from the hot path
315/// (without calling monitor fan-out callbacks), and monitors can read a coherent
316/// snapshot from watchdog threads when diagnosing stalls.
317#[derive(Debug)]
318pub struct RuntimeExecutionProbe {
319    component_id: AtomicUsize,
320    step: AtomicUsize,
321    #[cfg(target_has_atomic = "64")]
322    culistid: AtomicU64,
323    #[cfg(target_has_atomic = "64")]
324    culistid_present: AtomicUsize,
325    #[cfg(not(target_has_atomic = "64"))]
326    culistid: Mutex<Option<u64>>,
327    sequence: AtomicUsize,
328}
329
330impl Default for RuntimeExecutionProbe {
331    fn default() -> Self {
332        Self {
333            component_id: AtomicUsize::new(ComponentId::INVALID.index()),
334            step: AtomicUsize::new(0),
335            #[cfg(target_has_atomic = "64")]
336            culistid: AtomicU64::new(0),
337            #[cfg(target_has_atomic = "64")]
338            culistid_present: AtomicUsize::new(0),
339            #[cfg(not(target_has_atomic = "64"))]
340            culistid: Mutex::new(None),
341            sequence: AtomicUsize::new(0),
342        }
343    }
344}
345
346impl RuntimeExecutionProbe {
347    #[inline]
348    pub fn record(&self, marker: ExecutionMarker) {
349        self.component_id
350            .store(marker.component_id.index(), Ordering::Relaxed);
351        self.step
352            .store(component_state_to_usize(marker.step), Ordering::Relaxed);
353        #[cfg(target_has_atomic = "64")]
354        match marker.culistid {
355            Some(culistid) => {
356                self.culistid.store(culistid, Ordering::Relaxed);
357                self.culistid_present.store(1, Ordering::Relaxed);
358            }
359            None => {
360                self.culistid_present.store(0, Ordering::Relaxed);
361            }
362        }
363        #[cfg(not(target_has_atomic = "64"))]
364        {
365            *self.culistid.lock() = marker.culistid;
366        }
367        self.sequence.fetch_add(1, Ordering::Release);
368    }
369
370    #[inline]
371    pub fn sequence(&self) -> usize {
372        self.sequence.load(Ordering::Acquire)
373    }
374
375    #[inline]
376    pub fn marker(&self) -> Option<ExecutionMarker> {
377        // Read a coherent snapshot. A concurrent writer may change values between reads;
378        // in that case we retry to keep the marker and sequence aligned.
379        loop {
380            let seq_before = self.sequence.load(Ordering::Acquire);
381            let component_id = self.component_id.load(Ordering::Relaxed);
382            let step = self.step.load(Ordering::Relaxed);
383            #[cfg(target_has_atomic = "64")]
384            let culistid_present = self.culistid_present.load(Ordering::Relaxed);
385            #[cfg(target_has_atomic = "64")]
386            let culistid_value = self.culistid.load(Ordering::Relaxed);
387            #[cfg(not(target_has_atomic = "64"))]
388            let culistid = *self.culistid.lock();
389            let seq_after = self.sequence.load(Ordering::Acquire);
390            if seq_before == seq_after {
391                if component_id == ComponentId::INVALID.index() {
392                    return None;
393                }
394                let step = usize_to_component_state(step);
395                #[cfg(target_has_atomic = "64")]
396                let culistid = if culistid_present == 0 {
397                    None
398                } else {
399                    Some(culistid_value)
400                };
401                return Some(ExecutionMarker {
402                    component_id: ComponentId::new(component_id),
403                    step,
404                    culistid,
405                });
406            }
407        }
408    }
409}
410
411#[inline]
412const fn component_state_to_usize(step: CuComponentState) -> usize {
413    match step {
414        CuComponentState::Start => 0,
415        CuComponentState::Preprocess => 1,
416        CuComponentState::Process => 2,
417        CuComponentState::Postprocess => 3,
418        CuComponentState::Stop => 4,
419    }
420}
421
422#[inline]
423const fn usize_to_component_state(step: usize) -> CuComponentState {
424    match step {
425        0 => CuComponentState::Start,
426        1 => CuComponentState::Preprocess,
427        2 => CuComponentState::Process,
428        3 => CuComponentState::Postprocess,
429        _ => CuComponentState::Stop,
430    }
431}
432
433#[cfg(feature = "std")]
434pub type ExecutionProbeHandle = Arc<RuntimeExecutionProbe>;
435
436/// Platform-neutral monitor view of runtime execution progress.
437///
438/// In `std` builds this can wrap a shared runtime probe. In `no_std` builds it is currently
439/// unavailable and helper methods return `None`/`false`.
440#[derive(Debug, Clone)]
441pub struct MonitorExecutionProbe {
442    #[cfg(feature = "std")]
443    inner: Option<ExecutionProbeHandle>,
444}
445
446impl Default for MonitorExecutionProbe {
447    fn default() -> Self {
448        Self::unavailable()
449    }
450}
451
452impl MonitorExecutionProbe {
453    #[cfg(feature = "std")]
454    pub fn from_shared(handle: ExecutionProbeHandle) -> Self {
455        Self {
456            inner: Some(handle),
457        }
458    }
459
460    pub const fn unavailable() -> Self {
461        Self {
462            #[cfg(feature = "std")]
463            inner: None,
464        }
465    }
466
467    pub fn is_available(&self) -> bool {
468        #[cfg(feature = "std")]
469        {
470            self.inner.is_some()
471        }
472        #[cfg(not(feature = "std"))]
473        {
474            false
475        }
476    }
477
478    pub fn marker(&self) -> Option<ExecutionMarker> {
479        #[cfg(feature = "std")]
480        {
481            self.inner.as_ref().and_then(|probe| probe.marker())
482        }
483        #[cfg(not(feature = "std"))]
484        {
485            None
486        }
487    }
488
489    pub fn sequence(&self) -> Option<usize> {
490        #[cfg(feature = "std")]
491        {
492            self.inner.as_ref().map(|probe| probe.sequence())
493        }
494        #[cfg(not(feature = "std"))]
495        {
496            None
497        }
498    }
499}
500
501/// Runtime component category used by monitoring metadata and topology.
502///
503/// A "task" is a regular Copper task (lifecycle callbacks + payload processing). A "bridge"
504/// is a monitored bridge-side execution component (bridge nodes and channel endpoints).
505#[derive(Debug, Clone, Copy, PartialEq, Eq)]
506#[non_exhaustive]
507pub enum ComponentType {
508    Source,
509    Task,
510    Sink,
511    Bridge,
512}
513
514impl ComponentType {
515    pub const fn is_task(self) -> bool {
516        !matches!(self, Self::Bridge)
517    }
518}
519
520/// Static identity entry for one monitored runtime component.
521#[derive(Debug, Clone, Copy, PartialEq, Eq)]
522pub struct MonitorComponentMetadata {
523    id: &'static str,
524    kind: ComponentType,
525    type_name: Option<&'static str>,
526}
527
528impl MonitorComponentMetadata {
529    pub const fn new(
530        id: &'static str,
531        kind: ComponentType,
532        type_name: Option<&'static str>,
533    ) -> Self {
534        Self {
535            id,
536            kind,
537            type_name,
538        }
539    }
540
541    /// Stable monitor component id (for logs/debug and joins with runtime markers).
542    pub const fn id(&self) -> &'static str {
543        self.id
544    }
545
546    pub const fn kind(&self) -> ComponentType {
547        self.kind
548    }
549
550    /// Rust type label when available (typically tasks); `None` for synthetic bridge entries.
551    pub const fn type_name(&self) -> Option<&'static str> {
552        self.type_name
553    }
554}
555
556/// Immutable runtime-provided metadata passed once to [`CuMonitor::new`].
557///
558/// This bundles identifiers, deterministic component layout, and monitor-specific config so monitor
559/// construction is explicit and does not need ad-hoc late setters.
560#[derive(Debug, Clone)]
561pub struct CuMonitoringMetadata {
562    mission_id: CompactString,
563    subsystem_id: Option<CompactString>,
564    instance_id: u32,
565    layout: CopperListLayout,
566    copperlist_info: CopperListInfo,
567    topology: MonitorTopology,
568    monitor_config: Option<ComponentConfig>,
569}
570
571impl CuMonitoringMetadata {
572    pub fn new(
573        mission_id: CompactString,
574        components: &'static [MonitorComponentMetadata],
575        culist_component_mapping: &'static [ComponentId],
576        copperlist_info: CopperListInfo,
577        topology: MonitorTopology,
578        monitor_config: Option<ComponentConfig>,
579    ) -> CuResult<Self> {
580        Self::validate_components(components)?;
581        Self::validate_culist_mapping(components.len(), culist_component_mapping)?;
582        Ok(Self {
583            mission_id,
584            subsystem_id: None,
585            instance_id: 0,
586            layout: CopperListLayout::new(components, culist_component_mapping),
587            copperlist_info,
588            topology,
589            monitor_config,
590        })
591    }
592
593    fn validate_components(components: &'static [MonitorComponentMetadata]) -> CuResult<()> {
594        let mut seen_bridge = false;
595        for component in components {
596            match component.kind() {
597                component_type if component_type.is_task() && seen_bridge => {
598                    return Err(CuError::from(
599                        "invalid monitor metadata: task-family components must appear before bridges",
600                    ));
601                }
602                ComponentType::Bridge => seen_bridge = true,
603                _ => {}
604            }
605        }
606        Ok(())
607    }
608
609    fn validate_culist_mapping(
610        components_len: usize,
611        culist_component_mapping: &'static [ComponentId],
612    ) -> CuResult<()> {
613        for component_idx in culist_component_mapping {
614            if component_idx.index() >= components_len {
615                return Err(CuError::from(
616                    "invalid monitor metadata: culist mapping points past components table",
617                ));
618            }
619        }
620        Ok(())
621    }
622
623    /// Active mission identifier for this runtime instance.
624    pub fn mission_id(&self) -> &str {
625        self.mission_id.as_str()
626    }
627
628    /// Compile-time subsystem identifier for this runtime instance when running in a
629    /// multi-Copper deployment.
630    pub fn subsystem_id(&self) -> Option<&str> {
631        self.subsystem_id.as_deref()
632    }
633
634    /// Deployment/runtime instance identity for this runtime instance.
635    pub fn instance_id(&self) -> u32 {
636        self.instance_id
637    }
638
639    /// Canonical table of monitored runtime components.
640    ///
641    /// Ordering is deterministic and mission-scoped: tasks first, then bridge-side components.
642    pub fn components(&self) -> &'static [MonitorComponentMetadata] {
643        self.layout.components()
644    }
645
646    /// Total number of monitored components.
647    pub const fn component_count(&self) -> usize {
648        self.layout.component_count()
649    }
650
651    /// Static runtime layout used to map CopperList slots to components.
652    pub const fn layout(&self) -> CopperListLayout {
653        self.layout
654    }
655
656    pub fn component(&self, component_id: ComponentId) -> &'static MonitorComponentMetadata {
657        self.layout.component(component_id)
658    }
659
660    pub fn component_id(&self, component_id: ComponentId) -> &'static str {
661        self.component(component_id).id()
662    }
663
664    pub fn component_kind(&self, component_id: ComponentId) -> ComponentType {
665        self.component(component_id).kind()
666    }
667
668    pub fn component_index_by_id(&self, component_id: &str) -> Option<ComponentId> {
669        self.layout
670            .components()
671            .iter()
672            .position(|component| component.id() == component_id)
673            .map(ComponentId::new)
674    }
675
676    /// CopperList slot -> monitored component index mapping.
677    ///
678    /// This table maps each CopperList slot index to the producing component index.
679    pub fn culist_component_mapping(&self) -> &'static [ComponentId] {
680        self.layout.slot_to_component()
681    }
682
683    pub fn component_for_culist_slot(&self, culist_slot: CuListSlot) -> ComponentId {
684        self.layout.component_for_slot(culist_slot)
685    }
686
687    pub fn copperlist_view<'a>(&self, msgs: &'a [&'a CuMsgMetadata]) -> CopperListView<'a> {
688        self.layout.view(msgs)
689    }
690
691    pub const fn copperlist_info(&self) -> CopperListInfo {
692        self.copperlist_info
693    }
694
695    /// Resolved graph topology for the active mission.
696    ///
697    /// This is always available. Nodes represent config graph nodes, not every synthetic bridge
698    /// channel entry in `components()`.
699    pub fn topology(&self) -> &MonitorTopology {
700        &self.topology
701    }
702
703    pub fn monitor_config(&self) -> Option<&ComponentConfig> {
704        self.monitor_config.as_ref()
705    }
706
707    pub fn with_monitor_config(mut self, monitor_config: Option<ComponentConfig>) -> Self {
708        self.monitor_config = monitor_config;
709        self
710    }
711
712    pub fn with_subsystem_id(mut self, subsystem_id: Option<&str>) -> Self {
713        self.subsystem_id = subsystem_id.map(CompactString::from);
714        self
715    }
716
717    pub fn with_instance_id(mut self, instance_id: u32) -> Self {
718        self.instance_id = instance_id;
719        self
720    }
721}
722
723/// Runtime-provided dynamic monitoring handles passed once to [`CuMonitor::new`].
724///
725/// This context may expose live runtime state (for example execution progress probes).
726#[derive(Debug, Clone, Default)]
727pub struct CuMonitoringRuntime {
728    execution_probe: MonitorExecutionProbe,
729}
730
731impl CuMonitoringRuntime {
732    #[cfg(feature = "std")]
733    pub fn new(execution_probe: MonitorExecutionProbe) -> Self {
734        ensure_runtime_panic_hook_installed();
735        Self { execution_probe }
736    }
737
738    #[cfg(not(feature = "std"))]
739    pub const fn new(execution_probe: MonitorExecutionProbe) -> Self {
740        Self { execution_probe }
741    }
742
743    #[cfg(feature = "std")]
744    pub fn unavailable() -> Self {
745        Self::new(MonitorExecutionProbe::unavailable())
746    }
747
748    #[cfg(not(feature = "std"))]
749    pub const fn unavailable() -> Self {
750        Self::new(MonitorExecutionProbe::unavailable())
751    }
752
753    pub fn execution_probe(&self) -> &MonitorExecutionProbe {
754        &self.execution_probe
755    }
756
757    #[cfg(feature = "std")]
758    pub fn register_panic_cleanup<F>(&self, callback: F) -> PanicHookRegistration
759    where
760        F: Fn(&PanicReport) + Send + Sync + 'static,
761    {
762        ensure_runtime_panic_hook_installed();
763        register_panic_cleanup(callback)
764    }
765
766    #[cfg(feature = "std")]
767    pub fn register_panic_action<F>(&self, callback: F) -> PanicHookRegistration
768    where
769        F: Fn(&PanicReport) -> Option<i32> + Send + Sync + 'static,
770    {
771        ensure_runtime_panic_hook_installed();
772        register_panic_action(callback)
773    }
774}
775
776#[cfg(feature = "std")]
777type PanicCleanupCallback = Arc<dyn Fn(&PanicReport) + Send + Sync + 'static>;
778#[cfg(feature = "std")]
779type PanicActionCallback = Arc<dyn Fn(&PanicReport) -> Option<i32> + Send + Sync + 'static>;
780
781#[cfg(feature = "std")]
782#[derive(Debug, Clone)]
783pub struct PanicReport {
784    message: String,
785    location: Option<String>,
786    thread_name: Option<String>,
787    backtrace: String,
788    timestamp_unix_ms: u128,
789    crash_report_path: Option<String>,
790}
791
792#[cfg(feature = "std")]
793impl PanicReport {
794    fn capture(info: &PanicHookInfo<'_>) -> Self {
795        let location = info
796            .location()
797            .map(|loc| format!("{}:{}:{}", loc.file(), loc.line(), loc.column()));
798        let thread_name = std::thread::current().name().map(|name| name.to_string());
799        let timestamp_unix_ms = SystemTime::now()
800            .duration_since(UNIX_EPOCH)
801            .map(|dur| dur.as_millis())
802            .unwrap_or(0);
803
804        Self {
805            message: panic_hook_payload_to_string(info),
806            location,
807            thread_name,
808            backtrace: Backtrace::force_capture().to_string(),
809            timestamp_unix_ms,
810            crash_report_path: None,
811        }
812    }
813
814    pub fn message(&self) -> &str {
815        &self.message
816    }
817
818    pub fn location(&self) -> Option<&str> {
819        self.location.as_deref()
820    }
821
822    pub fn thread_name(&self) -> Option<&str> {
823        self.thread_name.as_deref()
824    }
825
826    pub fn backtrace(&self) -> &str {
827        &self.backtrace
828    }
829
830    pub fn timestamp_unix_ms(&self) -> u128 {
831        self.timestamp_unix_ms
832    }
833
834    pub fn crash_report_path(&self) -> Option<&str> {
835        self.crash_report_path.as_deref()
836    }
837
838    pub fn summary(&self) -> String {
839        match self.location() {
840            Some(location) => format!("panic at {location}: {}", self.message()),
841            None => format!("panic: {}", self.message()),
842        }
843    }
844}
845
846#[cfg(feature = "std")]
847#[derive(Clone, Copy, Debug, PartialEq, Eq)]
848enum PanicHookRegistrationKind {
849    Cleanup,
850    Action,
851}
852
853#[cfg(feature = "std")]
854#[derive(Clone)]
855struct RegisteredPanicCleanup {
856    id: usize,
857    callback: PanicCleanupCallback,
858}
859
860#[cfg(feature = "std")]
861#[derive(Clone)]
862struct RegisteredPanicAction {
863    id: usize,
864    callback: PanicActionCallback,
865}
866
867#[cfg(feature = "std")]
868#[derive(Default)]
869struct PanicHookRegistry {
870    cleanup_callbacks: StdMutex<Vec<RegisteredPanicCleanup>>,
871    action_callbacks: StdMutex<Vec<RegisteredPanicAction>>,
872}
873
874#[cfg(feature = "std")]
875#[derive(Debug)]
876pub struct PanicHookRegistration {
877    id: usize,
878    kind: PanicHookRegistrationKind,
879}
880
881#[cfg(feature = "std")]
882impl Drop for PanicHookRegistration {
883    fn drop(&mut self) {
884        unregister_panic_hook(self.kind, self.id);
885    }
886}
887
888#[cfg(feature = "std")]
889static PANIC_HOOK_REGISTRY: OnceLock<PanicHookRegistry> = OnceLock::new();
890#[cfg(feature = "std")]
891static PANIC_HOOK_INSTALL_ONCE: OnceLock<()> = OnceLock::new();
892#[cfg(feature = "std")]
893static PANIC_HOOK_REGISTRATION_ID: AtomicUsize = AtomicUsize::new(1);
894#[cfg(feature = "std")]
895static PANIC_HOOK_ACTIVE_COUNT: AtomicUsize = AtomicUsize::new(0);
896
897#[cfg(feature = "std")]
898fn panic_hook_registry() -> &'static PanicHookRegistry {
899    PANIC_HOOK_REGISTRY.get_or_init(PanicHookRegistry::default)
900}
901
902#[cfg(feature = "std")]
903fn ensure_runtime_panic_hook_installed() {
904    let _ = PANIC_HOOK_INSTALL_ONCE.get_or_init(|| {
905        std::panic::set_hook(Box::new(move |info| {
906            let _guard = PanicHookActiveGuard::new();
907            let mut report = PanicReport::capture(info);
908            run_panic_cleanup_callbacks(&report);
909            report.crash_report_path = write_panic_report_to_file(&report);
910            emit_panic_report(&report);
911
912            if let Some(exit_code) = run_panic_action_callbacks(&report) {
913                std::process::exit(exit_code);
914            }
915        }));
916    });
917}
918
919#[cfg(feature = "std")]
920struct PanicHookActiveGuard;
921
922#[cfg(feature = "std")]
923impl PanicHookActiveGuard {
924    fn new() -> Self {
925        PANIC_HOOK_ACTIVE_COUNT.fetch_add(1, Ordering::SeqCst);
926        Self
927    }
928}
929
930#[cfg(feature = "std")]
931impl Drop for PanicHookActiveGuard {
932    fn drop(&mut self) {
933        PANIC_HOOK_ACTIVE_COUNT.fetch_sub(1, Ordering::SeqCst);
934    }
935}
936
937#[cfg(feature = "std")]
938pub fn runtime_panic_hook_active() -> bool {
939    PANIC_HOOK_ACTIVE_COUNT.load(Ordering::SeqCst) > 0
940}
941
942#[cfg(not(feature = "std"))]
943pub const fn runtime_panic_hook_active() -> bool {
944    false
945}
946
947#[cfg(feature = "std")]
948fn register_panic_cleanup<F>(callback: F) -> PanicHookRegistration
949where
950    F: Fn(&PanicReport) + Send + Sync + 'static,
951{
952    let id = PANIC_HOOK_REGISTRATION_ID.fetch_add(1, Ordering::Relaxed);
953    let callback = Arc::new(callback) as PanicCleanupCallback;
954    let mut callbacks = panic_hook_registry()
955        .cleanup_callbacks
956        .lock()
957        .unwrap_or_else(|poison| poison.into_inner());
958    callbacks.push(RegisteredPanicCleanup { id, callback });
959    PanicHookRegistration {
960        id,
961        kind: PanicHookRegistrationKind::Cleanup,
962    }
963}
964
965#[cfg(feature = "std")]
966fn register_panic_action<F>(callback: F) -> PanicHookRegistration
967where
968    F: Fn(&PanicReport) -> Option<i32> + Send + Sync + 'static,
969{
970    let id = PANIC_HOOK_REGISTRATION_ID.fetch_add(1, Ordering::Relaxed);
971    let callback = Arc::new(callback) as PanicActionCallback;
972    let mut callbacks = panic_hook_registry()
973        .action_callbacks
974        .lock()
975        .unwrap_or_else(|poison| poison.into_inner());
976    callbacks.push(RegisteredPanicAction { id, callback });
977    PanicHookRegistration {
978        id,
979        kind: PanicHookRegistrationKind::Action,
980    }
981}
982
983#[cfg(feature = "std")]
984fn unregister_panic_hook(kind: PanicHookRegistrationKind, id: usize) {
985    let registry = panic_hook_registry();
986    match kind {
987        PanicHookRegistrationKind::Cleanup => {
988            let mut callbacks = registry
989                .cleanup_callbacks
990                .lock()
991                .unwrap_or_else(|poison| poison.into_inner());
992            callbacks.retain(|entry| entry.id != id);
993        }
994        PanicHookRegistrationKind::Action => {
995            let mut callbacks = registry
996                .action_callbacks
997                .lock()
998                .unwrap_or_else(|poison| poison.into_inner());
999            callbacks.retain(|entry| entry.id != id);
1000        }
1001    }
1002}
1003
1004#[cfg(feature = "std")]
1005fn run_panic_cleanup_callbacks(report: &PanicReport) {
1006    let callbacks = panic_hook_registry()
1007        .cleanup_callbacks
1008        .lock()
1009        .unwrap_or_else(|poison| poison.into_inner())
1010        .clone();
1011    for entry in callbacks {
1012        (entry.callback)(report);
1013    }
1014}
1015
1016#[cfg(feature = "std")]
1017fn run_panic_action_callbacks(report: &PanicReport) -> Option<i32> {
1018    let callbacks = panic_hook_registry()
1019        .action_callbacks
1020        .lock()
1021        .unwrap_or_else(|poison| poison.into_inner())
1022        .clone();
1023    let mut exit_code = None;
1024    for entry in callbacks {
1025        if exit_code.is_none() {
1026            exit_code = (entry.callback)(report);
1027        } else {
1028            let _ = (entry.callback)(report);
1029        }
1030    }
1031    exit_code
1032}
1033
1034#[cfg(feature = "std")]
1035fn panic_hook_payload_to_string(info: &PanicHookInfo<'_>) -> String {
1036    if let Some(msg) = info.payload().downcast_ref::<&str>() {
1037        (*msg).to_string()
1038    } else if let Some(msg) = info.payload().downcast_ref::<String>() {
1039        msg.clone()
1040    } else {
1041        "panic with non-string payload".to_string()
1042    }
1043}
1044
1045#[cfg(feature = "std")]
1046fn render_panic_report(report: &PanicReport) -> String {
1047    let mut rendered = String::from("Copper panic\n");
1048    rendered.push_str(&format!("time_unix_ms: {}\n", report.timestamp_unix_ms()));
1049    rendered.push_str(&format!(
1050        "thread: {}\n",
1051        report.thread_name().unwrap_or("<unnamed>")
1052    ));
1053    if let Some(location) = report.location() {
1054        rendered.push_str(&format!("location: {location}\n"));
1055    }
1056    rendered.push_str(&format!("message: {}\n", report.message()));
1057    if let Some(path) = report.crash_report_path() {
1058        rendered.push_str(&format!("crash_report: {path}\n"));
1059    }
1060    rendered.push_str("\nBacktrace:\n");
1061    rendered.push_str(report.backtrace());
1062    if !report.backtrace().ends_with('\n') {
1063        rendered.push('\n');
1064    }
1065    rendered
1066}
1067
1068#[cfg(feature = "std")]
1069fn emit_panic_report(report: &PanicReport) {
1070    let mut stderr = std::io::stderr().lock();
1071    let _ = stderr.write_all(render_panic_report(report).as_bytes());
1072    let _ = stderr.flush();
1073}
1074
1075#[cfg(feature = "std")]
1076fn write_panic_report_to_file(report: &PanicReport) -> Option<String> {
1077    let cwd = std::env::current_dir().ok()?;
1078    let file_name = format!(
1079        "copper-crash-{}-{}.txt",
1080        report.timestamp_unix_ms(),
1081        std::process::id()
1082    );
1083    let path = cwd.join(file_name);
1084    let path_string = path.to_string_lossy().to_string();
1085    let mut file = File::create(&path).ok()?;
1086    let mut report_with_path = report.clone();
1087    report_with_path.crash_report_path = Some(path_string.clone());
1088    file.write_all(render_panic_report(&report_with_path).as_bytes())
1089        .ok()?;
1090    file.flush().ok()?;
1091    Some(path_string)
1092}
1093
1094/// Monitor decision to be taken when a component step errored out.
1095#[derive(Debug)]
1096pub enum Decision {
1097    Abort,    // for a step (stop, start) or a copperlist, just stop trying to process it.
1098    Ignore, // Ignore this error and try to continue, ie calling the other component steps, setting a None return value and continue a copperlist.
1099    Shutdown, // This is a fatal error, shutdown the copper as cleanly as possible.
1100}
1101
1102fn merge_decision(lhs: Decision, rhs: Decision) -> Decision {
1103    use Decision::{Abort, Ignore, Shutdown};
1104    // Pick the strictest monitor decision when multiple monitors disagree.
1105    // Shutdown dominates Abort, which dominates Ignore.
1106    match (lhs, rhs) {
1107        (Shutdown, _) | (_, Shutdown) => Shutdown,
1108        (Abort, _) | (_, Abort) => Abort,
1109        _ => Ignore,
1110    }
1111}
1112
1113#[derive(Debug, Clone)]
1114pub struct MonitorNode {
1115    pub id: String,
1116    pub type_name: Option<String>,
1117    pub kind: ComponentType,
1118    /// Ordered list of input port identifiers.
1119    pub inputs: Vec<String>,
1120    /// Ordered list of output port identifiers.
1121    pub outputs: Vec<String>,
1122}
1123
1124#[derive(Debug, Clone)]
1125pub struct MonitorConnection {
1126    pub src: String,
1127    pub src_port: Option<String>,
1128    pub dst: String,
1129    pub dst_port: Option<String>,
1130    pub msg: String,
1131}
1132
1133#[derive(Debug, Clone, Default)]
1134pub struct MonitorTopology {
1135    pub nodes: Vec<MonitorNode>,
1136    pub connections: Vec<MonitorConnection>,
1137}
1138
1139#[derive(Debug, Clone, Copy, Default)]
1140pub struct CopperListInfo {
1141    pub size_bytes: usize,
1142    pub count: usize,
1143}
1144
1145impl CopperListInfo {
1146    pub const fn new(size_bytes: usize, count: usize) -> Self {
1147        Self { size_bytes, count }
1148    }
1149}
1150
1151/// Reported data about CopperList IO for a single iteration.
1152#[derive(Debug, Clone, Copy, Default)]
1153pub struct CopperListIoStats {
1154    /// CopperList bytes resident in RAM for this iteration.
1155    ///
1156    /// This includes the fixed CopperList struct size plus any pooled or
1157    /// handle-backed payload bytes observed on the real encode path.
1158    pub raw_culist_bytes: u64,
1159    /// Bytes attributed to handle-backed storage while measuring payload IO.
1160    ///
1161    /// This is surfaced separately so monitors can show how much of the runtime
1162    /// footprint lives in pooled payload buffers rather than inside the fixed
1163    /// CopperList struct.
1164    pub handle_bytes: u64,
1165    /// Bytes produced by bincode serialization of the CopperList
1166    pub encoded_culist_bytes: u64,
1167    /// Bytes produced by bincode serialization of the KeyFrame (0 if none)
1168    pub keyframe_bytes: u64,
1169    /// Cumulative bytes written to the structured log stream so far
1170    pub structured_log_bytes_total: u64,
1171    /// CopperList identifier for reference in monitors
1172    pub culistid: u64,
1173}
1174
1175#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
1176pub struct PayloadIoStats {
1177    pub resident_bytes: usize,
1178    pub encoded_bytes: usize,
1179    pub handle_bytes: usize,
1180}
1181
1182#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
1183pub struct CuMsgIoStats {
1184    pub present: bool,
1185    pub resident_bytes: u64,
1186    pub encoded_bytes: u64,
1187    pub handle_bytes: u64,
1188}
1189
1190struct CuMsgIoEntry {
1191    present: PortableAtomicBool,
1192    resident_bytes: PortableAtomicU64,
1193    encoded_bytes: PortableAtomicU64,
1194    handle_bytes: PortableAtomicU64,
1195}
1196
1197impl CuMsgIoEntry {
1198    fn clear(&self) {
1199        self.present.store(false, PortableOrdering::Release);
1200        self.resident_bytes.store(0, PortableOrdering::Relaxed);
1201        self.encoded_bytes.store(0, PortableOrdering::Relaxed);
1202        self.handle_bytes.store(0, PortableOrdering::Relaxed);
1203    }
1204
1205    fn get(&self) -> CuMsgIoStats {
1206        if !self.present.load(PortableOrdering::Acquire) {
1207            return CuMsgIoStats::default();
1208        }
1209
1210        CuMsgIoStats {
1211            present: true,
1212            resident_bytes: self.resident_bytes.load(PortableOrdering::Relaxed),
1213            encoded_bytes: self.encoded_bytes.load(PortableOrdering::Relaxed),
1214            handle_bytes: self.handle_bytes.load(PortableOrdering::Relaxed),
1215        }
1216    }
1217
1218    fn set(&self, stats: CuMsgIoStats) {
1219        self.resident_bytes
1220            .store(stats.resident_bytes, PortableOrdering::Relaxed);
1221        self.encoded_bytes
1222            .store(stats.encoded_bytes, PortableOrdering::Relaxed);
1223        self.handle_bytes
1224            .store(stats.handle_bytes, PortableOrdering::Relaxed);
1225        self.present.store(stats.present, PortableOrdering::Release);
1226    }
1227}
1228
1229impl Default for CuMsgIoEntry {
1230    fn default() -> Self {
1231        Self {
1232            present: PortableAtomicBool::new(false),
1233            resident_bytes: PortableAtomicU64::new(0),
1234            encoded_bytes: PortableAtomicU64::new(0),
1235            handle_bytes: PortableAtomicU64::new(0),
1236        }
1237    }
1238}
1239
1240pub struct CuMsgIoCache<const N: usize> {
1241    entries: [CuMsgIoEntry; N],
1242}
1243
1244impl<const N: usize> CuMsgIoCache<N> {
1245    pub fn clear(&self) {
1246        for entry in &self.entries {
1247            entry.clear();
1248        }
1249    }
1250
1251    pub fn get(&self, idx: usize) -> CuMsgIoStats {
1252        self.entries[idx].get()
1253    }
1254
1255    fn raw_parts(&self) -> (usize, usize) {
1256        (self.entries.as_ptr() as usize, N)
1257    }
1258}
1259
1260impl<const N: usize> Default for CuMsgIoCache<N> {
1261    fn default() -> Self {
1262        Self {
1263            entries: core::array::from_fn(|_| CuMsgIoEntry::default()),
1264        }
1265    }
1266}
1267
1268#[derive(Clone, Copy)]
1269struct ActiveCuMsgIoCapture {
1270    cache_addr: usize,
1271    cache_len: usize,
1272    current_slot: Option<usize>,
1273}
1274
1275#[cfg(feature = "std")]
1276thread_local! {
1277    static PAYLOAD_HANDLE_BYTES: Cell<Option<usize>> = const { Cell::new(None) };
1278    static ACTIVE_COPPERLIST_CAPTURE: Cell<Option<ActiveCuMsgIoCapture>> = const { Cell::new(None) };
1279    static LAST_COMPLETED_HANDLE_BYTES: Cell<u64> = const { Cell::new(0) };
1280}
1281
1282#[cfg(not(feature = "std"))]
1283static PAYLOAD_HANDLE_BYTES: SpinMutex<Option<usize>> = SpinMutex::new(None);
1284#[cfg(not(feature = "std"))]
1285static ACTIVE_COPPERLIST_CAPTURE: SpinMutex<Option<ActiveCuMsgIoCapture>> = SpinMutex::new(None);
1286#[cfg(not(feature = "std"))]
1287static LAST_COMPLETED_HANDLE_BYTES: SpinMutex<u64> = SpinMutex::new(0);
1288
1289fn begin_payload_io_measurement() {
1290    #[cfg(feature = "std")]
1291    PAYLOAD_HANDLE_BYTES.with(|bytes| {
1292        debug_assert!(
1293            bytes.get().is_none(),
1294            "payload IO byte measurement must not be nested"
1295        );
1296        bytes.set(Some(0));
1297    });
1298
1299    #[cfg(not(feature = "std"))]
1300    {
1301        let mut bytes = PAYLOAD_HANDLE_BYTES.lock();
1302        debug_assert!(
1303            bytes.is_none(),
1304            "payload IO byte measurement must not be nested"
1305        );
1306        *bytes = Some(0);
1307    }
1308}
1309
1310fn finish_payload_io_measurement() -> usize {
1311    #[cfg(feature = "std")]
1312    {
1313        PAYLOAD_HANDLE_BYTES.with(|bytes| bytes.replace(None).unwrap_or(0))
1314    }
1315
1316    #[cfg(not(feature = "std"))]
1317    {
1318        PAYLOAD_HANDLE_BYTES.lock().take().unwrap_or(0)
1319    }
1320}
1321
1322fn abort_payload_io_measurement() {
1323    #[cfg(feature = "std")]
1324    PAYLOAD_HANDLE_BYTES.with(|bytes| bytes.set(None));
1325
1326    #[cfg(not(feature = "std"))]
1327    {
1328        *PAYLOAD_HANDLE_BYTES.lock() = None;
1329    }
1330}
1331
1332fn current_payload_io_measurement() -> usize {
1333    #[cfg(feature = "std")]
1334    {
1335        PAYLOAD_HANDLE_BYTES.with(|bytes| bytes.get().unwrap_or(0))
1336    }
1337
1338    #[cfg(not(feature = "std"))]
1339    {
1340        PAYLOAD_HANDLE_BYTES.lock().as_ref().copied().unwrap_or(0)
1341    }
1342}
1343
1344#[cfg(feature = "std")]
1345pub(crate) fn record_payload_handle_bytes(bytes: usize) {
1346    #[cfg(feature = "std")]
1347    PAYLOAD_HANDLE_BYTES.with(|total| {
1348        if let Some(current) = total.get() {
1349            total.set(Some(current.saturating_add(bytes)));
1350        }
1351    });
1352
1353    #[cfg(not(feature = "std"))]
1354    {
1355        let mut total = PAYLOAD_HANDLE_BYTES.lock();
1356        if let Some(current) = *total {
1357            *total = Some(current.saturating_add(bytes));
1358        }
1359    }
1360}
1361
1362fn set_last_completed_handle_bytes(bytes: u64) {
1363    #[cfg(feature = "std")]
1364    LAST_COMPLETED_HANDLE_BYTES.with(|total| total.set(bytes));
1365
1366    #[cfg(not(feature = "std"))]
1367    {
1368        *LAST_COMPLETED_HANDLE_BYTES.lock() = bytes;
1369    }
1370}
1371
1372pub fn take_last_completed_handle_bytes() -> u64 {
1373    #[cfg(feature = "std")]
1374    {
1375        LAST_COMPLETED_HANDLE_BYTES.with(|total| total.replace(0))
1376    }
1377
1378    #[cfg(not(feature = "std"))]
1379    {
1380        let mut total = LAST_COMPLETED_HANDLE_BYTES.lock();
1381        let value = *total;
1382        *total = 0;
1383        value
1384    }
1385}
1386
1387fn with_active_capture_mut<R>(f: impl FnOnce(&mut ActiveCuMsgIoCapture) -> R) -> Option<R> {
1388    #[cfg(feature = "std")]
1389    {
1390        ACTIVE_COPPERLIST_CAPTURE.with(|capture| {
1391            let mut state = capture.get()?;
1392            let result = f(&mut state);
1393            capture.set(Some(state));
1394            Some(result)
1395        })
1396    }
1397
1398    #[cfg(not(feature = "std"))]
1399    {
1400        let mut capture = ACTIVE_COPPERLIST_CAPTURE.lock();
1401        let state = capture.as_mut()?;
1402        Some(f(state))
1403    }
1404}
1405
1406pub struct CuMsgIoCaptureGuard;
1407
1408impl CuMsgIoCaptureGuard {
1409    pub fn select_slot(&self, slot: usize) {
1410        let _ = with_active_capture_mut(|capture| {
1411            debug_assert!(slot < capture.cache_len, "payload IO slot out of range");
1412            capture.current_slot = Some(slot);
1413        });
1414    }
1415}
1416
1417impl Drop for CuMsgIoCaptureGuard {
1418    fn drop(&mut self) {
1419        set_last_completed_handle_bytes(finish_payload_io_measurement() as u64);
1420
1421        #[cfg(feature = "std")]
1422        ACTIVE_COPPERLIST_CAPTURE.with(|capture| capture.set(None));
1423
1424        #[cfg(not(feature = "std"))]
1425        {
1426            *ACTIVE_COPPERLIST_CAPTURE.lock() = None;
1427        }
1428    }
1429}
1430
1431pub fn start_copperlist_io_capture<const N: usize>(cache: &CuMsgIoCache<N>) -> CuMsgIoCaptureGuard {
1432    cache.clear();
1433    set_last_completed_handle_bytes(0);
1434    begin_payload_io_measurement();
1435    let (cache_addr, cache_len) = cache.raw_parts();
1436    let capture = ActiveCuMsgIoCapture {
1437        cache_addr,
1438        cache_len,
1439        current_slot: None,
1440    };
1441
1442    #[cfg(feature = "std")]
1443    ACTIVE_COPPERLIST_CAPTURE.with(|state| {
1444        debug_assert!(
1445            state.get().is_none(),
1446            "CopperList payload IO capture must not be nested"
1447        );
1448        state.set(Some(capture));
1449    });
1450
1451    #[cfg(not(feature = "std"))]
1452    {
1453        let mut state = ACTIVE_COPPERLIST_CAPTURE.lock();
1454        debug_assert!(
1455            state.is_none(),
1456            "CopperList payload IO capture must not be nested"
1457        );
1458        *state = Some(capture);
1459    }
1460
1461    CuMsgIoCaptureGuard
1462}
1463
1464pub(crate) fn current_payload_handle_bytes() -> usize {
1465    current_payload_io_measurement()
1466}
1467
1468pub(crate) fn record_current_slot_payload_io_stats(
1469    fixed_bytes: usize,
1470    encoded_bytes: usize,
1471    handle_bytes: usize,
1472) {
1473    let _ = with_active_capture_mut(|capture| {
1474        let Some(slot) = capture.current_slot else {
1475            return;
1476        };
1477        if slot >= capture.cache_len {
1478            return;
1479        }
1480        // SAFETY: the capture guard holds the cache alive for the duration of the encode pass.
1481        let cache_ptr = capture.cache_addr as *const CuMsgIoEntry;
1482        let entry = unsafe { &*cache_ptr.add(slot) };
1483        entry.set(CuMsgIoStats {
1484            present: true,
1485            resident_bytes: (fixed_bytes.saturating_add(handle_bytes)) as u64,
1486            encoded_bytes: encoded_bytes as u64,
1487            handle_bytes: handle_bytes as u64,
1488        });
1489    });
1490}
1491
1492/// Measures payload bytes using the same encode path Copper uses for
1493/// logging/export.
1494///
1495/// `resident_bytes` is the payload's in-memory fixed footprint plus any
1496/// handle-backed dynamic storage reported during encoding. `encoded_bytes` is
1497/// the exact bincode payload size.
1498pub fn payload_io_stats<T>(payload: &T) -> CuResult<PayloadIoStats>
1499where
1500    T: Encode,
1501{
1502    begin_payload_io_measurement();
1503    begin_observed_encode();
1504
1505    let result = (|| {
1506        let mut encoder =
1507            EncoderImpl::<_, _>::new(ObservedWriter::new(SizeWriter::default()), standard());
1508        payload.encode(&mut encoder).map_err(|e| {
1509            CuError::from("Failed to measure payload IO bytes").add_cause(&e.to_string())
1510        })?;
1511        let encoded_bytes = encoder.into_writer().into_inner().bytes_written;
1512        debug_assert_eq!(encoded_bytes, finish_observed_encode());
1513        let handle_bytes = finish_payload_io_measurement();
1514        Ok(PayloadIoStats {
1515            resident_bytes: core::mem::size_of::<T>().saturating_add(handle_bytes),
1516            encoded_bytes,
1517            handle_bytes,
1518        })
1519    })();
1520
1521    if result.is_err() {
1522        abort_payload_io_measurement();
1523        abort_observed_encode();
1524    }
1525
1526    result
1527}
1528
1529#[derive(Default, Debug, Clone, Copy)]
1530struct NodeIoUsage {
1531    has_incoming: bool,
1532    has_outgoing: bool,
1533}
1534
1535fn collect_output_ports(graph: &CuGraph, node_id: NodeId) -> Vec<(String, String)> {
1536    let mut edge_ids = graph.get_src_edges(node_id).unwrap_or_default();
1537    edge_ids.sort();
1538
1539    let mut outputs = Vec::new();
1540    let mut seen = Vec::new();
1541    let mut port_idx = 0usize;
1542    for edge_id in edge_ids {
1543        let Some(edge) = graph.edge(edge_id) else {
1544            continue;
1545        };
1546        if seen.iter().any(|msg| msg == &edge.msg) {
1547            continue;
1548        }
1549        seen.push(edge.msg.clone());
1550        let mut port_label = String::from("out");
1551        port_label.push_str(&port_idx.to_string());
1552        port_label.push_str(": ");
1553        port_label.push_str(edge.msg.as_str());
1554        outputs.push((edge.msg.clone(), port_label));
1555        port_idx += 1;
1556    }
1557    outputs
1558}
1559
1560/// Derive a monitor-friendly topology from the runtime configuration.
1561pub fn build_monitor_topology(config: &CuConfig, mission: &str) -> CuResult<MonitorTopology> {
1562    let graph = config.get_graph(Some(mission))?;
1563    let mut nodes: Map<String, MonitorNode> = Map::new();
1564    let mut io_usage: Map<String, NodeIoUsage> = Map::new();
1565    let mut output_port_lookup: Map<String, Map<String, String>> = Map::new();
1566
1567    let mut bridge_lookup: Map<&str, &BridgeConfig> = Map::new();
1568    for bridge in &config.bridges {
1569        bridge_lookup.insert(bridge.id.as_str(), bridge);
1570    }
1571
1572    for cnx in graph.edges() {
1573        io_usage.entry(cnx.src.clone()).or_default().has_outgoing = true;
1574        io_usage.entry(cnx.dst.clone()).or_default().has_incoming = true;
1575    }
1576
1577    for (_, node) in graph.get_all_nodes() {
1578        let node_id = node.get_id();
1579        let usage = io_usage.get(node_id.as_str()).cloned().unwrap_or_default();
1580        let kind = match node.get_flavor() {
1581            Flavor::Bridge => ComponentType::Bridge,
1582            _ if !usage.has_incoming && usage.has_outgoing => ComponentType::Source,
1583            _ if usage.has_incoming && !usage.has_outgoing => ComponentType::Sink,
1584            _ => ComponentType::Task,
1585        };
1586
1587        let mut inputs = Vec::new();
1588        let mut outputs = Vec::new();
1589        if kind == ComponentType::Bridge {
1590            if let Some(bridge) = bridge_lookup.get(node_id.as_str()) {
1591                for ch in &bridge.channels {
1592                    match ch {
1593                        BridgeChannelConfigRepresentation::Rx { id, .. } => {
1594                            outputs.push(id.clone())
1595                        }
1596                        BridgeChannelConfigRepresentation::Tx { id, .. } => inputs.push(id.clone()),
1597                    }
1598                }
1599            }
1600        } else {
1601            if usage.has_incoming || !usage.has_outgoing {
1602                inputs.push("in".to_string());
1603            }
1604            if usage.has_outgoing {
1605                if let Some(node_idx) = graph.get_node_id_by_name(node_id.as_str()) {
1606                    let ports = collect_output_ports(graph, node_idx);
1607                    let mut port_map: Map<String, String> = Map::new();
1608                    for (msg_type, label) in ports {
1609                        port_map.insert(msg_type, label.clone());
1610                        outputs.push(label);
1611                    }
1612                    output_port_lookup.insert(node_id.clone(), port_map);
1613                }
1614            } else if !usage.has_incoming {
1615                outputs.push("out".to_string());
1616            }
1617        }
1618
1619        nodes.insert(
1620            node_id.clone(),
1621            MonitorNode {
1622                id: node_id,
1623                type_name: Some(node.get_type().to_string()),
1624                kind,
1625                inputs,
1626                outputs,
1627            },
1628        );
1629    }
1630
1631    let mut connections = Vec::new();
1632    for cnx in graph.edges() {
1633        let src = cnx.src.clone();
1634        let dst = cnx.dst.clone();
1635
1636        let src_port = cnx.src_channel.clone().or_else(|| {
1637            output_port_lookup
1638                .get(&src)
1639                .and_then(|ports| ports.get(&cnx.msg).cloned())
1640                .or_else(|| {
1641                    nodes
1642                        .get(&src)
1643                        .and_then(|node| node.outputs.first().cloned())
1644                })
1645        });
1646        let dst_port = cnx.dst_channel.clone().or_else(|| {
1647            nodes
1648                .get(&dst)
1649                .and_then(|node| node.inputs.first().cloned())
1650        });
1651
1652        connections.push(MonitorConnection {
1653            src,
1654            src_port,
1655            dst,
1656            dst_port,
1657            msg: cnx.msg.clone(),
1658        });
1659    }
1660
1661    Ok(MonitorTopology {
1662        nodes: nodes.into_values().collect(),
1663        connections,
1664    })
1665}
1666
1667/// Runtime monitoring contract implemented by monitor components.
1668///
1669/// Lifecycle:
1670/// 1. [`CuMonitor::new`] is called once at runtime construction time.
1671/// 2. [`CuMonitor::start`] is called once before the first runtime iteration.
1672/// 3. For each iteration, [`CuMonitor::process_copperlist`] is called after component execution,
1673///    then [`CuMonitor::observe_copperlist_io`] after serialization accounting.
1674/// 4. [`CuMonitor::process_error`] is called synchronously when a monitored component step fails.
1675/// 5. [`CuMonitor::process_panic`] is called when the runtime catches a panic (`std` builds).
1676/// 6. [`CuMonitor::stop`] is called once during runtime shutdown.
1677///
1678/// Indexing model:
1679/// - `process_error(component_id, ..)` uses component indices into `metadata.components()`.
1680/// - `process_copperlist(..., view)` iterates CopperList slots with resolved component identity.
1681///
1682/// Error policy:
1683/// - [`Decision::Ignore`] continues execution.
1684/// - [`Decision::Abort`] aborts the current operation (step/copperlist scope).
1685/// - [`Decision::Shutdown`] triggers runtime shutdown.
1686pub trait CuMonitor: Sized {
1687    /// Construct the monitor once, before component execution starts.
1688    ///
1689    /// `metadata` contains mission/config/topology/static mapping information.
1690    /// `runtime` exposes dynamic runtime handles (for example execution probes).
1691    /// Use `metadata.monitor_config()` to decode monitor-specific parameters.
1692    fn new(metadata: CuMonitoringMetadata, runtime: CuMonitoringRuntime) -> CuResult<Self>
1693    where
1694        Self: Sized;
1695
1696    /// Called once before processing the first CopperList.
1697    fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
1698        Ok(())
1699    }
1700
1701    /// Called once per processed CopperList after component execution.
1702    fn process_copperlist(&self, _ctx: &CuContext, view: CopperListView<'_>) -> CuResult<()>;
1703
1704    /// Called when runtime finishes CopperList serialization/IO accounting.
1705    fn observe_copperlist_io(&self, _stats: CopperListIoStats) {}
1706
1707    /// Called when a monitored component step fails; must return an immediate runtime decision.
1708    ///
1709    /// `component_id` is an index into [`CuMonitoringMetadata::components`].
1710    fn process_error(
1711        &self,
1712        component_id: ComponentId,
1713        step: CuComponentState,
1714        error: &CuError,
1715    ) -> Decision;
1716
1717    /// Called when the runtime catches a panic (`std` builds).
1718    fn process_panic(&self, _panic_message: &str) {}
1719
1720    /// Called once during runtime shutdown.
1721    fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
1722        Ok(())
1723    }
1724}
1725
1726/// A do nothing monitor if no monitor is provided.
1727/// This is basically defining the default behavior of Copper in case of error.
1728pub struct NoMonitor {}
1729impl CuMonitor for NoMonitor {
1730    fn new(_metadata: CuMonitoringMetadata, _runtime: CuMonitoringRuntime) -> CuResult<Self> {
1731        Ok(NoMonitor {})
1732    }
1733
1734    fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
1735        #[cfg(all(feature = "std", debug_assertions))]
1736        register_live_log_listener(|entry, format_str, param_names| {
1737            let params: Vec<String> = entry.params.iter().map(|v| v.to_string()).collect();
1738            let named: Map<String, String> = param_names
1739                .iter()
1740                .zip(params.iter())
1741                .map(|(k, v)| (k.to_string(), v.clone()))
1742                .collect();
1743
1744            if let Ok(msg) = format_message_only(format_str, params.as_slice(), &named) {
1745                let ts = format_timestamp(entry.time);
1746                println!("{} [{:?}] {}", ts, entry.level, msg);
1747            }
1748        });
1749        Ok(())
1750    }
1751
1752    fn process_copperlist(&self, _ctx: &CuContext, _view: CopperListView<'_>) -> CuResult<()> {
1753        // By default, do nothing.
1754        Ok(())
1755    }
1756
1757    fn process_error(
1758        &self,
1759        _component_id: ComponentId,
1760        _step: CuComponentState,
1761        _error: &CuError,
1762    ) -> Decision {
1763        // By default, just try to continue.
1764        Decision::Ignore
1765    }
1766
1767    fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
1768        #[cfg(all(feature = "std", debug_assertions))]
1769        unregister_live_log_listener();
1770        Ok(())
1771    }
1772}
1773
1774macro_rules! impl_monitor_tuple {
1775    ($($idx:tt => $name:ident),+) => {
1776        impl<$($name: CuMonitor),+> CuMonitor for ($($name,)+) {
1777            fn new(metadata: CuMonitoringMetadata, runtime: CuMonitoringRuntime) -> CuResult<Self>
1778            where
1779                Self: Sized,
1780            {
1781                Ok(($($name::new(metadata.clone(), runtime.clone())?,)+))
1782            }
1783
1784            fn start(&mut self, ctx: &CuContext) -> CuResult<()> {
1785                $(self.$idx.start(ctx)?;)+
1786                Ok(())
1787            }
1788
1789            fn process_copperlist(&self, ctx: &CuContext, view: CopperListView<'_>) -> CuResult<()> {
1790                $(self.$idx.process_copperlist(ctx, view)?;)+
1791                Ok(())
1792            }
1793
1794            fn observe_copperlist_io(&self, stats: CopperListIoStats) {
1795                $(self.$idx.observe_copperlist_io(stats);)+
1796            }
1797
1798            fn process_error(
1799                &self,
1800                component_id: ComponentId,
1801                step: CuComponentState,
1802                error: &CuError,
1803            ) -> Decision {
1804                let mut decision = Decision::Ignore;
1805                $(decision = merge_decision(decision, self.$idx.process_error(component_id, step, error));)+
1806                decision
1807            }
1808
1809            fn process_panic(&self, panic_message: &str) {
1810                $(self.$idx.process_panic(panic_message);)+
1811            }
1812
1813            fn stop(&mut self, ctx: &CuContext) -> CuResult<()> {
1814                $(self.$idx.stop(ctx)?;)+
1815                Ok(())
1816            }
1817        }
1818    };
1819}
1820
1821impl_monitor_tuple!(0 => M0, 1 => M1);
1822impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2);
1823impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2, 3 => M3);
1824impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2, 3 => M3, 4 => M4);
1825impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2, 3 => M3, 4 => M4, 5 => M5);
1826
1827#[cfg(feature = "std")]
1828pub fn panic_payload_to_string(payload: &(dyn core::any::Any + Send)) -> String {
1829    if let Some(msg) = payload.downcast_ref::<&str>() {
1830        (*msg).to_string()
1831    } else if let Some(msg) = payload.downcast_ref::<String>() {
1832        msg.clone()
1833    } else {
1834        "panic with non-string payload".to_string()
1835    }
1836}
1837
1838/// A simple allocator that counts the number of bytes allocated and deallocated.
1839pub struct CountingAlloc<A: GlobalAlloc> {
1840    inner: A,
1841    allocated: AtomicUsize,
1842    deallocated: AtomicUsize,
1843}
1844
1845impl<A: GlobalAlloc> CountingAlloc<A> {
1846    pub const fn new(inner: A) -> Self {
1847        CountingAlloc {
1848            inner,
1849            allocated: AtomicUsize::new(0),
1850            deallocated: AtomicUsize::new(0),
1851        }
1852    }
1853
1854    pub fn allocated(&self) -> usize {
1855        self.allocated.load(Ordering::SeqCst)
1856    }
1857
1858    pub fn deallocated(&self) -> usize {
1859        self.deallocated.load(Ordering::SeqCst)
1860    }
1861
1862    pub fn reset(&self) {
1863        self.allocated.store(0, Ordering::SeqCst);
1864        self.deallocated.store(0, Ordering::SeqCst);
1865    }
1866}
1867
1868// SAFETY: Delegates allocation/deallocation to the inner allocator while tracking sizes.
1869unsafe impl<A: GlobalAlloc> GlobalAlloc for CountingAlloc<A> {
1870    // SAFETY: Callers uphold the GlobalAlloc contract; we delegate to the inner allocator.
1871    unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
1872        // SAFETY: Forwarding to the inner allocator preserves GlobalAlloc invariants.
1873        let p = unsafe { self.inner.alloc(layout) };
1874        if !p.is_null() {
1875            self.allocated.fetch_add(layout.size(), Ordering::SeqCst);
1876        }
1877        p
1878    }
1879
1880    // SAFETY: Callers uphold the GlobalAlloc contract; we delegate to the inner allocator.
1881    unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
1882        // SAFETY: Forwarding to the inner allocator preserves GlobalAlloc invariants.
1883        unsafe { self.inner.dealloc(ptr, layout) }
1884        self.deallocated.fetch_add(layout.size(), Ordering::SeqCst);
1885    }
1886}
1887
1888/// A simple struct that counts the number of bytes allocated and deallocated in a scope.
1889#[cfg(feature = "memory_monitoring")]
1890pub struct ScopedAllocCounter {
1891    bf_allocated: usize,
1892    bf_deallocated: usize,
1893}
1894
1895#[cfg(feature = "memory_monitoring")]
1896impl Default for ScopedAllocCounter {
1897    fn default() -> Self {
1898        Self::new()
1899    }
1900}
1901
1902#[cfg(feature = "memory_monitoring")]
1903impl ScopedAllocCounter {
1904    pub fn new() -> Self {
1905        ScopedAllocCounter {
1906            bf_allocated: GLOBAL.allocated(),
1907            bf_deallocated: GLOBAL.deallocated(),
1908        }
1909    }
1910
1911    /// Returns the total number of bytes allocated in the current scope
1912    /// since the creation of this `ScopedAllocCounter`.
1913    ///
1914    /// # Example
1915    /// ```
1916    /// use cu29_runtime::monitoring::ScopedAllocCounter;
1917    ///
1918    /// let counter = ScopedAllocCounter::new();
1919    /// let _vec = vec![0u8; 1024];
1920    /// println!("Bytes allocated: {}", counter.get_allocated());
1921    /// ```
1922    pub fn allocated(&self) -> usize {
1923        GLOBAL.allocated() - self.bf_allocated
1924    }
1925
1926    /// Returns the total number of bytes deallocated in the current scope
1927    /// since the creation of this `ScopedAllocCounter`.
1928    ///
1929    /// # Example
1930    /// ```
1931    /// use cu29_runtime::monitoring::ScopedAllocCounter;
1932    ///
1933    /// let counter = ScopedAllocCounter::new();
1934    /// let _vec = vec![0u8; 1024];
1935    /// drop(_vec);
1936    /// println!("Bytes deallocated: {}", counter.get_deallocated());
1937    /// ```
1938    pub fn deallocated(&self) -> usize {
1939        GLOBAL.deallocated() - self.bf_deallocated
1940    }
1941}
1942
1943/// Build a difference between the number of bytes allocated and deallocated in the scope at drop time.
1944#[cfg(feature = "memory_monitoring")]
1945impl Drop for ScopedAllocCounter {
1946    fn drop(&mut self) {
1947        let _allocated = GLOBAL.allocated() - self.bf_allocated;
1948        let _deallocated = GLOBAL.deallocated() - self.bf_deallocated;
1949        // TODO(gbin): Fix this when the logger is ready.
1950        // debug!(
1951        //     "Allocations: +{}B -{}B",
1952        //     allocated = allocated,
1953        //     deallocated = deallocated,
1954        // );
1955    }
1956}
1957
1958#[cfg(feature = "std")]
1959const BUCKET_COUNT: usize = 1024;
1960#[cfg(not(feature = "std"))]
1961const BUCKET_COUNT: usize = 256;
1962
1963/// Accumulative stat object that can give your some real time statistics.
1964/// Uses a fixed-size bucketed histogram for accurate percentile calculations.
1965#[derive(Debug, Clone)]
1966pub struct LiveStatistics {
1967    buckets: [u64; BUCKET_COUNT],
1968    min_val: u64,
1969    max_val: u64,
1970    sum: u128,
1971    sum_sq: u128,
1972    count: u64,
1973    max_value: u64,
1974}
1975
1976impl LiveStatistics {
1977    /// Creates a new `LiveStatistics` instance with a specified maximum value.
1978    ///
1979    /// This function initializes a `LiveStatistics` structure with default values
1980    /// for tracking statistical data, while setting an upper limit for the data
1981    /// points that the structure tracks.
1982    ///
1983    /// # Parameters
1984    /// - `max_value` (`u64`): The maximum value that can be recorded or tracked.
1985    ///
1986    /// # Returns
1987    /// A new instance of `LiveStatistics` with:
1988    /// - `buckets`: An array pre-filled with zeros to categorize data points.
1989    /// - `min_val`: Initialized to the maximum possible `u64` value to track the minimum correctly.
1990    /// - `max_val`: Initialized to zero.
1991    /// - `sum`: The sum of all data points, initialized to zero.
1992    /// - `sum_sq`: The sum of squares of all data points, initialized to zero.
1993    /// - `count`: The total number of data points, initialized to zero.
1994    /// - `max_value`: The maximum allowable value for data points, set to the provided `max_value`.
1995    ///
1996    pub fn new_with_max(max_value: u64) -> Self {
1997        LiveStatistics {
1998            buckets: [0; BUCKET_COUNT],
1999            min_val: u64::MAX,
2000            max_val: 0,
2001            sum: 0,
2002            sum_sq: 0,
2003            count: 0,
2004            max_value,
2005        }
2006    }
2007
2008    #[inline]
2009    fn value_to_bucket(&self, value: u64) -> usize {
2010        if value >= self.max_value {
2011            BUCKET_COUNT - 1
2012        } else {
2013            ((value as u128 * BUCKET_COUNT as u128) / self.max_value as u128) as usize
2014        }
2015    }
2016
2017    #[inline]
2018    pub fn min(&self) -> u64 {
2019        if self.count == 0 { 0 } else { self.min_val }
2020    }
2021
2022    #[inline]
2023    pub fn max(&self) -> u64 {
2024        self.max_val
2025    }
2026
2027    #[inline]
2028    pub fn mean(&self) -> f64 {
2029        if self.count == 0 {
2030            0.0
2031        } else {
2032            self.sum as f64 / self.count as f64
2033        }
2034    }
2035
2036    #[inline]
2037    pub fn stdev(&self) -> f64 {
2038        if self.count == 0 {
2039            return 0.0;
2040        }
2041        let mean = self.mean();
2042        let variance = (self.sum_sq as f64 / self.count as f64) - (mean * mean);
2043        if variance < 0.0 {
2044            return 0.0;
2045        }
2046        #[cfg(feature = "std")]
2047        return variance.sqrt();
2048        #[cfg(not(feature = "std"))]
2049        return sqrt(variance);
2050    }
2051
2052    #[inline]
2053    pub fn percentile(&self, percentile: f64) -> u64 {
2054        if self.count == 0 {
2055            return 0;
2056        }
2057
2058        let target_count = (self.count as f64 * percentile) as u64;
2059        let mut accumulated = 0u64;
2060
2061        for (bucket_idx, &bucket_count) in self.buckets.iter().enumerate() {
2062            accumulated += bucket_count;
2063            if accumulated >= target_count {
2064                // Linear interpolation within the bucket
2065                let bucket_start = (bucket_idx as u64 * self.max_value) / BUCKET_COUNT as u64;
2066                let bucket_end = ((bucket_idx + 1) as u64 * self.max_value) / BUCKET_COUNT as u64;
2067                let bucket_fraction = if bucket_count > 0 {
2068                    (target_count - (accumulated - bucket_count)) as f64 / bucket_count as f64
2069                } else {
2070                    0.5
2071                };
2072                return bucket_start
2073                    + ((bucket_end - bucket_start) as f64 * bucket_fraction) as u64;
2074            }
2075        }
2076
2077        self.max_val
2078    }
2079
2080    /// Adds a value to the statistics.
2081    #[inline]
2082    pub fn record(&mut self, value: u64) {
2083        if value < self.min_val {
2084            self.min_val = value;
2085        }
2086        if value > self.max_val {
2087            self.max_val = value;
2088        }
2089        let value_u128 = value as u128;
2090        self.sum += value_u128;
2091        self.sum_sq += value_u128 * value_u128;
2092        self.count += 1;
2093
2094        let bucket = self.value_to_bucket(value);
2095        self.buckets[bucket] += 1;
2096    }
2097
2098    #[inline]
2099    pub fn len(&self) -> u64 {
2100        self.count
2101    }
2102
2103    #[inline]
2104    pub fn is_empty(&self) -> bool {
2105        self.count == 0
2106    }
2107
2108    #[inline]
2109    pub fn reset(&mut self) {
2110        self.buckets.fill(0);
2111        self.min_val = u64::MAX;
2112        self.max_val = 0;
2113        self.sum = 0;
2114        self.sum_sq = 0;
2115        self.count = 0;
2116    }
2117}
2118
2119/// A Specialized statistics object for CuDuration.
2120/// It will also keep track of the jitter between the values.
2121#[derive(Debug, Clone)]
2122pub struct CuDurationStatistics {
2123    bare: LiveStatistics,
2124    jitter: LiveStatistics,
2125    last_value: CuDuration,
2126}
2127
2128impl CuDurationStatistics {
2129    pub fn new(max: CuDuration) -> Self {
2130        let CuDuration(max) = max;
2131        CuDurationStatistics {
2132            bare: LiveStatistics::new_with_max(max),
2133            jitter: LiveStatistics::new_with_max(max),
2134            last_value: CuDuration::default(),
2135        }
2136    }
2137
2138    #[inline]
2139    pub fn min(&self) -> CuDuration {
2140        CuDuration(self.bare.min())
2141    }
2142
2143    #[inline]
2144    pub fn max(&self) -> CuDuration {
2145        CuDuration(self.bare.max())
2146    }
2147
2148    #[inline]
2149    pub fn mean(&self) -> CuDuration {
2150        CuDuration(self.bare.mean() as u64) // CuDuration is in ns, it is ok.
2151    }
2152
2153    #[inline]
2154    pub fn percentile(&self, percentile: f64) -> CuDuration {
2155        CuDuration(self.bare.percentile(percentile))
2156    }
2157
2158    #[inline]
2159    pub fn stddev(&self) -> CuDuration {
2160        CuDuration(self.bare.stdev() as u64)
2161    }
2162
2163    #[inline]
2164    pub fn len(&self) -> u64 {
2165        self.bare.len()
2166    }
2167
2168    #[inline]
2169    pub fn is_empty(&self) -> bool {
2170        self.bare.len() == 0
2171    }
2172
2173    #[inline]
2174    pub fn jitter_min(&self) -> CuDuration {
2175        CuDuration(self.jitter.min())
2176    }
2177
2178    #[inline]
2179    pub fn jitter_max(&self) -> CuDuration {
2180        CuDuration(self.jitter.max())
2181    }
2182
2183    #[inline]
2184    pub fn jitter_mean(&self) -> CuDuration {
2185        CuDuration(self.jitter.mean() as u64)
2186    }
2187
2188    #[inline]
2189    pub fn jitter_stddev(&self) -> CuDuration {
2190        CuDuration(self.jitter.stdev() as u64)
2191    }
2192
2193    #[inline]
2194    pub fn jitter_percentile(&self, percentile: f64) -> CuDuration {
2195        CuDuration(self.jitter.percentile(percentile))
2196    }
2197
2198    #[inline]
2199    pub fn record(&mut self, value: CuDuration) {
2200        let CuDuration(nanos) = value;
2201        if self.bare.is_empty() {
2202            self.bare.record(nanos);
2203            self.last_value = value;
2204            return;
2205        }
2206        self.bare.record(nanos);
2207        let CuDuration(last_nanos) = self.last_value;
2208        self.jitter.record(nanos.abs_diff(last_nanos));
2209        self.last_value = value;
2210    }
2211
2212    #[inline]
2213    pub fn reset(&mut self) {
2214        self.bare.reset();
2215        self.jitter.reset();
2216    }
2217}
2218
2219#[cfg(test)]
2220mod tests {
2221    use super::*;
2222    use core::sync::atomic::{AtomicUsize, Ordering};
2223
2224    #[derive(Clone, Copy)]
2225    enum TestDecision {
2226        Ignore,
2227        Abort,
2228        Shutdown,
2229    }
2230
2231    struct TestMonitor {
2232        decision: TestDecision,
2233        copperlist_calls: AtomicUsize,
2234        panic_calls: AtomicUsize,
2235    }
2236
2237    impl TestMonitor {
2238        fn new_with(decision: TestDecision) -> Self {
2239            Self {
2240                decision,
2241                copperlist_calls: AtomicUsize::new(0),
2242                panic_calls: AtomicUsize::new(0),
2243            }
2244        }
2245    }
2246
2247    fn test_metadata() -> CuMonitoringMetadata {
2248        const COMPONENTS: &[MonitorComponentMetadata] = &[
2249            MonitorComponentMetadata::new("a", ComponentType::Task, None),
2250            MonitorComponentMetadata::new("b", ComponentType::Task, None),
2251        ];
2252        CuMonitoringMetadata::new(
2253            CompactString::from(crate::config::DEFAULT_MISSION_ID),
2254            COMPONENTS,
2255            &[],
2256            CopperListInfo::new(0, 0),
2257            MonitorTopology::default(),
2258            None,
2259        )
2260        .expect("test metadata should be valid")
2261    }
2262
2263    impl CuMonitor for TestMonitor {
2264        fn new(_metadata: CuMonitoringMetadata, runtime: CuMonitoringRuntime) -> CuResult<Self> {
2265            let monitor = Self::new_with(TestDecision::Ignore);
2266            #[cfg(feature = "std")]
2267            let _ = runtime.execution_probe();
2268            Ok(monitor)
2269        }
2270
2271        fn process_copperlist(&self, _ctx: &CuContext, _view: CopperListView<'_>) -> CuResult<()> {
2272            self.copperlist_calls.fetch_add(1, Ordering::SeqCst);
2273            Ok(())
2274        }
2275
2276        fn process_error(
2277            &self,
2278            _component_id: ComponentId,
2279            _step: CuComponentState,
2280            _error: &CuError,
2281        ) -> Decision {
2282            match self.decision {
2283                TestDecision::Ignore => Decision::Ignore,
2284                TestDecision::Abort => Decision::Abort,
2285                TestDecision::Shutdown => Decision::Shutdown,
2286            }
2287        }
2288
2289        fn process_panic(&self, _panic_message: &str) {
2290            self.panic_calls.fetch_add(1, Ordering::SeqCst);
2291        }
2292    }
2293
2294    #[test]
2295    fn test_live_statistics_percentiles() {
2296        let mut stats = LiveStatistics::new_with_max(1000);
2297
2298        // Record 100 values from 0 to 99
2299        for i in 0..100 {
2300            stats.record(i);
2301        }
2302
2303        assert_eq!(stats.len(), 100);
2304        assert_eq!(stats.min(), 0);
2305        assert_eq!(stats.max(), 99);
2306        assert_eq!(stats.mean() as u64, 49); // Average of 0..99
2307
2308        // Test percentiles - should be approximately correct
2309        let p50 = stats.percentile(0.5);
2310        let p90 = stats.percentile(0.90);
2311        let p95 = stats.percentile(0.95);
2312        let p99 = stats.percentile(0.99);
2313
2314        // With 100 samples from 0-99, percentiles should be close to their index
2315        assert!((p50 as i64 - 49).abs() < 5, "p50={} expected ~49", p50);
2316        assert!((p90 as i64 - 89).abs() < 5, "p90={} expected ~89", p90);
2317        assert!((p95 as i64 - 94).abs() < 5, "p95={} expected ~94", p95);
2318        assert!((p99 as i64 - 98).abs() < 5, "p99={} expected ~98", p99);
2319    }
2320
2321    #[test]
2322    fn test_duration_stats() {
2323        let mut stats = CuDurationStatistics::new(CuDuration(1000));
2324        stats.record(CuDuration(100));
2325        stats.record(CuDuration(200));
2326        stats.record(CuDuration(500));
2327        stats.record(CuDuration(400));
2328        assert_eq!(stats.min(), CuDuration(100));
2329        assert_eq!(stats.max(), CuDuration(500));
2330        assert_eq!(stats.mean(), CuDuration(300));
2331        assert_eq!(stats.len(), 4);
2332        assert_eq!(stats.jitter.len(), 3);
2333        assert_eq!(stats.jitter_min(), CuDuration(100));
2334        assert_eq!(stats.jitter_max(), CuDuration(300));
2335        assert_eq!(stats.jitter_mean(), CuDuration((100 + 300 + 100) / 3));
2336        stats.reset();
2337        assert_eq!(stats.len(), 0);
2338    }
2339
2340    #[test]
2341    fn test_duration_stats_large_samples_do_not_overflow() {
2342        let mut stats = CuDurationStatistics::new(CuDuration(10_000_000_000));
2343        stats.record(CuDuration(5_000_000_000));
2344        stats.record(CuDuration(8_000_000_000));
2345
2346        assert_eq!(stats.min(), CuDuration(5_000_000_000));
2347        assert_eq!(stats.max(), CuDuration(8_000_000_000));
2348        assert_eq!(stats.mean(), CuDuration(6_500_000_000));
2349        assert!(stats.stddev().as_nanos().abs_diff(1_500_000_000) <= 1);
2350        assert_eq!(stats.jitter_mean(), CuDuration(3_000_000_000));
2351    }
2352
2353    #[test]
2354    fn tuple_monitor_merges_contradictory_decisions_with_strictest_wins() {
2355        let err = CuError::from("boom");
2356
2357        let two = (
2358            TestMonitor::new_with(TestDecision::Ignore),
2359            TestMonitor::new_with(TestDecision::Shutdown),
2360        );
2361        assert!(matches!(
2362            two.process_error(ComponentId::new(0), CuComponentState::Process, &err),
2363            Decision::Shutdown
2364        ));
2365
2366        let two = (
2367            TestMonitor::new_with(TestDecision::Ignore),
2368            TestMonitor::new_with(TestDecision::Abort),
2369        );
2370        assert!(matches!(
2371            two.process_error(ComponentId::new(0), CuComponentState::Process, &err),
2372            Decision::Abort
2373        ));
2374    }
2375
2376    #[test]
2377    fn tuple_monitor_fans_out_callbacks() {
2378        let monitors = <(TestMonitor, TestMonitor) as CuMonitor>::new(
2379            test_metadata(),
2380            CuMonitoringRuntime::unavailable(),
2381        )
2382        .expect("tuple new");
2383        let (ctx, _clock_control) = CuContext::new_mock_clock();
2384        let empty_view = test_metadata().layout().view(&[]);
2385        monitors
2386            .process_copperlist(&ctx, empty_view)
2387            .expect("process_copperlist should fan out");
2388        monitors.process_panic("panic marker");
2389
2390        assert_eq!(monitors.0.copperlist_calls.load(Ordering::SeqCst), 1);
2391        assert_eq!(monitors.1.copperlist_calls.load(Ordering::SeqCst), 1);
2392        assert_eq!(monitors.0.panic_calls.load(Ordering::SeqCst), 1);
2393        assert_eq!(monitors.1.panic_calls.load(Ordering::SeqCst), 1);
2394    }
2395
2396    fn encoded_size<E: Encode>(value: &E) -> usize {
2397        let mut encoder = EncoderImpl::<_, _>::new(SizeWriter::default(), standard());
2398        value
2399            .encode(&mut encoder)
2400            .expect("size measurement encoder should not fail");
2401        encoder.into_writer().bytes_written
2402    }
2403
2404    #[test]
2405    fn payload_io_stats_tracks_encode_path_size_for_plain_payloads() {
2406        let payload = vec![1u8, 2, 3, 4];
2407        let io = payload_io_stats(&payload).expect("payload IO measurement should succeed");
2408
2409        assert_eq!(io.encoded_bytes, encoded_size(&payload));
2410        assert_eq!(io.resident_bytes, core::mem::size_of::<Vec<u8>>());
2411        assert_eq!(io.handle_bytes, 0);
2412    }
2413
2414    #[test]
2415    fn payload_io_stats_tracks_handle_backed_storage() {
2416        let payload = crate::pool::CuHandle::new_detached(vec![0u8; 32]);
2417        let io = payload_io_stats(&payload).expect("payload IO measurement should succeed");
2418
2419        assert_eq!(io.encoded_bytes, encoded_size(&payload));
2420        assert_eq!(
2421            io.resident_bytes,
2422            core::mem::size_of::<crate::pool::CuHandle<Vec<u8>>>() + 32
2423        );
2424        assert_eq!(io.handle_bytes, 32);
2425    }
2426
2427    #[test]
2428    fn runtime_execution_probe_roundtrip_marker() {
2429        let probe = RuntimeExecutionProbe::default();
2430        assert!(probe.marker().is_none());
2431        assert_eq!(probe.sequence(), 0);
2432
2433        probe.record(ExecutionMarker {
2434            component_id: ComponentId::new(7),
2435            step: CuComponentState::Process,
2436            culistid: Some(42),
2437        });
2438
2439        let marker = probe.marker().expect("marker should be available");
2440        assert_eq!(marker.component_id, ComponentId::new(7));
2441        assert!(matches!(marker.step, CuComponentState::Process));
2442        assert_eq!(marker.culistid, Some(42));
2443        assert_eq!(probe.sequence(), 1);
2444    }
2445}