cu29_runtime/
monitoring.rs

1//! Some basic internal monitoring tooling Copper uses to monitor itself and the tasks it is running.
2//!
3
4use crate::config::CuConfig;
5use crate::cutask::CuMsgMetadata;
6use crate::log::*;
7use cu29_clock::{CuDuration, RobotClock};
8use cu29_traits::{CuError, CuResult};
9use hdrhistogram::Histogram;
10use serde_derive::{Deserialize, Serialize};
11use std::alloc::{GlobalAlloc, Layout, System};
12use std::sync::atomic::{AtomicUsize, Ordering};
13
14/// The state of a task.
15#[derive(Debug, Serialize, Deserialize)]
16pub enum CuTaskState {
17    Start,
18    Preprocess,
19    Process,
20    Postprocess,
21    Stop,
22}
23
24/// Monitor decision to be taken when a task errored out.
25#[derive(Debug)]
26pub enum Decision {
27    Abort,    // for a step (stop, start) or a copperlist, just stop trying to process it.
28    Ignore, // Ignore this error and try to continue, ie calling the other tasks steps, setting a None return value and continue a copperlist.
29    Shutdown, // This is a fatal error, shutdown the copper as cleanly as possible.
30}
31
32/// Trait to implement a monitoring task.
33pub trait CuMonitor: Sized {
34    fn new(config: &CuConfig, taskids: &'static [&'static str]) -> CuResult<Self>
35    where
36        Self: Sized;
37
38    fn start(&mut self, _clock: &RobotClock) -> CuResult<()> {
39        Ok(())
40    }
41
42    /// Callback that will be trigger at the end of every copperlist (before, on or after the serialization).
43    fn process_copperlist(&self, msgs: &[&CuMsgMetadata]) -> CuResult<()>;
44
45    /// Callbacked when a Task errored out. The runtime requires an immediate decision.
46    fn process_error(&self, taskid: usize, step: CuTaskState, error: &CuError) -> Decision;
47
48    /// Callbacked when copper is stopping.
49    fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> {
50        Ok(())
51    }
52}
53
54/// A do nothing monitor if no monitor is provided.
55/// This is basically defining the default behavior of Copper in case of error.
56pub struct NoMonitor {}
57impl CuMonitor for NoMonitor {
58    fn new(_config: &CuConfig, _taskids: &'static [&'static str]) -> CuResult<Self> {
59        Ok(NoMonitor {})
60    }
61
62    fn process_copperlist(&self, _msgs: &[&CuMsgMetadata]) -> CuResult<()> {
63        // By default, do nothing.
64        Ok(())
65    }
66
67    fn process_error(&self, _taskid: usize, _step: CuTaskState, _error: &CuError) -> Decision {
68        // By default, just try to continue.
69        Decision::Ignore
70    }
71}
72
73#[global_allocator]
74pub static GLOBAL: CountingAllocator = CountingAllocator::new();
75
76/// A simple allocator that counts the number of bytes allocated and deallocated.
77pub struct CountingAllocator {
78    allocated: AtomicUsize,
79    deallocated: AtomicUsize,
80}
81
82impl Default for CountingAllocator {
83    fn default() -> Self {
84        Self::new()
85    }
86}
87
88impl CountingAllocator {
89    pub const fn new() -> Self {
90        CountingAllocator {
91            allocated: AtomicUsize::new(0),
92            deallocated: AtomicUsize::new(0),
93        }
94    }
95
96    pub fn get_allocated(&self) -> usize {
97        self.allocated.load(Ordering::SeqCst)
98    }
99
100    pub fn get_deallocated(&self) -> usize {
101        self.deallocated.load(Ordering::SeqCst)
102    }
103
104    pub fn reset(&self) {
105        self.allocated.store(0, Ordering::SeqCst);
106        self.deallocated.store(0, Ordering::SeqCst);
107    }
108}
109
110unsafe impl GlobalAlloc for CountingAllocator {
111    unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
112        let ptr = System.alloc(layout);
113        if !ptr.is_null() {
114            self.allocated.fetch_add(layout.size(), Ordering::SeqCst);
115        }
116        ptr
117    }
118
119    unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
120        System.dealloc(ptr, layout);
121        self.deallocated.fetch_add(layout.size(), Ordering::SeqCst);
122    }
123}
124
125/// A simple struct that counts the number of bytes allocated and deallocated in a scope.
126pub struct ScopedAllocCounter {
127    bf_allocated: usize,
128    bf_deallocated: usize,
129}
130
131impl Default for ScopedAllocCounter {
132    fn default() -> Self {
133        Self::new()
134    }
135}
136
137impl ScopedAllocCounter {
138    pub fn new() -> Self {
139        ScopedAllocCounter {
140            bf_allocated: GLOBAL.get_allocated(),
141            bf_deallocated: GLOBAL.get_deallocated(),
142        }
143    }
144
145    /// Returns the total number of bytes allocated in the current scope
146    /// since the creation of this `ScopedAllocCounter`.
147    ///
148    /// # Example
149    /// ```
150    /// use cu29_runtime::monitoring::ScopedAllocCounter;
151    ///
152    /// let counter = ScopedAllocCounter::new();
153    /// let _vec = vec![0u8; 1024];
154    /// println!("Bytes allocated: {}", counter.get_allocated());
155    /// ```
156    pub fn get_allocated(&self) -> usize {
157        GLOBAL.get_allocated() - self.bf_allocated
158    }
159
160    /// Returns the total number of bytes deallocated in the current scope
161    /// since the creation of this `ScopedAllocCounter`.
162    ///
163    /// # Example
164    /// ```
165    /// use cu29_runtime::monitoring::ScopedAllocCounter;
166    ///
167    /// let counter = ScopedAllocCounter::new();
168    /// let _vec = vec![0u8; 1024];
169    /// drop(_vec);
170    /// println!("Bytes deallocated: {}", counter.get_deallocated());
171    /// ```
172    pub fn get_deallocated(&self) -> usize {
173        GLOBAL.get_deallocated() - self.bf_deallocated
174    }
175}
176
177/// Build a difference between the number of bytes allocated and deallocated in the scope at drop time.
178impl Drop for ScopedAllocCounter {
179    fn drop(&mut self) {
180        let _allocated = GLOBAL.get_allocated() - self.bf_allocated;
181        let _deallocated = GLOBAL.get_deallocated() - self.bf_deallocated;
182        // TODO(gbin): Fix this when the logger is ready.
183        // debug!(
184        //     "Allocations: +{}B -{}B",
185        //     allocated = allocated,
186        //     deallocated = deallocated,
187        // );
188    }
189}
190
191/// Accumulative stat object that can give your some real time statistics.
192#[derive(Debug, Clone)]
193pub struct LiveStatistics {
194    stats: Histogram<u64>, // u64 is the Counter type.
195}
196
197impl LiveStatistics {
198    pub fn new_unbounded() -> Self {
199        LiveStatistics {
200            stats: Histogram::<u64>::new(3).unwrap(),
201        }
202    }
203
204    pub fn new_with_max(max: u64) -> Self {
205        LiveStatistics {
206            stats: Histogram::<u64>::new_with_bounds(1, max, 3).unwrap(),
207        }
208    }
209
210    #[inline]
211    pub fn min(&self) -> u64 {
212        self.stats.min()
213    }
214
215    #[inline]
216    pub fn max(&self) -> u64 {
217        self.stats.max()
218    }
219
220    #[inline]
221    pub fn mean(&self) -> f64 {
222        self.stats.mean()
223    }
224
225    #[inline]
226    pub fn percentile(&self, percentile: f64) -> u64 {
227        self.stats.value_at_quantile(percentile)
228    }
229
230    /// Adds a value to the statistics.
231    #[inline]
232    pub fn record(&mut self, value: u64) {
233        let maybe_error = self.stats.record(value);
234        if let Err(e) = maybe_error {
235            debug!("stats.record errored out: {}", e.to_string());
236        }
237    }
238
239    #[inline]
240    pub fn len(&self) -> u64 {
241        self.stats.len()
242    }
243
244    #[inline]
245    pub fn is_empty(&self) -> bool {
246        self.stats.len() == 0
247    }
248
249    #[inline]
250    pub fn reset(&mut self) {
251        self.stats.reset();
252    }
253}
254
255/// A Specialized statistics object for CuDuration.
256/// It will also keep track of the jitter between the values.
257#[derive(Debug, Clone)]
258pub struct CuDurationStatistics {
259    bare: LiveStatistics,
260    jitter: LiveStatistics,
261    last_value: CuDuration,
262}
263
264impl CuDurationStatistics {
265    pub fn new(max: CuDuration) -> Self {
266        let CuDuration(max) = max;
267        CuDurationStatistics {
268            bare: LiveStatistics::new_with_max(max),
269            jitter: LiveStatistics::new_with_max(max),
270            last_value: CuDuration::default(),
271        }
272    }
273
274    #[inline]
275    pub fn min(&self) -> CuDuration {
276        CuDuration(self.bare.min())
277    }
278
279    #[inline]
280    pub fn max(&self) -> CuDuration {
281        CuDuration(self.bare.max())
282    }
283
284    #[inline]
285    pub fn mean(&self) -> CuDuration {
286        CuDuration(self.bare.mean() as u64) // CuDuration is in ns, it is ok.
287    }
288
289    #[inline]
290    pub fn percentile(&self, percentile: f64) -> CuDuration {
291        CuDuration(self.bare.percentile(percentile))
292    }
293
294    #[inline]
295    pub fn stddev(&self) -> CuDuration {
296        CuDuration(self.bare.stats.stdev() as u64)
297    }
298
299    #[inline]
300    pub fn len(&self) -> u64 {
301        self.bare.len()
302    }
303
304    #[inline]
305    pub fn is_empty(&self) -> bool {
306        self.bare.len() == 0
307    }
308
309    #[inline]
310    pub fn jitter_min(&self) -> CuDuration {
311        CuDuration(self.jitter.min())
312    }
313
314    #[inline]
315    pub fn jitter_max(&self) -> CuDuration {
316        CuDuration(self.jitter.max())
317    }
318
319    #[inline]
320    pub fn jitter_mean(&self) -> CuDuration {
321        CuDuration(self.jitter.mean() as u64)
322    }
323
324    #[inline]
325    pub fn jitter_stddev(&self) -> CuDuration {
326        CuDuration(self.jitter.stats.stdev() as u64)
327    }
328
329    #[inline]
330    pub fn jitter_percentile(&self, percentile: f64) -> CuDuration {
331        CuDuration(self.jitter.percentile(percentile))
332    }
333
334    #[inline]
335    pub fn record(&mut self, value: CuDuration) {
336        let CuDuration(nanos) = value;
337        if self.bare.is_empty() {
338            self.bare.stats.record(nanos).unwrap(); // this needs to happen *after* the check.
339            self.last_value = value;
340            return;
341        }
342        self.bare.stats.record(nanos).unwrap(); // .. and needs to happen here too.
343        let CuDuration(last_nanos) = self.last_value;
344        self.jitter
345            .stats
346            .record(nanos.abs_diff(last_nanos))
347            .unwrap();
348        self.last_value = value;
349    }
350
351    #[inline]
352    pub fn reset(&mut self) {
353        self.bare.reset();
354        self.jitter.reset();
355    }
356}
357
358#[cfg(test)]
359mod tests {
360    use super::*;
361
362    #[test]
363    fn test_live_statistics() {
364        let mut stats = LiveStatistics::new_unbounded();
365        stats.record(1);
366        stats.record(2);
367        stats.record(3);
368        stats.record(4);
369        stats.record(5);
370        assert_eq!(stats.min(), 1);
371        assert_eq!(stats.max(), 5);
372        assert_eq!(stats.mean(), 3.0);
373        assert_eq!(stats.percentile(0.5), 3);
374        assert_eq!(stats.percentile(0.90), 5);
375        assert_eq!(stats.percentile(0.99), 5);
376        assert_eq!(stats.len(), 5);
377        stats.reset();
378        assert_eq!(stats.len(), 0);
379    }
380
381    #[test]
382    fn test_duration_stats() {
383        let mut stats = CuDurationStatistics::new(CuDuration(100));
384        stats.record(CuDuration(100));
385        stats.record(CuDuration(200));
386        stats.record(CuDuration(500));
387        stats.record(CuDuration(400));
388        assert_eq!(stats.min(), CuDuration(100));
389        assert_eq!(stats.max(), CuDuration(500));
390        assert_eq!(stats.mean(), CuDuration(300));
391        assert_eq!(stats.percentile(0.5), CuDuration(200));
392        assert_eq!(stats.percentile(0.90), CuDuration(500));
393        assert_eq!(stats.percentile(0.99), CuDuration(500));
394        assert_eq!(stats.len(), 4);
395        assert_eq!(stats.jitter.len(), 3);
396        assert_eq!(stats.jitter_min(), CuDuration(100));
397        assert_eq!(stats.jitter_max(), CuDuration(300));
398        assert_eq!(stats.jitter_mean(), CuDuration((100 + 300 + 100) / 3));
399        assert_eq!(stats.jitter_percentile(0.5), CuDuration(100));
400        assert_eq!(stats.jitter_percentile(0.90), CuDuration(300));
401        assert_eq!(stats.jitter_percentile(0.99), CuDuration(300));
402        stats.reset();
403        assert_eq!(stats.len(), 0);
404    }
405}