1use crate::config::CuConfig;
5use crate::config::{BridgeChannelConfigRepresentation, BridgeConfig, Flavor};
6use crate::cutask::CuMsgMetadata;
7use cu29_clock::{CuDuration, RobotClock};
8#[allow(unused_imports)]
9use cu29_log::CuLogLevel;
10use cu29_traits::{CuError, CuResult};
11use serde_derive::{Deserialize, Serialize};
12
13#[cfg(not(feature = "std"))]
14extern crate alloc;
15
16#[cfg(feature = "std")]
17use std::{collections::HashMap as Map, string::String, string::ToString, vec::Vec};
18
19#[cfg(not(feature = "std"))]
20use alloc::{collections::BTreeMap as Map, string::String, string::ToString, vec::Vec};
21
22#[cfg(not(feature = "std"))]
23mod imp {
24 pub use alloc::alloc::{GlobalAlloc, Layout};
25 pub use core::sync::atomic::{AtomicUsize, Ordering};
26 pub use libm::sqrt;
27}
28
29#[cfg(feature = "std")]
30mod imp {
31 #[cfg(feature = "memory_monitoring")]
32 use super::CountingAlloc;
33 #[cfg(feature = "memory_monitoring")]
34 pub use std::alloc::System;
35 pub use std::alloc::{GlobalAlloc, Layout};
36 pub use std::sync::atomic::{AtomicUsize, Ordering};
37 #[cfg(feature = "memory_monitoring")]
38 #[global_allocator]
39 pub static GLOBAL: CountingAlloc<System> = CountingAlloc::new(System);
40}
41
42use imp::*;
43
44#[derive(Debug, Serialize, Deserialize)]
46pub enum CuTaskState {
47 Start,
48 Preprocess,
49 Process,
50 Postprocess,
51 Stop,
52}
53
54#[derive(Debug)]
56pub enum Decision {
57 Abort, Ignore, Shutdown, }
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub enum ComponentKind {
64 Task,
65 Bridge,
66}
67
68#[derive(Debug, Clone)]
69pub struct MonitorNode {
70 pub id: String,
71 pub type_name: Option<String>,
72 pub kind: ComponentKind,
73 pub inputs: Vec<String>,
75 pub outputs: Vec<String>,
77}
78
79#[derive(Debug, Clone)]
80pub struct MonitorConnection {
81 pub src: String,
82 pub src_port: Option<String>,
83 pub dst: String,
84 pub dst_port: Option<String>,
85 pub msg: String,
86}
87
88#[derive(Debug, Clone, Default)]
89pub struct MonitorTopology {
90 pub nodes: Vec<MonitorNode>,
91 pub connections: Vec<MonitorConnection>,
92}
93
94#[derive(Default, Debug, Clone, Copy)]
95struct NodeIoUsage {
96 has_incoming: bool,
97 has_outgoing: bool,
98}
99
100pub fn build_monitor_topology(
102 config: &CuConfig,
103 mission: Option<&str>,
104) -> CuResult<MonitorTopology> {
105 let graph = config.get_graph(mission)?;
106 let mut nodes: Map<String, MonitorNode> = Map::new();
107 let mut io_usage: Map<String, NodeIoUsage> = Map::new();
108
109 let mut bridge_lookup: Map<&str, &BridgeConfig> = Map::new();
110 for bridge in &config.bridges {
111 bridge_lookup.insert(bridge.id.as_str(), bridge);
112 }
113
114 for cnx in graph.edges() {
115 io_usage.entry(cnx.src.clone()).or_default().has_outgoing = true;
116 io_usage.entry(cnx.dst.clone()).or_default().has_incoming = true;
117 }
118
119 for (_, node) in graph.get_all_nodes() {
120 let kind = match node.get_flavor() {
121 Flavor::Bridge => ComponentKind::Bridge,
122 _ => ComponentKind::Task,
123 };
124 let node_id = node.get_id();
125
126 let mut inputs = Vec::new();
127 let mut outputs = Vec::new();
128 if kind == ComponentKind::Bridge {
129 if let Some(bridge) = bridge_lookup.get(node_id.as_str()) {
130 for ch in &bridge.channels {
131 match ch {
132 BridgeChannelConfigRepresentation::Rx { id, .. } => {
133 outputs.push(id.clone())
134 }
135 BridgeChannelConfigRepresentation::Tx { id, .. } => inputs.push(id.clone()),
136 }
137 }
138 }
139 } else {
140 let usage = io_usage.get(node_id.as_str()).cloned().unwrap_or_default();
141 if usage.has_incoming || !usage.has_outgoing {
142 inputs.push("in".to_string());
143 }
144 if usage.has_outgoing || !usage.has_incoming {
145 outputs.push("out".to_string());
146 }
147 }
148
149 nodes.insert(
150 node_id.clone(),
151 MonitorNode {
152 id: node_id,
153 type_name: Some(node.get_type().to_string()),
154 kind,
155 inputs,
156 outputs,
157 },
158 );
159 }
160
161 let mut connections = Vec::new();
162 for cnx in graph.edges() {
163 let src = cnx.src.clone();
164 let dst = cnx.dst.clone();
165
166 let src_port = cnx.src_channel.clone().or_else(|| {
167 nodes
168 .get(&src)
169 .and_then(|node| node.outputs.first().cloned())
170 });
171 let dst_port = cnx.dst_channel.clone().or_else(|| {
172 nodes
173 .get(&dst)
174 .and_then(|node| node.inputs.first().cloned())
175 });
176
177 connections.push(MonitorConnection {
178 src,
179 src_port,
180 dst,
181 dst_port,
182 msg: cnx.msg.clone(),
183 });
184 }
185
186 Ok(MonitorTopology {
187 nodes: nodes.into_values().collect(),
188 connections,
189 })
190}
191
192pub trait CuMonitor: Sized {
194 fn new(config: &CuConfig, taskids: &'static [&'static str]) -> CuResult<Self>
195 where
196 Self: Sized;
197
198 fn set_topology(&mut self, _topology: MonitorTopology) {}
199
200 fn start(&mut self, _clock: &RobotClock) -> CuResult<()> {
201 Ok(())
202 }
203
204 fn process_copperlist(&self, msgs: &[&CuMsgMetadata]) -> CuResult<()>;
206
207 fn process_error(&self, taskid: usize, step: CuTaskState, error: &CuError) -> Decision;
209
210 fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> {
212 Ok(())
213 }
214}
215
216pub struct NoMonitor {}
219impl CuMonitor for NoMonitor {
220 fn new(_config: &CuConfig, _taskids: &'static [&'static str]) -> CuResult<Self> {
221 Ok(NoMonitor {})
222 }
223
224 fn process_copperlist(&self, _msgs: &[&CuMsgMetadata]) -> CuResult<()> {
225 Ok(())
227 }
228
229 fn process_error(&self, _taskid: usize, _step: CuTaskState, _error: &CuError) -> Decision {
230 Decision::Ignore
232 }
233}
234
235pub struct CountingAlloc<A: GlobalAlloc> {
237 inner: A,
238 allocated: AtomicUsize,
239 deallocated: AtomicUsize,
240}
241
242impl<A: GlobalAlloc> CountingAlloc<A> {
243 pub const fn new(inner: A) -> Self {
244 CountingAlloc {
245 inner,
246 allocated: AtomicUsize::new(0),
247 deallocated: AtomicUsize::new(0),
248 }
249 }
250
251 pub fn allocated(&self) -> usize {
252 self.allocated.load(Ordering::SeqCst)
253 }
254
255 pub fn deallocated(&self) -> usize {
256 self.deallocated.load(Ordering::SeqCst)
257 }
258
259 pub fn reset(&self) {
260 self.allocated.store(0, Ordering::SeqCst);
261 self.deallocated.store(0, Ordering::SeqCst);
262 }
263}
264
265unsafe impl<A: GlobalAlloc> GlobalAlloc for CountingAlloc<A> {
266 unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
267 let p = unsafe { self.inner.alloc(layout) };
268 if !p.is_null() {
269 self.allocated.fetch_add(layout.size(), Ordering::SeqCst);
270 }
271 p
272 }
273
274 unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
275 unsafe {
276 self.inner.dealloc(ptr, layout);
277 }
278 self.deallocated.fetch_add(layout.size(), Ordering::SeqCst);
279 }
280}
281
282#[cfg(feature = "memory_monitoring")]
284pub struct ScopedAllocCounter {
285 bf_allocated: usize,
286 bf_deallocated: usize,
287}
288
289#[cfg(feature = "memory_monitoring")]
290impl Default for ScopedAllocCounter {
291 fn default() -> Self {
292 Self::new()
293 }
294}
295
296#[cfg(feature = "memory_monitoring")]
297impl ScopedAllocCounter {
298 pub fn new() -> Self {
299 ScopedAllocCounter {
300 bf_allocated: GLOBAL.allocated(),
301 bf_deallocated: GLOBAL.deallocated(),
302 }
303 }
304
305 pub fn allocated(&self) -> usize {
317 GLOBAL.allocated() - self.bf_allocated
318 }
319
320 pub fn deallocated(&self) -> usize {
333 GLOBAL.deallocated() - self.bf_deallocated
334 }
335}
336
337#[cfg(feature = "memory_monitoring")]
339impl Drop for ScopedAllocCounter {
340 fn drop(&mut self) {
341 let _allocated = GLOBAL.allocated() - self.bf_allocated;
342 let _deallocated = GLOBAL.deallocated() - self.bf_deallocated;
343 }
350}
351
352#[cfg(feature = "std")]
353const BUCKET_COUNT: usize = 1024;
354#[cfg(not(feature = "std"))]
355const BUCKET_COUNT: usize = 256;
356
357#[derive(Debug, Clone)]
360pub struct LiveStatistics {
361 buckets: [u64; BUCKET_COUNT],
362 min_val: u64,
363 max_val: u64,
364 sum: u64,
365 sum_sq: u64,
366 count: u64,
367 max_value: u64,
368}
369
370impl LiveStatistics {
371 pub fn new_with_max(max_value: u64) -> Self {
391 LiveStatistics {
392 buckets: [0; BUCKET_COUNT],
393 min_val: u64::MAX,
394 max_val: 0,
395 sum: 0,
396 sum_sq: 0,
397 count: 0,
398 max_value,
399 }
400 }
401
402 #[inline]
403 fn value_to_bucket(&self, value: u64) -> usize {
404 if value >= self.max_value {
405 BUCKET_COUNT - 1
406 } else {
407 ((value as u128 * BUCKET_COUNT as u128) / self.max_value as u128) as usize
408 }
409 }
410
411 #[inline]
412 pub fn min(&self) -> u64 {
413 if self.count == 0 { 0 } else { self.min_val }
414 }
415
416 #[inline]
417 pub fn max(&self) -> u64 {
418 self.max_val
419 }
420
421 #[inline]
422 pub fn mean(&self) -> f64 {
423 if self.count == 0 {
424 0.0
425 } else {
426 self.sum as f64 / self.count as f64
427 }
428 }
429
430 #[inline]
431 pub fn stdev(&self) -> f64 {
432 if self.count == 0 {
433 return 0.0;
434 }
435 let mean = self.mean();
436 let variance = (self.sum_sq as f64 / self.count as f64) - (mean * mean);
437 if variance < 0.0 {
438 return 0.0;
439 }
440 #[cfg(feature = "std")]
441 return variance.sqrt();
442 #[cfg(not(feature = "std"))]
443 return sqrt(variance);
444 }
445
446 #[inline]
447 pub fn percentile(&self, percentile: f64) -> u64 {
448 if self.count == 0 {
449 return 0;
450 }
451
452 let target_count = (self.count as f64 * percentile) as u64;
453 let mut accumulated = 0u64;
454
455 for (bucket_idx, &bucket_count) in self.buckets.iter().enumerate() {
456 accumulated += bucket_count;
457 if accumulated >= target_count {
458 let bucket_start = (bucket_idx as u64 * self.max_value) / BUCKET_COUNT as u64;
460 let bucket_end = ((bucket_idx + 1) as u64 * self.max_value) / BUCKET_COUNT as u64;
461 let bucket_fraction = if bucket_count > 0 {
462 (target_count - (accumulated - bucket_count)) as f64 / bucket_count as f64
463 } else {
464 0.5
465 };
466 return bucket_start
467 + ((bucket_end - bucket_start) as f64 * bucket_fraction) as u64;
468 }
469 }
470
471 self.max_val
472 }
473
474 #[inline]
476 pub fn record(&mut self, value: u64) {
477 if value < self.min_val {
478 self.min_val = value;
479 }
480 if value > self.max_val {
481 self.max_val = value;
482 }
483 self.sum += value;
484 self.sum_sq += value * value;
485 self.count += 1;
486
487 let bucket = self.value_to_bucket(value);
488 self.buckets[bucket] += 1;
489 }
490
491 #[inline]
492 pub fn len(&self) -> u64 {
493 self.count
494 }
495
496 #[inline]
497 pub fn is_empty(&self) -> bool {
498 self.count == 0
499 }
500
501 #[inline]
502 pub fn reset(&mut self) {
503 self.buckets.fill(0);
504 self.min_val = u64::MAX;
505 self.max_val = 0;
506 self.sum = 0;
507 self.sum_sq = 0;
508 self.count = 0;
509 }
510}
511
512#[derive(Debug, Clone)]
515pub struct CuDurationStatistics {
516 bare: LiveStatistics,
517 jitter: LiveStatistics,
518 last_value: CuDuration,
519}
520
521impl CuDurationStatistics {
522 pub fn new(max: CuDuration) -> Self {
523 let CuDuration(max) = max;
524 CuDurationStatistics {
525 bare: LiveStatistics::new_with_max(max),
526 jitter: LiveStatistics::new_with_max(max),
527 last_value: CuDuration::default(),
528 }
529 }
530
531 #[inline]
532 pub fn min(&self) -> CuDuration {
533 CuDuration(self.bare.min())
534 }
535
536 #[inline]
537 pub fn max(&self) -> CuDuration {
538 CuDuration(self.bare.max())
539 }
540
541 #[inline]
542 pub fn mean(&self) -> CuDuration {
543 CuDuration(self.bare.mean() as u64) }
545
546 #[inline]
547 pub fn percentile(&self, percentile: f64) -> CuDuration {
548 CuDuration(self.bare.percentile(percentile))
549 }
550
551 #[inline]
552 pub fn stddev(&self) -> CuDuration {
553 CuDuration(self.bare.stdev() as u64)
554 }
555
556 #[inline]
557 pub fn len(&self) -> u64 {
558 self.bare.len()
559 }
560
561 #[inline]
562 pub fn is_empty(&self) -> bool {
563 self.bare.len() == 0
564 }
565
566 #[inline]
567 pub fn jitter_min(&self) -> CuDuration {
568 CuDuration(self.jitter.min())
569 }
570
571 #[inline]
572 pub fn jitter_max(&self) -> CuDuration {
573 CuDuration(self.jitter.max())
574 }
575
576 #[inline]
577 pub fn jitter_mean(&self) -> CuDuration {
578 CuDuration(self.jitter.mean() as u64)
579 }
580
581 #[inline]
582 pub fn jitter_stddev(&self) -> CuDuration {
583 CuDuration(self.jitter.stdev() as u64)
584 }
585
586 #[inline]
587 pub fn jitter_percentile(&self, percentile: f64) -> CuDuration {
588 CuDuration(self.jitter.percentile(percentile))
589 }
590
591 #[inline]
592 pub fn record(&mut self, value: CuDuration) {
593 let CuDuration(nanos) = value;
594 if self.bare.is_empty() {
595 self.bare.record(nanos);
596 self.last_value = value;
597 return;
598 }
599 self.bare.record(nanos);
600 let CuDuration(last_nanos) = self.last_value;
601 self.jitter.record(nanos.abs_diff(last_nanos));
602 self.last_value = value;
603 }
604
605 #[inline]
606 pub fn reset(&mut self) {
607 self.bare.reset();
608 self.jitter.reset();
609 }
610}
611
612#[cfg(test)]
613mod tests {
614 use super::*;
615
616 #[test]
617 fn test_live_statistics_percentiles() {
618 let mut stats = LiveStatistics::new_with_max(1000);
619
620 for i in 0..100 {
622 stats.record(i);
623 }
624
625 assert_eq!(stats.len(), 100);
626 assert_eq!(stats.min(), 0);
627 assert_eq!(stats.max(), 99);
628 assert_eq!(stats.mean() as u64, 49); let p50 = stats.percentile(0.5);
632 let p90 = stats.percentile(0.90);
633 let p95 = stats.percentile(0.95);
634 let p99 = stats.percentile(0.99);
635
636 assert!((p50 as i64 - 49).abs() < 5, "p50={} expected ~49", p50);
638 assert!((p90 as i64 - 89).abs() < 5, "p90={} expected ~89", p90);
639 assert!((p95 as i64 - 94).abs() < 5, "p95={} expected ~94", p95);
640 assert!((p99 as i64 - 98).abs() < 5, "p99={} expected ~98", p99);
641 }
642
643 #[test]
644 fn test_duration_stats() {
645 let mut stats = CuDurationStatistics::new(CuDuration(1000));
646 stats.record(CuDuration(100));
647 stats.record(CuDuration(200));
648 stats.record(CuDuration(500));
649 stats.record(CuDuration(400));
650 assert_eq!(stats.min(), CuDuration(100));
651 assert_eq!(stats.max(), CuDuration(500));
652 assert_eq!(stats.mean(), CuDuration(300));
653 assert_eq!(stats.len(), 4);
654 assert_eq!(stats.jitter.len(), 3);
655 assert_eq!(stats.jitter_min(), CuDuration(100));
656 assert_eq!(stats.jitter_max(), CuDuration(300));
657 assert_eq!(stats.jitter_mean(), CuDuration((100 + 300 + 100) / 3));
658 stats.reset();
659 assert_eq!(stats.len(), 0);
660 }
661}