Skip to main content

cu29_runtime/
monitoring.rs

1//! Some basic internal monitoring tooling Copper uses to monitor itself and the tasks it is running.
2//!
3
4use crate::config::CuConfig;
5use crate::config::{BridgeChannelConfigRepresentation, BridgeConfig, CuGraph, Flavor, NodeId};
6use crate::context::CuContext;
7use crate::cutask::CuMsgMetadata;
8use cu29_clock::CuDuration;
9#[allow(unused_imports)]
10use cu29_log::CuLogLevel;
11#[cfg(all(feature = "std", debug_assertions))]
12use cu29_log_runtime::{
13    format_message_only, register_live_log_listener, unregister_live_log_listener,
14};
15use cu29_traits::{CuError, CuResult};
16use serde_derive::{Deserialize, Serialize};
17
18#[cfg(not(feature = "std"))]
19extern crate alloc;
20
21#[cfg(feature = "std")]
22use std::sync::Arc;
23#[cfg(feature = "std")]
24use std::{collections::HashMap as Map, string::String, string::ToString, vec::Vec};
25
26#[cfg(not(feature = "std"))]
27use alloc::{collections::BTreeMap as Map, string::String, string::ToString, vec::Vec};
28#[cfg(not(target_has_atomic = "64"))]
29use spin::Mutex;
30
31#[cfg(not(feature = "std"))]
32mod imp {
33    pub use alloc::alloc::{GlobalAlloc, Layout};
34    #[cfg(target_has_atomic = "64")]
35    pub use core::sync::atomic::AtomicU64;
36    pub use core::sync::atomic::{AtomicUsize, Ordering};
37    pub use libm::sqrt;
38}
39
40#[cfg(feature = "std")]
41mod imp {
42    #[cfg(feature = "memory_monitoring")]
43    use super::CountingAlloc;
44    #[cfg(feature = "memory_monitoring")]
45    pub use std::alloc::System;
46    pub use std::alloc::{GlobalAlloc, Layout};
47    #[cfg(target_has_atomic = "64")]
48    pub use std::sync::atomic::AtomicU64;
49    pub use std::sync::atomic::{AtomicUsize, Ordering};
50    #[cfg(feature = "memory_monitoring")]
51    #[global_allocator]
52    pub static GLOBAL: CountingAlloc<System> = CountingAlloc::new(System);
53}
54
55use imp::*;
56
57#[cfg(all(feature = "std", debug_assertions))]
58fn format_timestamp(time: CuDuration) -> String {
59    // Render CuTime/CuDuration as HH:mm:ss.xxxx (4 fractional digits of a second).
60    let nanos = time.as_nanos();
61    let total_seconds = nanos / 1_000_000_000;
62    let hours = total_seconds / 3600;
63    let minutes = (total_seconds / 60) % 60;
64    let seconds = total_seconds % 60;
65    let fractional_1e4 = (nanos % 1_000_000_000) / 100_000;
66    format!("{hours:02}:{minutes:02}:{seconds:02}.{fractional_1e4:04}")
67}
68
69/// The state of a task.
70#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
71pub enum CuTaskState {
72    Start,
73    Preprocess,
74    Process,
75    Postprocess,
76    Stop,
77}
78
79/// Execution progress marker emitted by the runtime before running a component step.
80#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
81pub struct ExecutionMarker {
82    /// Index into TASKS_IDS (tasks + bridge components/channels).
83    pub component_id: usize,
84    /// Lifecycle phase currently entered.
85    pub step: CuTaskState,
86    /// CopperList id when available (runtime loop), None during start/stop.
87    pub culistid: Option<u64>,
88}
89
90/// Lock-free runtime-side progress probe.
91///
92/// The runtime writes execution markers directly into this probe from the hot path
93/// (without calling monitor fan-out callbacks), and monitors can read a coherent
94/// snapshot from watchdog threads when diagnosing stalls.
95#[derive(Debug)]
96pub struct RuntimeExecutionProbe {
97    component_id: AtomicUsize,
98    step: AtomicUsize,
99    #[cfg(target_has_atomic = "64")]
100    culistid: AtomicU64,
101    #[cfg(target_has_atomic = "64")]
102    culistid_present: AtomicUsize,
103    #[cfg(not(target_has_atomic = "64"))]
104    culistid: Mutex<Option<u64>>,
105    sequence: AtomicUsize,
106}
107
108impl Default for RuntimeExecutionProbe {
109    fn default() -> Self {
110        Self {
111            component_id: AtomicUsize::new(usize::MAX),
112            step: AtomicUsize::new(0),
113            #[cfg(target_has_atomic = "64")]
114            culistid: AtomicU64::new(0),
115            #[cfg(target_has_atomic = "64")]
116            culistid_present: AtomicUsize::new(0),
117            #[cfg(not(target_has_atomic = "64"))]
118            culistid: Mutex::new(None),
119            sequence: AtomicUsize::new(0),
120        }
121    }
122}
123
124impl RuntimeExecutionProbe {
125    #[inline]
126    pub fn record(&self, marker: ExecutionMarker) {
127        self.component_id
128            .store(marker.component_id, Ordering::Relaxed);
129        self.step
130            .store(task_state_to_usize(marker.step), Ordering::Relaxed);
131        #[cfg(target_has_atomic = "64")]
132        match marker.culistid {
133            Some(culistid) => {
134                self.culistid.store(culistid, Ordering::Relaxed);
135                self.culistid_present.store(1, Ordering::Relaxed);
136            }
137            None => {
138                self.culistid_present.store(0, Ordering::Relaxed);
139            }
140        }
141        #[cfg(not(target_has_atomic = "64"))]
142        {
143            *self.culistid.lock() = marker.culistid;
144        }
145        self.sequence.fetch_add(1, Ordering::Release);
146    }
147
148    #[inline]
149    pub fn sequence(&self) -> usize {
150        self.sequence.load(Ordering::Acquire)
151    }
152
153    #[inline]
154    pub fn marker(&self) -> Option<ExecutionMarker> {
155        // Read a coherent snapshot. A concurrent writer may change values between reads;
156        // in that case we retry to keep the marker and sequence aligned.
157        loop {
158            let seq_before = self.sequence.load(Ordering::Acquire);
159            let component_id = self.component_id.load(Ordering::Relaxed);
160            let step = self.step.load(Ordering::Relaxed);
161            #[cfg(target_has_atomic = "64")]
162            let culistid_present = self.culistid_present.load(Ordering::Relaxed);
163            #[cfg(target_has_atomic = "64")]
164            let culistid_value = self.culistid.load(Ordering::Relaxed);
165            #[cfg(not(target_has_atomic = "64"))]
166            let culistid = *self.culistid.lock();
167            let seq_after = self.sequence.load(Ordering::Acquire);
168            if seq_before == seq_after {
169                if component_id == usize::MAX {
170                    return None;
171                }
172                let step = usize_to_task_state(step);
173                #[cfg(target_has_atomic = "64")]
174                let culistid = if culistid_present == 0 {
175                    None
176                } else {
177                    Some(culistid_value)
178                };
179                return Some(ExecutionMarker {
180                    component_id,
181                    step,
182                    culistid,
183                });
184            }
185        }
186    }
187}
188
189#[inline]
190const fn task_state_to_usize(step: CuTaskState) -> usize {
191    match step {
192        CuTaskState::Start => 0,
193        CuTaskState::Preprocess => 1,
194        CuTaskState::Process => 2,
195        CuTaskState::Postprocess => 3,
196        CuTaskState::Stop => 4,
197    }
198}
199
200#[inline]
201const fn usize_to_task_state(step: usize) -> CuTaskState {
202    match step {
203        0 => CuTaskState::Start,
204        1 => CuTaskState::Preprocess,
205        2 => CuTaskState::Process,
206        3 => CuTaskState::Postprocess,
207        _ => CuTaskState::Stop,
208    }
209}
210
211#[cfg(feature = "std")]
212pub type ExecutionProbeHandle = Arc<RuntimeExecutionProbe>;
213
214/// Monitor decision to be taken when a task errored out.
215#[derive(Debug)]
216pub enum Decision {
217    Abort,    // for a step (stop, start) or a copperlist, just stop trying to process it.
218    Ignore, // Ignore this error and try to continue, ie calling the other tasks steps, setting a None return value and continue a copperlist.
219    Shutdown, // This is a fatal error, shutdown the copper as cleanly as possible.
220}
221
222fn merge_decision(lhs: Decision, rhs: Decision) -> Decision {
223    use Decision::{Abort, Ignore, Shutdown};
224    // Pick the strictest monitor decision when multiple monitors disagree.
225    // Shutdown dominates Abort, which dominates Ignore.
226    match (lhs, rhs) {
227        (Shutdown, _) | (_, Shutdown) => Shutdown,
228        (Abort, _) | (_, Abort) => Abort,
229        _ => Ignore,
230    }
231}
232
233#[derive(Debug, Clone, Copy, PartialEq, Eq)]
234pub enum ComponentKind {
235    Task,
236    Bridge,
237}
238
239#[derive(Debug, Clone)]
240pub struct MonitorNode {
241    pub id: String,
242    pub type_name: Option<String>,
243    pub kind: ComponentKind,
244    /// Ordered list of input port identifiers.
245    pub inputs: Vec<String>,
246    /// Ordered list of output port identifiers.
247    pub outputs: Vec<String>,
248}
249
250#[derive(Debug, Clone)]
251pub struct MonitorConnection {
252    pub src: String,
253    pub src_port: Option<String>,
254    pub dst: String,
255    pub dst_port: Option<String>,
256    pub msg: String,
257}
258
259#[derive(Debug, Clone, Default)]
260pub struct MonitorTopology {
261    pub nodes: Vec<MonitorNode>,
262    pub connections: Vec<MonitorConnection>,
263}
264
265#[derive(Debug, Clone, Copy, Default)]
266pub struct CopperListInfo {
267    pub size_bytes: usize,
268    pub count: usize,
269}
270
271impl CopperListInfo {
272    pub const fn new(size_bytes: usize, count: usize) -> Self {
273        Self { size_bytes, count }
274    }
275}
276
277/// Reported data about CopperList IO for a single iteration.
278#[derive(Debug, Clone, Copy, Default)]
279pub struct CopperListIoStats {
280    /// CopperList struct size in RAM (excluding dynamic payloads/handles)
281    pub raw_culist_bytes: u64,
282    /// Bytes held by payloads that will be serialized (currently: pooled handles, vecs, slices)
283    pub handle_bytes: u64,
284    /// Bytes produced by bincode serialization of the CopperList
285    pub encoded_culist_bytes: u64,
286    /// Bytes produced by bincode serialization of the KeyFrame (0 if none)
287    pub keyframe_bytes: u64,
288    /// Cumulative bytes written to the structured log stream so far
289    pub structured_log_bytes_total: u64,
290    /// CopperList identifier for reference in monitors
291    pub culistid: u64,
292}
293
294/// Lightweight trait to estimate the amount of data a payload will contribute when serialized.
295/// Default implementations return the stack size; specific types override to report dynamic data.
296pub trait CuPayloadSize {
297    /// Total bytes represented by the payload in memory (stack + heap backing).
298    fn raw_bytes(&self) -> usize {
299        core::mem::size_of_val(self)
300    }
301
302    /// Bytes that correspond to reusable/pooled handles (used for IO budgeting).
303    fn handle_bytes(&self) -> usize {
304        0
305    }
306}
307
308impl<T> CuPayloadSize for T
309where
310    T: crate::cutask::CuMsgPayload,
311{
312    fn raw_bytes(&self) -> usize {
313        core::mem::size_of::<T>()
314    }
315}
316
317#[derive(Default, Debug, Clone, Copy)]
318struct NodeIoUsage {
319    has_incoming: bool,
320    has_outgoing: bool,
321}
322
323fn collect_output_ports(graph: &CuGraph, node_id: NodeId) -> Vec<(String, String)> {
324    let mut edge_ids = graph.get_src_edges(node_id).unwrap_or_default();
325    edge_ids.sort();
326
327    let mut outputs = Vec::new();
328    let mut seen = Vec::new();
329    let mut port_idx = 0usize;
330    for edge_id in edge_ids {
331        let Some(edge) = graph.edge(edge_id) else {
332            continue;
333        };
334        if seen.iter().any(|msg| msg == &edge.msg) {
335            continue;
336        }
337        seen.push(edge.msg.clone());
338        let mut port_label = String::from("out");
339        port_label.push_str(&port_idx.to_string());
340        port_label.push_str(": ");
341        port_label.push_str(edge.msg.as_str());
342        outputs.push((edge.msg.clone(), port_label));
343        port_idx += 1;
344    }
345    outputs
346}
347
348/// Derive a monitor-friendly topology from the runtime configuration.
349pub fn build_monitor_topology(
350    config: &CuConfig,
351    mission: Option<&str>,
352) -> CuResult<MonitorTopology> {
353    let graph = config.get_graph(mission)?;
354    let mut nodes: Map<String, MonitorNode> = Map::new();
355    let mut io_usage: Map<String, NodeIoUsage> = Map::new();
356    let mut output_port_lookup: Map<String, Map<String, String>> = Map::new();
357
358    let mut bridge_lookup: Map<&str, &BridgeConfig> = Map::new();
359    for bridge in &config.bridges {
360        bridge_lookup.insert(bridge.id.as_str(), bridge);
361    }
362
363    for cnx in graph.edges() {
364        io_usage.entry(cnx.src.clone()).or_default().has_outgoing = true;
365        io_usage.entry(cnx.dst.clone()).or_default().has_incoming = true;
366    }
367
368    for (_, node) in graph.get_all_nodes() {
369        let kind = match node.get_flavor() {
370            Flavor::Bridge => ComponentKind::Bridge,
371            _ => ComponentKind::Task,
372        };
373        let node_id = node.get_id();
374
375        let mut inputs = Vec::new();
376        let mut outputs = Vec::new();
377        if kind == ComponentKind::Bridge {
378            if let Some(bridge) = bridge_lookup.get(node_id.as_str()) {
379                for ch in &bridge.channels {
380                    match ch {
381                        BridgeChannelConfigRepresentation::Rx { id, .. } => {
382                            outputs.push(id.clone())
383                        }
384                        BridgeChannelConfigRepresentation::Tx { id, .. } => inputs.push(id.clone()),
385                    }
386                }
387            }
388        } else {
389            let usage = io_usage.get(node_id.as_str()).cloned().unwrap_or_default();
390            if usage.has_incoming || !usage.has_outgoing {
391                inputs.push("in".to_string());
392            }
393            if usage.has_outgoing {
394                if let Some(node_idx) = graph.get_node_id_by_name(node_id.as_str()) {
395                    let ports = collect_output_ports(graph, node_idx);
396                    let mut port_map: Map<String, String> = Map::new();
397                    for (msg_type, label) in ports {
398                        port_map.insert(msg_type, label.clone());
399                        outputs.push(label);
400                    }
401                    output_port_lookup.insert(node_id.clone(), port_map);
402                }
403            } else if !usage.has_incoming {
404                outputs.push("out".to_string());
405            }
406        }
407
408        nodes.insert(
409            node_id.clone(),
410            MonitorNode {
411                id: node_id,
412                type_name: Some(node.get_type().to_string()),
413                kind,
414                inputs,
415                outputs,
416            },
417        );
418    }
419
420    let mut connections = Vec::new();
421    for cnx in graph.edges() {
422        let src = cnx.src.clone();
423        let dst = cnx.dst.clone();
424
425        let src_port = cnx.src_channel.clone().or_else(|| {
426            output_port_lookup
427                .get(&src)
428                .and_then(|ports| ports.get(&cnx.msg).cloned())
429                .or_else(|| {
430                    nodes
431                        .get(&src)
432                        .and_then(|node| node.outputs.first().cloned())
433                })
434        });
435        let dst_port = cnx.dst_channel.clone().or_else(|| {
436            nodes
437                .get(&dst)
438                .and_then(|node| node.inputs.first().cloned())
439        });
440
441        connections.push(MonitorConnection {
442            src,
443            src_port,
444            dst,
445            dst_port,
446            msg: cnx.msg.clone(),
447        });
448    }
449
450    Ok(MonitorTopology {
451        nodes: nodes.into_values().collect(),
452        connections,
453    })
454}
455
456/// Trait to implement a monitoring task.
457pub trait CuMonitor: Sized {
458    fn new(config: &CuConfig, taskids: &'static [&'static str]) -> CuResult<Self>
459    where
460        Self: Sized;
461
462    fn set_topology(&mut self, _topology: MonitorTopology) {}
463
464    fn set_copperlist_info(&mut self, _info: CopperListInfo) {}
465
466    #[cfg(feature = "std")]
467    fn set_execution_probe(&mut self, _probe: ExecutionProbeHandle) {}
468
469    fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
470        Ok(())
471    }
472
473    /// Callback that will be trigger at the end of every copperlist (before, on or after the serialization).
474    fn process_copperlist(&self, _ctx: &CuContext, msgs: &[&CuMsgMetadata]) -> CuResult<()>;
475
476    /// Called when the runtime finishes serializing a CopperList, giving IO accounting data.
477    fn observe_copperlist_io(&self, _stats: CopperListIoStats) {}
478
479    /// Callbacked when a Task errored out. The runtime requires an immediate decision.
480    fn process_error(&self, taskid: usize, step: CuTaskState, error: &CuError) -> Decision;
481
482    /// Callback fired when the runtime catches a panic in a std build.
483    fn process_panic(&self, _panic_message: &str) {}
484
485    /// Callbacked when copper is stopping.
486    fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
487        Ok(())
488    }
489}
490
491/// A do nothing monitor if no monitor is provided.
492/// This is basically defining the default behavior of Copper in case of error.
493pub struct NoMonitor {}
494impl CuMonitor for NoMonitor {
495    fn new(_config: &CuConfig, _taskids: &'static [&'static str]) -> CuResult<Self> {
496        Ok(NoMonitor {})
497    }
498
499    fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
500        #[cfg(all(feature = "std", debug_assertions))]
501        register_live_log_listener(|entry, format_str, param_names| {
502            let params: Vec<String> = entry.params.iter().map(|v| v.to_string()).collect();
503            let named: Map<String, String> = param_names
504                .iter()
505                .zip(params.iter())
506                .map(|(k, v)| (k.to_string(), v.clone()))
507                .collect();
508
509            if let Ok(msg) = format_message_only(format_str, params.as_slice(), &named) {
510                let ts = format_timestamp(entry.time);
511                println!("{} [{:?}] {}", ts, entry.level, msg);
512            }
513        });
514        Ok(())
515    }
516
517    fn process_copperlist(&self, _ctx: &CuContext, _msgs: &[&CuMsgMetadata]) -> CuResult<()> {
518        // By default, do nothing.
519        Ok(())
520    }
521
522    fn process_error(&self, _taskid: usize, _step: CuTaskState, _error: &CuError) -> Decision {
523        // By default, just try to continue.
524        Decision::Ignore
525    }
526
527    fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
528        #[cfg(all(feature = "std", debug_assertions))]
529        unregister_live_log_listener();
530        Ok(())
531    }
532}
533
534macro_rules! impl_monitor_tuple {
535    ($($idx:tt => $name:ident),+) => {
536        impl<$($name: CuMonitor),+> CuMonitor for ($($name,)+) {
537            fn new(config: &CuConfig, taskids: &'static [&'static str]) -> CuResult<Self>
538            where
539                Self: Sized,
540            {
541                Ok(($($name::new(config, taskids)?,)+))
542            }
543
544            fn set_topology(&mut self, topology: MonitorTopology) {
545                $(self.$idx.set_topology(topology.clone());)+
546            }
547
548            fn set_copperlist_info(&mut self, info: CopperListInfo) {
549                $(self.$idx.set_copperlist_info(info);)+
550            }
551
552            #[cfg(feature = "std")]
553            fn set_execution_probe(&mut self, probe: ExecutionProbeHandle) {
554                $(self.$idx.set_execution_probe(probe.clone());)+
555            }
556
557            fn start(&mut self, ctx: &CuContext) -> CuResult<()> {
558                $(self.$idx.start(ctx)?;)+
559                Ok(())
560            }
561
562            fn process_copperlist(&self, ctx: &CuContext, msgs: &[&CuMsgMetadata]) -> CuResult<()> {
563                $(self.$idx.process_copperlist(ctx, msgs)?;)+
564                Ok(())
565            }
566
567            fn observe_copperlist_io(&self, stats: CopperListIoStats) {
568                $(self.$idx.observe_copperlist_io(stats);)+
569            }
570
571            fn process_error(&self, taskid: usize, step: CuTaskState, error: &CuError) -> Decision {
572                let mut decision = Decision::Ignore;
573                $(decision = merge_decision(decision, self.$idx.process_error(taskid, step, error));)+
574                decision
575            }
576
577            fn process_panic(&self, panic_message: &str) {
578                $(self.$idx.process_panic(panic_message);)+
579            }
580
581            fn stop(&mut self, ctx: &CuContext) -> CuResult<()> {
582                $(self.$idx.stop(ctx)?;)+
583                Ok(())
584            }
585        }
586    };
587}
588
589impl_monitor_tuple!(0 => M0, 1 => M1);
590impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2);
591impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2, 3 => M3);
592impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2, 3 => M3, 4 => M4);
593impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2, 3 => M3, 4 => M4, 5 => M5);
594
595#[cfg(feature = "std")]
596pub fn panic_payload_to_string(payload: &(dyn core::any::Any + Send)) -> String {
597    if let Some(msg) = payload.downcast_ref::<&str>() {
598        (*msg).to_string()
599    } else if let Some(msg) = payload.downcast_ref::<String>() {
600        msg.clone()
601    } else {
602        "panic with non-string payload".to_string()
603    }
604}
605
606/// A simple allocator that counts the number of bytes allocated and deallocated.
607pub struct CountingAlloc<A: GlobalAlloc> {
608    inner: A,
609    allocated: AtomicUsize,
610    deallocated: AtomicUsize,
611}
612
613impl<A: GlobalAlloc> CountingAlloc<A> {
614    pub const fn new(inner: A) -> Self {
615        CountingAlloc {
616            inner,
617            allocated: AtomicUsize::new(0),
618            deallocated: AtomicUsize::new(0),
619        }
620    }
621
622    pub fn allocated(&self) -> usize {
623        self.allocated.load(Ordering::SeqCst)
624    }
625
626    pub fn deallocated(&self) -> usize {
627        self.deallocated.load(Ordering::SeqCst)
628    }
629
630    pub fn reset(&self) {
631        self.allocated.store(0, Ordering::SeqCst);
632        self.deallocated.store(0, Ordering::SeqCst);
633    }
634}
635
636// SAFETY: Delegates allocation/deallocation to the inner allocator while tracking sizes.
637unsafe impl<A: GlobalAlloc> GlobalAlloc for CountingAlloc<A> {
638    // SAFETY: Callers uphold the GlobalAlloc contract; we delegate to the inner allocator.
639    unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
640        // SAFETY: Forwarding to the inner allocator preserves GlobalAlloc invariants.
641        let p = unsafe { self.inner.alloc(layout) };
642        if !p.is_null() {
643            self.allocated.fetch_add(layout.size(), Ordering::SeqCst);
644        }
645        p
646    }
647
648    // SAFETY: Callers uphold the GlobalAlloc contract; we delegate to the inner allocator.
649    unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
650        // SAFETY: Forwarding to the inner allocator preserves GlobalAlloc invariants.
651        unsafe { self.inner.dealloc(ptr, layout) }
652        self.deallocated.fetch_add(layout.size(), Ordering::SeqCst);
653    }
654}
655
656/// A simple struct that counts the number of bytes allocated and deallocated in a scope.
657#[cfg(feature = "memory_monitoring")]
658pub struct ScopedAllocCounter {
659    bf_allocated: usize,
660    bf_deallocated: usize,
661}
662
663#[cfg(feature = "memory_monitoring")]
664impl Default for ScopedAllocCounter {
665    fn default() -> Self {
666        Self::new()
667    }
668}
669
670#[cfg(feature = "memory_monitoring")]
671impl ScopedAllocCounter {
672    pub fn new() -> Self {
673        ScopedAllocCounter {
674            bf_allocated: GLOBAL.allocated(),
675            bf_deallocated: GLOBAL.deallocated(),
676        }
677    }
678
679    /// Returns the total number of bytes allocated in the current scope
680    /// since the creation of this `ScopedAllocCounter`.
681    ///
682    /// # Example
683    /// ```
684    /// use cu29_runtime::monitoring::ScopedAllocCounter;
685    ///
686    /// let counter = ScopedAllocCounter::new();
687    /// let _vec = vec![0u8; 1024];
688    /// println!("Bytes allocated: {}", counter.get_allocated());
689    /// ```
690    pub fn allocated(&self) -> usize {
691        GLOBAL.allocated() - self.bf_allocated
692    }
693
694    /// Returns the total number of bytes deallocated in the current scope
695    /// since the creation of this `ScopedAllocCounter`.
696    ///
697    /// # Example
698    /// ```
699    /// use cu29_runtime::monitoring::ScopedAllocCounter;
700    ///
701    /// let counter = ScopedAllocCounter::new();
702    /// let _vec = vec![0u8; 1024];
703    /// drop(_vec);
704    /// println!("Bytes deallocated: {}", counter.get_deallocated());
705    /// ```
706    pub fn deallocated(&self) -> usize {
707        GLOBAL.deallocated() - self.bf_deallocated
708    }
709}
710
711/// Build a difference between the number of bytes allocated and deallocated in the scope at drop time.
712#[cfg(feature = "memory_monitoring")]
713impl Drop for ScopedAllocCounter {
714    fn drop(&mut self) {
715        let _allocated = GLOBAL.allocated() - self.bf_allocated;
716        let _deallocated = GLOBAL.deallocated() - self.bf_deallocated;
717        // TODO(gbin): Fix this when the logger is ready.
718        // debug!(
719        //     "Allocations: +{}B -{}B",
720        //     allocated = allocated,
721        //     deallocated = deallocated,
722        // );
723    }
724}
725
726#[cfg(feature = "std")]
727const BUCKET_COUNT: usize = 1024;
728#[cfg(not(feature = "std"))]
729const BUCKET_COUNT: usize = 256;
730
731/// Accumulative stat object that can give your some real time statistics.
732/// Uses a fixed-size bucketed histogram for accurate percentile calculations.
733#[derive(Debug, Clone)]
734pub struct LiveStatistics {
735    buckets: [u64; BUCKET_COUNT],
736    min_val: u64,
737    max_val: u64,
738    sum: u64,
739    sum_sq: u64,
740    count: u64,
741    max_value: u64,
742}
743
744impl LiveStatistics {
745    /// Creates a new `LiveStatistics` instance with a specified maximum value.
746    ///
747    /// This function initializes a `LiveStatistics` structure with default values
748    /// for tracking statistical data, while setting an upper limit for the data
749    /// points that the structure tracks.
750    ///
751    /// # Parameters
752    /// - `max_value` (`u64`): The maximum value that can be recorded or tracked.
753    ///
754    /// # Returns
755    /// A new instance of `LiveStatistics` with:
756    /// - `buckets`: An array pre-filled with zeros to categorize data points.
757    /// - `min_val`: Initialized to the maximum possible `u64` value to track the minimum correctly.
758    /// - `max_val`: Initialized to zero.
759    /// - `sum`: The sum of all data points, initialized to zero.
760    /// - `sum_sq`: The sum of squares of all data points, initialized to zero.
761    /// - `count`: The total number of data points, initialized to zero.
762    /// - `max_value`: The maximum allowable value for data points, set to the provided `max_value`.
763    ///
764    pub fn new_with_max(max_value: u64) -> Self {
765        LiveStatistics {
766            buckets: [0; BUCKET_COUNT],
767            min_val: u64::MAX,
768            max_val: 0,
769            sum: 0,
770            sum_sq: 0,
771            count: 0,
772            max_value,
773        }
774    }
775
776    #[inline]
777    fn value_to_bucket(&self, value: u64) -> usize {
778        if value >= self.max_value {
779            BUCKET_COUNT - 1
780        } else {
781            ((value as u128 * BUCKET_COUNT as u128) / self.max_value as u128) as usize
782        }
783    }
784
785    #[inline]
786    pub fn min(&self) -> u64 {
787        if self.count == 0 { 0 } else { self.min_val }
788    }
789
790    #[inline]
791    pub fn max(&self) -> u64 {
792        self.max_val
793    }
794
795    #[inline]
796    pub fn mean(&self) -> f64 {
797        if self.count == 0 {
798            0.0
799        } else {
800            self.sum as f64 / self.count as f64
801        }
802    }
803
804    #[inline]
805    pub fn stdev(&self) -> f64 {
806        if self.count == 0 {
807            return 0.0;
808        }
809        let mean = self.mean();
810        let variance = (self.sum_sq as f64 / self.count as f64) - (mean * mean);
811        if variance < 0.0 {
812            return 0.0;
813        }
814        #[cfg(feature = "std")]
815        return variance.sqrt();
816        #[cfg(not(feature = "std"))]
817        return sqrt(variance);
818    }
819
820    #[inline]
821    pub fn percentile(&self, percentile: f64) -> u64 {
822        if self.count == 0 {
823            return 0;
824        }
825
826        let target_count = (self.count as f64 * percentile) as u64;
827        let mut accumulated = 0u64;
828
829        for (bucket_idx, &bucket_count) in self.buckets.iter().enumerate() {
830            accumulated += bucket_count;
831            if accumulated >= target_count {
832                // Linear interpolation within the bucket
833                let bucket_start = (bucket_idx as u64 * self.max_value) / BUCKET_COUNT as u64;
834                let bucket_end = ((bucket_idx + 1) as u64 * self.max_value) / BUCKET_COUNT as u64;
835                let bucket_fraction = if bucket_count > 0 {
836                    (target_count - (accumulated - bucket_count)) as f64 / bucket_count as f64
837                } else {
838                    0.5
839                };
840                return bucket_start
841                    + ((bucket_end - bucket_start) as f64 * bucket_fraction) as u64;
842            }
843        }
844
845        self.max_val
846    }
847
848    /// Adds a value to the statistics.
849    #[inline]
850    pub fn record(&mut self, value: u64) {
851        if value < self.min_val {
852            self.min_val = value;
853        }
854        if value > self.max_val {
855            self.max_val = value;
856        }
857        self.sum += value;
858        self.sum_sq += value * value;
859        self.count += 1;
860
861        let bucket = self.value_to_bucket(value);
862        self.buckets[bucket] += 1;
863    }
864
865    #[inline]
866    pub fn len(&self) -> u64 {
867        self.count
868    }
869
870    #[inline]
871    pub fn is_empty(&self) -> bool {
872        self.count == 0
873    }
874
875    #[inline]
876    pub fn reset(&mut self) {
877        self.buckets.fill(0);
878        self.min_val = u64::MAX;
879        self.max_val = 0;
880        self.sum = 0;
881        self.sum_sq = 0;
882        self.count = 0;
883    }
884}
885
886/// A Specialized statistics object for CuDuration.
887/// It will also keep track of the jitter between the values.
888#[derive(Debug, Clone)]
889pub struct CuDurationStatistics {
890    bare: LiveStatistics,
891    jitter: LiveStatistics,
892    last_value: CuDuration,
893}
894
895impl CuDurationStatistics {
896    pub fn new(max: CuDuration) -> Self {
897        let CuDuration(max) = max;
898        CuDurationStatistics {
899            bare: LiveStatistics::new_with_max(max),
900            jitter: LiveStatistics::new_with_max(max),
901            last_value: CuDuration::default(),
902        }
903    }
904
905    #[inline]
906    pub fn min(&self) -> CuDuration {
907        CuDuration(self.bare.min())
908    }
909
910    #[inline]
911    pub fn max(&self) -> CuDuration {
912        CuDuration(self.bare.max())
913    }
914
915    #[inline]
916    pub fn mean(&self) -> CuDuration {
917        CuDuration(self.bare.mean() as u64) // CuDuration is in ns, it is ok.
918    }
919
920    #[inline]
921    pub fn percentile(&self, percentile: f64) -> CuDuration {
922        CuDuration(self.bare.percentile(percentile))
923    }
924
925    #[inline]
926    pub fn stddev(&self) -> CuDuration {
927        CuDuration(self.bare.stdev() as u64)
928    }
929
930    #[inline]
931    pub fn len(&self) -> u64 {
932        self.bare.len()
933    }
934
935    #[inline]
936    pub fn is_empty(&self) -> bool {
937        self.bare.len() == 0
938    }
939
940    #[inline]
941    pub fn jitter_min(&self) -> CuDuration {
942        CuDuration(self.jitter.min())
943    }
944
945    #[inline]
946    pub fn jitter_max(&self) -> CuDuration {
947        CuDuration(self.jitter.max())
948    }
949
950    #[inline]
951    pub fn jitter_mean(&self) -> CuDuration {
952        CuDuration(self.jitter.mean() as u64)
953    }
954
955    #[inline]
956    pub fn jitter_stddev(&self) -> CuDuration {
957        CuDuration(self.jitter.stdev() as u64)
958    }
959
960    #[inline]
961    pub fn jitter_percentile(&self, percentile: f64) -> CuDuration {
962        CuDuration(self.jitter.percentile(percentile))
963    }
964
965    #[inline]
966    pub fn record(&mut self, value: CuDuration) {
967        let CuDuration(nanos) = value;
968        if self.bare.is_empty() {
969            self.bare.record(nanos);
970            self.last_value = value;
971            return;
972        }
973        self.bare.record(nanos);
974        let CuDuration(last_nanos) = self.last_value;
975        self.jitter.record(nanos.abs_diff(last_nanos));
976        self.last_value = value;
977    }
978
979    #[inline]
980    pub fn reset(&mut self) {
981        self.bare.reset();
982        self.jitter.reset();
983    }
984}
985
986#[cfg(test)]
987mod tests {
988    use super::*;
989    use core::sync::atomic::{AtomicUsize, Ordering};
990    #[cfg(feature = "std")]
991    use std::sync::Arc;
992
993    #[derive(Clone, Copy)]
994    enum TestDecision {
995        Ignore,
996        Abort,
997        Shutdown,
998    }
999
1000    struct TestMonitor {
1001        decision: TestDecision,
1002        copperlist_calls: AtomicUsize,
1003        panic_calls: AtomicUsize,
1004        #[cfg(feature = "std")]
1005        probe_calls: AtomicUsize,
1006    }
1007
1008    impl TestMonitor {
1009        fn new_with(decision: TestDecision) -> Self {
1010            Self {
1011                decision,
1012                copperlist_calls: AtomicUsize::new(0),
1013                panic_calls: AtomicUsize::new(0),
1014                #[cfg(feature = "std")]
1015                probe_calls: AtomicUsize::new(0),
1016            }
1017        }
1018    }
1019
1020    impl CuMonitor for TestMonitor {
1021        fn new(_config: &CuConfig, _taskids: &'static [&'static str]) -> CuResult<Self> {
1022            Ok(Self::new_with(TestDecision::Ignore))
1023        }
1024
1025        #[cfg(feature = "std")]
1026        fn set_execution_probe(&mut self, _probe: ExecutionProbeHandle) {
1027            self.probe_calls.fetch_add(1, Ordering::SeqCst);
1028        }
1029
1030        fn process_copperlist(&self, _ctx: &CuContext, _msgs: &[&CuMsgMetadata]) -> CuResult<()> {
1031            self.copperlist_calls.fetch_add(1, Ordering::SeqCst);
1032            Ok(())
1033        }
1034
1035        fn process_error(&self, _taskid: usize, _step: CuTaskState, _error: &CuError) -> Decision {
1036            match self.decision {
1037                TestDecision::Ignore => Decision::Ignore,
1038                TestDecision::Abort => Decision::Abort,
1039                TestDecision::Shutdown => Decision::Shutdown,
1040            }
1041        }
1042
1043        fn process_panic(&self, _panic_message: &str) {
1044            self.panic_calls.fetch_add(1, Ordering::SeqCst);
1045        }
1046    }
1047
1048    #[test]
1049    fn test_live_statistics_percentiles() {
1050        let mut stats = LiveStatistics::new_with_max(1000);
1051
1052        // Record 100 values from 0 to 99
1053        for i in 0..100 {
1054            stats.record(i);
1055        }
1056
1057        assert_eq!(stats.len(), 100);
1058        assert_eq!(stats.min(), 0);
1059        assert_eq!(stats.max(), 99);
1060        assert_eq!(stats.mean() as u64, 49); // Average of 0..99
1061
1062        // Test percentiles - should be approximately correct
1063        let p50 = stats.percentile(0.5);
1064        let p90 = stats.percentile(0.90);
1065        let p95 = stats.percentile(0.95);
1066        let p99 = stats.percentile(0.99);
1067
1068        // With 100 samples from 0-99, percentiles should be close to their index
1069        assert!((p50 as i64 - 49).abs() < 5, "p50={} expected ~49", p50);
1070        assert!((p90 as i64 - 89).abs() < 5, "p90={} expected ~89", p90);
1071        assert!((p95 as i64 - 94).abs() < 5, "p95={} expected ~94", p95);
1072        assert!((p99 as i64 - 98).abs() < 5, "p99={} expected ~98", p99);
1073    }
1074
1075    #[test]
1076    fn test_duration_stats() {
1077        let mut stats = CuDurationStatistics::new(CuDuration(1000));
1078        stats.record(CuDuration(100));
1079        stats.record(CuDuration(200));
1080        stats.record(CuDuration(500));
1081        stats.record(CuDuration(400));
1082        assert_eq!(stats.min(), CuDuration(100));
1083        assert_eq!(stats.max(), CuDuration(500));
1084        assert_eq!(stats.mean(), CuDuration(300));
1085        assert_eq!(stats.len(), 4);
1086        assert_eq!(stats.jitter.len(), 3);
1087        assert_eq!(stats.jitter_min(), CuDuration(100));
1088        assert_eq!(stats.jitter_max(), CuDuration(300));
1089        assert_eq!(stats.jitter_mean(), CuDuration((100 + 300 + 100) / 3));
1090        stats.reset();
1091        assert_eq!(stats.len(), 0);
1092    }
1093
1094    #[test]
1095    fn tuple_monitor_merges_contradictory_decisions_with_strictest_wins() {
1096        let err = CuError::from("boom");
1097
1098        let two = (
1099            TestMonitor::new_with(TestDecision::Ignore),
1100            TestMonitor::new_with(TestDecision::Shutdown),
1101        );
1102        assert!(matches!(
1103            two.process_error(0, CuTaskState::Process, &err),
1104            Decision::Shutdown
1105        ));
1106
1107        let two = (
1108            TestMonitor::new_with(TestDecision::Ignore),
1109            TestMonitor::new_with(TestDecision::Abort),
1110        );
1111        assert!(matches!(
1112            two.process_error(0, CuTaskState::Process, &err),
1113            Decision::Abort
1114        ));
1115    }
1116
1117    #[test]
1118    fn tuple_monitor_fans_out_callbacks() {
1119        let left = TestMonitor::new_with(TestDecision::Ignore);
1120        let right = TestMonitor::new_with(TestDecision::Ignore);
1121        let mut monitors = (left, right);
1122        let (ctx, _clock_control) = CuContext::new_mock_clock();
1123
1124        #[cfg(feature = "std")]
1125        monitors.set_execution_probe(Arc::new(RuntimeExecutionProbe::default()));
1126        monitors
1127            .process_copperlist(&ctx, &[])
1128            .expect("process_copperlist should fan out");
1129        monitors.process_panic("panic marker");
1130
1131        assert_eq!(monitors.0.copperlist_calls.load(Ordering::SeqCst), 1);
1132        assert_eq!(monitors.1.copperlist_calls.load(Ordering::SeqCst), 1);
1133        assert_eq!(monitors.0.panic_calls.load(Ordering::SeqCst), 1);
1134        assert_eq!(monitors.1.panic_calls.load(Ordering::SeqCst), 1);
1135        #[cfg(feature = "std")]
1136        {
1137            assert_eq!(monitors.0.probe_calls.load(Ordering::SeqCst), 1);
1138            assert_eq!(monitors.1.probe_calls.load(Ordering::SeqCst), 1);
1139        }
1140    }
1141
1142    #[test]
1143    fn runtime_execution_probe_roundtrip_marker() {
1144        let probe = RuntimeExecutionProbe::default();
1145        assert!(probe.marker().is_none());
1146        assert_eq!(probe.sequence(), 0);
1147
1148        probe.record(ExecutionMarker {
1149            component_id: 7,
1150            step: CuTaskState::Process,
1151            culistid: Some(42),
1152        });
1153
1154        let marker = probe.marker().expect("marker should be available");
1155        assert_eq!(marker.component_id, 7);
1156        assert!(matches!(marker.step, CuTaskState::Process));
1157        assert_eq!(marker.culistid, Some(42));
1158        assert_eq!(probe.sequence(), 1);
1159    }
1160}