1use crate::config::CuConfig;
5use crate::config::{
6 BridgeChannelConfigRepresentation, BridgeConfig, ComponentConfig, CuGraph, Flavor, NodeId,
7};
8use crate::context::CuContext;
9use crate::cutask::CuMsgMetadata;
10use bincode::Encode;
11use bincode::config::standard;
12use bincode::enc::EncoderImpl;
13use bincode::enc::write::SizeWriter;
14use compact_str::CompactString;
15use cu29_clock::CuDuration;
16#[allow(unused_imports)]
17use cu29_log::CuLogLevel;
18#[cfg(all(feature = "std", debug_assertions))]
19use cu29_log_runtime::{
20 format_message_only, register_live_log_listener, unregister_live_log_listener,
21};
22use cu29_traits::{
23 CuError, CuResult, ObservedWriter, abort_observed_encode, begin_observed_encode,
24 finish_observed_encode,
25};
26use portable_atomic::{
27 AtomicBool as PortableAtomicBool, AtomicU64 as PortableAtomicU64, Ordering as PortableOrdering,
28};
29use serde_derive::{Deserialize, Serialize};
30
31#[cfg(not(feature = "std"))]
32extern crate alloc;
33
34#[cfg(feature = "std")]
35use core::cell::Cell;
36#[cfg(feature = "std")]
37use std::backtrace::Backtrace;
38#[cfg(feature = "std")]
39use std::fs::File;
40#[cfg(feature = "std")]
41use std::io::Write;
42#[cfg(feature = "std")]
43use std::panic::PanicHookInfo;
44#[cfg(feature = "std")]
45use std::sync::{Arc, Mutex as StdMutex, OnceLock};
46#[cfg(feature = "std")]
47use std::thread_local;
48#[cfg(feature = "std")]
49use std::time::{SystemTime, UNIX_EPOCH};
50#[cfg(feature = "std")]
51use std::{collections::HashMap as Map, string::String, string::ToString, vec::Vec};
52
53#[cfg(not(feature = "std"))]
54use alloc::{collections::BTreeMap as Map, string::String, string::ToString, vec::Vec};
55#[cfg(not(target_has_atomic = "64"))]
56use spin::Mutex;
57#[cfg(not(feature = "std"))]
58use spin::Mutex as SpinMutex;
59
60#[cfg(not(feature = "std"))]
61mod imp {
62 pub use alloc::alloc::{GlobalAlloc, Layout};
63 #[cfg(target_has_atomic = "64")]
64 pub use core::sync::atomic::AtomicU64;
65 pub use core::sync::atomic::{AtomicUsize, Ordering};
66 pub use libm::sqrt;
67}
68
69#[cfg(feature = "std")]
70mod imp {
71 #[cfg(feature = "memory_monitoring")]
72 use super::CountingAlloc;
73 #[cfg(feature = "memory_monitoring")]
74 pub use std::alloc::System;
75 pub use std::alloc::{GlobalAlloc, Layout};
76 #[cfg(target_has_atomic = "64")]
77 pub use std::sync::atomic::AtomicU64;
78 pub use std::sync::atomic::{AtomicUsize, Ordering};
79 #[cfg(feature = "memory_monitoring")]
80 #[global_allocator]
81 pub static GLOBAL: CountingAlloc<System> = CountingAlloc::new(System);
82}
83
84use imp::*;
85
86#[cfg(all(feature = "std", debug_assertions))]
87fn format_timestamp(time: CuDuration) -> String {
88 let nanos = time.as_nanos();
90 let total_seconds = nanos / 1_000_000_000;
91 let hours = total_seconds / 3600;
92 let minutes = (total_seconds / 60) % 60;
93 let seconds = total_seconds % 60;
94 let fractional_1e4 = (nanos % 1_000_000_000) / 100_000;
95 format!("{hours:02}:{minutes:02}:{seconds:02}.{fractional_1e4:04}")
96}
97
98#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
100pub enum CuComponentState {
101 Start,
102 Preprocess,
103 Process,
104 Postprocess,
105 Stop,
106}
107
108#[repr(transparent)]
110#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
111pub struct ComponentId(usize);
112
113impl ComponentId {
114 pub const INVALID: Self = Self(usize::MAX);
115
116 #[inline]
117 pub const fn new(index: usize) -> Self {
118 Self(index)
119 }
120
121 #[inline]
122 pub const fn index(self) -> usize {
123 self.0
124 }
125}
126
127impl core::fmt::Display for ComponentId {
128 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
129 self.0.fmt(f)
130 }
131}
132
133impl From<ComponentId> for usize {
134 fn from(value: ComponentId) -> Self {
135 value.index()
136 }
137}
138
139#[repr(transparent)]
141#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
142pub struct CuListSlot(usize);
143
144impl CuListSlot {
145 #[inline]
146 pub const fn new(index: usize) -> Self {
147 Self(index)
148 }
149
150 #[inline]
151 pub const fn index(self) -> usize {
152 self.0
153 }
154}
155
156impl From<CuListSlot> for usize {
157 fn from(value: CuListSlot) -> Self {
158 value.index()
159 }
160}
161
162#[derive(Debug, Clone, Copy)]
166pub struct CopperListLayout {
167 components: &'static [MonitorComponentMetadata],
168 slot_to_component: &'static [ComponentId],
169}
170
171impl CopperListLayout {
172 #[inline]
173 pub const fn new(
174 components: &'static [MonitorComponentMetadata],
175 slot_to_component: &'static [ComponentId],
176 ) -> Self {
177 Self {
178 components,
179 slot_to_component,
180 }
181 }
182
183 #[inline]
184 pub const fn components(self) -> &'static [MonitorComponentMetadata] {
185 self.components
186 }
187
188 #[inline]
189 pub const fn component_count(self) -> usize {
190 self.components.len()
191 }
192
193 #[inline]
194 pub const fn culist_slot_count(self) -> usize {
195 self.slot_to_component.len()
196 }
197
198 #[inline]
199 pub fn component(self, id: ComponentId) -> &'static MonitorComponentMetadata {
200 &self.components[id.index()]
201 }
202
203 #[inline]
204 pub fn component_for_slot(self, culist_slot: CuListSlot) -> ComponentId {
205 self.slot_to_component[culist_slot.index()]
206 }
207
208 #[inline]
209 pub const fn slot_to_component(self) -> &'static [ComponentId] {
210 self.slot_to_component
211 }
212
213 #[inline]
214 pub fn view<'a>(self, msgs: &'a [&'a CuMsgMetadata]) -> CopperListView<'a> {
215 CopperListView::new(self, msgs)
216 }
217}
218
219#[derive(Debug, Clone, Copy)]
221pub struct CopperListView<'a> {
222 layout: CopperListLayout,
223 msgs: &'a [&'a CuMsgMetadata],
224}
225
226impl<'a> CopperListView<'a> {
227 #[inline]
228 pub fn new(layout: CopperListLayout, msgs: &'a [&'a CuMsgMetadata]) -> Self {
229 assert_eq!(
230 msgs.len(),
231 layout.culist_slot_count(),
232 "invalid monitor CopperList view: msgs len {} != slot mapping len {}",
233 msgs.len(),
234 layout.culist_slot_count()
235 );
236 Self { layout, msgs }
237 }
238
239 #[inline]
240 pub const fn layout(self) -> CopperListLayout {
241 self.layout
242 }
243
244 #[inline]
245 pub const fn msgs(self) -> &'a [&'a CuMsgMetadata] {
246 self.msgs
247 }
248
249 #[inline]
250 pub const fn len(self) -> usize {
251 self.msgs.len()
252 }
253
254 #[inline]
255 pub const fn is_empty(self) -> bool {
256 self.msgs.is_empty()
257 }
258
259 #[inline]
260 pub fn entry(self, culist_slot: CuListSlot) -> CopperListEntry<'a> {
261 let index = culist_slot.index();
262 CopperListEntry {
263 culist_slot,
264 component_id: self.layout.component_for_slot(culist_slot),
265 msg: self.msgs[index],
266 }
267 }
268
269 pub fn entries(self) -> impl Iterator<Item = CopperListEntry<'a>> + 'a {
270 self.msgs.iter().enumerate().map(move |(idx, msg)| {
271 let culist_slot = CuListSlot::new(idx);
272 CopperListEntry {
273 culist_slot,
274 component_id: self.layout.component_for_slot(culist_slot),
275 msg,
276 }
277 })
278 }
279}
280
281#[derive(Debug, Clone, Copy)]
283pub struct CopperListEntry<'a> {
284 pub culist_slot: CuListSlot,
285 pub component_id: ComponentId,
286 pub msg: &'a CuMsgMetadata,
287}
288
289impl<'a> CopperListEntry<'a> {
290 #[inline]
291 pub fn component(self, layout: CopperListLayout) -> &'static MonitorComponentMetadata {
292 layout.component(self.component_id)
293 }
294
295 #[inline]
296 pub fn component_type(self, layout: CopperListLayout) -> ComponentType {
297 layout.component(self.component_id).kind()
298 }
299}
300
301#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
303pub struct ExecutionMarker {
304 pub component_id: ComponentId,
306 pub step: CuComponentState,
308 pub culistid: Option<u64>,
310}
311
312#[derive(Debug)]
318pub struct RuntimeExecutionProbe {
319 component_id: AtomicUsize,
320 step: AtomicUsize,
321 #[cfg(target_has_atomic = "64")]
322 culistid: AtomicU64,
323 #[cfg(target_has_atomic = "64")]
324 culistid_present: AtomicUsize,
325 #[cfg(not(target_has_atomic = "64"))]
326 culistid: Mutex<Option<u64>>,
327 sequence: AtomicUsize,
328}
329
330impl Default for RuntimeExecutionProbe {
331 fn default() -> Self {
332 Self {
333 component_id: AtomicUsize::new(ComponentId::INVALID.index()),
334 step: AtomicUsize::new(0),
335 #[cfg(target_has_atomic = "64")]
336 culistid: AtomicU64::new(0),
337 #[cfg(target_has_atomic = "64")]
338 culistid_present: AtomicUsize::new(0),
339 #[cfg(not(target_has_atomic = "64"))]
340 culistid: Mutex::new(None),
341 sequence: AtomicUsize::new(0),
342 }
343 }
344}
345
346impl RuntimeExecutionProbe {
347 #[inline]
348 pub fn record(&self, marker: ExecutionMarker) {
349 self.component_id
350 .store(marker.component_id.index(), Ordering::Relaxed);
351 self.step
352 .store(component_state_to_usize(marker.step), Ordering::Relaxed);
353 #[cfg(target_has_atomic = "64")]
354 match marker.culistid {
355 Some(culistid) => {
356 self.culistid.store(culistid, Ordering::Relaxed);
357 self.culistid_present.store(1, Ordering::Relaxed);
358 }
359 None => {
360 self.culistid_present.store(0, Ordering::Relaxed);
361 }
362 }
363 #[cfg(not(target_has_atomic = "64"))]
364 {
365 *self.culistid.lock() = marker.culistid;
366 }
367 self.sequence.fetch_add(1, Ordering::Release);
368 }
369
370 #[inline]
371 pub fn sequence(&self) -> usize {
372 self.sequence.load(Ordering::Acquire)
373 }
374
375 #[inline]
376 pub fn marker(&self) -> Option<ExecutionMarker> {
377 loop {
380 let seq_before = self.sequence.load(Ordering::Acquire);
381 let component_id = self.component_id.load(Ordering::Relaxed);
382 let step = self.step.load(Ordering::Relaxed);
383 #[cfg(target_has_atomic = "64")]
384 let culistid_present = self.culistid_present.load(Ordering::Relaxed);
385 #[cfg(target_has_atomic = "64")]
386 let culistid_value = self.culistid.load(Ordering::Relaxed);
387 #[cfg(not(target_has_atomic = "64"))]
388 let culistid = *self.culistid.lock();
389 let seq_after = self.sequence.load(Ordering::Acquire);
390 if seq_before == seq_after {
391 if component_id == ComponentId::INVALID.index() {
392 return None;
393 }
394 let step = usize_to_component_state(step);
395 #[cfg(target_has_atomic = "64")]
396 let culistid = if culistid_present == 0 {
397 None
398 } else {
399 Some(culistid_value)
400 };
401 return Some(ExecutionMarker {
402 component_id: ComponentId::new(component_id),
403 step,
404 culistid,
405 });
406 }
407 }
408 }
409}
410
411#[inline]
412const fn component_state_to_usize(step: CuComponentState) -> usize {
413 match step {
414 CuComponentState::Start => 0,
415 CuComponentState::Preprocess => 1,
416 CuComponentState::Process => 2,
417 CuComponentState::Postprocess => 3,
418 CuComponentState::Stop => 4,
419 }
420}
421
422#[inline]
423const fn usize_to_component_state(step: usize) -> CuComponentState {
424 match step {
425 0 => CuComponentState::Start,
426 1 => CuComponentState::Preprocess,
427 2 => CuComponentState::Process,
428 3 => CuComponentState::Postprocess,
429 _ => CuComponentState::Stop,
430 }
431}
432
433#[cfg(feature = "std")]
434pub type ExecutionProbeHandle = Arc<RuntimeExecutionProbe>;
435
436#[derive(Debug, Clone)]
441pub struct MonitorExecutionProbe {
442 #[cfg(feature = "std")]
443 inner: Option<ExecutionProbeHandle>,
444}
445
446impl Default for MonitorExecutionProbe {
447 fn default() -> Self {
448 Self::unavailable()
449 }
450}
451
452impl MonitorExecutionProbe {
453 #[cfg(feature = "std")]
454 pub fn from_shared(handle: ExecutionProbeHandle) -> Self {
455 Self {
456 inner: Some(handle),
457 }
458 }
459
460 pub const fn unavailable() -> Self {
461 Self {
462 #[cfg(feature = "std")]
463 inner: None,
464 }
465 }
466
467 pub fn is_available(&self) -> bool {
468 #[cfg(feature = "std")]
469 {
470 self.inner.is_some()
471 }
472 #[cfg(not(feature = "std"))]
473 {
474 false
475 }
476 }
477
478 pub fn marker(&self) -> Option<ExecutionMarker> {
479 #[cfg(feature = "std")]
480 {
481 self.inner.as_ref().and_then(|probe| probe.marker())
482 }
483 #[cfg(not(feature = "std"))]
484 {
485 None
486 }
487 }
488
489 pub fn sequence(&self) -> Option<usize> {
490 #[cfg(feature = "std")]
491 {
492 self.inner.as_ref().map(|probe| probe.sequence())
493 }
494 #[cfg(not(feature = "std"))]
495 {
496 None
497 }
498 }
499}
500
501#[derive(Debug, Clone, Copy, PartialEq, Eq)]
506#[non_exhaustive]
507pub enum ComponentType {
508 Source,
509 Task,
510 Sink,
511 Bridge,
512}
513
514impl ComponentType {
515 pub const fn is_task(self) -> bool {
516 !matches!(self, Self::Bridge)
517 }
518}
519
520#[derive(Debug, Clone, Copy, PartialEq, Eq)]
522pub struct MonitorComponentMetadata {
523 id: &'static str,
524 kind: ComponentType,
525 type_name: Option<&'static str>,
526}
527
528impl MonitorComponentMetadata {
529 pub const fn new(
530 id: &'static str,
531 kind: ComponentType,
532 type_name: Option<&'static str>,
533 ) -> Self {
534 Self {
535 id,
536 kind,
537 type_name,
538 }
539 }
540
541 pub const fn id(&self) -> &'static str {
543 self.id
544 }
545
546 pub const fn kind(&self) -> ComponentType {
547 self.kind
548 }
549
550 pub const fn type_name(&self) -> Option<&'static str> {
552 self.type_name
553 }
554}
555
556#[derive(Debug, Clone)]
561pub struct CuMonitoringMetadata {
562 mission_id: CompactString,
563 subsystem_id: Option<CompactString>,
564 instance_id: u32,
565 layout: CopperListLayout,
566 copperlist_info: CopperListInfo,
567 topology: MonitorTopology,
568 monitor_config: Option<ComponentConfig>,
569}
570
571impl CuMonitoringMetadata {
572 pub fn new(
573 mission_id: CompactString,
574 components: &'static [MonitorComponentMetadata],
575 culist_component_mapping: &'static [ComponentId],
576 copperlist_info: CopperListInfo,
577 topology: MonitorTopology,
578 monitor_config: Option<ComponentConfig>,
579 ) -> CuResult<Self> {
580 Self::validate_components(components)?;
581 Self::validate_culist_mapping(components.len(), culist_component_mapping)?;
582 Ok(Self {
583 mission_id,
584 subsystem_id: None,
585 instance_id: 0,
586 layout: CopperListLayout::new(components, culist_component_mapping),
587 copperlist_info,
588 topology,
589 monitor_config,
590 })
591 }
592
593 fn validate_components(components: &'static [MonitorComponentMetadata]) -> CuResult<()> {
594 let mut seen_bridge = false;
595 for component in components {
596 match component.kind() {
597 component_type if component_type.is_task() && seen_bridge => {
598 return Err(CuError::from(
599 "invalid monitor metadata: task-family components must appear before bridges",
600 ));
601 }
602 ComponentType::Bridge => seen_bridge = true,
603 _ => {}
604 }
605 }
606 Ok(())
607 }
608
609 fn validate_culist_mapping(
610 components_len: usize,
611 culist_component_mapping: &'static [ComponentId],
612 ) -> CuResult<()> {
613 for component_idx in culist_component_mapping {
614 if component_idx.index() >= components_len {
615 return Err(CuError::from(
616 "invalid monitor metadata: culist mapping points past components table",
617 ));
618 }
619 }
620 Ok(())
621 }
622
623 pub fn mission_id(&self) -> &str {
625 self.mission_id.as_str()
626 }
627
628 pub fn subsystem_id(&self) -> Option<&str> {
631 self.subsystem_id.as_deref()
632 }
633
634 pub fn instance_id(&self) -> u32 {
636 self.instance_id
637 }
638
639 pub fn components(&self) -> &'static [MonitorComponentMetadata] {
643 self.layout.components()
644 }
645
646 pub const fn component_count(&self) -> usize {
648 self.layout.component_count()
649 }
650
651 pub const fn layout(&self) -> CopperListLayout {
653 self.layout
654 }
655
656 pub fn component(&self, component_id: ComponentId) -> &'static MonitorComponentMetadata {
657 self.layout.component(component_id)
658 }
659
660 pub fn component_id(&self, component_id: ComponentId) -> &'static str {
661 self.component(component_id).id()
662 }
663
664 pub fn component_kind(&self, component_id: ComponentId) -> ComponentType {
665 self.component(component_id).kind()
666 }
667
668 pub fn component_index_by_id(&self, component_id: &str) -> Option<ComponentId> {
669 self.layout
670 .components()
671 .iter()
672 .position(|component| component.id() == component_id)
673 .map(ComponentId::new)
674 }
675
676 pub fn culist_component_mapping(&self) -> &'static [ComponentId] {
680 self.layout.slot_to_component()
681 }
682
683 pub fn component_for_culist_slot(&self, culist_slot: CuListSlot) -> ComponentId {
684 self.layout.component_for_slot(culist_slot)
685 }
686
687 pub fn copperlist_view<'a>(&self, msgs: &'a [&'a CuMsgMetadata]) -> CopperListView<'a> {
688 self.layout.view(msgs)
689 }
690
691 pub const fn copperlist_info(&self) -> CopperListInfo {
692 self.copperlist_info
693 }
694
695 pub fn topology(&self) -> &MonitorTopology {
700 &self.topology
701 }
702
703 pub fn monitor_config(&self) -> Option<&ComponentConfig> {
704 self.monitor_config.as_ref()
705 }
706
707 pub fn with_monitor_config(mut self, monitor_config: Option<ComponentConfig>) -> Self {
708 self.monitor_config = monitor_config;
709 self
710 }
711
712 pub fn with_subsystem_id(mut self, subsystem_id: Option<&str>) -> Self {
713 self.subsystem_id = subsystem_id.map(CompactString::from);
714 self
715 }
716
717 pub fn with_instance_id(mut self, instance_id: u32) -> Self {
718 self.instance_id = instance_id;
719 self
720 }
721}
722
723#[derive(Debug, Clone, Default)]
727pub struct CuMonitoringRuntime {
728 execution_probe: MonitorExecutionProbe,
729}
730
731impl CuMonitoringRuntime {
732 #[cfg(feature = "std")]
733 pub fn new(execution_probe: MonitorExecutionProbe) -> Self {
734 ensure_runtime_panic_hook_installed();
735 Self { execution_probe }
736 }
737
738 #[cfg(not(feature = "std"))]
739 pub const fn new(execution_probe: MonitorExecutionProbe) -> Self {
740 Self { execution_probe }
741 }
742
743 #[cfg(feature = "std")]
744 pub fn unavailable() -> Self {
745 Self::new(MonitorExecutionProbe::unavailable())
746 }
747
748 #[cfg(not(feature = "std"))]
749 pub const fn unavailable() -> Self {
750 Self::new(MonitorExecutionProbe::unavailable())
751 }
752
753 pub fn execution_probe(&self) -> &MonitorExecutionProbe {
754 &self.execution_probe
755 }
756
757 #[cfg(feature = "std")]
758 pub fn register_panic_cleanup<F>(&self, callback: F) -> PanicHookRegistration
759 where
760 F: Fn(&PanicReport) + Send + Sync + 'static,
761 {
762 ensure_runtime_panic_hook_installed();
763 register_panic_cleanup(callback)
764 }
765
766 #[cfg(feature = "std")]
767 pub fn register_panic_action<F>(&self, callback: F) -> PanicHookRegistration
768 where
769 F: Fn(&PanicReport) -> Option<i32> + Send + Sync + 'static,
770 {
771 ensure_runtime_panic_hook_installed();
772 register_panic_action(callback)
773 }
774}
775
776#[cfg(feature = "std")]
777type PanicCleanupCallback = Arc<dyn Fn(&PanicReport) + Send + Sync + 'static>;
778#[cfg(feature = "std")]
779type PanicActionCallback = Arc<dyn Fn(&PanicReport) -> Option<i32> + Send + Sync + 'static>;
780
781#[cfg(feature = "std")]
782#[derive(Debug, Clone)]
783pub struct PanicReport {
784 message: String,
785 location: Option<String>,
786 thread_name: Option<String>,
787 backtrace: String,
788 timestamp_unix_ms: u128,
789 crash_report_path: Option<String>,
790}
791
792#[cfg(feature = "std")]
793impl PanicReport {
794 fn capture(info: &PanicHookInfo<'_>) -> Self {
795 let location = info
796 .location()
797 .map(|loc| format!("{}:{}:{}", loc.file(), loc.line(), loc.column()));
798 let thread_name = std::thread::current().name().map(|name| name.to_string());
799 let timestamp_unix_ms = SystemTime::now()
800 .duration_since(UNIX_EPOCH)
801 .map(|dur| dur.as_millis())
802 .unwrap_or(0);
803
804 Self {
805 message: panic_hook_payload_to_string(info),
806 location,
807 thread_name,
808 backtrace: Backtrace::force_capture().to_string(),
809 timestamp_unix_ms,
810 crash_report_path: None,
811 }
812 }
813
814 pub fn message(&self) -> &str {
815 &self.message
816 }
817
818 pub fn location(&self) -> Option<&str> {
819 self.location.as_deref()
820 }
821
822 pub fn thread_name(&self) -> Option<&str> {
823 self.thread_name.as_deref()
824 }
825
826 pub fn backtrace(&self) -> &str {
827 &self.backtrace
828 }
829
830 pub fn timestamp_unix_ms(&self) -> u128 {
831 self.timestamp_unix_ms
832 }
833
834 pub fn crash_report_path(&self) -> Option<&str> {
835 self.crash_report_path.as_deref()
836 }
837
838 pub fn summary(&self) -> String {
839 match self.location() {
840 Some(location) => format!("panic at {location}: {}", self.message()),
841 None => format!("panic: {}", self.message()),
842 }
843 }
844}
845
846#[cfg(feature = "std")]
847#[derive(Clone, Copy, Debug, PartialEq, Eq)]
848enum PanicHookRegistrationKind {
849 Cleanup,
850 Action,
851}
852
853#[cfg(feature = "std")]
854#[derive(Clone)]
855struct RegisteredPanicCleanup {
856 id: usize,
857 callback: PanicCleanupCallback,
858}
859
860#[cfg(feature = "std")]
861#[derive(Clone)]
862struct RegisteredPanicAction {
863 id: usize,
864 callback: PanicActionCallback,
865}
866
867#[cfg(feature = "std")]
868#[derive(Default)]
869struct PanicHookRegistry {
870 cleanup_callbacks: StdMutex<Vec<RegisteredPanicCleanup>>,
871 action_callbacks: StdMutex<Vec<RegisteredPanicAction>>,
872}
873
874#[cfg(feature = "std")]
875#[derive(Debug)]
876pub struct PanicHookRegistration {
877 id: usize,
878 kind: PanicHookRegistrationKind,
879}
880
881#[cfg(feature = "std")]
882impl Drop for PanicHookRegistration {
883 fn drop(&mut self) {
884 unregister_panic_hook(self.kind, self.id);
885 }
886}
887
888#[cfg(feature = "std")]
889static PANIC_HOOK_REGISTRY: OnceLock<PanicHookRegistry> = OnceLock::new();
890#[cfg(feature = "std")]
891static PANIC_HOOK_INSTALL_ONCE: OnceLock<()> = OnceLock::new();
892#[cfg(feature = "std")]
893static PANIC_HOOK_REGISTRATION_ID: AtomicUsize = AtomicUsize::new(1);
894#[cfg(feature = "std")]
895static PANIC_HOOK_ACTIVE_COUNT: AtomicUsize = AtomicUsize::new(0);
896
897#[cfg(feature = "std")]
898fn panic_hook_registry() -> &'static PanicHookRegistry {
899 PANIC_HOOK_REGISTRY.get_or_init(PanicHookRegistry::default)
900}
901
902#[cfg(feature = "std")]
903fn ensure_runtime_panic_hook_installed() {
904 let _ = PANIC_HOOK_INSTALL_ONCE.get_or_init(|| {
905 std::panic::set_hook(Box::new(move |info| {
906 let _guard = PanicHookActiveGuard::new();
907 let mut report = PanicReport::capture(info);
908 run_panic_cleanup_callbacks(&report);
909 report.crash_report_path = write_panic_report_to_file(&report);
910 emit_panic_report(&report);
911
912 if let Some(exit_code) = run_panic_action_callbacks(&report) {
913 std::process::exit(exit_code);
914 }
915 }));
916 });
917}
918
919#[cfg(feature = "std")]
920struct PanicHookActiveGuard;
921
922#[cfg(feature = "std")]
923impl PanicHookActiveGuard {
924 fn new() -> Self {
925 PANIC_HOOK_ACTIVE_COUNT.fetch_add(1, Ordering::SeqCst);
926 Self
927 }
928}
929
930#[cfg(feature = "std")]
931impl Drop for PanicHookActiveGuard {
932 fn drop(&mut self) {
933 PANIC_HOOK_ACTIVE_COUNT.fetch_sub(1, Ordering::SeqCst);
934 }
935}
936
937#[cfg(feature = "std")]
938pub fn runtime_panic_hook_active() -> bool {
939 PANIC_HOOK_ACTIVE_COUNT.load(Ordering::SeqCst) > 0
940}
941
942#[cfg(not(feature = "std"))]
943pub const fn runtime_panic_hook_active() -> bool {
944 false
945}
946
947#[cfg(feature = "std")]
948fn register_panic_cleanup<F>(callback: F) -> PanicHookRegistration
949where
950 F: Fn(&PanicReport) + Send + Sync + 'static,
951{
952 let id = PANIC_HOOK_REGISTRATION_ID.fetch_add(1, Ordering::Relaxed);
953 let callback = Arc::new(callback) as PanicCleanupCallback;
954 let mut callbacks = panic_hook_registry()
955 .cleanup_callbacks
956 .lock()
957 .unwrap_or_else(|poison| poison.into_inner());
958 callbacks.push(RegisteredPanicCleanup { id, callback });
959 PanicHookRegistration {
960 id,
961 kind: PanicHookRegistrationKind::Cleanup,
962 }
963}
964
965#[cfg(feature = "std")]
966fn register_panic_action<F>(callback: F) -> PanicHookRegistration
967where
968 F: Fn(&PanicReport) -> Option<i32> + Send + Sync + 'static,
969{
970 let id = PANIC_HOOK_REGISTRATION_ID.fetch_add(1, Ordering::Relaxed);
971 let callback = Arc::new(callback) as PanicActionCallback;
972 let mut callbacks = panic_hook_registry()
973 .action_callbacks
974 .lock()
975 .unwrap_or_else(|poison| poison.into_inner());
976 callbacks.push(RegisteredPanicAction { id, callback });
977 PanicHookRegistration {
978 id,
979 kind: PanicHookRegistrationKind::Action,
980 }
981}
982
983#[cfg(feature = "std")]
984fn unregister_panic_hook(kind: PanicHookRegistrationKind, id: usize) {
985 let registry = panic_hook_registry();
986 match kind {
987 PanicHookRegistrationKind::Cleanup => {
988 let mut callbacks = registry
989 .cleanup_callbacks
990 .lock()
991 .unwrap_or_else(|poison| poison.into_inner());
992 callbacks.retain(|entry| entry.id != id);
993 }
994 PanicHookRegistrationKind::Action => {
995 let mut callbacks = registry
996 .action_callbacks
997 .lock()
998 .unwrap_or_else(|poison| poison.into_inner());
999 callbacks.retain(|entry| entry.id != id);
1000 }
1001 }
1002}
1003
1004#[cfg(feature = "std")]
1005fn run_panic_cleanup_callbacks(report: &PanicReport) {
1006 let callbacks = panic_hook_registry()
1007 .cleanup_callbacks
1008 .lock()
1009 .unwrap_or_else(|poison| poison.into_inner())
1010 .clone();
1011 for entry in callbacks {
1012 (entry.callback)(report);
1013 }
1014}
1015
1016#[cfg(feature = "std")]
1017fn run_panic_action_callbacks(report: &PanicReport) -> Option<i32> {
1018 let callbacks = panic_hook_registry()
1019 .action_callbacks
1020 .lock()
1021 .unwrap_or_else(|poison| poison.into_inner())
1022 .clone();
1023 let mut exit_code = None;
1024 for entry in callbacks {
1025 if exit_code.is_none() {
1026 exit_code = (entry.callback)(report);
1027 } else {
1028 let _ = (entry.callback)(report);
1029 }
1030 }
1031 exit_code
1032}
1033
1034#[cfg(feature = "std")]
1035fn panic_hook_payload_to_string(info: &PanicHookInfo<'_>) -> String {
1036 if let Some(msg) = info.payload().downcast_ref::<&str>() {
1037 (*msg).to_string()
1038 } else if let Some(msg) = info.payload().downcast_ref::<String>() {
1039 msg.clone()
1040 } else {
1041 "panic with non-string payload".to_string()
1042 }
1043}
1044
1045#[cfg(feature = "std")]
1046fn render_panic_report(report: &PanicReport) -> String {
1047 let mut rendered = String::from("Copper panic\n");
1048 rendered.push_str(&format!("time_unix_ms: {}\n", report.timestamp_unix_ms()));
1049 rendered.push_str(&format!(
1050 "thread: {}\n",
1051 report.thread_name().unwrap_or("<unnamed>")
1052 ));
1053 if let Some(location) = report.location() {
1054 rendered.push_str(&format!("location: {location}\n"));
1055 }
1056 rendered.push_str(&format!("message: {}\n", report.message()));
1057 if let Some(path) = report.crash_report_path() {
1058 rendered.push_str(&format!("crash_report: {path}\n"));
1059 }
1060 rendered.push_str("\nBacktrace:\n");
1061 rendered.push_str(report.backtrace());
1062 if !report.backtrace().ends_with('\n') {
1063 rendered.push('\n');
1064 }
1065 rendered
1066}
1067
1068#[cfg(feature = "std")]
1069fn emit_panic_report(report: &PanicReport) {
1070 let mut stderr = std::io::stderr().lock();
1071 let _ = stderr.write_all(render_panic_report(report).as_bytes());
1072 let _ = stderr.flush();
1073}
1074
1075#[cfg(feature = "std")]
1076fn write_panic_report_to_file(report: &PanicReport) -> Option<String> {
1077 let cwd = std::env::current_dir().ok()?;
1078 let file_name = format!(
1079 "copper-crash-{}-{}.txt",
1080 report.timestamp_unix_ms(),
1081 std::process::id()
1082 );
1083 let path = cwd.join(file_name);
1084 let path_string = path.to_string_lossy().to_string();
1085 let mut file = File::create(&path).ok()?;
1086 let mut report_with_path = report.clone();
1087 report_with_path.crash_report_path = Some(path_string.clone());
1088 file.write_all(render_panic_report(&report_with_path).as_bytes())
1089 .ok()?;
1090 file.flush().ok()?;
1091 Some(path_string)
1092}
1093
1094#[derive(Debug)]
1096pub enum Decision {
1097 Abort, Ignore, Shutdown, }
1101
1102fn merge_decision(lhs: Decision, rhs: Decision) -> Decision {
1103 use Decision::{Abort, Ignore, Shutdown};
1104 match (lhs, rhs) {
1107 (Shutdown, _) | (_, Shutdown) => Shutdown,
1108 (Abort, _) | (_, Abort) => Abort,
1109 _ => Ignore,
1110 }
1111}
1112
1113#[derive(Debug, Clone)]
1114pub struct MonitorNode {
1115 pub id: String,
1116 pub type_name: Option<String>,
1117 pub kind: ComponentType,
1118 pub inputs: Vec<String>,
1120 pub outputs: Vec<String>,
1122}
1123
1124#[derive(Debug, Clone)]
1125pub struct MonitorConnection {
1126 pub src: String,
1127 pub src_port: Option<String>,
1128 pub dst: String,
1129 pub dst_port: Option<String>,
1130 pub msg: String,
1131}
1132
1133#[derive(Debug, Clone, Default)]
1134pub struct MonitorTopology {
1135 pub nodes: Vec<MonitorNode>,
1136 pub connections: Vec<MonitorConnection>,
1137}
1138
1139#[derive(Debug, Clone, Copy, Default)]
1140pub struct CopperListInfo {
1141 pub size_bytes: usize,
1142 pub count: usize,
1143}
1144
1145impl CopperListInfo {
1146 pub const fn new(size_bytes: usize, count: usize) -> Self {
1147 Self { size_bytes, count }
1148 }
1149}
1150
1151#[derive(Debug, Clone, Copy, Default)]
1153pub struct CopperListIoStats {
1154 pub raw_culist_bytes: u64,
1159 pub handle_bytes: u64,
1165 pub encoded_culist_bytes: u64,
1167 pub keyframe_bytes: u64,
1169 pub structured_log_bytes_total: u64,
1171 pub culistid: u64,
1173}
1174
1175#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
1176pub struct PayloadIoStats {
1177 pub resident_bytes: usize,
1178 pub encoded_bytes: usize,
1179 pub handle_bytes: usize,
1180}
1181
1182#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
1183pub struct CuMsgIoStats {
1184 pub present: bool,
1185 pub resident_bytes: u64,
1186 pub encoded_bytes: u64,
1187 pub handle_bytes: u64,
1188}
1189
1190struct CuMsgIoEntry {
1191 present: PortableAtomicBool,
1192 resident_bytes: PortableAtomicU64,
1193 encoded_bytes: PortableAtomicU64,
1194 handle_bytes: PortableAtomicU64,
1195}
1196
1197impl CuMsgIoEntry {
1198 fn clear(&self) {
1199 self.present.store(false, PortableOrdering::Release);
1200 self.resident_bytes.store(0, PortableOrdering::Relaxed);
1201 self.encoded_bytes.store(0, PortableOrdering::Relaxed);
1202 self.handle_bytes.store(0, PortableOrdering::Relaxed);
1203 }
1204
1205 fn get(&self) -> CuMsgIoStats {
1206 if !self.present.load(PortableOrdering::Acquire) {
1207 return CuMsgIoStats::default();
1208 }
1209
1210 CuMsgIoStats {
1211 present: true,
1212 resident_bytes: self.resident_bytes.load(PortableOrdering::Relaxed),
1213 encoded_bytes: self.encoded_bytes.load(PortableOrdering::Relaxed),
1214 handle_bytes: self.handle_bytes.load(PortableOrdering::Relaxed),
1215 }
1216 }
1217
1218 fn set(&self, stats: CuMsgIoStats) {
1219 self.resident_bytes
1220 .store(stats.resident_bytes, PortableOrdering::Relaxed);
1221 self.encoded_bytes
1222 .store(stats.encoded_bytes, PortableOrdering::Relaxed);
1223 self.handle_bytes
1224 .store(stats.handle_bytes, PortableOrdering::Relaxed);
1225 self.present.store(stats.present, PortableOrdering::Release);
1226 }
1227}
1228
1229impl Default for CuMsgIoEntry {
1230 fn default() -> Self {
1231 Self {
1232 present: PortableAtomicBool::new(false),
1233 resident_bytes: PortableAtomicU64::new(0),
1234 encoded_bytes: PortableAtomicU64::new(0),
1235 handle_bytes: PortableAtomicU64::new(0),
1236 }
1237 }
1238}
1239
1240pub struct CuMsgIoCache<const N: usize> {
1241 entries: [CuMsgIoEntry; N],
1242}
1243
1244impl<const N: usize> CuMsgIoCache<N> {
1245 pub fn clear(&self) {
1246 for entry in &self.entries {
1247 entry.clear();
1248 }
1249 }
1250
1251 pub fn get(&self, idx: usize) -> CuMsgIoStats {
1252 self.entries[idx].get()
1253 }
1254
1255 fn raw_parts(&self) -> (usize, usize) {
1256 (self.entries.as_ptr() as usize, N)
1257 }
1258}
1259
1260impl<const N: usize> Default for CuMsgIoCache<N> {
1261 fn default() -> Self {
1262 Self {
1263 entries: core::array::from_fn(|_| CuMsgIoEntry::default()),
1264 }
1265 }
1266}
1267
1268#[derive(Clone, Copy)]
1269struct ActiveCuMsgIoCapture {
1270 cache_addr: usize,
1271 cache_len: usize,
1272 current_slot: Option<usize>,
1273}
1274
1275#[cfg(feature = "std")]
1276thread_local! {
1277 static PAYLOAD_HANDLE_BYTES: Cell<Option<usize>> = const { Cell::new(None) };
1278 static ACTIVE_COPPERLIST_CAPTURE: Cell<Option<ActiveCuMsgIoCapture>> = const { Cell::new(None) };
1279 static LAST_COMPLETED_HANDLE_BYTES: Cell<u64> = const { Cell::new(0) };
1280}
1281
1282#[cfg(not(feature = "std"))]
1283static PAYLOAD_HANDLE_BYTES: SpinMutex<Option<usize>> = SpinMutex::new(None);
1284#[cfg(not(feature = "std"))]
1285static ACTIVE_COPPERLIST_CAPTURE: SpinMutex<Option<ActiveCuMsgIoCapture>> = SpinMutex::new(None);
1286#[cfg(not(feature = "std"))]
1287static LAST_COMPLETED_HANDLE_BYTES: SpinMutex<u64> = SpinMutex::new(0);
1288
1289fn begin_payload_io_measurement() {
1290 #[cfg(feature = "std")]
1291 PAYLOAD_HANDLE_BYTES.with(|bytes| {
1292 debug_assert!(
1293 bytes.get().is_none(),
1294 "payload IO byte measurement must not be nested"
1295 );
1296 bytes.set(Some(0));
1297 });
1298
1299 #[cfg(not(feature = "std"))]
1300 {
1301 let mut bytes = PAYLOAD_HANDLE_BYTES.lock();
1302 debug_assert!(
1303 bytes.is_none(),
1304 "payload IO byte measurement must not be nested"
1305 );
1306 *bytes = Some(0);
1307 }
1308}
1309
1310fn finish_payload_io_measurement() -> usize {
1311 #[cfg(feature = "std")]
1312 {
1313 PAYLOAD_HANDLE_BYTES.with(|bytes| bytes.replace(None).unwrap_or(0))
1314 }
1315
1316 #[cfg(not(feature = "std"))]
1317 {
1318 PAYLOAD_HANDLE_BYTES.lock().take().unwrap_or(0)
1319 }
1320}
1321
1322fn abort_payload_io_measurement() {
1323 #[cfg(feature = "std")]
1324 PAYLOAD_HANDLE_BYTES.with(|bytes| bytes.set(None));
1325
1326 #[cfg(not(feature = "std"))]
1327 {
1328 *PAYLOAD_HANDLE_BYTES.lock() = None;
1329 }
1330}
1331
1332fn current_payload_io_measurement() -> usize {
1333 #[cfg(feature = "std")]
1334 {
1335 PAYLOAD_HANDLE_BYTES.with(|bytes| bytes.get().unwrap_or(0))
1336 }
1337
1338 #[cfg(not(feature = "std"))]
1339 {
1340 PAYLOAD_HANDLE_BYTES.lock().as_ref().copied().unwrap_or(0)
1341 }
1342}
1343
1344#[cfg(feature = "std")]
1345pub(crate) fn record_payload_handle_bytes(bytes: usize) {
1346 #[cfg(feature = "std")]
1347 PAYLOAD_HANDLE_BYTES.with(|total| {
1348 if let Some(current) = total.get() {
1349 total.set(Some(current.saturating_add(bytes)));
1350 }
1351 });
1352
1353 #[cfg(not(feature = "std"))]
1354 {
1355 let mut total = PAYLOAD_HANDLE_BYTES.lock();
1356 if let Some(current) = *total {
1357 *total = Some(current.saturating_add(bytes));
1358 }
1359 }
1360}
1361
1362fn set_last_completed_handle_bytes(bytes: u64) {
1363 #[cfg(feature = "std")]
1364 LAST_COMPLETED_HANDLE_BYTES.with(|total| total.set(bytes));
1365
1366 #[cfg(not(feature = "std"))]
1367 {
1368 *LAST_COMPLETED_HANDLE_BYTES.lock() = bytes;
1369 }
1370}
1371
1372pub fn take_last_completed_handle_bytes() -> u64 {
1373 #[cfg(feature = "std")]
1374 {
1375 LAST_COMPLETED_HANDLE_BYTES.with(|total| total.replace(0))
1376 }
1377
1378 #[cfg(not(feature = "std"))]
1379 {
1380 let mut total = LAST_COMPLETED_HANDLE_BYTES.lock();
1381 let value = *total;
1382 *total = 0;
1383 value
1384 }
1385}
1386
1387fn with_active_capture_mut<R>(f: impl FnOnce(&mut ActiveCuMsgIoCapture) -> R) -> Option<R> {
1388 #[cfg(feature = "std")]
1389 {
1390 ACTIVE_COPPERLIST_CAPTURE.with(|capture| {
1391 let mut state = capture.get()?;
1392 let result = f(&mut state);
1393 capture.set(Some(state));
1394 Some(result)
1395 })
1396 }
1397
1398 #[cfg(not(feature = "std"))]
1399 {
1400 let mut capture = ACTIVE_COPPERLIST_CAPTURE.lock();
1401 let state = capture.as_mut()?;
1402 Some(f(state))
1403 }
1404}
1405
1406pub struct CuMsgIoCaptureGuard;
1407
1408impl CuMsgIoCaptureGuard {
1409 pub fn select_slot(&self, slot: usize) {
1410 let _ = with_active_capture_mut(|capture| {
1411 debug_assert!(slot < capture.cache_len, "payload IO slot out of range");
1412 capture.current_slot = Some(slot);
1413 });
1414 }
1415}
1416
1417impl Drop for CuMsgIoCaptureGuard {
1418 fn drop(&mut self) {
1419 set_last_completed_handle_bytes(finish_payload_io_measurement() as u64);
1420
1421 #[cfg(feature = "std")]
1422 ACTIVE_COPPERLIST_CAPTURE.with(|capture| capture.set(None));
1423
1424 #[cfg(not(feature = "std"))]
1425 {
1426 *ACTIVE_COPPERLIST_CAPTURE.lock() = None;
1427 }
1428 }
1429}
1430
1431pub fn start_copperlist_io_capture<const N: usize>(cache: &CuMsgIoCache<N>) -> CuMsgIoCaptureGuard {
1432 cache.clear();
1433 set_last_completed_handle_bytes(0);
1434 begin_payload_io_measurement();
1435 let (cache_addr, cache_len) = cache.raw_parts();
1436 let capture = ActiveCuMsgIoCapture {
1437 cache_addr,
1438 cache_len,
1439 current_slot: None,
1440 };
1441
1442 #[cfg(feature = "std")]
1443 ACTIVE_COPPERLIST_CAPTURE.with(|state| {
1444 debug_assert!(
1445 state.get().is_none(),
1446 "CopperList payload IO capture must not be nested"
1447 );
1448 state.set(Some(capture));
1449 });
1450
1451 #[cfg(not(feature = "std"))]
1452 {
1453 let mut state = ACTIVE_COPPERLIST_CAPTURE.lock();
1454 debug_assert!(
1455 state.is_none(),
1456 "CopperList payload IO capture must not be nested"
1457 );
1458 *state = Some(capture);
1459 }
1460
1461 CuMsgIoCaptureGuard
1462}
1463
1464pub(crate) fn current_payload_handle_bytes() -> usize {
1465 current_payload_io_measurement()
1466}
1467
1468pub(crate) fn record_current_slot_payload_io_stats(
1469 fixed_bytes: usize,
1470 encoded_bytes: usize,
1471 handle_bytes: usize,
1472) {
1473 let _ = with_active_capture_mut(|capture| {
1474 let Some(slot) = capture.current_slot else {
1475 return;
1476 };
1477 if slot >= capture.cache_len {
1478 return;
1479 }
1480 let cache_ptr = capture.cache_addr as *const CuMsgIoEntry;
1482 let entry = unsafe { &*cache_ptr.add(slot) };
1483 entry.set(CuMsgIoStats {
1484 present: true,
1485 resident_bytes: (fixed_bytes.saturating_add(handle_bytes)) as u64,
1486 encoded_bytes: encoded_bytes as u64,
1487 handle_bytes: handle_bytes as u64,
1488 });
1489 });
1490}
1491
1492pub fn payload_io_stats<T>(payload: &T) -> CuResult<PayloadIoStats>
1499where
1500 T: Encode,
1501{
1502 begin_payload_io_measurement();
1503 begin_observed_encode();
1504
1505 let result = (|| {
1506 let mut encoder =
1507 EncoderImpl::<_, _>::new(ObservedWriter::new(SizeWriter::default()), standard());
1508 payload.encode(&mut encoder).map_err(|e| {
1509 CuError::from("Failed to measure payload IO bytes").add_cause(&e.to_string())
1510 })?;
1511 let encoded_bytes = encoder.into_writer().into_inner().bytes_written;
1512 debug_assert_eq!(encoded_bytes, finish_observed_encode());
1513 let handle_bytes = finish_payload_io_measurement();
1514 Ok(PayloadIoStats {
1515 resident_bytes: core::mem::size_of::<T>().saturating_add(handle_bytes),
1516 encoded_bytes,
1517 handle_bytes,
1518 })
1519 })();
1520
1521 if result.is_err() {
1522 abort_payload_io_measurement();
1523 abort_observed_encode();
1524 }
1525
1526 result
1527}
1528
1529#[derive(Default, Debug, Clone, Copy)]
1530struct NodeIoUsage {
1531 has_incoming: bool,
1532 has_outgoing: bool,
1533}
1534
1535fn collect_output_ports(graph: &CuGraph, node_id: NodeId) -> Vec<(String, String)> {
1536 let mut edge_ids = graph.get_src_edges(node_id).unwrap_or_default();
1537 edge_ids.sort();
1538
1539 let mut outputs = Vec::new();
1540 let mut seen = Vec::new();
1541 let mut port_idx = 0usize;
1542 for edge_id in edge_ids {
1543 let Some(edge) = graph.edge(edge_id) else {
1544 continue;
1545 };
1546 if seen.iter().any(|msg| msg == &edge.msg) {
1547 continue;
1548 }
1549 seen.push(edge.msg.clone());
1550 let mut port_label = String::from("out");
1551 port_label.push_str(&port_idx.to_string());
1552 port_label.push_str(": ");
1553 port_label.push_str(edge.msg.as_str());
1554 outputs.push((edge.msg.clone(), port_label));
1555 port_idx += 1;
1556 }
1557 outputs
1558}
1559
1560pub fn build_monitor_topology(config: &CuConfig, mission: &str) -> CuResult<MonitorTopology> {
1562 let graph = config.get_graph(Some(mission))?;
1563 let mut nodes: Map<String, MonitorNode> = Map::new();
1564 let mut io_usage: Map<String, NodeIoUsage> = Map::new();
1565 let mut output_port_lookup: Map<String, Map<String, String>> = Map::new();
1566
1567 let mut bridge_lookup: Map<&str, &BridgeConfig> = Map::new();
1568 for bridge in &config.bridges {
1569 bridge_lookup.insert(bridge.id.as_str(), bridge);
1570 }
1571
1572 for cnx in graph.edges() {
1573 io_usage.entry(cnx.src.clone()).or_default().has_outgoing = true;
1574 io_usage.entry(cnx.dst.clone()).or_default().has_incoming = true;
1575 }
1576
1577 for (_, node) in graph.get_all_nodes() {
1578 let node_id = node.get_id();
1579 let usage = io_usage.get(node_id.as_str()).cloned().unwrap_or_default();
1580 let kind = match node.get_flavor() {
1581 Flavor::Bridge => ComponentType::Bridge,
1582 _ if !usage.has_incoming && usage.has_outgoing => ComponentType::Source,
1583 _ if usage.has_incoming && !usage.has_outgoing => ComponentType::Sink,
1584 _ => ComponentType::Task,
1585 };
1586
1587 let mut inputs = Vec::new();
1588 let mut outputs = Vec::new();
1589 if kind == ComponentType::Bridge {
1590 if let Some(bridge) = bridge_lookup.get(node_id.as_str()) {
1591 for ch in &bridge.channels {
1592 match ch {
1593 BridgeChannelConfigRepresentation::Rx { id, .. } => {
1594 outputs.push(id.clone())
1595 }
1596 BridgeChannelConfigRepresentation::Tx { id, .. } => inputs.push(id.clone()),
1597 }
1598 }
1599 }
1600 } else {
1601 if usage.has_incoming || !usage.has_outgoing {
1602 inputs.push("in".to_string());
1603 }
1604 if usage.has_outgoing {
1605 if let Some(node_idx) = graph.get_node_id_by_name(node_id.as_str()) {
1606 let ports = collect_output_ports(graph, node_idx);
1607 let mut port_map: Map<String, String> = Map::new();
1608 for (msg_type, label) in ports {
1609 port_map.insert(msg_type, label.clone());
1610 outputs.push(label);
1611 }
1612 output_port_lookup.insert(node_id.clone(), port_map);
1613 }
1614 } else if !usage.has_incoming {
1615 outputs.push("out".to_string());
1616 }
1617 }
1618
1619 nodes.insert(
1620 node_id.clone(),
1621 MonitorNode {
1622 id: node_id,
1623 type_name: Some(node.get_type().to_string()),
1624 kind,
1625 inputs,
1626 outputs,
1627 },
1628 );
1629 }
1630
1631 let mut connections = Vec::new();
1632 for cnx in graph.edges() {
1633 let src = cnx.src.clone();
1634 let dst = cnx.dst.clone();
1635
1636 let src_port = cnx.src_channel.clone().or_else(|| {
1637 output_port_lookup
1638 .get(&src)
1639 .and_then(|ports| ports.get(&cnx.msg).cloned())
1640 .or_else(|| {
1641 nodes
1642 .get(&src)
1643 .and_then(|node| node.outputs.first().cloned())
1644 })
1645 });
1646 let dst_port = cnx.dst_channel.clone().or_else(|| {
1647 nodes
1648 .get(&dst)
1649 .and_then(|node| node.inputs.first().cloned())
1650 });
1651
1652 connections.push(MonitorConnection {
1653 src,
1654 src_port,
1655 dst,
1656 dst_port,
1657 msg: cnx.msg.clone(),
1658 });
1659 }
1660
1661 Ok(MonitorTopology {
1662 nodes: nodes.into_values().collect(),
1663 connections,
1664 })
1665}
1666
1667pub trait CuMonitor: Sized {
1687 fn new(metadata: CuMonitoringMetadata, runtime: CuMonitoringRuntime) -> CuResult<Self>
1693 where
1694 Self: Sized;
1695
1696 fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
1698 Ok(())
1699 }
1700
1701 fn process_copperlist(&self, _ctx: &CuContext, view: CopperListView<'_>) -> CuResult<()>;
1703
1704 fn observe_copperlist_io(&self, _stats: CopperListIoStats) {}
1706
1707 fn process_error(
1711 &self,
1712 component_id: ComponentId,
1713 step: CuComponentState,
1714 error: &CuError,
1715 ) -> Decision;
1716
1717 fn process_panic(&self, _panic_message: &str) {}
1719
1720 fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
1722 Ok(())
1723 }
1724}
1725
1726pub struct NoMonitor {}
1729impl CuMonitor for NoMonitor {
1730 fn new(_metadata: CuMonitoringMetadata, _runtime: CuMonitoringRuntime) -> CuResult<Self> {
1731 Ok(NoMonitor {})
1732 }
1733
1734 fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
1735 #[cfg(all(feature = "std", debug_assertions))]
1736 register_live_log_listener(|entry, format_str, param_names| {
1737 let params: Vec<String> = entry.params.iter().map(|v| v.to_string()).collect();
1738 let named: Map<String, String> = param_names
1739 .iter()
1740 .zip(params.iter())
1741 .map(|(k, v)| (k.to_string(), v.clone()))
1742 .collect();
1743
1744 if let Ok(msg) = format_message_only(format_str, params.as_slice(), &named) {
1745 let ts = format_timestamp(entry.time);
1746 println!("{} [{:?}] {}", ts, entry.level, msg);
1747 }
1748 });
1749 Ok(())
1750 }
1751
1752 fn process_copperlist(&self, _ctx: &CuContext, _view: CopperListView<'_>) -> CuResult<()> {
1753 Ok(())
1755 }
1756
1757 fn process_error(
1758 &self,
1759 _component_id: ComponentId,
1760 _step: CuComponentState,
1761 _error: &CuError,
1762 ) -> Decision {
1763 Decision::Ignore
1765 }
1766
1767 fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
1768 #[cfg(all(feature = "std", debug_assertions))]
1769 unregister_live_log_listener();
1770 Ok(())
1771 }
1772}
1773
1774macro_rules! impl_monitor_tuple {
1775 ($($idx:tt => $name:ident),+) => {
1776 impl<$($name: CuMonitor),+> CuMonitor for ($($name,)+) {
1777 fn new(metadata: CuMonitoringMetadata, runtime: CuMonitoringRuntime) -> CuResult<Self>
1778 where
1779 Self: Sized,
1780 {
1781 Ok(($($name::new(metadata.clone(), runtime.clone())?,)+))
1782 }
1783
1784 fn start(&mut self, ctx: &CuContext) -> CuResult<()> {
1785 $(self.$idx.start(ctx)?;)+
1786 Ok(())
1787 }
1788
1789 fn process_copperlist(&self, ctx: &CuContext, view: CopperListView<'_>) -> CuResult<()> {
1790 $(self.$idx.process_copperlist(ctx, view)?;)+
1791 Ok(())
1792 }
1793
1794 fn observe_copperlist_io(&self, stats: CopperListIoStats) {
1795 $(self.$idx.observe_copperlist_io(stats);)+
1796 }
1797
1798 fn process_error(
1799 &self,
1800 component_id: ComponentId,
1801 step: CuComponentState,
1802 error: &CuError,
1803 ) -> Decision {
1804 let mut decision = Decision::Ignore;
1805 $(decision = merge_decision(decision, self.$idx.process_error(component_id, step, error));)+
1806 decision
1807 }
1808
1809 fn process_panic(&self, panic_message: &str) {
1810 $(self.$idx.process_panic(panic_message);)+
1811 }
1812
1813 fn stop(&mut self, ctx: &CuContext) -> CuResult<()> {
1814 $(self.$idx.stop(ctx)?;)+
1815 Ok(())
1816 }
1817 }
1818 };
1819}
1820
1821impl_monitor_tuple!(0 => M0, 1 => M1);
1822impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2);
1823impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2, 3 => M3);
1824impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2, 3 => M3, 4 => M4);
1825impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2, 3 => M3, 4 => M4, 5 => M5);
1826
1827#[cfg(feature = "std")]
1828pub fn panic_payload_to_string(payload: &(dyn core::any::Any + Send)) -> String {
1829 if let Some(msg) = payload.downcast_ref::<&str>() {
1830 (*msg).to_string()
1831 } else if let Some(msg) = payload.downcast_ref::<String>() {
1832 msg.clone()
1833 } else {
1834 "panic with non-string payload".to_string()
1835 }
1836}
1837
1838pub struct CountingAlloc<A: GlobalAlloc> {
1840 inner: A,
1841 allocated: AtomicUsize,
1842 deallocated: AtomicUsize,
1843}
1844
1845impl<A: GlobalAlloc> CountingAlloc<A> {
1846 pub const fn new(inner: A) -> Self {
1847 CountingAlloc {
1848 inner,
1849 allocated: AtomicUsize::new(0),
1850 deallocated: AtomicUsize::new(0),
1851 }
1852 }
1853
1854 pub fn allocated(&self) -> usize {
1855 self.allocated.load(Ordering::SeqCst)
1856 }
1857
1858 pub fn deallocated(&self) -> usize {
1859 self.deallocated.load(Ordering::SeqCst)
1860 }
1861
1862 pub fn reset(&self) {
1863 self.allocated.store(0, Ordering::SeqCst);
1864 self.deallocated.store(0, Ordering::SeqCst);
1865 }
1866}
1867
1868unsafe impl<A: GlobalAlloc> GlobalAlloc for CountingAlloc<A> {
1870 unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
1872 let p = unsafe { self.inner.alloc(layout) };
1874 if !p.is_null() {
1875 self.allocated.fetch_add(layout.size(), Ordering::SeqCst);
1876 }
1877 p
1878 }
1879
1880 unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
1882 unsafe { self.inner.dealloc(ptr, layout) }
1884 self.deallocated.fetch_add(layout.size(), Ordering::SeqCst);
1885 }
1886}
1887
1888#[cfg(feature = "memory_monitoring")]
1890pub struct ScopedAllocCounter {
1891 bf_allocated: usize,
1892 bf_deallocated: usize,
1893}
1894
1895#[cfg(feature = "memory_monitoring")]
1896impl Default for ScopedAllocCounter {
1897 fn default() -> Self {
1898 Self::new()
1899 }
1900}
1901
1902#[cfg(feature = "memory_monitoring")]
1903impl ScopedAllocCounter {
1904 pub fn new() -> Self {
1905 ScopedAllocCounter {
1906 bf_allocated: GLOBAL.allocated(),
1907 bf_deallocated: GLOBAL.deallocated(),
1908 }
1909 }
1910
1911 pub fn allocated(&self) -> usize {
1923 GLOBAL.allocated() - self.bf_allocated
1924 }
1925
1926 pub fn deallocated(&self) -> usize {
1939 GLOBAL.deallocated() - self.bf_deallocated
1940 }
1941}
1942
1943#[cfg(feature = "memory_monitoring")]
1945impl Drop for ScopedAllocCounter {
1946 fn drop(&mut self) {
1947 let _allocated = GLOBAL.allocated() - self.bf_allocated;
1948 let _deallocated = GLOBAL.deallocated() - self.bf_deallocated;
1949 }
1956}
1957
1958#[cfg(feature = "std")]
1959const BUCKET_COUNT: usize = 1024;
1960#[cfg(not(feature = "std"))]
1961const BUCKET_COUNT: usize = 256;
1962
1963#[derive(Debug, Clone)]
1966pub struct LiveStatistics {
1967 buckets: [u64; BUCKET_COUNT],
1968 min_val: u64,
1969 max_val: u64,
1970 sum: u128,
1971 sum_sq: u128,
1972 count: u64,
1973 max_value: u64,
1974}
1975
1976impl LiveStatistics {
1977 pub fn new_with_max(max_value: u64) -> Self {
1997 LiveStatistics {
1998 buckets: [0; BUCKET_COUNT],
1999 min_val: u64::MAX,
2000 max_val: 0,
2001 sum: 0,
2002 sum_sq: 0,
2003 count: 0,
2004 max_value,
2005 }
2006 }
2007
2008 #[inline]
2009 fn value_to_bucket(&self, value: u64) -> usize {
2010 if value >= self.max_value {
2011 BUCKET_COUNT - 1
2012 } else {
2013 ((value as u128 * BUCKET_COUNT as u128) / self.max_value as u128) as usize
2014 }
2015 }
2016
2017 #[inline]
2018 pub fn min(&self) -> u64 {
2019 if self.count == 0 { 0 } else { self.min_val }
2020 }
2021
2022 #[inline]
2023 pub fn max(&self) -> u64 {
2024 self.max_val
2025 }
2026
2027 #[inline]
2028 pub fn mean(&self) -> f64 {
2029 if self.count == 0 {
2030 0.0
2031 } else {
2032 self.sum as f64 / self.count as f64
2033 }
2034 }
2035
2036 #[inline]
2037 pub fn stdev(&self) -> f64 {
2038 if self.count == 0 {
2039 return 0.0;
2040 }
2041 let mean = self.mean();
2042 let variance = (self.sum_sq as f64 / self.count as f64) - (mean * mean);
2043 if variance < 0.0 {
2044 return 0.0;
2045 }
2046 #[cfg(feature = "std")]
2047 return variance.sqrt();
2048 #[cfg(not(feature = "std"))]
2049 return sqrt(variance);
2050 }
2051
2052 #[inline]
2053 pub fn percentile(&self, percentile: f64) -> u64 {
2054 if self.count == 0 {
2055 return 0;
2056 }
2057
2058 let target_count = (self.count as f64 * percentile) as u64;
2059 let mut accumulated = 0u64;
2060
2061 for (bucket_idx, &bucket_count) in self.buckets.iter().enumerate() {
2062 accumulated += bucket_count;
2063 if accumulated >= target_count {
2064 let bucket_start = (bucket_idx as u64 * self.max_value) / BUCKET_COUNT as u64;
2066 let bucket_end = ((bucket_idx + 1) as u64 * self.max_value) / BUCKET_COUNT as u64;
2067 let bucket_fraction = if bucket_count > 0 {
2068 (target_count - (accumulated - bucket_count)) as f64 / bucket_count as f64
2069 } else {
2070 0.5
2071 };
2072 return bucket_start
2073 + ((bucket_end - bucket_start) as f64 * bucket_fraction) as u64;
2074 }
2075 }
2076
2077 self.max_val
2078 }
2079
2080 #[inline]
2082 pub fn record(&mut self, value: u64) {
2083 if value < self.min_val {
2084 self.min_val = value;
2085 }
2086 if value > self.max_val {
2087 self.max_val = value;
2088 }
2089 let value_u128 = value as u128;
2090 self.sum += value_u128;
2091 self.sum_sq += value_u128 * value_u128;
2092 self.count += 1;
2093
2094 let bucket = self.value_to_bucket(value);
2095 self.buckets[bucket] += 1;
2096 }
2097
2098 #[inline]
2099 pub fn len(&self) -> u64 {
2100 self.count
2101 }
2102
2103 #[inline]
2104 pub fn is_empty(&self) -> bool {
2105 self.count == 0
2106 }
2107
2108 #[inline]
2109 pub fn reset(&mut self) {
2110 self.buckets.fill(0);
2111 self.min_val = u64::MAX;
2112 self.max_val = 0;
2113 self.sum = 0;
2114 self.sum_sq = 0;
2115 self.count = 0;
2116 }
2117}
2118
2119#[derive(Debug, Clone)]
2122pub struct CuDurationStatistics {
2123 bare: LiveStatistics,
2124 jitter: LiveStatistics,
2125 last_value: CuDuration,
2126}
2127
2128impl CuDurationStatistics {
2129 pub fn new(max: CuDuration) -> Self {
2130 let CuDuration(max) = max;
2131 CuDurationStatistics {
2132 bare: LiveStatistics::new_with_max(max),
2133 jitter: LiveStatistics::new_with_max(max),
2134 last_value: CuDuration::default(),
2135 }
2136 }
2137
2138 #[inline]
2139 pub fn min(&self) -> CuDuration {
2140 CuDuration(self.bare.min())
2141 }
2142
2143 #[inline]
2144 pub fn max(&self) -> CuDuration {
2145 CuDuration(self.bare.max())
2146 }
2147
2148 #[inline]
2149 pub fn mean(&self) -> CuDuration {
2150 CuDuration(self.bare.mean() as u64) }
2152
2153 #[inline]
2154 pub fn percentile(&self, percentile: f64) -> CuDuration {
2155 CuDuration(self.bare.percentile(percentile))
2156 }
2157
2158 #[inline]
2159 pub fn stddev(&self) -> CuDuration {
2160 CuDuration(self.bare.stdev() as u64)
2161 }
2162
2163 #[inline]
2164 pub fn len(&self) -> u64 {
2165 self.bare.len()
2166 }
2167
2168 #[inline]
2169 pub fn is_empty(&self) -> bool {
2170 self.bare.len() == 0
2171 }
2172
2173 #[inline]
2174 pub fn jitter_min(&self) -> CuDuration {
2175 CuDuration(self.jitter.min())
2176 }
2177
2178 #[inline]
2179 pub fn jitter_max(&self) -> CuDuration {
2180 CuDuration(self.jitter.max())
2181 }
2182
2183 #[inline]
2184 pub fn jitter_mean(&self) -> CuDuration {
2185 CuDuration(self.jitter.mean() as u64)
2186 }
2187
2188 #[inline]
2189 pub fn jitter_stddev(&self) -> CuDuration {
2190 CuDuration(self.jitter.stdev() as u64)
2191 }
2192
2193 #[inline]
2194 pub fn jitter_percentile(&self, percentile: f64) -> CuDuration {
2195 CuDuration(self.jitter.percentile(percentile))
2196 }
2197
2198 #[inline]
2199 pub fn record(&mut self, value: CuDuration) {
2200 let CuDuration(nanos) = value;
2201 if self.bare.is_empty() {
2202 self.bare.record(nanos);
2203 self.last_value = value;
2204 return;
2205 }
2206 self.bare.record(nanos);
2207 let CuDuration(last_nanos) = self.last_value;
2208 self.jitter.record(nanos.abs_diff(last_nanos));
2209 self.last_value = value;
2210 }
2211
2212 #[inline]
2213 pub fn reset(&mut self) {
2214 self.bare.reset();
2215 self.jitter.reset();
2216 }
2217}
2218
2219#[cfg(test)]
2220mod tests {
2221 use super::*;
2222 use core::sync::atomic::{AtomicUsize, Ordering};
2223
2224 #[derive(Clone, Copy)]
2225 enum TestDecision {
2226 Ignore,
2227 Abort,
2228 Shutdown,
2229 }
2230
2231 struct TestMonitor {
2232 decision: TestDecision,
2233 copperlist_calls: AtomicUsize,
2234 panic_calls: AtomicUsize,
2235 }
2236
2237 impl TestMonitor {
2238 fn new_with(decision: TestDecision) -> Self {
2239 Self {
2240 decision,
2241 copperlist_calls: AtomicUsize::new(0),
2242 panic_calls: AtomicUsize::new(0),
2243 }
2244 }
2245 }
2246
2247 fn test_metadata() -> CuMonitoringMetadata {
2248 const COMPONENTS: &[MonitorComponentMetadata] = &[
2249 MonitorComponentMetadata::new("a", ComponentType::Task, None),
2250 MonitorComponentMetadata::new("b", ComponentType::Task, None),
2251 ];
2252 CuMonitoringMetadata::new(
2253 CompactString::from(crate::config::DEFAULT_MISSION_ID),
2254 COMPONENTS,
2255 &[],
2256 CopperListInfo::new(0, 0),
2257 MonitorTopology::default(),
2258 None,
2259 )
2260 .expect("test metadata should be valid")
2261 }
2262
2263 impl CuMonitor for TestMonitor {
2264 fn new(_metadata: CuMonitoringMetadata, runtime: CuMonitoringRuntime) -> CuResult<Self> {
2265 let monitor = Self::new_with(TestDecision::Ignore);
2266 #[cfg(feature = "std")]
2267 let _ = runtime.execution_probe();
2268 Ok(monitor)
2269 }
2270
2271 fn process_copperlist(&self, _ctx: &CuContext, _view: CopperListView<'_>) -> CuResult<()> {
2272 self.copperlist_calls.fetch_add(1, Ordering::SeqCst);
2273 Ok(())
2274 }
2275
2276 fn process_error(
2277 &self,
2278 _component_id: ComponentId,
2279 _step: CuComponentState,
2280 _error: &CuError,
2281 ) -> Decision {
2282 match self.decision {
2283 TestDecision::Ignore => Decision::Ignore,
2284 TestDecision::Abort => Decision::Abort,
2285 TestDecision::Shutdown => Decision::Shutdown,
2286 }
2287 }
2288
2289 fn process_panic(&self, _panic_message: &str) {
2290 self.panic_calls.fetch_add(1, Ordering::SeqCst);
2291 }
2292 }
2293
2294 #[test]
2295 fn test_live_statistics_percentiles() {
2296 let mut stats = LiveStatistics::new_with_max(1000);
2297
2298 for i in 0..100 {
2300 stats.record(i);
2301 }
2302
2303 assert_eq!(stats.len(), 100);
2304 assert_eq!(stats.min(), 0);
2305 assert_eq!(stats.max(), 99);
2306 assert_eq!(stats.mean() as u64, 49); let p50 = stats.percentile(0.5);
2310 let p90 = stats.percentile(0.90);
2311 let p95 = stats.percentile(0.95);
2312 let p99 = stats.percentile(0.99);
2313
2314 assert!((p50 as i64 - 49).abs() < 5, "p50={} expected ~49", p50);
2316 assert!((p90 as i64 - 89).abs() < 5, "p90={} expected ~89", p90);
2317 assert!((p95 as i64 - 94).abs() < 5, "p95={} expected ~94", p95);
2318 assert!((p99 as i64 - 98).abs() < 5, "p99={} expected ~98", p99);
2319 }
2320
2321 #[test]
2322 fn test_duration_stats() {
2323 let mut stats = CuDurationStatistics::new(CuDuration(1000));
2324 stats.record(CuDuration(100));
2325 stats.record(CuDuration(200));
2326 stats.record(CuDuration(500));
2327 stats.record(CuDuration(400));
2328 assert_eq!(stats.min(), CuDuration(100));
2329 assert_eq!(stats.max(), CuDuration(500));
2330 assert_eq!(stats.mean(), CuDuration(300));
2331 assert_eq!(stats.len(), 4);
2332 assert_eq!(stats.jitter.len(), 3);
2333 assert_eq!(stats.jitter_min(), CuDuration(100));
2334 assert_eq!(stats.jitter_max(), CuDuration(300));
2335 assert_eq!(stats.jitter_mean(), CuDuration((100 + 300 + 100) / 3));
2336 stats.reset();
2337 assert_eq!(stats.len(), 0);
2338 }
2339
2340 #[test]
2341 fn test_duration_stats_large_samples_do_not_overflow() {
2342 let mut stats = CuDurationStatistics::new(CuDuration(10_000_000_000));
2343 stats.record(CuDuration(5_000_000_000));
2344 stats.record(CuDuration(8_000_000_000));
2345
2346 assert_eq!(stats.min(), CuDuration(5_000_000_000));
2347 assert_eq!(stats.max(), CuDuration(8_000_000_000));
2348 assert_eq!(stats.mean(), CuDuration(6_500_000_000));
2349 assert!(stats.stddev().as_nanos().abs_diff(1_500_000_000) <= 1);
2350 assert_eq!(stats.jitter_mean(), CuDuration(3_000_000_000));
2351 }
2352
2353 #[test]
2354 fn tuple_monitor_merges_contradictory_decisions_with_strictest_wins() {
2355 let err = CuError::from("boom");
2356
2357 let two = (
2358 TestMonitor::new_with(TestDecision::Ignore),
2359 TestMonitor::new_with(TestDecision::Shutdown),
2360 );
2361 assert!(matches!(
2362 two.process_error(ComponentId::new(0), CuComponentState::Process, &err),
2363 Decision::Shutdown
2364 ));
2365
2366 let two = (
2367 TestMonitor::new_with(TestDecision::Ignore),
2368 TestMonitor::new_with(TestDecision::Abort),
2369 );
2370 assert!(matches!(
2371 two.process_error(ComponentId::new(0), CuComponentState::Process, &err),
2372 Decision::Abort
2373 ));
2374 }
2375
2376 #[test]
2377 fn tuple_monitor_fans_out_callbacks() {
2378 let monitors = <(TestMonitor, TestMonitor) as CuMonitor>::new(
2379 test_metadata(),
2380 CuMonitoringRuntime::unavailable(),
2381 )
2382 .expect("tuple new");
2383 let (ctx, _clock_control) = CuContext::new_mock_clock();
2384 let empty_view = test_metadata().layout().view(&[]);
2385 monitors
2386 .process_copperlist(&ctx, empty_view)
2387 .expect("process_copperlist should fan out");
2388 monitors.process_panic("panic marker");
2389
2390 assert_eq!(monitors.0.copperlist_calls.load(Ordering::SeqCst), 1);
2391 assert_eq!(monitors.1.copperlist_calls.load(Ordering::SeqCst), 1);
2392 assert_eq!(monitors.0.panic_calls.load(Ordering::SeqCst), 1);
2393 assert_eq!(monitors.1.panic_calls.load(Ordering::SeqCst), 1);
2394 }
2395
2396 fn encoded_size<E: Encode>(value: &E) -> usize {
2397 let mut encoder = EncoderImpl::<_, _>::new(SizeWriter::default(), standard());
2398 value
2399 .encode(&mut encoder)
2400 .expect("size measurement encoder should not fail");
2401 encoder.into_writer().bytes_written
2402 }
2403
2404 #[test]
2405 fn payload_io_stats_tracks_encode_path_size_for_plain_payloads() {
2406 let payload = vec![1u8, 2, 3, 4];
2407 let io = payload_io_stats(&payload).expect("payload IO measurement should succeed");
2408
2409 assert_eq!(io.encoded_bytes, encoded_size(&payload));
2410 assert_eq!(io.resident_bytes, core::mem::size_of::<Vec<u8>>());
2411 assert_eq!(io.handle_bytes, 0);
2412 }
2413
2414 #[test]
2415 fn payload_io_stats_tracks_handle_backed_storage() {
2416 let payload = crate::pool::CuHandle::new_detached(vec![0u8; 32]);
2417 let io = payload_io_stats(&payload).expect("payload IO measurement should succeed");
2418
2419 assert_eq!(io.encoded_bytes, encoded_size(&payload));
2420 assert_eq!(
2421 io.resident_bytes,
2422 core::mem::size_of::<crate::pool::CuHandle<Vec<u8>>>() + 32
2423 );
2424 assert_eq!(io.handle_bytes, 32);
2425 }
2426
2427 #[test]
2428 fn runtime_execution_probe_roundtrip_marker() {
2429 let probe = RuntimeExecutionProbe::default();
2430 assert!(probe.marker().is_none());
2431 assert_eq!(probe.sequence(), 0);
2432
2433 probe.record(ExecutionMarker {
2434 component_id: ComponentId::new(7),
2435 step: CuComponentState::Process,
2436 culistid: Some(42),
2437 });
2438
2439 let marker = probe.marker().expect("marker should be available");
2440 assert_eq!(marker.component_id, ComponentId::new(7));
2441 assert!(matches!(marker.step, CuComponentState::Process));
2442 assert_eq!(marker.culistid, Some(42));
2443 assert_eq!(probe.sequence(), 1);
2444 }
2445}