1use crate::config::CuConfig;
5use crate::config::{BridgeChannelConfigRepresentation, BridgeConfig, Flavor};
6use crate::cutask::CuMsgMetadata;
7use cu29_clock::{CuDuration, RobotClock};
8#[allow(unused_imports)]
9use cu29_log::CuLogLevel;
10use cu29_traits::{CuError, CuResult};
11use petgraph::visit::IntoEdgeReferences;
12use serde_derive::{Deserialize, Serialize};
13
14#[cfg(not(feature = "std"))]
15extern crate alloc;
16
17#[cfg(feature = "std")]
18use std::{collections::HashMap as Map, string::String, string::ToString, vec::Vec};
19
20#[cfg(not(feature = "std"))]
21use alloc::{collections::BTreeMap as Map, string::String, string::ToString, vec::Vec};
22
23#[cfg(not(feature = "std"))]
24mod imp {
25 pub use alloc::alloc::{GlobalAlloc, Layout};
26 pub use core::sync::atomic::{AtomicUsize, Ordering};
27 pub use libm::sqrt;
28}
29
30#[cfg(feature = "std")]
31mod imp {
32 #[cfg(feature = "memory_monitoring")]
33 use super::CountingAlloc;
34 #[cfg(feature = "memory_monitoring")]
35 pub use std::alloc::System;
36 pub use std::alloc::{GlobalAlloc, Layout};
37 pub use std::sync::atomic::{AtomicUsize, Ordering};
38 #[cfg(feature = "memory_monitoring")]
39 #[global_allocator]
40 pub static GLOBAL: CountingAlloc<System> = CountingAlloc::new(System);
41}
42
43use imp::*;
44
45#[derive(Debug, Serialize, Deserialize)]
47pub enum CuTaskState {
48 Start,
49 Preprocess,
50 Process,
51 Postprocess,
52 Stop,
53}
54
55#[derive(Debug)]
57pub enum Decision {
58 Abort, Ignore, Shutdown, }
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum ComponentKind {
65 Task,
66 Bridge,
67}
68
69#[derive(Debug, Clone)]
70pub struct MonitorNode {
71 pub id: String,
72 pub type_name: Option<String>,
73 pub kind: ComponentKind,
74 pub inputs: Vec<String>,
76 pub outputs: Vec<String>,
78}
79
80#[derive(Debug, Clone)]
81pub struct MonitorConnection {
82 pub src: String,
83 pub src_port: Option<String>,
84 pub dst: String,
85 pub dst_port: Option<String>,
86 pub msg: String,
87}
88
89#[derive(Debug, Clone, Default)]
90pub struct MonitorTopology {
91 pub nodes: Vec<MonitorNode>,
92 pub connections: Vec<MonitorConnection>,
93}
94
95#[derive(Default, Debug, Clone, Copy)]
96struct NodeIoUsage {
97 has_incoming: bool,
98 has_outgoing: bool,
99}
100
101pub fn build_monitor_topology(
103 config: &CuConfig,
104 mission: Option<&str>,
105) -> CuResult<MonitorTopology> {
106 let graph = config.get_graph(mission)?;
107 let mut nodes: Map<String, MonitorNode> = Map::new();
108 let mut io_usage: Map<String, NodeIoUsage> = Map::new();
109
110 let mut bridge_lookup: Map<&str, &BridgeConfig> = Map::new();
111 for bridge in &config.bridges {
112 bridge_lookup.insert(bridge.id.as_str(), bridge);
113 }
114
115 for edge in graph.0.edge_references() {
116 let cnx = edge.weight();
117 io_usage.entry(cnx.src.clone()).or_default().has_outgoing = true;
118 io_usage.entry(cnx.dst.clone()).or_default().has_incoming = true;
119 }
120
121 for (_, node) in graph.get_all_nodes() {
122 let kind = match node.get_flavor() {
123 Flavor::Bridge => ComponentKind::Bridge,
124 _ => ComponentKind::Task,
125 };
126 let node_id = node.get_id();
127
128 let mut inputs = Vec::new();
129 let mut outputs = Vec::new();
130 if kind == ComponentKind::Bridge {
131 if let Some(bridge) = bridge_lookup.get(node_id.as_str()) {
132 for ch in &bridge.channels {
133 match ch {
134 BridgeChannelConfigRepresentation::Rx { id, .. } => {
135 outputs.push(id.clone())
136 }
137 BridgeChannelConfigRepresentation::Tx { id, .. } => inputs.push(id.clone()),
138 }
139 }
140 }
141 } else {
142 let usage = io_usage.get(node_id.as_str()).cloned().unwrap_or_default();
143 if usage.has_incoming || !usage.has_outgoing {
144 inputs.push("in".to_string());
145 }
146 if usage.has_outgoing || !usage.has_incoming {
147 outputs.push("out".to_string());
148 }
149 }
150
151 nodes.insert(
152 node_id.clone(),
153 MonitorNode {
154 id: node_id,
155 type_name: Some(node.get_type().to_string()),
156 kind,
157 inputs,
158 outputs,
159 },
160 );
161 }
162
163 let mut connections = Vec::new();
164 for edge in graph.0.edge_references() {
165 let cnx = edge.weight();
166 let src = cnx.src.clone();
167 let dst = cnx.dst.clone();
168
169 let src_port = cnx.src_channel.clone().or_else(|| {
170 nodes
171 .get(&src)
172 .and_then(|node| node.outputs.first().cloned())
173 });
174 let dst_port = cnx.dst_channel.clone().or_else(|| {
175 nodes
176 .get(&dst)
177 .and_then(|node| node.inputs.first().cloned())
178 });
179
180 connections.push(MonitorConnection {
181 src,
182 src_port,
183 dst,
184 dst_port,
185 msg: cnx.msg.clone(),
186 });
187 }
188
189 Ok(MonitorTopology {
190 nodes: nodes.into_values().collect(),
191 connections,
192 })
193}
194
195pub trait CuMonitor: Sized {
197 fn new(config: &CuConfig, taskids: &'static [&'static str]) -> CuResult<Self>
198 where
199 Self: Sized;
200
201 fn set_topology(&mut self, _topology: MonitorTopology) {}
202
203 fn start(&mut self, _clock: &RobotClock) -> CuResult<()> {
204 Ok(())
205 }
206
207 fn process_copperlist(&self, msgs: &[&CuMsgMetadata]) -> CuResult<()>;
209
210 fn process_error(&self, taskid: usize, step: CuTaskState, error: &CuError) -> Decision;
212
213 fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> {
215 Ok(())
216 }
217}
218
219pub struct NoMonitor {}
222impl CuMonitor for NoMonitor {
223 fn new(_config: &CuConfig, _taskids: &'static [&'static str]) -> CuResult<Self> {
224 Ok(NoMonitor {})
225 }
226
227 fn process_copperlist(&self, _msgs: &[&CuMsgMetadata]) -> CuResult<()> {
228 Ok(())
230 }
231
232 fn process_error(&self, _taskid: usize, _step: CuTaskState, _error: &CuError) -> Decision {
233 Decision::Ignore
235 }
236}
237
238pub struct CountingAlloc<A: GlobalAlloc> {
240 inner: A,
241 allocated: AtomicUsize,
242 deallocated: AtomicUsize,
243}
244
245impl<A: GlobalAlloc> CountingAlloc<A> {
246 pub const fn new(inner: A) -> Self {
247 CountingAlloc {
248 inner,
249 allocated: AtomicUsize::new(0),
250 deallocated: AtomicUsize::new(0),
251 }
252 }
253
254 pub fn allocated(&self) -> usize {
255 self.allocated.load(Ordering::SeqCst)
256 }
257
258 pub fn deallocated(&self) -> usize {
259 self.deallocated.load(Ordering::SeqCst)
260 }
261
262 pub fn reset(&self) {
263 self.allocated.store(0, Ordering::SeqCst);
264 self.deallocated.store(0, Ordering::SeqCst);
265 }
266}
267
268unsafe impl<A: GlobalAlloc> GlobalAlloc for CountingAlloc<A> {
269 unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
270 let p = unsafe { self.inner.alloc(layout) };
271 if !p.is_null() {
272 self.allocated.fetch_add(layout.size(), Ordering::SeqCst);
273 }
274 p
275 }
276
277 unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
278 unsafe {
279 self.inner.dealloc(ptr, layout);
280 }
281 self.deallocated.fetch_add(layout.size(), Ordering::SeqCst);
282 }
283}
284
285#[cfg(feature = "memory_monitoring")]
287pub struct ScopedAllocCounter {
288 bf_allocated: usize,
289 bf_deallocated: usize,
290}
291
292#[cfg(feature = "memory_monitoring")]
293impl Default for ScopedAllocCounter {
294 fn default() -> Self {
295 Self::new()
296 }
297}
298
299#[cfg(feature = "memory_monitoring")]
300impl ScopedAllocCounter {
301 pub fn new() -> Self {
302 ScopedAllocCounter {
303 bf_allocated: GLOBAL.allocated(),
304 bf_deallocated: GLOBAL.deallocated(),
305 }
306 }
307
308 pub fn allocated(&self) -> usize {
320 GLOBAL.allocated() - self.bf_allocated
321 }
322
323 pub fn deallocated(&self) -> usize {
336 GLOBAL.deallocated() - self.bf_deallocated
337 }
338}
339
340#[cfg(feature = "memory_monitoring")]
342impl Drop for ScopedAllocCounter {
343 fn drop(&mut self) {
344 let _allocated = GLOBAL.allocated() - self.bf_allocated;
345 let _deallocated = GLOBAL.deallocated() - self.bf_deallocated;
346 }
353}
354
355#[cfg(feature = "std")]
356const BUCKET_COUNT: usize = 1024;
357#[cfg(not(feature = "std"))]
358const BUCKET_COUNT: usize = 256;
359
360#[derive(Debug, Clone)]
363pub struct LiveStatistics {
364 buckets: [u64; BUCKET_COUNT],
365 min_val: u64,
366 max_val: u64,
367 sum: u64,
368 sum_sq: u64,
369 count: u64,
370 max_value: u64,
371}
372
373impl LiveStatistics {
374 pub fn new_with_max(max_value: u64) -> Self {
394 LiveStatistics {
395 buckets: [0; BUCKET_COUNT],
396 min_val: u64::MAX,
397 max_val: 0,
398 sum: 0,
399 sum_sq: 0,
400 count: 0,
401 max_value,
402 }
403 }
404
405 #[inline]
406 fn value_to_bucket(&self, value: u64) -> usize {
407 if value >= self.max_value {
408 BUCKET_COUNT - 1
409 } else {
410 ((value as u128 * BUCKET_COUNT as u128) / self.max_value as u128) as usize
411 }
412 }
413
414 #[inline]
415 pub fn min(&self) -> u64 {
416 if self.count == 0 { 0 } else { self.min_val }
417 }
418
419 #[inline]
420 pub fn max(&self) -> u64 {
421 self.max_val
422 }
423
424 #[inline]
425 pub fn mean(&self) -> f64 {
426 if self.count == 0 {
427 0.0
428 } else {
429 self.sum as f64 / self.count as f64
430 }
431 }
432
433 #[inline]
434 pub fn stdev(&self) -> f64 {
435 if self.count == 0 {
436 return 0.0;
437 }
438 let mean = self.mean();
439 let variance = (self.sum_sq as f64 / self.count as f64) - (mean * mean);
440 if variance < 0.0 {
441 return 0.0;
442 }
443 #[cfg(feature = "std")]
444 return variance.sqrt();
445 #[cfg(not(feature = "std"))]
446 return sqrt(variance);
447 }
448
449 #[inline]
450 pub fn percentile(&self, percentile: f64) -> u64 {
451 if self.count == 0 {
452 return 0;
453 }
454
455 let target_count = (self.count as f64 * percentile) as u64;
456 let mut accumulated = 0u64;
457
458 for (bucket_idx, &bucket_count) in self.buckets.iter().enumerate() {
459 accumulated += bucket_count;
460 if accumulated >= target_count {
461 let bucket_start = (bucket_idx as u64 * self.max_value) / BUCKET_COUNT as u64;
463 let bucket_end = ((bucket_idx + 1) as u64 * self.max_value) / BUCKET_COUNT as u64;
464 let bucket_fraction = if bucket_count > 0 {
465 (target_count - (accumulated - bucket_count)) as f64 / bucket_count as f64
466 } else {
467 0.5
468 };
469 return bucket_start
470 + ((bucket_end - bucket_start) as f64 * bucket_fraction) as u64;
471 }
472 }
473
474 self.max_val
475 }
476
477 #[inline]
479 pub fn record(&mut self, value: u64) {
480 if value < self.min_val {
481 self.min_val = value;
482 }
483 if value > self.max_val {
484 self.max_val = value;
485 }
486 self.sum += value;
487 self.sum_sq += value * value;
488 self.count += 1;
489
490 let bucket = self.value_to_bucket(value);
491 self.buckets[bucket] += 1;
492 }
493
494 #[inline]
495 pub fn len(&self) -> u64 {
496 self.count
497 }
498
499 #[inline]
500 pub fn is_empty(&self) -> bool {
501 self.count == 0
502 }
503
504 #[inline]
505 pub fn reset(&mut self) {
506 self.buckets.fill(0);
507 self.min_val = u64::MAX;
508 self.max_val = 0;
509 self.sum = 0;
510 self.sum_sq = 0;
511 self.count = 0;
512 }
513}
514
515#[derive(Debug, Clone)]
518pub struct CuDurationStatistics {
519 bare: LiveStatistics,
520 jitter: LiveStatistics,
521 last_value: CuDuration,
522}
523
524impl CuDurationStatistics {
525 pub fn new(max: CuDuration) -> Self {
526 let CuDuration(max) = max;
527 CuDurationStatistics {
528 bare: LiveStatistics::new_with_max(max),
529 jitter: LiveStatistics::new_with_max(max),
530 last_value: CuDuration::default(),
531 }
532 }
533
534 #[inline]
535 pub fn min(&self) -> CuDuration {
536 CuDuration(self.bare.min())
537 }
538
539 #[inline]
540 pub fn max(&self) -> CuDuration {
541 CuDuration(self.bare.max())
542 }
543
544 #[inline]
545 pub fn mean(&self) -> CuDuration {
546 CuDuration(self.bare.mean() as u64) }
548
549 #[inline]
550 pub fn percentile(&self, percentile: f64) -> CuDuration {
551 CuDuration(self.bare.percentile(percentile))
552 }
553
554 #[inline]
555 pub fn stddev(&self) -> CuDuration {
556 CuDuration(self.bare.stdev() as u64)
557 }
558
559 #[inline]
560 pub fn len(&self) -> u64 {
561 self.bare.len()
562 }
563
564 #[inline]
565 pub fn is_empty(&self) -> bool {
566 self.bare.len() == 0
567 }
568
569 #[inline]
570 pub fn jitter_min(&self) -> CuDuration {
571 CuDuration(self.jitter.min())
572 }
573
574 #[inline]
575 pub fn jitter_max(&self) -> CuDuration {
576 CuDuration(self.jitter.max())
577 }
578
579 #[inline]
580 pub fn jitter_mean(&self) -> CuDuration {
581 CuDuration(self.jitter.mean() as u64)
582 }
583
584 #[inline]
585 pub fn jitter_stddev(&self) -> CuDuration {
586 CuDuration(self.jitter.stdev() as u64)
587 }
588
589 #[inline]
590 pub fn jitter_percentile(&self, percentile: f64) -> CuDuration {
591 CuDuration(self.jitter.percentile(percentile))
592 }
593
594 #[inline]
595 pub fn record(&mut self, value: CuDuration) {
596 let CuDuration(nanos) = value;
597 if self.bare.is_empty() {
598 self.bare.record(nanos);
599 self.last_value = value;
600 return;
601 }
602 self.bare.record(nanos);
603 let CuDuration(last_nanos) = self.last_value;
604 self.jitter.record(nanos.abs_diff(last_nanos));
605 self.last_value = value;
606 }
607
608 #[inline]
609 pub fn reset(&mut self) {
610 self.bare.reset();
611 self.jitter.reset();
612 }
613}
614
615#[cfg(test)]
616mod tests {
617 use super::*;
618
619 #[test]
620 fn test_live_statistics_percentiles() {
621 let mut stats = LiveStatistics::new_with_max(1000);
622
623 for i in 0..100 {
625 stats.record(i);
626 }
627
628 assert_eq!(stats.len(), 100);
629 assert_eq!(stats.min(), 0);
630 assert_eq!(stats.max(), 99);
631 assert_eq!(stats.mean() as u64, 49); let p50 = stats.percentile(0.5);
635 let p90 = stats.percentile(0.90);
636 let p95 = stats.percentile(0.95);
637 let p99 = stats.percentile(0.99);
638
639 assert!((p50 as i64 - 49).abs() < 5, "p50={} expected ~49", p50);
641 assert!((p90 as i64 - 89).abs() < 5, "p90={} expected ~89", p90);
642 assert!((p95 as i64 - 94).abs() < 5, "p95={} expected ~94", p95);
643 assert!((p99 as i64 - 98).abs() < 5, "p99={} expected ~98", p99);
644 }
645
646 #[test]
647 fn test_duration_stats() {
648 let mut stats = CuDurationStatistics::new(CuDuration(1000));
649 stats.record(CuDuration(100));
650 stats.record(CuDuration(200));
651 stats.record(CuDuration(500));
652 stats.record(CuDuration(400));
653 assert_eq!(stats.min(), CuDuration(100));
654 assert_eq!(stats.max(), CuDuration(500));
655 assert_eq!(stats.mean(), CuDuration(300));
656 assert_eq!(stats.len(), 4);
657 assert_eq!(stats.jitter.len(), 3);
658 assert_eq!(stats.jitter_min(), CuDuration(100));
659 assert_eq!(stats.jitter_max(), CuDuration(300));
660 assert_eq!(stats.jitter_mean(), CuDuration((100 + 300 + 100) / 3));
661 stats.reset();
662 assert_eq!(stats.len(), 0);
663 }
664}