cu29_runtime/
monitoring.rs

1//! Some basic internal monitoring tooling Copper uses to monitor itself and the tasks it is running.
2//!
3
4use crate::config::CuConfig;
5use crate::cutask::CuMsgMetadata;
6use cu29_clock::{CuDuration, RobotClock};
7#[allow(unused_imports)]
8use cu29_log::CuLogLevel;
9use cu29_traits::{CuError, CuResult};
10use serde_derive::{Deserialize, Serialize};
11
12#[cfg(not(feature = "std"))]
13mod imp {
14    pub use alloc::alloc::{GlobalAlloc, Layout};
15    pub use core::sync::atomic::{AtomicUsize, Ordering};
16    pub use libm::sqrt;
17}
18
19#[cfg(feature = "std")]
20mod imp {
21    #[cfg(feature = "memory_monitoring")]
22    use super::CountingAlloc;
23    #[cfg(feature = "memory_monitoring")]
24    pub use std::alloc::System;
25    pub use std::alloc::{GlobalAlloc, Layout};
26    pub use std::sync::atomic::{AtomicUsize, Ordering};
27    #[cfg(feature = "memory_monitoring")]
28    #[global_allocator]
29    pub static GLOBAL: CountingAlloc<System> = CountingAlloc::new(System);
30}
31
32use imp::*;
33
34/// The state of a task.
35#[derive(Debug, Serialize, Deserialize)]
36pub enum CuTaskState {
37    Start,
38    Preprocess,
39    Process,
40    Postprocess,
41    Stop,
42}
43
44/// Monitor decision to be taken when a task errored out.
45#[derive(Debug)]
46pub enum Decision {
47    Abort,    // for a step (stop, start) or a copperlist, just stop trying to process it.
48    Ignore, // Ignore this error and try to continue, ie calling the other tasks steps, setting a None return value and continue a copperlist.
49    Shutdown, // This is a fatal error, shutdown the copper as cleanly as possible.
50}
51
52/// Trait to implement a monitoring task.
53pub trait CuMonitor: Sized {
54    fn new(config: &CuConfig, taskids: &'static [&'static str]) -> CuResult<Self>
55    where
56        Self: Sized;
57
58    fn start(&mut self, _clock: &RobotClock) -> CuResult<()> {
59        Ok(())
60    }
61
62    /// Callback that will be trigger at the end of every copperlist (before, on or after the serialization).
63    fn process_copperlist(&self, msgs: &[&CuMsgMetadata]) -> CuResult<()>;
64
65    /// Callbacked when a Task errored out. The runtime requires an immediate decision.
66    fn process_error(&self, taskid: usize, step: CuTaskState, error: &CuError) -> Decision;
67
68    /// Callbacked when copper is stopping.
69    fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> {
70        Ok(())
71    }
72}
73
74/// A do nothing monitor if no monitor is provided.
75/// This is basically defining the default behavior of Copper in case of error.
76pub struct NoMonitor {}
77impl CuMonitor for NoMonitor {
78    fn new(_config: &CuConfig, _taskids: &'static [&'static str]) -> CuResult<Self> {
79        Ok(NoMonitor {})
80    }
81
82    fn process_copperlist(&self, _msgs: &[&CuMsgMetadata]) -> CuResult<()> {
83        // By default, do nothing.
84        Ok(())
85    }
86
87    fn process_error(&self, _taskid: usize, _step: CuTaskState, _error: &CuError) -> Decision {
88        // By default, just try to continue.
89        Decision::Ignore
90    }
91}
92
93/// A simple allocator that counts the number of bytes allocated and deallocated.
94pub struct CountingAlloc<A: GlobalAlloc> {
95    inner: A,
96    allocated: AtomicUsize,
97    deallocated: AtomicUsize,
98}
99
100impl<A: GlobalAlloc> CountingAlloc<A> {
101    pub const fn new(inner: A) -> Self {
102        CountingAlloc {
103            inner,
104            allocated: AtomicUsize::new(0),
105            deallocated: AtomicUsize::new(0),
106        }
107    }
108
109    pub fn allocated(&self) -> usize {
110        self.allocated.load(Ordering::SeqCst)
111    }
112
113    pub fn deallocated(&self) -> usize {
114        self.deallocated.load(Ordering::SeqCst)
115    }
116
117    pub fn reset(&self) {
118        self.allocated.store(0, Ordering::SeqCst);
119        self.deallocated.store(0, Ordering::SeqCst);
120    }
121}
122
123unsafe impl<A: GlobalAlloc> GlobalAlloc for CountingAlloc<A> {
124    unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
125        let p = self.inner.alloc(layout);
126        if !p.is_null() {
127            self.allocated.fetch_add(layout.size(), Ordering::SeqCst);
128        }
129        p
130    }
131
132    unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
133        self.inner.dealloc(ptr, layout);
134        self.deallocated.fetch_add(layout.size(), Ordering::SeqCst);
135    }
136}
137
138/// A simple struct that counts the number of bytes allocated and deallocated in a scope.
139#[cfg(feature = "memory_monitoring")]
140pub struct ScopedAllocCounter {
141    bf_allocated: usize,
142    bf_deallocated: usize,
143}
144
145#[cfg(feature = "memory_monitoring")]
146impl Default for ScopedAllocCounter {
147    fn default() -> Self {
148        Self::new()
149    }
150}
151
152#[cfg(feature = "memory_monitoring")]
153impl ScopedAllocCounter {
154    pub fn new() -> Self {
155        ScopedAllocCounter {
156            bf_allocated: GLOBAL.allocated(),
157            bf_deallocated: GLOBAL.deallocated(),
158        }
159    }
160
161    /// Returns the total number of bytes allocated in the current scope
162    /// since the creation of this `ScopedAllocCounter`.
163    ///
164    /// # Example
165    /// ```
166    /// use cu29_runtime::monitoring::ScopedAllocCounter;
167    ///
168    /// let counter = ScopedAllocCounter::new();
169    /// let _vec = vec![0u8; 1024];
170    /// println!("Bytes allocated: {}", counter.get_allocated());
171    /// ```
172    pub fn allocated(&self) -> usize {
173        GLOBAL.allocated() - self.bf_allocated
174    }
175
176    /// Returns the total number of bytes deallocated in the current scope
177    /// since the creation of this `ScopedAllocCounter`.
178    ///
179    /// # Example
180    /// ```
181    /// use cu29_runtime::monitoring::ScopedAllocCounter;
182    ///
183    /// let counter = ScopedAllocCounter::new();
184    /// let _vec = vec![0u8; 1024];
185    /// drop(_vec);
186    /// println!("Bytes deallocated: {}", counter.get_deallocated());
187    /// ```
188    pub fn deallocated(&self) -> usize {
189        GLOBAL.deallocated() - self.bf_deallocated
190    }
191}
192
193/// Build a difference between the number of bytes allocated and deallocated in the scope at drop time.
194#[cfg(feature = "memory_monitoring")]
195impl Drop for ScopedAllocCounter {
196    fn drop(&mut self) {
197        let _allocated = GLOBAL.allocated() - self.bf_allocated;
198        let _deallocated = GLOBAL.deallocated() - self.bf_deallocated;
199        // TODO(gbin): Fix this when the logger is ready.
200        // debug!(
201        //     "Allocations: +{}B -{}B",
202        //     allocated = allocated,
203        //     deallocated = deallocated,
204        // );
205    }
206}
207
208const BUCKET_COUNT: usize = 1024;
209
210/// Accumulative stat object that can give your some real time statistics.
211/// Uses a fixed-size bucketed histogram for accurate percentile calculations.
212#[derive(Debug, Clone)]
213pub struct LiveStatistics {
214    buckets: [u64; BUCKET_COUNT],
215    min_val: u64,
216    max_val: u64,
217    sum: u64,
218    sum_sq: u64,
219    count: u64,
220    max_value: u64,
221}
222
223impl LiveStatistics {
224    /// Creates a new `LiveStatistics` instance with a specified maximum value.
225    ///
226    /// This function initializes a `LiveStatistics` structure with default values
227    /// for tracking statistical data, while setting an upper limit for the data
228    /// points that the structure tracks.
229    ///
230    /// # Parameters
231    /// - `max_value` (`u64`): The maximum value that can be recorded or tracked.
232    ///
233    /// # Returns
234    /// A new instance of `LiveStatistics` with:
235    /// - `buckets`: An array pre-filled with zeros to categorize data points.
236    /// - `min_val`: Initialized to the maximum possible `u64` value to track the minimum correctly.
237    /// - `max_val`: Initialized to zero.
238    /// - `sum`: The sum of all data points, initialized to zero.
239    /// - `sum_sq`: The sum of squares of all data points, initialized to zero.
240    /// - `count`: The total number of data points, initialized to zero.
241    /// - `max_value`: The maximum allowable value for data points, set to the provided `max_value`.
242    ///
243    pub fn new_with_max(max_value: u64) -> Self {
244        LiveStatistics {
245            buckets: [0; BUCKET_COUNT],
246            min_val: u64::MAX,
247            max_val: 0,
248            sum: 0,
249            sum_sq: 0,
250            count: 0,
251            max_value,
252        }
253    }
254
255    #[inline]
256    fn value_to_bucket(&self, value: u64) -> usize {
257        if value >= self.max_value {
258            BUCKET_COUNT - 1
259        } else {
260            ((value as u128 * BUCKET_COUNT as u128) / self.max_value as u128) as usize
261        }
262    }
263
264    #[inline]
265    pub fn min(&self) -> u64 {
266        if self.count == 0 {
267            0
268        } else {
269            self.min_val
270        }
271    }
272
273    #[inline]
274    pub fn max(&self) -> u64 {
275        self.max_val
276    }
277
278    #[inline]
279    pub fn mean(&self) -> f64 {
280        if self.count == 0 {
281            0.0
282        } else {
283            self.sum as f64 / self.count as f64
284        }
285    }
286
287    #[inline]
288    pub fn stdev(&self) -> f64 {
289        if self.count == 0 {
290            return 0.0;
291        }
292        let mean = self.mean();
293        let variance = (self.sum_sq as f64 / self.count as f64) - (mean * mean);
294        if variance < 0.0 {
295            return 0.0;
296        }
297        #[cfg(feature = "std")]
298        return variance.sqrt();
299        #[cfg(not(feature = "std"))]
300        return sqrt(variance);
301    }
302
303    #[inline]
304    pub fn percentile(&self, percentile: f64) -> u64 {
305        if self.count == 0 {
306            return 0;
307        }
308
309        let target_count = (self.count as f64 * percentile) as u64;
310        let mut accumulated = 0u64;
311
312        for (bucket_idx, &bucket_count) in self.buckets.iter().enumerate() {
313            accumulated += bucket_count;
314            if accumulated >= target_count {
315                // Linear interpolation within the bucket
316                let bucket_start = (bucket_idx as u64 * self.max_value) / BUCKET_COUNT as u64;
317                let bucket_end = ((bucket_idx + 1) as u64 * self.max_value) / BUCKET_COUNT as u64;
318                let bucket_fraction = if bucket_count > 0 {
319                    (target_count - (accumulated - bucket_count)) as f64 / bucket_count as f64
320                } else {
321                    0.5
322                };
323                return bucket_start
324                    + ((bucket_end - bucket_start) as f64 * bucket_fraction) as u64;
325            }
326        }
327
328        self.max_val
329    }
330
331    /// Adds a value to the statistics.
332    #[inline]
333    pub fn record(&mut self, value: u64) {
334        if value < self.min_val {
335            self.min_val = value;
336        }
337        if value > self.max_val {
338            self.max_val = value;
339        }
340        self.sum += value;
341        self.sum_sq += value * value;
342        self.count += 1;
343
344        let bucket = self.value_to_bucket(value);
345        self.buckets[bucket] += 1;
346    }
347
348    #[inline]
349    pub fn len(&self) -> u64 {
350        self.count
351    }
352
353    #[inline]
354    pub fn is_empty(&self) -> bool {
355        self.count == 0
356    }
357
358    #[inline]
359    pub fn reset(&mut self) {
360        self.buckets.fill(0);
361        self.min_val = u64::MAX;
362        self.max_val = 0;
363        self.sum = 0;
364        self.sum_sq = 0;
365        self.count = 0;
366    }
367}
368
369/// A Specialized statistics object for CuDuration.
370/// It will also keep track of the jitter between the values.
371#[derive(Debug, Clone)]
372pub struct CuDurationStatistics {
373    bare: LiveStatistics,
374    jitter: LiveStatistics,
375    last_value: CuDuration,
376}
377
378impl CuDurationStatistics {
379    pub fn new(max: CuDuration) -> Self {
380        let CuDuration(max) = max;
381        CuDurationStatistics {
382            bare: LiveStatistics::new_with_max(max),
383            jitter: LiveStatistics::new_with_max(max),
384            last_value: CuDuration::default(),
385        }
386    }
387
388    #[inline]
389    pub fn min(&self) -> CuDuration {
390        CuDuration(self.bare.min())
391    }
392
393    #[inline]
394    pub fn max(&self) -> CuDuration {
395        CuDuration(self.bare.max())
396    }
397
398    #[inline]
399    pub fn mean(&self) -> CuDuration {
400        CuDuration(self.bare.mean() as u64) // CuDuration is in ns, it is ok.
401    }
402
403    #[inline]
404    pub fn percentile(&self, percentile: f64) -> CuDuration {
405        CuDuration(self.bare.percentile(percentile))
406    }
407
408    #[inline]
409    pub fn stddev(&self) -> CuDuration {
410        CuDuration(self.bare.stdev() as u64)
411    }
412
413    #[inline]
414    pub fn len(&self) -> u64 {
415        self.bare.len()
416    }
417
418    #[inline]
419    pub fn is_empty(&self) -> bool {
420        self.bare.len() == 0
421    }
422
423    #[inline]
424    pub fn jitter_min(&self) -> CuDuration {
425        CuDuration(self.jitter.min())
426    }
427
428    #[inline]
429    pub fn jitter_max(&self) -> CuDuration {
430        CuDuration(self.jitter.max())
431    }
432
433    #[inline]
434    pub fn jitter_mean(&self) -> CuDuration {
435        CuDuration(self.jitter.mean() as u64)
436    }
437
438    #[inline]
439    pub fn jitter_stddev(&self) -> CuDuration {
440        CuDuration(self.jitter.stdev() as u64)
441    }
442
443    #[inline]
444    pub fn jitter_percentile(&self, percentile: f64) -> CuDuration {
445        CuDuration(self.jitter.percentile(percentile))
446    }
447
448    #[inline]
449    pub fn record(&mut self, value: CuDuration) {
450        let CuDuration(nanos) = value;
451        if self.bare.is_empty() {
452            self.bare.record(nanos);
453            self.last_value = value;
454            return;
455        }
456        self.bare.record(nanos);
457        let CuDuration(last_nanos) = self.last_value;
458        self.jitter.record(nanos.abs_diff(last_nanos));
459        self.last_value = value;
460    }
461
462    #[inline]
463    pub fn reset(&mut self) {
464        self.bare.reset();
465        self.jitter.reset();
466    }
467}
468
469#[cfg(test)]
470mod tests {
471    use super::*;
472
473    #[test]
474    fn test_live_statistics_percentiles() {
475        let mut stats = LiveStatistics::new_with_max(1000);
476
477        // Record 100 values from 0 to 99
478        for i in 0..100 {
479            stats.record(i);
480        }
481
482        assert_eq!(stats.len(), 100);
483        assert_eq!(stats.min(), 0);
484        assert_eq!(stats.max(), 99);
485        assert_eq!(stats.mean() as u64, 49); // Average of 0..99
486
487        // Test percentiles - should be approximately correct
488        let p50 = stats.percentile(0.5);
489        let p90 = stats.percentile(0.90);
490        let p95 = stats.percentile(0.95);
491        let p99 = stats.percentile(0.99);
492
493        // With 100 samples from 0-99, percentiles should be close to their index
494        assert!((p50 as i64 - 49).abs() < 5, "p50={} expected ~49", p50);
495        assert!((p90 as i64 - 89).abs() < 5, "p90={} expected ~89", p90);
496        assert!((p95 as i64 - 94).abs() < 5, "p95={} expected ~94", p95);
497        assert!((p99 as i64 - 98).abs() < 5, "p99={} expected ~98", p99);
498    }
499
500    #[test]
501    fn test_duration_stats() {
502        let mut stats = CuDurationStatistics::new(CuDuration(1000));
503        stats.record(CuDuration(100));
504        stats.record(CuDuration(200));
505        stats.record(CuDuration(500));
506        stats.record(CuDuration(400));
507        assert_eq!(stats.min(), CuDuration(100));
508        assert_eq!(stats.max(), CuDuration(500));
509        assert_eq!(stats.mean(), CuDuration(300));
510        assert_eq!(stats.len(), 4);
511        assert_eq!(stats.jitter.len(), 3);
512        assert_eq!(stats.jitter_min(), CuDuration(100));
513        assert_eq!(stats.jitter_max(), CuDuration(300));
514        assert_eq!(stats.jitter_mean(), CuDuration((100 + 300 + 100) / 3));
515        stats.reset();
516        assert_eq!(stats.len(), 0);
517    }
518}