1use crate::config::CuConfig;
5use crate::config::{BridgeChannelConfigRepresentation, BridgeConfig, CuGraph, Flavor, NodeId};
6use crate::context::CuContext;
7use crate::cutask::CuMsgMetadata;
8use cu29_clock::CuDuration;
9#[allow(unused_imports)]
10use cu29_log::CuLogLevel;
11#[cfg(all(feature = "std", debug_assertions))]
12use cu29_log_runtime::{
13 format_message_only, register_live_log_listener, unregister_live_log_listener,
14};
15use cu29_traits::{CuError, CuResult};
16use serde_derive::{Deserialize, Serialize};
17
18#[cfg(not(feature = "std"))]
19extern crate alloc;
20
21#[cfg(feature = "std")]
22use std::sync::Arc;
23#[cfg(feature = "std")]
24use std::{collections::HashMap as Map, string::String, string::ToString, vec::Vec};
25
26#[cfg(not(feature = "std"))]
27use alloc::{collections::BTreeMap as Map, string::String, string::ToString, vec::Vec};
28#[cfg(not(target_has_atomic = "64"))]
29use spin::Mutex;
30
31#[cfg(not(feature = "std"))]
32mod imp {
33 pub use alloc::alloc::{GlobalAlloc, Layout};
34 #[cfg(target_has_atomic = "64")]
35 pub use core::sync::atomic::AtomicU64;
36 pub use core::sync::atomic::{AtomicUsize, Ordering};
37 pub use libm::sqrt;
38}
39
40#[cfg(feature = "std")]
41mod imp {
42 #[cfg(feature = "memory_monitoring")]
43 use super::CountingAlloc;
44 #[cfg(feature = "memory_monitoring")]
45 pub use std::alloc::System;
46 pub use std::alloc::{GlobalAlloc, Layout};
47 #[cfg(target_has_atomic = "64")]
48 pub use std::sync::atomic::AtomicU64;
49 pub use std::sync::atomic::{AtomicUsize, Ordering};
50 #[cfg(feature = "memory_monitoring")]
51 #[global_allocator]
52 pub static GLOBAL: CountingAlloc<System> = CountingAlloc::new(System);
53}
54
55use imp::*;
56
57#[cfg(all(feature = "std", debug_assertions))]
58fn format_timestamp(time: CuDuration) -> String {
59 let nanos = time.as_nanos();
61 let total_seconds = nanos / 1_000_000_000;
62 let hours = total_seconds / 3600;
63 let minutes = (total_seconds / 60) % 60;
64 let seconds = total_seconds % 60;
65 let fractional_1e4 = (nanos % 1_000_000_000) / 100_000;
66 format!("{hours:02}:{minutes:02}:{seconds:02}.{fractional_1e4:04}")
67}
68
69#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
71pub enum CuTaskState {
72 Start,
73 Preprocess,
74 Process,
75 Postprocess,
76 Stop,
77}
78
79#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
81pub struct ExecutionMarker {
82 pub component_id: usize,
84 pub step: CuTaskState,
86 pub culistid: Option<u64>,
88}
89
90#[derive(Debug)]
96pub struct RuntimeExecutionProbe {
97 component_id: AtomicUsize,
98 step: AtomicUsize,
99 #[cfg(target_has_atomic = "64")]
100 culistid: AtomicU64,
101 #[cfg(target_has_atomic = "64")]
102 culistid_present: AtomicUsize,
103 #[cfg(not(target_has_atomic = "64"))]
104 culistid: Mutex<Option<u64>>,
105 sequence: AtomicUsize,
106}
107
108impl Default for RuntimeExecutionProbe {
109 fn default() -> Self {
110 Self {
111 component_id: AtomicUsize::new(usize::MAX),
112 step: AtomicUsize::new(0),
113 #[cfg(target_has_atomic = "64")]
114 culistid: AtomicU64::new(0),
115 #[cfg(target_has_atomic = "64")]
116 culistid_present: AtomicUsize::new(0),
117 #[cfg(not(target_has_atomic = "64"))]
118 culistid: Mutex::new(None),
119 sequence: AtomicUsize::new(0),
120 }
121 }
122}
123
124impl RuntimeExecutionProbe {
125 #[inline]
126 pub fn record(&self, marker: ExecutionMarker) {
127 self.component_id
128 .store(marker.component_id, Ordering::Relaxed);
129 self.step
130 .store(task_state_to_usize(marker.step), Ordering::Relaxed);
131 #[cfg(target_has_atomic = "64")]
132 match marker.culistid {
133 Some(culistid) => {
134 self.culistid.store(culistid, Ordering::Relaxed);
135 self.culistid_present.store(1, Ordering::Relaxed);
136 }
137 None => {
138 self.culistid_present.store(0, Ordering::Relaxed);
139 }
140 }
141 #[cfg(not(target_has_atomic = "64"))]
142 {
143 *self.culistid.lock() = marker.culistid;
144 }
145 self.sequence.fetch_add(1, Ordering::Release);
146 }
147
148 #[inline]
149 pub fn sequence(&self) -> usize {
150 self.sequence.load(Ordering::Acquire)
151 }
152
153 #[inline]
154 pub fn marker(&self) -> Option<ExecutionMarker> {
155 loop {
158 let seq_before = self.sequence.load(Ordering::Acquire);
159 let component_id = self.component_id.load(Ordering::Relaxed);
160 let step = self.step.load(Ordering::Relaxed);
161 #[cfg(target_has_atomic = "64")]
162 let culistid_present = self.culistid_present.load(Ordering::Relaxed);
163 #[cfg(target_has_atomic = "64")]
164 let culistid_value = self.culistid.load(Ordering::Relaxed);
165 #[cfg(not(target_has_atomic = "64"))]
166 let culistid = *self.culistid.lock();
167 let seq_after = self.sequence.load(Ordering::Acquire);
168 if seq_before == seq_after {
169 if component_id == usize::MAX {
170 return None;
171 }
172 let step = usize_to_task_state(step);
173 #[cfg(target_has_atomic = "64")]
174 let culistid = if culistid_present == 0 {
175 None
176 } else {
177 Some(culistid_value)
178 };
179 return Some(ExecutionMarker {
180 component_id,
181 step,
182 culistid,
183 });
184 }
185 }
186 }
187}
188
189#[inline]
190const fn task_state_to_usize(step: CuTaskState) -> usize {
191 match step {
192 CuTaskState::Start => 0,
193 CuTaskState::Preprocess => 1,
194 CuTaskState::Process => 2,
195 CuTaskState::Postprocess => 3,
196 CuTaskState::Stop => 4,
197 }
198}
199
200#[inline]
201const fn usize_to_task_state(step: usize) -> CuTaskState {
202 match step {
203 0 => CuTaskState::Start,
204 1 => CuTaskState::Preprocess,
205 2 => CuTaskState::Process,
206 3 => CuTaskState::Postprocess,
207 _ => CuTaskState::Stop,
208 }
209}
210
211#[cfg(feature = "std")]
212pub type ExecutionProbeHandle = Arc<RuntimeExecutionProbe>;
213
214#[derive(Debug)]
216pub enum Decision {
217 Abort, Ignore, Shutdown, }
221
222fn merge_decision(lhs: Decision, rhs: Decision) -> Decision {
223 use Decision::{Abort, Ignore, Shutdown};
224 match (lhs, rhs) {
227 (Shutdown, _) | (_, Shutdown) => Shutdown,
228 (Abort, _) | (_, Abort) => Abort,
229 _ => Ignore,
230 }
231}
232
233#[derive(Debug, Clone, Copy, PartialEq, Eq)]
234pub enum ComponentKind {
235 Task,
236 Bridge,
237}
238
239#[derive(Debug, Clone)]
240pub struct MonitorNode {
241 pub id: String,
242 pub type_name: Option<String>,
243 pub kind: ComponentKind,
244 pub inputs: Vec<String>,
246 pub outputs: Vec<String>,
248}
249
250#[derive(Debug, Clone)]
251pub struct MonitorConnection {
252 pub src: String,
253 pub src_port: Option<String>,
254 pub dst: String,
255 pub dst_port: Option<String>,
256 pub msg: String,
257}
258
259#[derive(Debug, Clone, Default)]
260pub struct MonitorTopology {
261 pub nodes: Vec<MonitorNode>,
262 pub connections: Vec<MonitorConnection>,
263}
264
265#[derive(Debug, Clone, Copy, Default)]
266pub struct CopperListInfo {
267 pub size_bytes: usize,
268 pub count: usize,
269}
270
271impl CopperListInfo {
272 pub const fn new(size_bytes: usize, count: usize) -> Self {
273 Self { size_bytes, count }
274 }
275}
276
277#[derive(Debug, Clone, Copy, Default)]
279pub struct CopperListIoStats {
280 pub raw_culist_bytes: u64,
282 pub handle_bytes: u64,
284 pub encoded_culist_bytes: u64,
286 pub keyframe_bytes: u64,
288 pub structured_log_bytes_total: u64,
290 pub culistid: u64,
292}
293
294pub trait CuPayloadSize {
297 fn raw_bytes(&self) -> usize {
299 core::mem::size_of_val(self)
300 }
301
302 fn handle_bytes(&self) -> usize {
304 0
305 }
306}
307
308impl<T> CuPayloadSize for T
309where
310 T: crate::cutask::CuMsgPayload,
311{
312 fn raw_bytes(&self) -> usize {
313 core::mem::size_of::<T>()
314 }
315}
316
317#[derive(Default, Debug, Clone, Copy)]
318struct NodeIoUsage {
319 has_incoming: bool,
320 has_outgoing: bool,
321}
322
323fn collect_output_ports(graph: &CuGraph, node_id: NodeId) -> Vec<(String, String)> {
324 let mut edge_ids = graph.get_src_edges(node_id).unwrap_or_default();
325 edge_ids.sort();
326
327 let mut outputs = Vec::new();
328 let mut seen = Vec::new();
329 let mut port_idx = 0usize;
330 for edge_id in edge_ids {
331 let Some(edge) = graph.edge(edge_id) else {
332 continue;
333 };
334 if seen.iter().any(|msg| msg == &edge.msg) {
335 continue;
336 }
337 seen.push(edge.msg.clone());
338 let mut port_label = String::from("out");
339 port_label.push_str(&port_idx.to_string());
340 port_label.push_str(": ");
341 port_label.push_str(edge.msg.as_str());
342 outputs.push((edge.msg.clone(), port_label));
343 port_idx += 1;
344 }
345 outputs
346}
347
348pub fn build_monitor_topology(
350 config: &CuConfig,
351 mission: Option<&str>,
352) -> CuResult<MonitorTopology> {
353 let graph = config.get_graph(mission)?;
354 let mut nodes: Map<String, MonitorNode> = Map::new();
355 let mut io_usage: Map<String, NodeIoUsage> = Map::new();
356 let mut output_port_lookup: Map<String, Map<String, String>> = Map::new();
357
358 let mut bridge_lookup: Map<&str, &BridgeConfig> = Map::new();
359 for bridge in &config.bridges {
360 bridge_lookup.insert(bridge.id.as_str(), bridge);
361 }
362
363 for cnx in graph.edges() {
364 io_usage.entry(cnx.src.clone()).or_default().has_outgoing = true;
365 io_usage.entry(cnx.dst.clone()).or_default().has_incoming = true;
366 }
367
368 for (_, node) in graph.get_all_nodes() {
369 let kind = match node.get_flavor() {
370 Flavor::Bridge => ComponentKind::Bridge,
371 _ => ComponentKind::Task,
372 };
373 let node_id = node.get_id();
374
375 let mut inputs = Vec::new();
376 let mut outputs = Vec::new();
377 if kind == ComponentKind::Bridge {
378 if let Some(bridge) = bridge_lookup.get(node_id.as_str()) {
379 for ch in &bridge.channels {
380 match ch {
381 BridgeChannelConfigRepresentation::Rx { id, .. } => {
382 outputs.push(id.clone())
383 }
384 BridgeChannelConfigRepresentation::Tx { id, .. } => inputs.push(id.clone()),
385 }
386 }
387 }
388 } else {
389 let usage = io_usage.get(node_id.as_str()).cloned().unwrap_or_default();
390 if usage.has_incoming || !usage.has_outgoing {
391 inputs.push("in".to_string());
392 }
393 if usage.has_outgoing {
394 if let Some(node_idx) = graph.get_node_id_by_name(node_id.as_str()) {
395 let ports = collect_output_ports(graph, node_idx);
396 let mut port_map: Map<String, String> = Map::new();
397 for (msg_type, label) in ports {
398 port_map.insert(msg_type, label.clone());
399 outputs.push(label);
400 }
401 output_port_lookup.insert(node_id.clone(), port_map);
402 }
403 } else if !usage.has_incoming {
404 outputs.push("out".to_string());
405 }
406 }
407
408 nodes.insert(
409 node_id.clone(),
410 MonitorNode {
411 id: node_id,
412 type_name: Some(node.get_type().to_string()),
413 kind,
414 inputs,
415 outputs,
416 },
417 );
418 }
419
420 let mut connections = Vec::new();
421 for cnx in graph.edges() {
422 let src = cnx.src.clone();
423 let dst = cnx.dst.clone();
424
425 let src_port = cnx.src_channel.clone().or_else(|| {
426 output_port_lookup
427 .get(&src)
428 .and_then(|ports| ports.get(&cnx.msg).cloned())
429 .or_else(|| {
430 nodes
431 .get(&src)
432 .and_then(|node| node.outputs.first().cloned())
433 })
434 });
435 let dst_port = cnx.dst_channel.clone().or_else(|| {
436 nodes
437 .get(&dst)
438 .and_then(|node| node.inputs.first().cloned())
439 });
440
441 connections.push(MonitorConnection {
442 src,
443 src_port,
444 dst,
445 dst_port,
446 msg: cnx.msg.clone(),
447 });
448 }
449
450 Ok(MonitorTopology {
451 nodes: nodes.into_values().collect(),
452 connections,
453 })
454}
455
456pub trait CuMonitor: Sized {
458 fn new(config: &CuConfig, taskids: &'static [&'static str]) -> CuResult<Self>
459 where
460 Self: Sized;
461
462 fn set_topology(&mut self, _topology: MonitorTopology) {}
463
464 fn set_copperlist_info(&mut self, _info: CopperListInfo) {}
465
466 #[cfg(feature = "std")]
467 fn set_execution_probe(&mut self, _probe: ExecutionProbeHandle) {}
468
469 fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
470 Ok(())
471 }
472
473 fn process_copperlist(&self, _ctx: &CuContext, msgs: &[&CuMsgMetadata]) -> CuResult<()>;
475
476 fn observe_copperlist_io(&self, _stats: CopperListIoStats) {}
478
479 fn process_error(&self, taskid: usize, step: CuTaskState, error: &CuError) -> Decision;
481
482 fn process_panic(&self, _panic_message: &str) {}
484
485 fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
487 Ok(())
488 }
489}
490
491pub struct NoMonitor {}
494impl CuMonitor for NoMonitor {
495 fn new(_config: &CuConfig, _taskids: &'static [&'static str]) -> CuResult<Self> {
496 Ok(NoMonitor {})
497 }
498
499 fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
500 #[cfg(all(feature = "std", debug_assertions))]
501 register_live_log_listener(|entry, format_str, param_names| {
502 let params: Vec<String> = entry.params.iter().map(|v| v.to_string()).collect();
503 let named: Map<String, String> = param_names
504 .iter()
505 .zip(params.iter())
506 .map(|(k, v)| (k.to_string(), v.clone()))
507 .collect();
508
509 if let Ok(msg) = format_message_only(format_str, params.as_slice(), &named) {
510 let ts = format_timestamp(entry.time);
511 println!("{} [{:?}] {}", ts, entry.level, msg);
512 }
513 });
514 Ok(())
515 }
516
517 fn process_copperlist(&self, _ctx: &CuContext, _msgs: &[&CuMsgMetadata]) -> CuResult<()> {
518 Ok(())
520 }
521
522 fn process_error(&self, _taskid: usize, _step: CuTaskState, _error: &CuError) -> Decision {
523 Decision::Ignore
525 }
526
527 fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
528 #[cfg(all(feature = "std", debug_assertions))]
529 unregister_live_log_listener();
530 Ok(())
531 }
532}
533
534macro_rules! impl_monitor_tuple {
535 ($($idx:tt => $name:ident),+) => {
536 impl<$($name: CuMonitor),+> CuMonitor for ($($name,)+) {
537 fn new(config: &CuConfig, taskids: &'static [&'static str]) -> CuResult<Self>
538 where
539 Self: Sized,
540 {
541 Ok(($($name::new(config, taskids)?,)+))
542 }
543
544 fn set_topology(&mut self, topology: MonitorTopology) {
545 $(self.$idx.set_topology(topology.clone());)+
546 }
547
548 fn set_copperlist_info(&mut self, info: CopperListInfo) {
549 $(self.$idx.set_copperlist_info(info);)+
550 }
551
552 #[cfg(feature = "std")]
553 fn set_execution_probe(&mut self, probe: ExecutionProbeHandle) {
554 $(self.$idx.set_execution_probe(probe.clone());)+
555 }
556
557 fn start(&mut self, ctx: &CuContext) -> CuResult<()> {
558 $(self.$idx.start(ctx)?;)+
559 Ok(())
560 }
561
562 fn process_copperlist(&self, ctx: &CuContext, msgs: &[&CuMsgMetadata]) -> CuResult<()> {
563 $(self.$idx.process_copperlist(ctx, msgs)?;)+
564 Ok(())
565 }
566
567 fn observe_copperlist_io(&self, stats: CopperListIoStats) {
568 $(self.$idx.observe_copperlist_io(stats);)+
569 }
570
571 fn process_error(&self, taskid: usize, step: CuTaskState, error: &CuError) -> Decision {
572 let mut decision = Decision::Ignore;
573 $(decision = merge_decision(decision, self.$idx.process_error(taskid, step, error));)+
574 decision
575 }
576
577 fn process_panic(&self, panic_message: &str) {
578 $(self.$idx.process_panic(panic_message);)+
579 }
580
581 fn stop(&mut self, ctx: &CuContext) -> CuResult<()> {
582 $(self.$idx.stop(ctx)?;)+
583 Ok(())
584 }
585 }
586 };
587}
588
589impl_monitor_tuple!(0 => M0, 1 => M1);
590impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2);
591impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2, 3 => M3);
592impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2, 3 => M3, 4 => M4);
593impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2, 3 => M3, 4 => M4, 5 => M5);
594
595#[cfg(feature = "std")]
596pub fn panic_payload_to_string(payload: &(dyn core::any::Any + Send)) -> String {
597 if let Some(msg) = payload.downcast_ref::<&str>() {
598 (*msg).to_string()
599 } else if let Some(msg) = payload.downcast_ref::<String>() {
600 msg.clone()
601 } else {
602 "panic with non-string payload".to_string()
603 }
604}
605
606pub struct CountingAlloc<A: GlobalAlloc> {
608 inner: A,
609 allocated: AtomicUsize,
610 deallocated: AtomicUsize,
611}
612
613impl<A: GlobalAlloc> CountingAlloc<A> {
614 pub const fn new(inner: A) -> Self {
615 CountingAlloc {
616 inner,
617 allocated: AtomicUsize::new(0),
618 deallocated: AtomicUsize::new(0),
619 }
620 }
621
622 pub fn allocated(&self) -> usize {
623 self.allocated.load(Ordering::SeqCst)
624 }
625
626 pub fn deallocated(&self) -> usize {
627 self.deallocated.load(Ordering::SeqCst)
628 }
629
630 pub fn reset(&self) {
631 self.allocated.store(0, Ordering::SeqCst);
632 self.deallocated.store(0, Ordering::SeqCst);
633 }
634}
635
636unsafe impl<A: GlobalAlloc> GlobalAlloc for CountingAlloc<A> {
638 unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
640 let p = unsafe { self.inner.alloc(layout) };
642 if !p.is_null() {
643 self.allocated.fetch_add(layout.size(), Ordering::SeqCst);
644 }
645 p
646 }
647
648 unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
650 unsafe { self.inner.dealloc(ptr, layout) }
652 self.deallocated.fetch_add(layout.size(), Ordering::SeqCst);
653 }
654}
655
656#[cfg(feature = "memory_monitoring")]
658pub struct ScopedAllocCounter {
659 bf_allocated: usize,
660 bf_deallocated: usize,
661}
662
663#[cfg(feature = "memory_monitoring")]
664impl Default for ScopedAllocCounter {
665 fn default() -> Self {
666 Self::new()
667 }
668}
669
670#[cfg(feature = "memory_monitoring")]
671impl ScopedAllocCounter {
672 pub fn new() -> Self {
673 ScopedAllocCounter {
674 bf_allocated: GLOBAL.allocated(),
675 bf_deallocated: GLOBAL.deallocated(),
676 }
677 }
678
679 pub fn allocated(&self) -> usize {
691 GLOBAL.allocated() - self.bf_allocated
692 }
693
694 pub fn deallocated(&self) -> usize {
707 GLOBAL.deallocated() - self.bf_deallocated
708 }
709}
710
711#[cfg(feature = "memory_monitoring")]
713impl Drop for ScopedAllocCounter {
714 fn drop(&mut self) {
715 let _allocated = GLOBAL.allocated() - self.bf_allocated;
716 let _deallocated = GLOBAL.deallocated() - self.bf_deallocated;
717 }
724}
725
726#[cfg(feature = "std")]
727const BUCKET_COUNT: usize = 1024;
728#[cfg(not(feature = "std"))]
729const BUCKET_COUNT: usize = 256;
730
731#[derive(Debug, Clone)]
734pub struct LiveStatistics {
735 buckets: [u64; BUCKET_COUNT],
736 min_val: u64,
737 max_val: u64,
738 sum: u64,
739 sum_sq: u64,
740 count: u64,
741 max_value: u64,
742}
743
744impl LiveStatistics {
745 pub fn new_with_max(max_value: u64) -> Self {
765 LiveStatistics {
766 buckets: [0; BUCKET_COUNT],
767 min_val: u64::MAX,
768 max_val: 0,
769 sum: 0,
770 sum_sq: 0,
771 count: 0,
772 max_value,
773 }
774 }
775
776 #[inline]
777 fn value_to_bucket(&self, value: u64) -> usize {
778 if value >= self.max_value {
779 BUCKET_COUNT - 1
780 } else {
781 ((value as u128 * BUCKET_COUNT as u128) / self.max_value as u128) as usize
782 }
783 }
784
785 #[inline]
786 pub fn min(&self) -> u64 {
787 if self.count == 0 { 0 } else { self.min_val }
788 }
789
790 #[inline]
791 pub fn max(&self) -> u64 {
792 self.max_val
793 }
794
795 #[inline]
796 pub fn mean(&self) -> f64 {
797 if self.count == 0 {
798 0.0
799 } else {
800 self.sum as f64 / self.count as f64
801 }
802 }
803
804 #[inline]
805 pub fn stdev(&self) -> f64 {
806 if self.count == 0 {
807 return 0.0;
808 }
809 let mean = self.mean();
810 let variance = (self.sum_sq as f64 / self.count as f64) - (mean * mean);
811 if variance < 0.0 {
812 return 0.0;
813 }
814 #[cfg(feature = "std")]
815 return variance.sqrt();
816 #[cfg(not(feature = "std"))]
817 return sqrt(variance);
818 }
819
820 #[inline]
821 pub fn percentile(&self, percentile: f64) -> u64 {
822 if self.count == 0 {
823 return 0;
824 }
825
826 let target_count = (self.count as f64 * percentile) as u64;
827 let mut accumulated = 0u64;
828
829 for (bucket_idx, &bucket_count) in self.buckets.iter().enumerate() {
830 accumulated += bucket_count;
831 if accumulated >= target_count {
832 let bucket_start = (bucket_idx as u64 * self.max_value) / BUCKET_COUNT as u64;
834 let bucket_end = ((bucket_idx + 1) as u64 * self.max_value) / BUCKET_COUNT as u64;
835 let bucket_fraction = if bucket_count > 0 {
836 (target_count - (accumulated - bucket_count)) as f64 / bucket_count as f64
837 } else {
838 0.5
839 };
840 return bucket_start
841 + ((bucket_end - bucket_start) as f64 * bucket_fraction) as u64;
842 }
843 }
844
845 self.max_val
846 }
847
848 #[inline]
850 pub fn record(&mut self, value: u64) {
851 if value < self.min_val {
852 self.min_val = value;
853 }
854 if value > self.max_val {
855 self.max_val = value;
856 }
857 self.sum += value;
858 self.sum_sq += value * value;
859 self.count += 1;
860
861 let bucket = self.value_to_bucket(value);
862 self.buckets[bucket] += 1;
863 }
864
865 #[inline]
866 pub fn len(&self) -> u64 {
867 self.count
868 }
869
870 #[inline]
871 pub fn is_empty(&self) -> bool {
872 self.count == 0
873 }
874
875 #[inline]
876 pub fn reset(&mut self) {
877 self.buckets.fill(0);
878 self.min_val = u64::MAX;
879 self.max_val = 0;
880 self.sum = 0;
881 self.sum_sq = 0;
882 self.count = 0;
883 }
884}
885
886#[derive(Debug, Clone)]
889pub struct CuDurationStatistics {
890 bare: LiveStatistics,
891 jitter: LiveStatistics,
892 last_value: CuDuration,
893}
894
895impl CuDurationStatistics {
896 pub fn new(max: CuDuration) -> Self {
897 let CuDuration(max) = max;
898 CuDurationStatistics {
899 bare: LiveStatistics::new_with_max(max),
900 jitter: LiveStatistics::new_with_max(max),
901 last_value: CuDuration::default(),
902 }
903 }
904
905 #[inline]
906 pub fn min(&self) -> CuDuration {
907 CuDuration(self.bare.min())
908 }
909
910 #[inline]
911 pub fn max(&self) -> CuDuration {
912 CuDuration(self.bare.max())
913 }
914
915 #[inline]
916 pub fn mean(&self) -> CuDuration {
917 CuDuration(self.bare.mean() as u64) }
919
920 #[inline]
921 pub fn percentile(&self, percentile: f64) -> CuDuration {
922 CuDuration(self.bare.percentile(percentile))
923 }
924
925 #[inline]
926 pub fn stddev(&self) -> CuDuration {
927 CuDuration(self.bare.stdev() as u64)
928 }
929
930 #[inline]
931 pub fn len(&self) -> u64 {
932 self.bare.len()
933 }
934
935 #[inline]
936 pub fn is_empty(&self) -> bool {
937 self.bare.len() == 0
938 }
939
940 #[inline]
941 pub fn jitter_min(&self) -> CuDuration {
942 CuDuration(self.jitter.min())
943 }
944
945 #[inline]
946 pub fn jitter_max(&self) -> CuDuration {
947 CuDuration(self.jitter.max())
948 }
949
950 #[inline]
951 pub fn jitter_mean(&self) -> CuDuration {
952 CuDuration(self.jitter.mean() as u64)
953 }
954
955 #[inline]
956 pub fn jitter_stddev(&self) -> CuDuration {
957 CuDuration(self.jitter.stdev() as u64)
958 }
959
960 #[inline]
961 pub fn jitter_percentile(&self, percentile: f64) -> CuDuration {
962 CuDuration(self.jitter.percentile(percentile))
963 }
964
965 #[inline]
966 pub fn record(&mut self, value: CuDuration) {
967 let CuDuration(nanos) = value;
968 if self.bare.is_empty() {
969 self.bare.record(nanos);
970 self.last_value = value;
971 return;
972 }
973 self.bare.record(nanos);
974 let CuDuration(last_nanos) = self.last_value;
975 self.jitter.record(nanos.abs_diff(last_nanos));
976 self.last_value = value;
977 }
978
979 #[inline]
980 pub fn reset(&mut self) {
981 self.bare.reset();
982 self.jitter.reset();
983 }
984}
985
986#[cfg(test)]
987mod tests {
988 use super::*;
989 use core::sync::atomic::{AtomicUsize, Ordering};
990 #[cfg(feature = "std")]
991 use std::sync::Arc;
992
993 #[derive(Clone, Copy)]
994 enum TestDecision {
995 Ignore,
996 Abort,
997 Shutdown,
998 }
999
1000 struct TestMonitor {
1001 decision: TestDecision,
1002 copperlist_calls: AtomicUsize,
1003 panic_calls: AtomicUsize,
1004 #[cfg(feature = "std")]
1005 probe_calls: AtomicUsize,
1006 }
1007
1008 impl TestMonitor {
1009 fn new_with(decision: TestDecision) -> Self {
1010 Self {
1011 decision,
1012 copperlist_calls: AtomicUsize::new(0),
1013 panic_calls: AtomicUsize::new(0),
1014 #[cfg(feature = "std")]
1015 probe_calls: AtomicUsize::new(0),
1016 }
1017 }
1018 }
1019
1020 impl CuMonitor for TestMonitor {
1021 fn new(_config: &CuConfig, _taskids: &'static [&'static str]) -> CuResult<Self> {
1022 Ok(Self::new_with(TestDecision::Ignore))
1023 }
1024
1025 #[cfg(feature = "std")]
1026 fn set_execution_probe(&mut self, _probe: ExecutionProbeHandle) {
1027 self.probe_calls.fetch_add(1, Ordering::SeqCst);
1028 }
1029
1030 fn process_copperlist(&self, _ctx: &CuContext, _msgs: &[&CuMsgMetadata]) -> CuResult<()> {
1031 self.copperlist_calls.fetch_add(1, Ordering::SeqCst);
1032 Ok(())
1033 }
1034
1035 fn process_error(&self, _taskid: usize, _step: CuTaskState, _error: &CuError) -> Decision {
1036 match self.decision {
1037 TestDecision::Ignore => Decision::Ignore,
1038 TestDecision::Abort => Decision::Abort,
1039 TestDecision::Shutdown => Decision::Shutdown,
1040 }
1041 }
1042
1043 fn process_panic(&self, _panic_message: &str) {
1044 self.panic_calls.fetch_add(1, Ordering::SeqCst);
1045 }
1046 }
1047
1048 #[test]
1049 fn test_live_statistics_percentiles() {
1050 let mut stats = LiveStatistics::new_with_max(1000);
1051
1052 for i in 0..100 {
1054 stats.record(i);
1055 }
1056
1057 assert_eq!(stats.len(), 100);
1058 assert_eq!(stats.min(), 0);
1059 assert_eq!(stats.max(), 99);
1060 assert_eq!(stats.mean() as u64, 49); let p50 = stats.percentile(0.5);
1064 let p90 = stats.percentile(0.90);
1065 let p95 = stats.percentile(0.95);
1066 let p99 = stats.percentile(0.99);
1067
1068 assert!((p50 as i64 - 49).abs() < 5, "p50={} expected ~49", p50);
1070 assert!((p90 as i64 - 89).abs() < 5, "p90={} expected ~89", p90);
1071 assert!((p95 as i64 - 94).abs() < 5, "p95={} expected ~94", p95);
1072 assert!((p99 as i64 - 98).abs() < 5, "p99={} expected ~98", p99);
1073 }
1074
1075 #[test]
1076 fn test_duration_stats() {
1077 let mut stats = CuDurationStatistics::new(CuDuration(1000));
1078 stats.record(CuDuration(100));
1079 stats.record(CuDuration(200));
1080 stats.record(CuDuration(500));
1081 stats.record(CuDuration(400));
1082 assert_eq!(stats.min(), CuDuration(100));
1083 assert_eq!(stats.max(), CuDuration(500));
1084 assert_eq!(stats.mean(), CuDuration(300));
1085 assert_eq!(stats.len(), 4);
1086 assert_eq!(stats.jitter.len(), 3);
1087 assert_eq!(stats.jitter_min(), CuDuration(100));
1088 assert_eq!(stats.jitter_max(), CuDuration(300));
1089 assert_eq!(stats.jitter_mean(), CuDuration((100 + 300 + 100) / 3));
1090 stats.reset();
1091 assert_eq!(stats.len(), 0);
1092 }
1093
1094 #[test]
1095 fn tuple_monitor_merges_contradictory_decisions_with_strictest_wins() {
1096 let err = CuError::from("boom");
1097
1098 let two = (
1099 TestMonitor::new_with(TestDecision::Ignore),
1100 TestMonitor::new_with(TestDecision::Shutdown),
1101 );
1102 assert!(matches!(
1103 two.process_error(0, CuTaskState::Process, &err),
1104 Decision::Shutdown
1105 ));
1106
1107 let two = (
1108 TestMonitor::new_with(TestDecision::Ignore),
1109 TestMonitor::new_with(TestDecision::Abort),
1110 );
1111 assert!(matches!(
1112 two.process_error(0, CuTaskState::Process, &err),
1113 Decision::Abort
1114 ));
1115 }
1116
1117 #[test]
1118 fn tuple_monitor_fans_out_callbacks() {
1119 let left = TestMonitor::new_with(TestDecision::Ignore);
1120 let right = TestMonitor::new_with(TestDecision::Ignore);
1121 let mut monitors = (left, right);
1122 let (ctx, _clock_control) = CuContext::new_mock_clock();
1123
1124 #[cfg(feature = "std")]
1125 monitors.set_execution_probe(Arc::new(RuntimeExecutionProbe::default()));
1126 monitors
1127 .process_copperlist(&ctx, &[])
1128 .expect("process_copperlist should fan out");
1129 monitors.process_panic("panic marker");
1130
1131 assert_eq!(monitors.0.copperlist_calls.load(Ordering::SeqCst), 1);
1132 assert_eq!(monitors.1.copperlist_calls.load(Ordering::SeqCst), 1);
1133 assert_eq!(monitors.0.panic_calls.load(Ordering::SeqCst), 1);
1134 assert_eq!(monitors.1.panic_calls.load(Ordering::SeqCst), 1);
1135 #[cfg(feature = "std")]
1136 {
1137 assert_eq!(monitors.0.probe_calls.load(Ordering::SeqCst), 1);
1138 assert_eq!(monitors.1.probe_calls.load(Ordering::SeqCst), 1);
1139 }
1140 }
1141
1142 #[test]
1143 fn runtime_execution_probe_roundtrip_marker() {
1144 let probe = RuntimeExecutionProbe::default();
1145 assert!(probe.marker().is_none());
1146 assert_eq!(probe.sequence(), 0);
1147
1148 probe.record(ExecutionMarker {
1149 component_id: 7,
1150 step: CuTaskState::Process,
1151 culistid: Some(42),
1152 });
1153
1154 let marker = probe.marker().expect("marker should be available");
1155 assert_eq!(marker.component_id, 7);
1156 assert!(matches!(marker.step, CuTaskState::Process));
1157 assert_eq!(marker.culistid, Some(42));
1158 assert_eq!(probe.sequence(), 1);
1159 }
1160}