cu29_runtime/
monitoring.rs

1//! Some basic internal monitoring tooling Copper uses to monitor itself and the tasks it is running.
2//!
3
4use crate::config::CuConfig;
5use crate::config::{BridgeChannelConfigRepresentation, BridgeConfig, Flavor};
6use crate::cutask::CuMsgMetadata;
7use cu29_clock::{CuDuration, RobotClock};
8#[allow(unused_imports)]
9use cu29_log::CuLogLevel;
10use cu29_traits::{CuError, CuResult};
11use petgraph::visit::IntoEdgeReferences;
12use serde_derive::{Deserialize, Serialize};
13
14#[cfg(not(feature = "std"))]
15extern crate alloc;
16
17#[cfg(feature = "std")]
18use std::{collections::HashMap as Map, string::String, string::ToString, vec::Vec};
19
20#[cfg(not(feature = "std"))]
21use alloc::{collections::BTreeMap as Map, string::String, string::ToString, vec::Vec};
22
23#[cfg(not(feature = "std"))]
24mod imp {
25    pub use alloc::alloc::{GlobalAlloc, Layout};
26    pub use core::sync::atomic::{AtomicUsize, Ordering};
27    pub use libm::sqrt;
28}
29
30#[cfg(feature = "std")]
31mod imp {
32    #[cfg(feature = "memory_monitoring")]
33    use super::CountingAlloc;
34    #[cfg(feature = "memory_monitoring")]
35    pub use std::alloc::System;
36    pub use std::alloc::{GlobalAlloc, Layout};
37    pub use std::sync::atomic::{AtomicUsize, Ordering};
38    #[cfg(feature = "memory_monitoring")]
39    #[global_allocator]
40    pub static GLOBAL: CountingAlloc<System> = CountingAlloc::new(System);
41}
42
43use imp::*;
44
45/// The state of a task.
46#[derive(Debug, Serialize, Deserialize)]
47pub enum CuTaskState {
48    Start,
49    Preprocess,
50    Process,
51    Postprocess,
52    Stop,
53}
54
55/// Monitor decision to be taken when a task errored out.
56#[derive(Debug)]
57pub enum Decision {
58    Abort,    // for a step (stop, start) or a copperlist, just stop trying to process it.
59    Ignore, // Ignore this error and try to continue, ie calling the other tasks steps, setting a None return value and continue a copperlist.
60    Shutdown, // This is a fatal error, shutdown the copper as cleanly as possible.
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum ComponentKind {
65    Task,
66    Bridge,
67}
68
69#[derive(Debug, Clone)]
70pub struct MonitorNode {
71    pub id: String,
72    pub type_name: Option<String>,
73    pub kind: ComponentKind,
74    /// Ordered list of input port identifiers.
75    pub inputs: Vec<String>,
76    /// Ordered list of output port identifiers.
77    pub outputs: Vec<String>,
78}
79
80#[derive(Debug, Clone)]
81pub struct MonitorConnection {
82    pub src: String,
83    pub src_port: Option<String>,
84    pub dst: String,
85    pub dst_port: Option<String>,
86    pub msg: String,
87}
88
89#[derive(Debug, Clone, Default)]
90pub struct MonitorTopology {
91    pub nodes: Vec<MonitorNode>,
92    pub connections: Vec<MonitorConnection>,
93}
94
95#[derive(Default, Debug, Clone, Copy)]
96struct NodeIoUsage {
97    has_incoming: bool,
98    has_outgoing: bool,
99}
100
101/// Derive a monitor-friendly topology from the runtime configuration.
102pub fn build_monitor_topology(
103    config: &CuConfig,
104    mission: Option<&str>,
105) -> CuResult<MonitorTopology> {
106    let graph = config.get_graph(mission)?;
107    let mut nodes: Map<String, MonitorNode> = Map::new();
108    let mut io_usage: Map<String, NodeIoUsage> = Map::new();
109
110    let mut bridge_lookup: Map<&str, &BridgeConfig> = Map::new();
111    for bridge in &config.bridges {
112        bridge_lookup.insert(bridge.id.as_str(), bridge);
113    }
114
115    for edge in graph.0.edge_references() {
116        let cnx = edge.weight();
117        io_usage.entry(cnx.src.clone()).or_default().has_outgoing = true;
118        io_usage.entry(cnx.dst.clone()).or_default().has_incoming = true;
119    }
120
121    for (_, node) in graph.get_all_nodes() {
122        let kind = match node.get_flavor() {
123            Flavor::Bridge => ComponentKind::Bridge,
124            _ => ComponentKind::Task,
125        };
126        let node_id = node.get_id();
127
128        let mut inputs = Vec::new();
129        let mut outputs = Vec::new();
130        if kind == ComponentKind::Bridge {
131            if let Some(bridge) = bridge_lookup.get(node_id.as_str()) {
132                for ch in &bridge.channels {
133                    match ch {
134                        BridgeChannelConfigRepresentation::Rx { id, .. } => {
135                            outputs.push(id.clone())
136                        }
137                        BridgeChannelConfigRepresentation::Tx { id, .. } => inputs.push(id.clone()),
138                    }
139                }
140            }
141        } else {
142            let usage = io_usage.get(node_id.as_str()).cloned().unwrap_or_default();
143            if usage.has_incoming || !usage.has_outgoing {
144                inputs.push("in".to_string());
145            }
146            if usage.has_outgoing || !usage.has_incoming {
147                outputs.push("out".to_string());
148            }
149        }
150
151        nodes.insert(
152            node_id.clone(),
153            MonitorNode {
154                id: node_id,
155                type_name: Some(node.get_type().to_string()),
156                kind,
157                inputs,
158                outputs,
159            },
160        );
161    }
162
163    let mut connections = Vec::new();
164    for edge in graph.0.edge_references() {
165        let cnx = edge.weight();
166        let src = cnx.src.clone();
167        let dst = cnx.dst.clone();
168
169        let src_port = cnx.src_channel.clone().or_else(|| {
170            nodes
171                .get(&src)
172                .and_then(|node| node.outputs.first().cloned())
173        });
174        let dst_port = cnx.dst_channel.clone().or_else(|| {
175            nodes
176                .get(&dst)
177                .and_then(|node| node.inputs.first().cloned())
178        });
179
180        connections.push(MonitorConnection {
181            src,
182            src_port,
183            dst,
184            dst_port,
185            msg: cnx.msg.clone(),
186        });
187    }
188
189    Ok(MonitorTopology {
190        nodes: nodes.into_values().collect(),
191        connections,
192    })
193}
194
195/// Trait to implement a monitoring task.
196pub trait CuMonitor: Sized {
197    fn new(config: &CuConfig, taskids: &'static [&'static str]) -> CuResult<Self>
198    where
199        Self: Sized;
200
201    fn set_topology(&mut self, _topology: MonitorTopology) {}
202
203    fn start(&mut self, _clock: &RobotClock) -> CuResult<()> {
204        Ok(())
205    }
206
207    /// Callback that will be trigger at the end of every copperlist (before, on or after the serialization).
208    fn process_copperlist(&self, msgs: &[&CuMsgMetadata]) -> CuResult<()>;
209
210    /// Callbacked when a Task errored out. The runtime requires an immediate decision.
211    fn process_error(&self, taskid: usize, step: CuTaskState, error: &CuError) -> Decision;
212
213    /// Callbacked when copper is stopping.
214    fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> {
215        Ok(())
216    }
217}
218
219/// A do nothing monitor if no monitor is provided.
220/// This is basically defining the default behavior of Copper in case of error.
221pub struct NoMonitor {}
222impl CuMonitor for NoMonitor {
223    fn new(_config: &CuConfig, _taskids: &'static [&'static str]) -> CuResult<Self> {
224        Ok(NoMonitor {})
225    }
226
227    fn process_copperlist(&self, _msgs: &[&CuMsgMetadata]) -> CuResult<()> {
228        // By default, do nothing.
229        Ok(())
230    }
231
232    fn process_error(&self, _taskid: usize, _step: CuTaskState, _error: &CuError) -> Decision {
233        // By default, just try to continue.
234        Decision::Ignore
235    }
236}
237
238/// A simple allocator that counts the number of bytes allocated and deallocated.
239pub struct CountingAlloc<A: GlobalAlloc> {
240    inner: A,
241    allocated: AtomicUsize,
242    deallocated: AtomicUsize,
243}
244
245impl<A: GlobalAlloc> CountingAlloc<A> {
246    pub const fn new(inner: A) -> Self {
247        CountingAlloc {
248            inner,
249            allocated: AtomicUsize::new(0),
250            deallocated: AtomicUsize::new(0),
251        }
252    }
253
254    pub fn allocated(&self) -> usize {
255        self.allocated.load(Ordering::SeqCst)
256    }
257
258    pub fn deallocated(&self) -> usize {
259        self.deallocated.load(Ordering::SeqCst)
260    }
261
262    pub fn reset(&self) {
263        self.allocated.store(0, Ordering::SeqCst);
264        self.deallocated.store(0, Ordering::SeqCst);
265    }
266}
267
268unsafe impl<A: GlobalAlloc> GlobalAlloc for CountingAlloc<A> {
269    unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
270        let p = unsafe { self.inner.alloc(layout) };
271        if !p.is_null() {
272            self.allocated.fetch_add(layout.size(), Ordering::SeqCst);
273        }
274        p
275    }
276
277    unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
278        unsafe {
279            self.inner.dealloc(ptr, layout);
280        }
281        self.deallocated.fetch_add(layout.size(), Ordering::SeqCst);
282    }
283}
284
285/// A simple struct that counts the number of bytes allocated and deallocated in a scope.
286#[cfg(feature = "memory_monitoring")]
287pub struct ScopedAllocCounter {
288    bf_allocated: usize,
289    bf_deallocated: usize,
290}
291
292#[cfg(feature = "memory_monitoring")]
293impl Default for ScopedAllocCounter {
294    fn default() -> Self {
295        Self::new()
296    }
297}
298
299#[cfg(feature = "memory_monitoring")]
300impl ScopedAllocCounter {
301    pub fn new() -> Self {
302        ScopedAllocCounter {
303            bf_allocated: GLOBAL.allocated(),
304            bf_deallocated: GLOBAL.deallocated(),
305        }
306    }
307
308    /// Returns the total number of bytes allocated in the current scope
309    /// since the creation of this `ScopedAllocCounter`.
310    ///
311    /// # Example
312    /// ```
313    /// use cu29_runtime::monitoring::ScopedAllocCounter;
314    ///
315    /// let counter = ScopedAllocCounter::new();
316    /// let _vec = vec![0u8; 1024];
317    /// println!("Bytes allocated: {}", counter.get_allocated());
318    /// ```
319    pub fn allocated(&self) -> usize {
320        GLOBAL.allocated() - self.bf_allocated
321    }
322
323    /// Returns the total number of bytes deallocated in the current scope
324    /// since the creation of this `ScopedAllocCounter`.
325    ///
326    /// # Example
327    /// ```
328    /// use cu29_runtime::monitoring::ScopedAllocCounter;
329    ///
330    /// let counter = ScopedAllocCounter::new();
331    /// let _vec = vec![0u8; 1024];
332    /// drop(_vec);
333    /// println!("Bytes deallocated: {}", counter.get_deallocated());
334    /// ```
335    pub fn deallocated(&self) -> usize {
336        GLOBAL.deallocated() - self.bf_deallocated
337    }
338}
339
340/// Build a difference between the number of bytes allocated and deallocated in the scope at drop time.
341#[cfg(feature = "memory_monitoring")]
342impl Drop for ScopedAllocCounter {
343    fn drop(&mut self) {
344        let _allocated = GLOBAL.allocated() - self.bf_allocated;
345        let _deallocated = GLOBAL.deallocated() - self.bf_deallocated;
346        // TODO(gbin): Fix this when the logger is ready.
347        // debug!(
348        //     "Allocations: +{}B -{}B",
349        //     allocated = allocated,
350        //     deallocated = deallocated,
351        // );
352    }
353}
354
355#[cfg(feature = "std")]
356const BUCKET_COUNT: usize = 1024;
357#[cfg(not(feature = "std"))]
358const BUCKET_COUNT: usize = 256;
359
360/// Accumulative stat object that can give your some real time statistics.
361/// Uses a fixed-size bucketed histogram for accurate percentile calculations.
362#[derive(Debug, Clone)]
363pub struct LiveStatistics {
364    buckets: [u64; BUCKET_COUNT],
365    min_val: u64,
366    max_val: u64,
367    sum: u64,
368    sum_sq: u64,
369    count: u64,
370    max_value: u64,
371}
372
373impl LiveStatistics {
374    /// Creates a new `LiveStatistics` instance with a specified maximum value.
375    ///
376    /// This function initializes a `LiveStatistics` structure with default values
377    /// for tracking statistical data, while setting an upper limit for the data
378    /// points that the structure tracks.
379    ///
380    /// # Parameters
381    /// - `max_value` (`u64`): The maximum value that can be recorded or tracked.
382    ///
383    /// # Returns
384    /// A new instance of `LiveStatistics` with:
385    /// - `buckets`: An array pre-filled with zeros to categorize data points.
386    /// - `min_val`: Initialized to the maximum possible `u64` value to track the minimum correctly.
387    /// - `max_val`: Initialized to zero.
388    /// - `sum`: The sum of all data points, initialized to zero.
389    /// - `sum_sq`: The sum of squares of all data points, initialized to zero.
390    /// - `count`: The total number of data points, initialized to zero.
391    /// - `max_value`: The maximum allowable value for data points, set to the provided `max_value`.
392    ///
393    pub fn new_with_max(max_value: u64) -> Self {
394        LiveStatistics {
395            buckets: [0; BUCKET_COUNT],
396            min_val: u64::MAX,
397            max_val: 0,
398            sum: 0,
399            sum_sq: 0,
400            count: 0,
401            max_value,
402        }
403    }
404
405    #[inline]
406    fn value_to_bucket(&self, value: u64) -> usize {
407        if value >= self.max_value {
408            BUCKET_COUNT - 1
409        } else {
410            ((value as u128 * BUCKET_COUNT as u128) / self.max_value as u128) as usize
411        }
412    }
413
414    #[inline]
415    pub fn min(&self) -> u64 {
416        if self.count == 0 { 0 } else { self.min_val }
417    }
418
419    #[inline]
420    pub fn max(&self) -> u64 {
421        self.max_val
422    }
423
424    #[inline]
425    pub fn mean(&self) -> f64 {
426        if self.count == 0 {
427            0.0
428        } else {
429            self.sum as f64 / self.count as f64
430        }
431    }
432
433    #[inline]
434    pub fn stdev(&self) -> f64 {
435        if self.count == 0 {
436            return 0.0;
437        }
438        let mean = self.mean();
439        let variance = (self.sum_sq as f64 / self.count as f64) - (mean * mean);
440        if variance < 0.0 {
441            return 0.0;
442        }
443        #[cfg(feature = "std")]
444        return variance.sqrt();
445        #[cfg(not(feature = "std"))]
446        return sqrt(variance);
447    }
448
449    #[inline]
450    pub fn percentile(&self, percentile: f64) -> u64 {
451        if self.count == 0 {
452            return 0;
453        }
454
455        let target_count = (self.count as f64 * percentile) as u64;
456        let mut accumulated = 0u64;
457
458        for (bucket_idx, &bucket_count) in self.buckets.iter().enumerate() {
459            accumulated += bucket_count;
460            if accumulated >= target_count {
461                // Linear interpolation within the bucket
462                let bucket_start = (bucket_idx as u64 * self.max_value) / BUCKET_COUNT as u64;
463                let bucket_end = ((bucket_idx + 1) as u64 * self.max_value) / BUCKET_COUNT as u64;
464                let bucket_fraction = if bucket_count > 0 {
465                    (target_count - (accumulated - bucket_count)) as f64 / bucket_count as f64
466                } else {
467                    0.5
468                };
469                return bucket_start
470                    + ((bucket_end - bucket_start) as f64 * bucket_fraction) as u64;
471            }
472        }
473
474        self.max_val
475    }
476
477    /// Adds a value to the statistics.
478    #[inline]
479    pub fn record(&mut self, value: u64) {
480        if value < self.min_val {
481            self.min_val = value;
482        }
483        if value > self.max_val {
484            self.max_val = value;
485        }
486        self.sum += value;
487        self.sum_sq += value * value;
488        self.count += 1;
489
490        let bucket = self.value_to_bucket(value);
491        self.buckets[bucket] += 1;
492    }
493
494    #[inline]
495    pub fn len(&self) -> u64 {
496        self.count
497    }
498
499    #[inline]
500    pub fn is_empty(&self) -> bool {
501        self.count == 0
502    }
503
504    #[inline]
505    pub fn reset(&mut self) {
506        self.buckets.fill(0);
507        self.min_val = u64::MAX;
508        self.max_val = 0;
509        self.sum = 0;
510        self.sum_sq = 0;
511        self.count = 0;
512    }
513}
514
515/// A Specialized statistics object for CuDuration.
516/// It will also keep track of the jitter between the values.
517#[derive(Debug, Clone)]
518pub struct CuDurationStatistics {
519    bare: LiveStatistics,
520    jitter: LiveStatistics,
521    last_value: CuDuration,
522}
523
524impl CuDurationStatistics {
525    pub fn new(max: CuDuration) -> Self {
526        let CuDuration(max) = max;
527        CuDurationStatistics {
528            bare: LiveStatistics::new_with_max(max),
529            jitter: LiveStatistics::new_with_max(max),
530            last_value: CuDuration::default(),
531        }
532    }
533
534    #[inline]
535    pub fn min(&self) -> CuDuration {
536        CuDuration(self.bare.min())
537    }
538
539    #[inline]
540    pub fn max(&self) -> CuDuration {
541        CuDuration(self.bare.max())
542    }
543
544    #[inline]
545    pub fn mean(&self) -> CuDuration {
546        CuDuration(self.bare.mean() as u64) // CuDuration is in ns, it is ok.
547    }
548
549    #[inline]
550    pub fn percentile(&self, percentile: f64) -> CuDuration {
551        CuDuration(self.bare.percentile(percentile))
552    }
553
554    #[inline]
555    pub fn stddev(&self) -> CuDuration {
556        CuDuration(self.bare.stdev() as u64)
557    }
558
559    #[inline]
560    pub fn len(&self) -> u64 {
561        self.bare.len()
562    }
563
564    #[inline]
565    pub fn is_empty(&self) -> bool {
566        self.bare.len() == 0
567    }
568
569    #[inline]
570    pub fn jitter_min(&self) -> CuDuration {
571        CuDuration(self.jitter.min())
572    }
573
574    #[inline]
575    pub fn jitter_max(&self) -> CuDuration {
576        CuDuration(self.jitter.max())
577    }
578
579    #[inline]
580    pub fn jitter_mean(&self) -> CuDuration {
581        CuDuration(self.jitter.mean() as u64)
582    }
583
584    #[inline]
585    pub fn jitter_stddev(&self) -> CuDuration {
586        CuDuration(self.jitter.stdev() as u64)
587    }
588
589    #[inline]
590    pub fn jitter_percentile(&self, percentile: f64) -> CuDuration {
591        CuDuration(self.jitter.percentile(percentile))
592    }
593
594    #[inline]
595    pub fn record(&mut self, value: CuDuration) {
596        let CuDuration(nanos) = value;
597        if self.bare.is_empty() {
598            self.bare.record(nanos);
599            self.last_value = value;
600            return;
601        }
602        self.bare.record(nanos);
603        let CuDuration(last_nanos) = self.last_value;
604        self.jitter.record(nanos.abs_diff(last_nanos));
605        self.last_value = value;
606    }
607
608    #[inline]
609    pub fn reset(&mut self) {
610        self.bare.reset();
611        self.jitter.reset();
612    }
613}
614
615#[cfg(test)]
616mod tests {
617    use super::*;
618
619    #[test]
620    fn test_live_statistics_percentiles() {
621        let mut stats = LiveStatistics::new_with_max(1000);
622
623        // Record 100 values from 0 to 99
624        for i in 0..100 {
625            stats.record(i);
626        }
627
628        assert_eq!(stats.len(), 100);
629        assert_eq!(stats.min(), 0);
630        assert_eq!(stats.max(), 99);
631        assert_eq!(stats.mean() as u64, 49); // Average of 0..99
632
633        // Test percentiles - should be approximately correct
634        let p50 = stats.percentile(0.5);
635        let p90 = stats.percentile(0.90);
636        let p95 = stats.percentile(0.95);
637        let p99 = stats.percentile(0.99);
638
639        // With 100 samples from 0-99, percentiles should be close to their index
640        assert!((p50 as i64 - 49).abs() < 5, "p50={} expected ~49", p50);
641        assert!((p90 as i64 - 89).abs() < 5, "p90={} expected ~89", p90);
642        assert!((p95 as i64 - 94).abs() < 5, "p95={} expected ~94", p95);
643        assert!((p99 as i64 - 98).abs() < 5, "p99={} expected ~98", p99);
644    }
645
646    #[test]
647    fn test_duration_stats() {
648        let mut stats = CuDurationStatistics::new(CuDuration(1000));
649        stats.record(CuDuration(100));
650        stats.record(CuDuration(200));
651        stats.record(CuDuration(500));
652        stats.record(CuDuration(400));
653        assert_eq!(stats.min(), CuDuration(100));
654        assert_eq!(stats.max(), CuDuration(500));
655        assert_eq!(stats.mean(), CuDuration(300));
656        assert_eq!(stats.len(), 4);
657        assert_eq!(stats.jitter.len(), 3);
658        assert_eq!(stats.jitter_min(), CuDuration(100));
659        assert_eq!(stats.jitter_max(), CuDuration(300));
660        assert_eq!(stats.jitter_mean(), CuDuration((100 + 300 + 100) / 3));
661        stats.reset();
662        assert_eq!(stats.len(), 0);
663    }
664}