cu29_runtime/
monitoring.rs

1//! Some basic internal monitoring tooling Copper uses to monitor itself and the tasks it is running.
2//!
3
4use crate::config::CuConfig;
5use crate::cutask::CuMsgMetadata;
6use crate::log::*;
7use cu29_clock::{CuDuration, RobotClock};
8#[allow(unused_imports)]
9use cu29_log::CuLogLevel;
10use cu29_traits::{CuError, CuResult};
11use hdrhistogram::Histogram;
12use serde_derive::{Deserialize, Serialize};
13use std::alloc::{GlobalAlloc, Layout, System};
14use std::sync::atomic::{AtomicUsize, Ordering};
15
16/// The state of a task.
17#[derive(Debug, Serialize, Deserialize)]
18pub enum CuTaskState {
19    Start,
20    Preprocess,
21    Process,
22    Postprocess,
23    Stop,
24}
25
26/// Monitor decision to be taken when a task errored out.
27#[derive(Debug)]
28pub enum Decision {
29    Abort,    // for a step (stop, start) or a copperlist, just stop trying to process it.
30    Ignore, // Ignore this error and try to continue, ie calling the other tasks steps, setting a None return value and continue a copperlist.
31    Shutdown, // This is a fatal error, shutdown the copper as cleanly as possible.
32}
33
34/// Trait to implement a monitoring task.
35pub trait CuMonitor: Sized {
36    fn new(config: &CuConfig, taskids: &'static [&'static str]) -> CuResult<Self>
37    where
38        Self: Sized;
39
40    fn start(&mut self, _clock: &RobotClock) -> CuResult<()> {
41        Ok(())
42    }
43
44    /// Callback that will be trigger at the end of every copperlist (before, on or after the serialization).
45    fn process_copperlist(&self, msgs: &[&CuMsgMetadata]) -> CuResult<()>;
46
47    /// Callbacked when a Task errored out. The runtime requires an immediate decision.
48    fn process_error(&self, taskid: usize, step: CuTaskState, error: &CuError) -> Decision;
49
50    /// Callbacked when copper is stopping.
51    fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> {
52        Ok(())
53    }
54}
55
56/// A do nothing monitor if no monitor is provided.
57/// This is basically defining the default behavior of Copper in case of error.
58pub struct NoMonitor {}
59impl CuMonitor for NoMonitor {
60    fn new(_config: &CuConfig, _taskids: &'static [&'static str]) -> CuResult<Self> {
61        Ok(NoMonitor {})
62    }
63
64    fn process_copperlist(&self, _msgs: &[&CuMsgMetadata]) -> CuResult<()> {
65        // By default, do nothing.
66        Ok(())
67    }
68
69    fn process_error(&self, _taskid: usize, _step: CuTaskState, _error: &CuError) -> Decision {
70        // By default, just try to continue.
71        Decision::Ignore
72    }
73}
74
75#[global_allocator]
76pub static GLOBAL: CountingAllocator = CountingAllocator::new();
77
78/// A simple allocator that counts the number of bytes allocated and deallocated.
79pub struct CountingAllocator {
80    allocated: AtomicUsize,
81    deallocated: AtomicUsize,
82}
83
84impl Default for CountingAllocator {
85    fn default() -> Self {
86        Self::new()
87    }
88}
89
90impl CountingAllocator {
91    pub const fn new() -> Self {
92        CountingAllocator {
93            allocated: AtomicUsize::new(0),
94            deallocated: AtomicUsize::new(0),
95        }
96    }
97
98    pub fn get_allocated(&self) -> usize {
99        self.allocated.load(Ordering::SeqCst)
100    }
101
102    pub fn get_deallocated(&self) -> usize {
103        self.deallocated.load(Ordering::SeqCst)
104    }
105
106    pub fn reset(&self) {
107        self.allocated.store(0, Ordering::SeqCst);
108        self.deallocated.store(0, Ordering::SeqCst);
109    }
110}
111
112unsafe impl GlobalAlloc for CountingAllocator {
113    unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
114        let ptr = System.alloc(layout);
115        if !ptr.is_null() {
116            self.allocated.fetch_add(layout.size(), Ordering::SeqCst);
117        }
118        ptr
119    }
120
121    unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
122        System.dealloc(ptr, layout);
123        self.deallocated.fetch_add(layout.size(), Ordering::SeqCst);
124    }
125}
126
127/// A simple struct that counts the number of bytes allocated and deallocated in a scope.
128pub struct ScopedAllocCounter {
129    bf_allocated: usize,
130    bf_deallocated: usize,
131}
132
133impl Default for ScopedAllocCounter {
134    fn default() -> Self {
135        Self::new()
136    }
137}
138
139impl ScopedAllocCounter {
140    pub fn new() -> Self {
141        ScopedAllocCounter {
142            bf_allocated: GLOBAL.get_allocated(),
143            bf_deallocated: GLOBAL.get_deallocated(),
144        }
145    }
146
147    /// Returns the total number of bytes allocated in the current scope
148    /// since the creation of this `ScopedAllocCounter`.
149    ///
150    /// # Example
151    /// ```
152    /// use cu29_runtime::monitoring::ScopedAllocCounter;
153    ///
154    /// let counter = ScopedAllocCounter::new();
155    /// let _vec = vec![0u8; 1024];
156    /// println!("Bytes allocated: {}", counter.get_allocated());
157    /// ```
158    pub fn get_allocated(&self) -> usize {
159        GLOBAL.get_allocated() - self.bf_allocated
160    }
161
162    /// Returns the total number of bytes deallocated in the current scope
163    /// since the creation of this `ScopedAllocCounter`.
164    ///
165    /// # Example
166    /// ```
167    /// use cu29_runtime::monitoring::ScopedAllocCounter;
168    ///
169    /// let counter = ScopedAllocCounter::new();
170    /// let _vec = vec![0u8; 1024];
171    /// drop(_vec);
172    /// println!("Bytes deallocated: {}", counter.get_deallocated());
173    /// ```
174    pub fn get_deallocated(&self) -> usize {
175        GLOBAL.get_deallocated() - self.bf_deallocated
176    }
177}
178
179/// Build a difference between the number of bytes allocated and deallocated in the scope at drop time.
180impl Drop for ScopedAllocCounter {
181    fn drop(&mut self) {
182        let _allocated = GLOBAL.get_allocated() - self.bf_allocated;
183        let _deallocated = GLOBAL.get_deallocated() - self.bf_deallocated;
184        // TODO(gbin): Fix this when the logger is ready.
185        // debug!(
186        //     "Allocations: +{}B -{}B",
187        //     allocated = allocated,
188        //     deallocated = deallocated,
189        // );
190    }
191}
192
193/// Accumulative stat object that can give your some real time statistics.
194#[derive(Debug, Clone)]
195pub struct LiveStatistics {
196    stats: Histogram<u64>, // u64 is the Counter type.
197}
198
199impl LiveStatistics {
200    pub fn new_unbounded() -> Self {
201        LiveStatistics {
202            stats: Histogram::<u64>::new(3).unwrap(),
203        }
204    }
205
206    pub fn new_with_max(max: u64) -> Self {
207        LiveStatistics {
208            stats: Histogram::<u64>::new_with_bounds(1, max, 3).unwrap(),
209        }
210    }
211
212    #[inline]
213    pub fn min(&self) -> u64 {
214        self.stats.min()
215    }
216
217    #[inline]
218    pub fn max(&self) -> u64 {
219        self.stats.max()
220    }
221
222    #[inline]
223    pub fn mean(&self) -> f64 {
224        self.stats.mean()
225    }
226
227    #[inline]
228    pub fn percentile(&self, percentile: f64) -> u64 {
229        self.stats.value_at_quantile(percentile)
230    }
231
232    /// Adds a value to the statistics.
233    #[inline]
234    pub fn record(&mut self, value: u64) {
235        let maybe_error = self.stats.record(value);
236        if let Err(e) = maybe_error {
237            debug!("stats.record errored out: {}", e.to_string());
238        }
239    }
240
241    #[inline]
242    pub fn len(&self) -> u64 {
243        self.stats.len()
244    }
245
246    #[inline]
247    pub fn is_empty(&self) -> bool {
248        self.stats.len() == 0
249    }
250
251    #[inline]
252    pub fn reset(&mut self) {
253        self.stats.reset();
254    }
255}
256
257/// A Specialized statistics object for CuDuration.
258/// It will also keep track of the jitter between the values.
259#[derive(Debug, Clone)]
260pub struct CuDurationStatistics {
261    bare: LiveStatistics,
262    jitter: LiveStatistics,
263    last_value: CuDuration,
264}
265
266impl CuDurationStatistics {
267    pub fn new(max: CuDuration) -> Self {
268        let CuDuration(max) = max;
269        CuDurationStatistics {
270            bare: LiveStatistics::new_with_max(max),
271            jitter: LiveStatistics::new_with_max(max),
272            last_value: CuDuration::default(),
273        }
274    }
275
276    #[inline]
277    pub fn min(&self) -> CuDuration {
278        CuDuration(self.bare.min())
279    }
280
281    #[inline]
282    pub fn max(&self) -> CuDuration {
283        CuDuration(self.bare.max())
284    }
285
286    #[inline]
287    pub fn mean(&self) -> CuDuration {
288        CuDuration(self.bare.mean() as u64) // CuDuration is in ns, it is ok.
289    }
290
291    #[inline]
292    pub fn percentile(&self, percentile: f64) -> CuDuration {
293        CuDuration(self.bare.percentile(percentile))
294    }
295
296    #[inline]
297    pub fn stddev(&self) -> CuDuration {
298        CuDuration(self.bare.stats.stdev() as u64)
299    }
300
301    #[inline]
302    pub fn len(&self) -> u64 {
303        self.bare.len()
304    }
305
306    #[inline]
307    pub fn is_empty(&self) -> bool {
308        self.bare.len() == 0
309    }
310
311    #[inline]
312    pub fn jitter_min(&self) -> CuDuration {
313        CuDuration(self.jitter.min())
314    }
315
316    #[inline]
317    pub fn jitter_max(&self) -> CuDuration {
318        CuDuration(self.jitter.max())
319    }
320
321    #[inline]
322    pub fn jitter_mean(&self) -> CuDuration {
323        CuDuration(self.jitter.mean() as u64)
324    }
325
326    #[inline]
327    pub fn jitter_stddev(&self) -> CuDuration {
328        CuDuration(self.jitter.stats.stdev() as u64)
329    }
330
331    #[inline]
332    pub fn jitter_percentile(&self, percentile: f64) -> CuDuration {
333        CuDuration(self.jitter.percentile(percentile))
334    }
335
336    #[inline]
337    pub fn record(&mut self, value: CuDuration) {
338        let CuDuration(nanos) = value;
339        if self.bare.is_empty() {
340            self.bare.stats.record(nanos).unwrap(); // this needs to happen *after* the check.
341            self.last_value = value;
342            return;
343        }
344        self.bare.stats.record(nanos).unwrap(); // .. and needs to happen here too.
345        let CuDuration(last_nanos) = self.last_value;
346        self.jitter
347            .stats
348            .record(nanos.abs_diff(last_nanos))
349            .unwrap();
350        self.last_value = value;
351    }
352
353    #[inline]
354    pub fn reset(&mut self) {
355        self.bare.reset();
356        self.jitter.reset();
357    }
358}
359
360#[cfg(test)]
361mod tests {
362    use super::*;
363
364    #[test]
365    fn test_live_statistics() {
366        let mut stats = LiveStatistics::new_unbounded();
367        stats.record(1);
368        stats.record(2);
369        stats.record(3);
370        stats.record(4);
371        stats.record(5);
372        assert_eq!(stats.min(), 1);
373        assert_eq!(stats.max(), 5);
374        assert_eq!(stats.mean(), 3.0);
375        assert_eq!(stats.percentile(0.5), 3);
376        assert_eq!(stats.percentile(0.90), 5);
377        assert_eq!(stats.percentile(0.99), 5);
378        assert_eq!(stats.len(), 5);
379        stats.reset();
380        assert_eq!(stats.len(), 0);
381    }
382
383    #[test]
384    fn test_duration_stats() {
385        let mut stats = CuDurationStatistics::new(CuDuration(100));
386        stats.record(CuDuration(100));
387        stats.record(CuDuration(200));
388        stats.record(CuDuration(500));
389        stats.record(CuDuration(400));
390        assert_eq!(stats.min(), CuDuration(100));
391        assert_eq!(stats.max(), CuDuration(500));
392        assert_eq!(stats.mean(), CuDuration(300));
393        assert_eq!(stats.percentile(0.5), CuDuration(200));
394        assert_eq!(stats.percentile(0.90), CuDuration(500));
395        assert_eq!(stats.percentile(0.99), CuDuration(500));
396        assert_eq!(stats.len(), 4);
397        assert_eq!(stats.jitter.len(), 3);
398        assert_eq!(stats.jitter_min(), CuDuration(100));
399        assert_eq!(stats.jitter_max(), CuDuration(300));
400        assert_eq!(stats.jitter_mean(), CuDuration((100 + 300 + 100) / 3));
401        assert_eq!(stats.jitter_percentile(0.5), CuDuration(100));
402        assert_eq!(stats.jitter_percentile(0.90), CuDuration(300));
403        assert_eq!(stats.jitter_percentile(0.99), CuDuration(300));
404        stats.reset();
405        assert_eq!(stats.len(), 0);
406    }
407}