1use crate::app::Subsystem;
6use crate::config::{ComponentConfig, CuDirection, DEFAULT_KEYFRAME_INTERVAL, Node};
7use crate::config::{CuConfig, CuGraph, MAX_RATE_TARGET_HZ, NodeId, RuntimeConfig};
8use crate::copperlist::{CopperList, CopperListState, CuListZeroedInit, CuListsManager};
9use crate::cutask::{BincodeAdapter, Freezable};
10#[cfg(feature = "std")]
11use crate::monitoring::ExecutionProbeHandle;
12#[cfg(feature = "std")]
13use crate::monitoring::MonitorExecutionProbe;
14use crate::monitoring::{
15 ComponentId, CopperListInfo, CuMonitor, CuMonitoringMetadata, CuMonitoringRuntime,
16 ExecutionMarker, MonitorComponentMetadata, RuntimeExecutionProbe, build_monitor_topology,
17 take_last_completed_handle_bytes,
18};
19#[cfg(all(feature = "std", feature = "parallel-rt"))]
20use crate::parallel_rt::{ParallelRt, ParallelRtMetadata};
21use crate::resource::ResourceManager;
22use compact_str::CompactString;
23use cu29_clock::{ClockProvider, CuDuration, CuTime, RobotClock};
24use cu29_traits::CuResult;
25use cu29_traits::WriteStream;
26use cu29_traits::{CopperListTuple, CuError};
27
28#[cfg(target_os = "none")]
29#[allow(unused_imports)]
30use cu29_log::{ANONYMOUS, CuLogEntry, CuLogLevel};
31#[cfg(target_os = "none")]
32#[allow(unused_imports)]
33use cu29_log_derive::info;
34#[cfg(target_os = "none")]
35#[allow(unused_imports)]
36use cu29_log_runtime::log;
37#[cfg(all(target_os = "none", debug_assertions))]
38#[allow(unused_imports)]
39use cu29_log_runtime::log_debug_mode;
40#[cfg(target_os = "none")]
41#[allow(unused_imports)]
42use cu29_value::to_value;
43
44#[cfg(all(feature = "std", any(feature = "async-cl-io", feature = "parallel-rt")))]
45use alloc::alloc::{alloc_zeroed, handle_alloc_error};
46use alloc::boxed::Box;
47use alloc::collections::{BTreeSet, VecDeque};
48use alloc::format;
49use alloc::string::{String, ToString};
50use alloc::vec::Vec;
51use bincode::enc::EncoderImpl;
52use bincode::enc::write::{SizeWriter, SliceWriter};
53use bincode::error::EncodeError;
54use bincode::{Decode, Encode};
55#[cfg(all(feature = "std", any(feature = "async-cl-io", feature = "parallel-rt")))]
56use core::alloc::Layout;
57use core::fmt::Result as FmtResult;
58use core::fmt::{Debug, Formatter};
59use core::marker::PhantomData;
60
61#[cfg(all(feature = "std", feature = "async-cl-io"))]
62use std::sync::mpsc::{Receiver, SyncSender, TryRecvError, sync_channel};
63#[cfg(all(feature = "std", feature = "async-cl-io"))]
64use std::thread::JoinHandle;
65
66pub type TasksInstantiator<CT> =
67 for<'c> fn(Vec<Option<&'c ComponentConfig>>, &mut ResourceManager) -> CuResult<CT>;
68pub type BridgesInstantiator<CB> = fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>;
69pub type MonitorInstantiator<M> = fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M;
70
71pub struct CuRuntimeParts<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize, TI, BI, MI> {
72 pub tasks_instanciator: TI,
73 pub monitored_components: &'static [MonitorComponentMetadata],
74 pub culist_component_mapping: &'static [ComponentId],
75 #[cfg(all(feature = "std", feature = "parallel-rt"))]
76 pub parallel_rt_metadata: &'static ParallelRtMetadata,
77 pub monitor_instanciator: MI,
78 pub bridges_instanciator: BI,
79 _payload: PhantomData<(CT, CB, P, M, [(); NBCL])>,
80}
81
82impl<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize, TI, BI, MI>
83 CuRuntimeParts<CT, CB, P, M, NBCL, TI, BI, MI>
84{
85 pub const fn new(
86 tasks_instanciator: TI,
87 monitored_components: &'static [MonitorComponentMetadata],
88 culist_component_mapping: &'static [ComponentId],
89 #[cfg(all(feature = "std", feature = "parallel-rt"))]
90 parallel_rt_metadata: &'static ParallelRtMetadata,
91 monitor_instanciator: MI,
92 bridges_instanciator: BI,
93 ) -> Self {
94 Self {
95 tasks_instanciator,
96 monitored_components,
97 culist_component_mapping,
98 #[cfg(all(feature = "std", feature = "parallel-rt"))]
99 parallel_rt_metadata,
100 monitor_instanciator,
101 bridges_instanciator,
102 _payload: PhantomData,
103 }
104 }
105}
106
107pub struct CuRuntimeBuilder<
108 'cfg,
109 CT,
110 CB,
111 P: CopperListTuple,
112 M: CuMonitor,
113 const NBCL: usize,
114 TI,
115 BI,
116 MI,
117 CLW,
118 KFW,
119> {
120 clock: RobotClock,
121 config: &'cfg CuConfig,
122 mission: &'cfg str,
123 subsystem: Subsystem,
124 instance_id: u32,
125 resources: Option<ResourceManager>,
126 parts: CuRuntimeParts<CT, CB, P, M, NBCL, TI, BI, MI>,
127 copperlists_logger: CLW,
128 keyframes_logger: KFW,
129}
130
131impl<'cfg, CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize, TI, BI, MI, CLW, KFW>
132 CuRuntimeBuilder<'cfg, CT, CB, P, M, NBCL, TI, BI, MI, CLW, KFW>
133{
134 pub fn new(
135 clock: RobotClock,
136 config: &'cfg CuConfig,
137 mission: &'cfg str,
138 parts: CuRuntimeParts<CT, CB, P, M, NBCL, TI, BI, MI>,
139 copperlists_logger: CLW,
140 keyframes_logger: KFW,
141 ) -> Self {
142 Self {
143 clock,
144 config,
145 mission,
146 subsystem: Subsystem::new(None, 0),
147 instance_id: 0,
148 resources: None,
149 parts,
150 copperlists_logger,
151 keyframes_logger,
152 }
153 }
154
155 pub fn with_subsystem(mut self, subsystem: Subsystem) -> Self {
156 self.subsystem = subsystem;
157 self
158 }
159
160 pub fn with_instance_id(mut self, instance_id: u32) -> Self {
161 self.instance_id = instance_id;
162 self
163 }
164
165 pub fn with_resources(mut self, resources: ResourceManager) -> Self {
166 self.resources = Some(resources);
167 self
168 }
169
170 pub fn try_with_resources_instantiator(
171 mut self,
172 resources_instantiator: impl FnOnce(&CuConfig) -> CuResult<ResourceManager>,
173 ) -> CuResult<Self> {
174 self.resources = Some(resources_instantiator(self.config)?);
175 Ok(self)
176 }
177}
178
179#[inline]
190pub fn perf_now(_clock: &RobotClock) -> CuTime {
191 #[cfg(all(feature = "std", feature = "sysclock-perf"))]
192 {
193 static PERF_CLOCK: std::sync::OnceLock<RobotClock> = std::sync::OnceLock::new();
194 return PERF_CLOCK.get_or_init(RobotClock::new).now();
195 }
196
197 #[allow(unreachable_code)]
198 _clock.now()
199}
200
201#[cfg(all(feature = "std", feature = "high-precision-limiter"))]
202const HIGH_PRECISION_LIMITER_SPIN_WINDOW_NS: u64 = 200_000;
203
204#[inline]
206pub fn rate_target_period(rate_target_hz: u64) -> CuResult<CuDuration> {
207 if rate_target_hz == 0 {
208 return Err(CuError::from(
209 "Runtime rate target cannot be zero. Set runtime.rate_target_hz to at least 1.",
210 ));
211 }
212
213 if rate_target_hz > MAX_RATE_TARGET_HZ {
214 return Err(CuError::from(format!(
215 "Runtime rate target ({rate_target_hz} Hz) exceeds the supported maximum of {MAX_RATE_TARGET_HZ} Hz."
216 )));
217 }
218
219 Ok(CuDuration::from(MAX_RATE_TARGET_HZ / rate_target_hz))
220}
221
222#[derive(Clone, Copy, Debug, PartialEq, Eq)]
229pub struct LoopRateLimiter {
230 period: CuDuration,
231 next_deadline: CuTime,
232}
233
234impl LoopRateLimiter {
235 #[inline]
236 pub fn from_rate_target_hz(rate_target_hz: u64, clock: &RobotClock) -> CuResult<Self> {
237 let period = rate_target_period(rate_target_hz)?;
238 Ok(Self {
239 period,
240 next_deadline: clock.now() + period,
241 })
242 }
243
244 #[inline]
245 pub fn is_ready(&self, clock: &RobotClock) -> bool {
246 self.remaining(clock).is_none()
247 }
248
249 #[inline]
250 pub fn remaining(&self, clock: &RobotClock) -> Option<CuDuration> {
251 let now = clock.now();
252 if now < self.next_deadline {
253 Some(self.next_deadline - now)
254 } else {
255 None
256 }
257 }
258
259 #[inline]
260 pub fn wait_until_ready(&self, clock: &RobotClock) {
261 let deadline = self.next_deadline;
262 let Some(remaining) = self.remaining(clock) else {
263 return;
264 };
265
266 #[cfg(all(feature = "std", feature = "high-precision-limiter"))]
267 {
268 let spin_window = self.spin_window();
269 if remaining > spin_window {
270 std::thread::sleep(std::time::Duration::from(remaining - spin_window));
271 }
272 while clock.now() < deadline {
273 core::hint::spin_loop();
274 }
275 }
276
277 #[cfg(all(feature = "std", not(feature = "high-precision-limiter")))]
278 {
279 let _ = deadline;
280 std::thread::sleep(std::time::Duration::from(remaining));
281 }
282
283 #[cfg(not(feature = "std"))]
284 {
285 let _ = remaining;
286 while clock.now() < deadline {
287 core::hint::spin_loop();
288 }
289 }
290 }
291
292 #[inline]
293 pub fn mark_tick(&mut self, clock: &RobotClock) {
294 self.advance_from(clock.now());
295 }
296
297 #[inline]
298 pub fn limit(&mut self, clock: &RobotClock) {
299 self.wait_until_ready(clock);
300 self.mark_tick(clock);
301 }
302
303 #[inline]
304 fn advance_from(&mut self, now: CuTime) {
305 let steps = if now < self.next_deadline {
306 1
307 } else {
308 (now - self.next_deadline).as_nanos() / self.period.as_nanos() + 1
309 };
310 self.next_deadline += steps * self.period;
311 }
312
313 #[cfg(all(feature = "std", feature = "high-precision-limiter"))]
314 #[inline]
315 fn spin_window(&self) -> CuDuration {
316 let _ = self.period;
317 CuDuration::from(HIGH_PRECISION_LIMITER_SPIN_WINDOW_NS)
318 }
319
320 #[cfg(test)]
321 #[inline]
322 fn next_deadline(&self) -> CuTime {
323 self.next_deadline
324 }
325}
326
327#[cfg(all(feature = "std", feature = "async-cl-io"))]
328#[doc(hidden)]
329pub trait AsyncCopperListPayload: Send {}
330
331#[cfg(all(feature = "std", feature = "async-cl-io"))]
332impl<T: Send> AsyncCopperListPayload for T {}
333
334#[cfg(not(all(feature = "std", feature = "async-cl-io")))]
335#[doc(hidden)]
336pub trait AsyncCopperListPayload {}
337
338#[cfg(not(all(feature = "std", feature = "async-cl-io")))]
339impl<T> AsyncCopperListPayload for T {}
340
341#[derive(Clone, Copy, Debug, PartialEq, Eq)]
348pub enum ProcessStepOutcome {
349 Continue,
350 AbortCopperList,
351}
352
353pub type ProcessStepResult = CuResult<ProcessStepOutcome>;
355
356#[cfg(feature = "remote-debug")]
357fn encode_completed_copperlist_snapshot<P: CopperListTuple>(
358 cl: &CopperList<P>,
359) -> CuResult<Vec<u8>> {
360 bincode::encode_to_vec(cl, bincode::config::standard())
361 .map_err(|e| CuError::new_with_cause("Failed to encode completed CopperList snapshot", e))
362}
363
364pub struct SyncCopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
366 inner: CuListsManager<P, NBCL>,
367 logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
369 #[cfg(feature = "remote-debug")]
371 last_completed_encoded: Option<Vec<u8>>,
372 pub last_encoded_bytes: u64,
374 pub last_handle_bytes: u64,
376}
377
378impl<P: CopperListTuple + Default, const NBCL: usize> SyncCopperListsManager<P, NBCL> {
379 pub fn new(logger: Option<Box<dyn WriteStream<CopperList<P>>>>) -> CuResult<Self>
380 where
381 P: CuListZeroedInit,
382 {
383 Ok(Self {
384 inner: CuListsManager::new(),
385 logger,
386 #[cfg(feature = "remote-debug")]
387 last_completed_encoded: None,
388 last_encoded_bytes: 0,
389 last_handle_bytes: 0,
390 })
391 }
392
393 pub fn next_cl_id(&self) -> u64 {
394 self.inner.next_cl_id()
395 }
396
397 pub fn last_cl_id(&self) -> u64 {
398 self.inner.last_cl_id()
399 }
400
401 pub fn peek(&self) -> Option<&CopperList<P>> {
402 self.inner.peek()
403 }
404
405 #[cfg(feature = "remote-debug")]
406 pub fn last_completed_encoded(&self) -> Option<&[u8]> {
407 self.last_completed_encoded.as_deref()
408 }
409
410 #[cfg(not(feature = "remote-debug"))]
411 pub fn last_completed_encoded(&self) -> Option<&[u8]> {
412 None
413 }
414
415 #[cfg(feature = "remote-debug")]
416 pub fn set_last_completed_encoded(&mut self, snapshot: Option<Vec<u8>>) {
417 self.last_completed_encoded = snapshot;
418 }
419
420 #[cfg(not(feature = "remote-debug"))]
421 pub fn set_last_completed_encoded(&mut self, _snapshot: Option<Vec<u8>>) {}
422
423 pub fn create(&mut self) -> CuResult<&mut CopperList<P>> {
424 self.inner
425 .create()
426 .ok_or_else(|| CuError::from("Ran out of space for copper lists"))
427 }
428
429 pub fn end_of_processing(&mut self, culistid: u64) -> CuResult<()> {
430 let mut is_top = true;
431 let mut nb_done = 0;
432 self.last_handle_bytes = 0;
433 #[cfg(feature = "remote-debug")]
434 let last_completed_encoded = &mut self.last_completed_encoded;
435 for cl in self.inner.iter_mut() {
436 if cl.id == culistid && cl.get_state() == CopperListState::Processing {
437 cl.change_state(CopperListState::DoneProcessing);
438 match () {
439 #[cfg(feature = "remote-debug")]
440 () => {
441 *last_completed_encoded = Some(encode_completed_copperlist_snapshot(cl)?);
442 }
443 #[cfg(not(feature = "remote-debug"))]
444 () => {}
445 }
446 }
447 if is_top && cl.get_state() == CopperListState::DoneProcessing {
448 if let Some(logger) = &mut self.logger {
449 cl.change_state(CopperListState::BeingSerialized);
450 logger.log(cl)?;
451 self.last_encoded_bytes = logger.last_log_bytes().unwrap_or(0) as u64;
452 self.last_handle_bytes = take_last_completed_handle_bytes();
453 }
454 cl.change_state(CopperListState::Free);
455 nb_done += 1;
456 } else {
457 is_top = false;
458 }
459 }
460 for _ in 0..nb_done {
461 let _ = self.inner.pop();
462 }
463 Ok(())
464 }
465
466 pub fn finish_pending(&mut self) -> CuResult<()> {
467 Ok(())
468 }
469
470 pub fn available_copper_lists(&mut self) -> CuResult<usize> {
471 Ok(NBCL - self.inner.len())
472 }
473
474 #[cfg(feature = "std")]
475 pub fn end_of_processing_boxed(
476 &mut self,
477 mut culist: Box<CopperList<P>>,
478 ) -> CuResult<OwnedCopperListSubmission<P>> {
479 culist.change_state(CopperListState::DoneProcessing);
480 self.last_encoded_bytes = 0;
481 self.last_handle_bytes = 0;
482 if let Some(logger) = &mut self.logger {
483 culist.change_state(CopperListState::BeingSerialized);
484 logger.log(&culist)?;
485 self.last_encoded_bytes = logger.last_log_bytes().unwrap_or(0) as u64;
486 self.last_handle_bytes = take_last_completed_handle_bytes();
487 }
488 culist.change_state(CopperListState::Free);
489 Ok(OwnedCopperListSubmission::Recycled(culist))
490 }
491
492 #[cfg(feature = "std")]
493 pub fn try_reclaim_boxed(&mut self) -> CuResult<Option<Box<CopperList<P>>>> {
494 Ok(None)
495 }
496
497 #[cfg(feature = "std")]
498 pub fn wait_reclaim_boxed(&mut self) -> CuResult<Box<CopperList<P>>> {
499 Err(CuError::from(
500 "Synchronous CopperList I/O cannot block waiting for boxed completions",
501 ))
502 }
503
504 #[cfg(feature = "std")]
505 pub fn finish_pending_boxed(&mut self) -> CuResult<Vec<Box<CopperList<P>>>> {
506 Ok(Vec::new())
507 }
508}
509
510#[cfg(feature = "std")]
512pub enum OwnedCopperListSubmission<P: CopperListTuple> {
513 Recycled(Box<CopperList<P>>),
515 Pending,
517}
518
519#[cfg(all(feature = "std", feature = "async-cl-io"))]
520struct AsyncCopperListCompletion<P: CopperListTuple> {
521 culist: Box<CopperList<P>>,
522 log_result: CuResult<(u64, u64)>,
523}
524
525#[cfg(all(feature = "std", any(feature = "async-cl-io", feature = "parallel-rt")))]
526fn allocate_zeroed_copperlist<P>() -> Box<CopperList<P>>
527where
528 P: CopperListTuple + CuListZeroedInit,
529{
530 let mut culist = unsafe {
532 let layout = Layout::new::<CopperList<P>>();
533 let ptr = alloc_zeroed(layout) as *mut CopperList<P>;
534 if ptr.is_null() {
535 handle_alloc_error(layout);
536 }
537 Box::from_raw(ptr)
538 };
539 culist.msgs.init_zeroed();
540 culist
541}
542
543#[cfg(all(feature = "std", feature = "parallel-rt"))]
544pub fn allocate_boxed_copperlists<P, const NBCL: usize>() -> Vec<Box<CopperList<P>>>
545where
546 P: CopperListTuple + CuListZeroedInit,
547{
548 let mut free_pool = Vec::with_capacity(NBCL);
549 for _ in 0..NBCL {
550 free_pool.push(allocate_zeroed_copperlist::<P>());
551 }
552 free_pool
553}
554
555#[cfg(all(feature = "std", feature = "async-cl-io"))]
557pub struct AsyncCopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
558 free_pool: Vec<Box<CopperList<P>>>,
559 current: Option<Box<CopperList<P>>>,
560 #[cfg(feature = "remote-debug")]
561 last_completed_encoded: Option<Vec<u8>>,
562 pending_count: usize,
563 next_cl_id: u64,
564 pending_sender: Option<SyncSender<Box<CopperList<P>>>>,
565 completion_receiver: Option<Receiver<AsyncCopperListCompletion<P>>>,
566 worker_handle: Option<JoinHandle<()>>,
567 pub last_encoded_bytes: u64,
569 pub last_handle_bytes: u64,
571}
572
573#[cfg(all(feature = "std", feature = "async-cl-io"))]
574impl<P: CopperListTuple + Default, const NBCL: usize> AsyncCopperListsManager<P, NBCL> {
575 pub fn new(logger: Option<Box<dyn WriteStream<CopperList<P>>>>) -> CuResult<Self>
576 where
577 P: CuListZeroedInit + AsyncCopperListPayload + 'static,
578 {
579 let mut free_pool = Vec::with_capacity(NBCL);
580 for _ in 0..NBCL {
581 free_pool.push(allocate_zeroed_copperlist::<P>());
582 }
583
584 let (pending_sender, completion_receiver, worker_handle) = if let Some(mut logger) = logger
585 {
586 let (pending_sender, pending_receiver) = sync_channel::<Box<CopperList<P>>>(NBCL);
587 let (completion_sender, completion_receiver) =
588 sync_channel::<AsyncCopperListCompletion<P>>(NBCL);
589 let worker_handle = std::thread::Builder::new()
590 .name("cu-async-cl-io".to_string())
591 .spawn(move || {
592 while let Ok(mut culist) = pending_receiver.recv() {
593 culist.change_state(CopperListState::BeingSerialized);
594 let log_result = logger.log(&culist).map(|_| {
595 (
596 logger.last_log_bytes().unwrap_or(0) as u64,
597 take_last_completed_handle_bytes(),
598 )
599 });
600 let should_stop = log_result.is_err();
601 if completion_sender
602 .send(AsyncCopperListCompletion { culist, log_result })
603 .is_err()
604 {
605 break;
606 }
607 if should_stop {
608 break;
609 }
610 }
611 })
612 .map_err(|e| {
613 CuError::from("Failed to spawn async CopperList serializer thread")
614 .add_cause(e.to_string().as_str())
615 })?;
616 (
617 Some(pending_sender),
618 Some(completion_receiver),
619 Some(worker_handle),
620 )
621 } else {
622 (None, None, None)
623 };
624
625 Ok(Self {
626 free_pool,
627 current: None,
628 #[cfg(feature = "remote-debug")]
629 last_completed_encoded: None,
630 pending_count: 0,
631 next_cl_id: 0,
632 pending_sender,
633 completion_receiver,
634 worker_handle,
635 last_encoded_bytes: 0,
636 last_handle_bytes: 0,
637 })
638 }
639
640 pub fn next_cl_id(&self) -> u64 {
641 self.next_cl_id
642 }
643
644 pub fn last_cl_id(&self) -> u64 {
645 self.next_cl_id.saturating_sub(1)
646 }
647
648 pub fn peek(&self) -> Option<&CopperList<P>> {
649 self.current.as_deref()
650 }
651
652 #[cfg(feature = "remote-debug")]
653 pub fn last_completed_encoded(&self) -> Option<&[u8]> {
654 self.last_completed_encoded.as_deref()
655 }
656
657 #[cfg(not(feature = "remote-debug"))]
658 pub fn last_completed_encoded(&self) -> Option<&[u8]> {
659 None
660 }
661
662 #[cfg(feature = "remote-debug")]
663 pub fn set_last_completed_encoded(&mut self, snapshot: Option<Vec<u8>>) {
664 self.last_completed_encoded = snapshot;
665 }
666
667 #[cfg(not(feature = "remote-debug"))]
668 pub fn set_last_completed_encoded(&mut self, _snapshot: Option<Vec<u8>>) {}
669
670 pub fn create(&mut self) -> CuResult<&mut CopperList<P>> {
671 if self.current.is_some() {
672 return Err(CuError::from(
673 "Attempted to create a CopperList while another one is still active",
674 ));
675 }
676
677 self.reclaim_completed()?;
678 while self.free_pool.is_empty() {
679 self.wait_for_completion()?;
680 }
681
682 let culist = self
683 .free_pool
684 .pop()
685 .ok_or_else(|| CuError::from("Ran out of space for copper lists"))?;
686 self.current = Some(culist);
687
688 let current = self
689 .current
690 .as_mut()
691 .expect("current CopperList is missing");
692 current.id = self.next_cl_id;
693 current.change_state(CopperListState::Initialized);
694 self.next_cl_id += 1;
695 Ok(current.as_mut())
696 }
697
698 #[cfg(feature = "remote-debug")]
699 fn capture_completed_snapshot(&mut self, cl: &CopperList<P>) -> CuResult<()> {
700 self.last_completed_encoded = Some(encode_completed_copperlist_snapshot(cl)?);
701 Ok(())
702 }
703
704 #[cfg(not(feature = "remote-debug"))]
705 fn capture_completed_snapshot(&mut self, _cl: &CopperList<P>) -> CuResult<()> {
706 Ok(())
707 }
708
709 pub fn end_of_processing(&mut self, culistid: u64) -> CuResult<()> {
710 self.reclaim_completed()?;
711
712 let mut culist = self.current.take().ok_or_else(|| {
713 CuError::from("Attempted to finish processing without an active CopperList")
714 })?;
715
716 if culist.id != culistid {
717 return Err(CuError::from(format!(
718 "Attempted to finish CopperList #{culistid} while CopperList #{} is active",
719 culist.id
720 )));
721 }
722
723 culist.change_state(CopperListState::DoneProcessing);
724 self.capture_completed_snapshot(&culist)?;
725 self.last_encoded_bytes = 0;
726 self.last_handle_bytes = 0;
727
728 if let Some(pending_sender) = &self.pending_sender {
729 culist.change_state(CopperListState::QueuedForSerialization);
730 pending_sender.send(culist).map_err(|e| {
731 CuError::from("Failed to enqueue CopperList for async serialization")
732 .add_cause(e.to_string().as_str())
733 })?;
734 self.pending_count += 1;
735 self.reclaim_completed()?;
736 } else {
737 culist.change_state(CopperListState::Free);
738 self.free_pool.push(culist);
739 }
740
741 Ok(())
742 }
743
744 pub fn finish_pending(&mut self) -> CuResult<()> {
745 if self.current.is_some() {
746 return Err(CuError::from(
747 "Cannot flush CopperList I/O while a CopperList is still active",
748 ));
749 }
750
751 while self.pending_count > 0 {
752 self.wait_for_completion()?;
753 }
754 Ok(())
755 }
756
757 pub fn available_copper_lists(&mut self) -> CuResult<usize> {
758 self.reclaim_completed()?;
759 Ok(self.free_pool.len())
760 }
761
762 pub fn end_of_processing_boxed(
763 &mut self,
764 mut culist: Box<CopperList<P>>,
765 ) -> CuResult<OwnedCopperListSubmission<P>> {
766 self.reclaim_completed()?;
767 culist.change_state(CopperListState::DoneProcessing);
768 self.capture_completed_snapshot(&culist)?;
769 self.last_encoded_bytes = 0;
770 self.last_handle_bytes = 0;
771
772 if let Some(pending_sender) = &self.pending_sender {
773 culist.change_state(CopperListState::QueuedForSerialization);
774 pending_sender.send(culist).map_err(|e| {
775 CuError::from("Failed to enqueue CopperList for async serialization")
776 .add_cause(e.to_string().as_str())
777 })?;
778 self.pending_count += 1;
779 self.reclaim_completed()?;
780 Ok(OwnedCopperListSubmission::Pending)
781 } else {
782 culist.change_state(CopperListState::Free);
783 Ok(OwnedCopperListSubmission::Recycled(culist))
784 }
785 }
786
787 pub fn try_reclaim_boxed(&mut self) -> CuResult<Option<Box<CopperList<P>>>> {
788 let recv_result = {
789 let Some(completion_receiver) = self.completion_receiver.as_ref() else {
790 return Ok(None);
791 };
792 completion_receiver.try_recv()
793 };
794 match recv_result {
795 Ok(completion) => self.handle_completion(completion).map(Some),
796 Err(TryRecvError::Empty) => Ok(None),
797 Err(TryRecvError::Disconnected) => Err(CuError::from(
798 "Async CopperList serializer thread disconnected unexpectedly",
799 )),
800 }
801 }
802
803 pub fn wait_reclaim_boxed(&mut self) -> CuResult<Box<CopperList<P>>> {
804 let completion = self
805 .completion_receiver
806 .as_ref()
807 .ok_or_else(|| {
808 CuError::from("No async CopperList serializer is active to return a free slot")
809 })?
810 .recv()
811 .map_err(|e| {
812 CuError::from("Failed to receive completion from async CopperList serializer")
813 .add_cause(e.to_string().as_str())
814 })?;
815 self.handle_completion(completion)
816 }
817
818 pub fn finish_pending_boxed(&mut self) -> CuResult<Vec<Box<CopperList<P>>>> {
819 let mut reclaimed = Vec::with_capacity(self.pending_count);
820 if self.current.is_some() {
821 return Err(CuError::from(
822 "Cannot flush CopperList I/O while a CopperList is still active",
823 ));
824 }
825 while self.pending_count > 0 {
826 reclaimed.push(self.wait_reclaim_boxed()?);
827 }
828 Ok(reclaimed)
829 }
830
831 fn reclaim_completed(&mut self) -> CuResult<()> {
832 loop {
833 let Some(culist) = self.try_reclaim_boxed()? else {
834 break;
835 };
836 self.free_pool.push(culist);
837 }
838 Ok(())
839 }
840
841 fn wait_for_completion(&mut self) -> CuResult<()> {
842 let culist = self.wait_reclaim_boxed()?;
843 self.free_pool.push(culist);
844 Ok(())
845 }
846
847 fn handle_completion(
848 &mut self,
849 mut completion: AsyncCopperListCompletion<P>,
850 ) -> CuResult<Box<CopperList<P>>> {
851 self.pending_count = self.pending_count.saturating_sub(1);
852 if let Ok((encoded_bytes, handle_bytes)) = completion.log_result.as_ref() {
853 self.last_encoded_bytes = *encoded_bytes;
854 self.last_handle_bytes = *handle_bytes;
855 }
856 completion.culist.change_state(CopperListState::Free);
857 completion.log_result?;
858 Ok(completion.culist)
859 }
860
861 fn shutdown_worker(&mut self) -> CuResult<()> {
862 self.finish_pending()?;
863 self.pending_sender.take();
864 if let Some(worker_handle) = self.worker_handle.take() {
865 worker_handle.join().map_err(|_| {
866 CuError::from("Async CopperList serializer thread panicked while joining")
867 })?;
868 }
869 Ok(())
870 }
871}
872
873#[cfg(all(feature = "std", feature = "async-cl-io"))]
874impl<P: CopperListTuple + Default, const NBCL: usize> Drop for AsyncCopperListsManager<P, NBCL> {
875 fn drop(&mut self) {
876 let _ = self.shutdown_worker();
877 }
878}
879
880#[cfg(all(feature = "std", feature = "async-cl-io"))]
881pub type CopperListsManager<P, const NBCL: usize> = AsyncCopperListsManager<P, NBCL>;
882
883#[cfg(not(all(feature = "std", feature = "async-cl-io")))]
884pub type CopperListsManager<P, const NBCL: usize> = SyncCopperListsManager<P, NBCL>;
885
886pub struct KeyFramesManager {
888 inner: KeyFrame,
890
891 forced_timestamp: Option<CuTime>,
893
894 locked: bool,
896
897 logger: Option<Box<dyn WriteStream<KeyFrame>>>,
899
900 keyframe_interval: u32,
902
903 pub last_encoded_bytes: u64,
905}
906
907impl KeyFramesManager {
908 fn is_keyframe(&self, culistid: u64) -> bool {
909 self.logger.is_some() && culistid.is_multiple_of(self.keyframe_interval as u64)
910 }
911
912 #[inline]
913 pub fn captures_keyframe(&self, culistid: u64) -> bool {
914 self.is_keyframe(culistid)
915 }
916
917 pub fn reset(&mut self, culistid: u64, clock: &RobotClock) {
918 if self.is_keyframe(culistid) {
919 if self.locked && self.inner.culistid == culistid {
921 return;
922 }
923 let ts = self.forced_timestamp.take().unwrap_or_else(|| clock.now());
924 self.inner.reset(culistid, ts);
925 self.locked = false;
926 }
927 }
928
929 #[cfg(feature = "std")]
931 pub fn set_forced_timestamp(&mut self, ts: CuTime) {
932 self.forced_timestamp = Some(ts);
933 }
934
935 pub fn freeze_task(&mut self, culistid: u64, task: &impl Freezable) -> CuResult<usize> {
936 if self.is_keyframe(culistid) {
937 if self.locked {
938 return Ok(0);
940 }
941 if self.inner.culistid != culistid {
942 return Err(CuError::from(format!(
943 "Freezing task for culistid {} but current keyframe is {}",
944 culistid, self.inner.culistid
945 )));
946 }
947 self.inner
948 .add_frozen_task(task)
949 .map_err(|e| CuError::from(format!("Failed to serialize task: {e}")))
950 } else {
951 Ok(0)
952 }
953 }
954
955 pub fn freeze_any(&mut self, culistid: u64, item: &impl Freezable) -> CuResult<usize> {
957 self.freeze_task(culistid, item)
958 }
959
960 pub fn end_of_processing(&mut self, culistid: u64) -> CuResult<()> {
961 if self.is_keyframe(culistid) {
962 let logger = self.logger.as_mut().unwrap();
963 logger.log(&self.inner)?;
964 self.last_encoded_bytes = logger.last_log_bytes().unwrap_or(0) as u64;
965 self.locked = false;
967 Ok(())
968 } else {
969 self.last_encoded_bytes = 0;
971 Ok(())
972 }
973 }
974
975 #[cfg(feature = "std")]
977 pub fn lock_keyframe(&mut self, keyframe: &KeyFrame) {
978 self.inner = keyframe.clone();
979 self.forced_timestamp = Some(keyframe.timestamp);
980 self.locked = true;
981 }
982}
983
984pub struct CuRuntime<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
988 pub clock: RobotClock, subsystem_code: u16,
993
994 pub instance_id: u32,
996
997 pub tasks: CT,
999
1000 pub bridges: CB,
1002
1003 pub resources: ResourceManager,
1005
1006 pub monitor: M,
1008
1009 #[cfg(feature = "std")]
1015 pub execution_probe: ExecutionProbeHandle,
1016 #[cfg(not(feature = "std"))]
1017 pub execution_probe: RuntimeExecutionProbe,
1018
1019 pub copperlists_manager: CopperListsManager<P, NBCL>,
1021
1022 pub keyframes_manager: KeyFramesManager,
1024
1025 #[cfg(all(feature = "std", feature = "parallel-rt"))]
1027 pub parallel_rt: ParallelRt<NBCL>,
1028
1029 pub runtime_config: RuntimeConfig,
1031}
1032
1033impl<
1035 CT,
1036 CB,
1037 P: CopperListTuple + CuListZeroedInit + Default + AsyncCopperListPayload,
1038 M: CuMonitor,
1039 const NBCL: usize,
1040> ClockProvider for CuRuntime<CT, CB, P, M, NBCL>
1041{
1042 fn get_clock(&self) -> RobotClock {
1043 self.clock.clone()
1044 }
1045}
1046
1047impl<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize> CuRuntime<CT, CB, P, M, NBCL> {
1048 #[inline]
1050 pub fn clock(&self) -> RobotClock {
1051 self.clock.clone()
1052 }
1053
1054 #[inline]
1056 pub fn subsystem_code(&self) -> u16 {
1057 self.subsystem_code
1058 }
1059
1060 #[inline]
1062 pub fn instance_id(&self) -> u32 {
1063 self.instance_id
1064 }
1065}
1066
1067impl<
1068 'cfg,
1069 CT,
1070 CB,
1071 P: CopperListTuple + CuListZeroedInit + Default + AsyncCopperListPayload + 'static,
1072 M: CuMonitor,
1073 const NBCL: usize,
1074 TI,
1075 BI,
1076 MI,
1077 CLW,
1078 KFW,
1079> CuRuntimeBuilder<'cfg, CT, CB, P, M, NBCL, TI, BI, MI, CLW, KFW>
1080where
1081 TI: for<'c> Fn(Vec<Option<&'c ComponentConfig>>, &mut ResourceManager) -> CuResult<CT>,
1082 BI: Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
1083 MI: Fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M,
1084 CLW: WriteStream<CopperList<P>> + 'static,
1085 KFW: WriteStream<KeyFrame> + 'static,
1086{
1087 pub fn build(self) -> CuResult<CuRuntime<CT, CB, P, M, NBCL>> {
1088 let Self {
1089 clock,
1090 config,
1091 mission,
1092 subsystem,
1093 instance_id,
1094 resources,
1095 parts,
1096 copperlists_logger,
1097 keyframes_logger,
1098 } = self;
1099 let mut resources =
1100 resources.ok_or_else(|| CuError::from("Resources missing from CuRuntimeBuilder"))?;
1101
1102 let graph = config.get_graph(Some(mission))?;
1103 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
1104 .get_all_nodes()
1105 .iter()
1106 .map(|(_, node)| node.get_instance_config())
1107 .collect();
1108
1109 let tasks = (parts.tasks_instanciator)(all_instances_configs, &mut resources)?;
1110
1111 #[cfg(feature = "std")]
1112 let execution_probe = std::sync::Arc::new(RuntimeExecutionProbe::default());
1113 #[cfg(not(feature = "std"))]
1114 let execution_probe = RuntimeExecutionProbe::default();
1115 let monitor_metadata = CuMonitoringMetadata::new(
1116 CompactString::from(mission),
1117 parts.monitored_components,
1118 parts.culist_component_mapping,
1119 CopperListInfo::new(core::mem::size_of::<CopperList<P>>(), NBCL),
1120 build_monitor_topology(config, mission)?,
1121 None,
1122 )?
1123 .with_subsystem_id(subsystem.id())
1124 .with_instance_id(instance_id);
1125 #[cfg(feature = "std")]
1126 let monitor_runtime =
1127 CuMonitoringRuntime::new(MonitorExecutionProbe::from_shared(execution_probe.clone()));
1128 #[cfg(not(feature = "std"))]
1129 let monitor_runtime = CuMonitoringRuntime::unavailable();
1130 let monitor = (parts.monitor_instanciator)(config, monitor_metadata, monitor_runtime);
1131 let bridges = (parts.bridges_instanciator)(config, &mut resources)?;
1132
1133 let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
1134 Some(logging_config) if logging_config.enable_task_logging => (
1135 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
1136 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
1137 logging_config.keyframe_interval.unwrap(),
1138 ),
1139 Some(_) => (None, None, 0),
1140 None => (
1141 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
1142 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
1143 DEFAULT_KEYFRAME_INTERVAL,
1144 ),
1145 };
1146
1147 let copperlists_manager = CopperListsManager::new(copperlists_logger)?;
1148 #[cfg(target_os = "none")]
1149 {
1150 let cl_size = core::mem::size_of::<CopperList<P>>();
1151 let total_bytes = cl_size.saturating_mul(NBCL);
1152 info!(
1153 "CuRuntime::new: copperlists count={} cl_size={} total_bytes={}",
1154 NBCL, cl_size, total_bytes
1155 );
1156 }
1157
1158 let keyframes_manager = KeyFramesManager {
1159 inner: KeyFrame::new(),
1160 logger: keyframes_logger,
1161 keyframe_interval,
1162 last_encoded_bytes: 0,
1163 forced_timestamp: None,
1164 locked: false,
1165 };
1166 #[cfg(all(feature = "std", feature = "parallel-rt"))]
1167 let parallel_rt = ParallelRt::new(parts.parallel_rt_metadata)?;
1168
1169 let runtime_config = config.runtime.clone().unwrap_or_default();
1170 runtime_config.validate()?;
1171
1172 Ok(CuRuntime {
1173 subsystem_code: subsystem.code(),
1174 instance_id,
1175 tasks,
1176 bridges,
1177 resources,
1178 monitor,
1179 execution_probe,
1180 clock,
1181 copperlists_manager,
1182 keyframes_manager,
1183 #[cfg(all(feature = "std", feature = "parallel-rt"))]
1184 parallel_rt,
1185 runtime_config,
1186 })
1187 }
1188}
1189
1190#[derive(Clone, Encode, Decode)]
1194pub struct KeyFrame {
1195 pub culistid: u64,
1197 pub timestamp: CuTime,
1199 pub serialized_tasks: Vec<u8>,
1201}
1202
1203impl KeyFrame {
1204 fn new() -> Self {
1205 KeyFrame {
1206 culistid: 0,
1207 timestamp: CuTime::default(),
1208 serialized_tasks: Vec::new(),
1209 }
1210 }
1211
1212 fn reset(&mut self, culistid: u64, timestamp: CuTime) {
1214 self.culistid = culistid;
1215 self.timestamp = timestamp;
1216 self.serialized_tasks.clear();
1217 }
1218
1219 fn add_frozen_task(&mut self, task: &impl Freezable) -> Result<usize, EncodeError> {
1221 let cfg = bincode::config::standard();
1222 let mut sizer = EncoderImpl::<_, _>::new(SizeWriter::default(), cfg);
1223 BincodeAdapter(task).encode(&mut sizer)?;
1224 let need = sizer.into_writer().bytes_written as usize;
1225
1226 let start = self.serialized_tasks.len();
1227 self.serialized_tasks.resize(start + need, 0);
1228 let mut enc =
1229 EncoderImpl::<_, _>::new(SliceWriter::new(&mut self.serialized_tasks[start..]), cfg);
1230 BincodeAdapter(task).encode(&mut enc)?;
1231 Ok(need)
1232 }
1233}
1234
1235#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
1237pub enum RuntimeLifecycleConfigSource {
1238 ProgrammaticOverride,
1239 ExternalFile,
1240 BundledDefault,
1241}
1242
1243#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
1245pub struct RuntimeLifecycleStackInfo {
1246 pub app_name: String,
1247 pub app_version: String,
1248 pub git_commit: Option<String>,
1249 pub git_dirty: Option<bool>,
1250 pub subsystem_id: Option<String>,
1251 pub subsystem_code: u16,
1252 pub instance_id: u32,
1253}
1254
1255#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
1257pub enum RuntimeLifecycleEvent {
1258 Instantiated {
1259 config_source: RuntimeLifecycleConfigSource,
1260 effective_config_ron: String,
1261 stack: RuntimeLifecycleStackInfo,
1262 },
1263 MissionStarted {
1264 mission: String,
1265 },
1266 MissionStopped {
1267 mission: String,
1268 reason: String,
1271 },
1272 Panic {
1274 message: String,
1275 file: Option<String>,
1276 line: Option<u32>,
1277 column: Option<u32>,
1278 },
1279 ShutdownCompleted,
1280}
1281
1282#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
1284pub struct RuntimeLifecycleRecord {
1285 pub timestamp: CuTime,
1286 pub event: RuntimeLifecycleEvent,
1287}
1288
1289impl<
1290 CT,
1291 CB,
1292 P: CopperListTuple + CuListZeroedInit + Default + AsyncCopperListPayload + 'static,
1293 M: CuMonitor,
1294 const NBCL: usize,
1295> CuRuntime<CT, CB, P, M, NBCL>
1296{
1297 #[inline]
1301 pub fn record_execution_marker(&self, marker: ExecutionMarker) {
1302 self.execution_probe.record(marker);
1303 }
1304
1305 #[inline]
1310 pub fn execution_probe_ref(&self) -> &RuntimeExecutionProbe {
1311 #[cfg(feature = "std")]
1312 {
1313 self.execution_probe.as_ref()
1314 }
1315
1316 #[cfg(not(feature = "std"))]
1317 {
1318 &self.execution_probe
1319 }
1320 }
1321
1322 #[allow(clippy::too_many_arguments)]
1324 #[cfg(feature = "std")]
1325 #[deprecated(note = "Use CuRuntimeBuilder instead of CuRuntime::new(...).")]
1326 pub fn new(
1327 clock: RobotClock,
1328 subsystem_code: u16,
1329 config: &CuConfig,
1330 mission: &str,
1331 resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
1332 tasks_instanciator: impl for<'c> Fn(
1333 Vec<Option<&'c ComponentConfig>>,
1334 &mut ResourceManager,
1335 ) -> CuResult<CT>,
1336 monitored_components: &'static [MonitorComponentMetadata],
1337 culist_component_mapping: &'static [ComponentId],
1338 #[cfg(all(feature = "std", feature = "parallel-rt"))]
1339 parallel_rt_metadata: &'static ParallelRtMetadata,
1340 monitor_instanciator: impl Fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M,
1341 bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
1342 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
1343 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
1344 ) -> CuResult<Self> {
1345 let parts = CuRuntimeParts::new(
1346 tasks_instanciator,
1347 monitored_components,
1348 culist_component_mapping,
1349 #[cfg(all(feature = "std", feature = "parallel-rt"))]
1350 parallel_rt_metadata,
1351 monitor_instanciator,
1352 bridges_instanciator,
1353 );
1354 CuRuntimeBuilder::new(
1355 clock,
1356 config,
1357 mission,
1358 parts,
1359 copperlists_logger,
1360 keyframes_logger,
1361 )
1362 .with_subsystem(Subsystem::new(None, subsystem_code))
1363 .try_with_resources_instantiator(resources_instanciator)?
1364 .build()
1365 }
1366
1367 #[allow(clippy::too_many_arguments)]
1368 #[cfg(feature = "std")]
1369 #[deprecated(note = "Use CuRuntimeBuilder instead of CuRuntime::new_with_resources(...).")]
1370 pub fn new_with_resources(
1371 clock: RobotClock,
1372 subsystem_code: u16,
1373 config: &CuConfig,
1374 mission: &str,
1375 resources: ResourceManager,
1376 tasks_instanciator: impl for<'c> Fn(
1377 Vec<Option<&'c ComponentConfig>>,
1378 &mut ResourceManager,
1379 ) -> CuResult<CT>,
1380 monitored_components: &'static [MonitorComponentMetadata],
1381 culist_component_mapping: &'static [ComponentId],
1382 #[cfg(all(feature = "std", feature = "parallel-rt"))]
1383 parallel_rt_metadata: &'static ParallelRtMetadata,
1384 monitor_instanciator: impl Fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M,
1385 bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
1386 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
1387 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
1388 ) -> CuResult<Self> {
1389 let parts = CuRuntimeParts::new(
1390 tasks_instanciator,
1391 monitored_components,
1392 culist_component_mapping,
1393 #[cfg(all(feature = "std", feature = "parallel-rt"))]
1394 parallel_rt_metadata,
1395 monitor_instanciator,
1396 bridges_instanciator,
1397 );
1398 CuRuntimeBuilder::new(
1399 clock,
1400 config,
1401 mission,
1402 parts,
1403 copperlists_logger,
1404 keyframes_logger,
1405 )
1406 .with_subsystem(Subsystem::new(None, subsystem_code))
1407 .with_resources(resources)
1408 .build()
1409 }
1410
1411 #[allow(clippy::too_many_arguments)]
1412 #[cfg(not(feature = "std"))]
1413 #[deprecated(note = "Use CuRuntimeBuilder instead of CuRuntime::new(...).")]
1414 pub fn new(
1415 clock: RobotClock,
1416 subsystem_code: u16,
1417 config: &CuConfig,
1418 mission: &str,
1419 resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
1420 tasks_instanciator: impl for<'c> Fn(
1421 Vec<Option<&'c ComponentConfig>>,
1422 &mut ResourceManager,
1423 ) -> CuResult<CT>,
1424 monitored_components: &'static [MonitorComponentMetadata],
1425 culist_component_mapping: &'static [ComponentId],
1426 monitor_instanciator: impl Fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M,
1427 bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
1428 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
1429 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
1430 ) -> CuResult<Self> {
1431 let parts = CuRuntimeParts::new(
1432 tasks_instanciator,
1433 monitored_components,
1434 culist_component_mapping,
1435 monitor_instanciator,
1436 bridges_instanciator,
1437 );
1438 CuRuntimeBuilder::new(
1439 clock,
1440 config,
1441 mission,
1442 parts,
1443 copperlists_logger,
1444 keyframes_logger,
1445 )
1446 .with_subsystem(Subsystem::new(None, subsystem_code))
1447 .try_with_resources_instantiator(resources_instanciator)?
1448 .build()
1449 }
1450
1451 #[allow(clippy::too_many_arguments)]
1452 #[cfg(not(feature = "std"))]
1453 #[deprecated(note = "Use CuRuntimeBuilder instead of CuRuntime::new_with_resources(...).")]
1454 pub fn new_with_resources(
1455 clock: RobotClock,
1456 subsystem_code: u16,
1457 config: &CuConfig,
1458 mission: &str,
1459 resources: ResourceManager,
1460 tasks_instanciator: impl for<'c> Fn(
1461 Vec<Option<&'c ComponentConfig>>,
1462 &mut ResourceManager,
1463 ) -> CuResult<CT>,
1464 monitored_components: &'static [MonitorComponentMetadata],
1465 culist_component_mapping: &'static [ComponentId],
1466 monitor_instanciator: impl Fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M,
1467 bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
1468 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
1469 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
1470 ) -> CuResult<Self> {
1471 let parts = CuRuntimeParts::new(
1472 tasks_instanciator,
1473 monitored_components,
1474 culist_component_mapping,
1475 monitor_instanciator,
1476 bridges_instanciator,
1477 );
1478 CuRuntimeBuilder::new(
1479 clock,
1480 config,
1481 mission,
1482 parts,
1483 copperlists_logger,
1484 keyframes_logger,
1485 )
1486 .with_subsystem(Subsystem::new(None, subsystem_code))
1487 .with_resources(resources)
1488 .build()
1489 }
1490}
1491
1492#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1497pub enum CuTaskType {
1498 Source,
1499 Regular,
1500 Sink,
1501}
1502
1503#[derive(Debug, Clone)]
1504pub struct CuOutputPack {
1505 pub culist_index: u32,
1506 pub msg_types: Vec<String>,
1507}
1508
1509#[derive(Debug, Clone)]
1510pub struct CuInputMsg {
1511 pub culist_index: u32,
1512 pub msg_type: String,
1513 pub src_port: usize,
1514 pub edge_id: usize,
1515}
1516
1517pub struct CuExecutionStep {
1519 pub node_id: NodeId,
1521 pub node: Node,
1523 pub task_type: CuTaskType,
1525
1526 pub input_msg_indices_types: Vec<CuInputMsg>,
1528
1529 pub output_msg_pack: Option<CuOutputPack>,
1531}
1532
1533impl Debug for CuExecutionStep {
1534 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1535 f.write_str(format!(" CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
1536 f.write_str(format!(" task_type: {:?}\n", self.node.get_type()).as_str())?;
1537 f.write_str(format!(" task: {:?}\n", self.task_type).as_str())?;
1538 f.write_str(
1539 format!(
1540 " input_msg_types: {:?}\n",
1541 self.input_msg_indices_types
1542 )
1543 .as_str(),
1544 )?;
1545 f.write_str(format!(" output_msg_pack: {:?}\n", self.output_msg_pack).as_str())?;
1546 Ok(())
1547 }
1548}
1549
1550pub struct CuExecutionLoop {
1555 pub steps: Vec<CuExecutionUnit>,
1556 pub loop_count: Option<u32>,
1557}
1558
1559impl Debug for CuExecutionLoop {
1560 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1561 f.write_str("CuExecutionLoop:\n")?;
1562 for step in &self.steps {
1563 match step {
1564 CuExecutionUnit::Step(step) => {
1565 step.fmt(f)?;
1566 }
1567 CuExecutionUnit::Loop(l) => {
1568 l.fmt(f)?;
1569 }
1570 }
1571 }
1572
1573 f.write_str(format!(" count: {:?}", self.loop_count).as_str())?;
1574 Ok(())
1575 }
1576}
1577
1578#[derive(Debug)]
1580pub enum CuExecutionUnit {
1581 Step(Box<CuExecutionStep>),
1582 Loop(CuExecutionLoop),
1583}
1584
1585fn find_output_pack_from_nodeid(
1586 node_id: NodeId,
1587 steps: &Vec<CuExecutionUnit>,
1588) -> Option<CuOutputPack> {
1589 for step in steps {
1590 match step {
1591 CuExecutionUnit::Loop(loop_unit) => {
1592 if let Some(output_pack) = find_output_pack_from_nodeid(node_id, &loop_unit.steps) {
1593 return Some(output_pack);
1594 }
1595 }
1596 CuExecutionUnit::Step(step) => {
1597 if step.node_id == node_id {
1598 return step.output_msg_pack.clone();
1599 }
1600 }
1601 }
1602 }
1603 None
1604}
1605
1606pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuTaskType {
1607 if graph.incoming_neighbor_count(node_id) == 0 {
1608 CuTaskType::Source
1609 } else if graph.outgoing_neighbor_count(node_id) == 0 {
1610 CuTaskType::Sink
1611 } else {
1612 CuTaskType::Regular
1613 }
1614}
1615
1616fn sort_inputs_by_cnx_id(input_msg_indices_types: &mut [CuInputMsg]) {
1619 input_msg_indices_types.sort_by_key(|input| input.edge_id);
1620}
1621
1622fn collect_output_msg_types(graph: &CuGraph, node_id: NodeId) -> Vec<String> {
1623 let mut edge_ids = graph.get_src_edges(node_id).unwrap_or_default();
1624 edge_ids.sort();
1625
1626 let mut msg_order: Vec<(usize, String)> = Vec::new();
1627 let mut record_msg = |msg: String, order: usize| {
1628 if let Some((existing_order, _)) = msg_order
1629 .iter_mut()
1630 .find(|(_, existing_msg)| *existing_msg == msg)
1631 {
1632 if order < *existing_order {
1633 *existing_order = order;
1634 }
1635 return;
1636 }
1637 msg_order.push((order, msg));
1638 };
1639
1640 for edge_id in edge_ids {
1641 if let Some(edge) = graph.edge(edge_id) {
1642 let order = if edge.order == usize::MAX {
1643 edge_id
1644 } else {
1645 edge.order
1646 };
1647 record_msg(edge.msg.clone(), order);
1648 }
1649 }
1650 if let Some(node) = graph.get_node(node_id) {
1651 for (msg, order) in node.nc_outputs_with_order() {
1652 record_msg(msg.clone(), order);
1653 }
1654 }
1655
1656 msg_order.sort_by(|(order_a, msg_a), (order_b, msg_b)| {
1657 order_a.cmp(order_b).then_with(|| msg_a.cmp(msg_b))
1658 });
1659 msg_order.into_iter().map(|(_, msg)| msg).collect()
1660}
1661fn plan_tasks_tree_branch(
1663 graph: &CuGraph,
1664 mut next_culist_output_index: u32,
1665 starting_point: NodeId,
1666 plan: &mut Vec<CuExecutionUnit>,
1667) -> (u32, bool) {
1668 #[cfg(all(feature = "std", feature = "macro_debug"))]
1669 eprintln!("-- starting branch from node {starting_point}");
1670
1671 let mut handled = false;
1672
1673 for id in graph.bfs_nodes(starting_point) {
1674 let node_ref = graph.get_node(id).unwrap();
1675 #[cfg(all(feature = "std", feature = "macro_debug"))]
1676 eprintln!(" Visiting node: {node_ref:?}");
1677
1678 let mut input_msg_indices_types: Vec<CuInputMsg> = Vec::new();
1679 let output_msg_pack: Option<CuOutputPack>;
1680 let task_type = find_task_type_for_id(graph, id);
1681
1682 match task_type {
1683 CuTaskType::Source => {
1684 #[cfg(all(feature = "std", feature = "macro_debug"))]
1685 eprintln!(" → Source node, assign output index {next_culist_output_index}");
1686 let msg_types = collect_output_msg_types(graph, id);
1687 if msg_types.is_empty() {
1688 panic!(
1689 "Source node '{}' has no outgoing connections",
1690 node_ref.get_id()
1691 );
1692 }
1693 output_msg_pack = Some(CuOutputPack {
1694 culist_index: next_culist_output_index,
1695 msg_types,
1696 });
1697 next_culist_output_index += 1;
1698 }
1699 CuTaskType::Sink => {
1700 let mut edge_ids = graph.get_dst_edges(id).unwrap_or_default();
1701 edge_ids.sort();
1702 #[cfg(all(feature = "std", feature = "macro_debug"))]
1703 eprintln!(" → Sink with incoming edges: {edge_ids:?}");
1704 for edge_id in edge_ids {
1705 let edge = graph
1706 .edge(edge_id)
1707 .unwrap_or_else(|| panic!("Missing edge {edge_id} for node {id}"));
1708 let pid = graph
1709 .get_node_id_by_name(edge.src.as_str())
1710 .unwrap_or_else(|| {
1711 panic!("Missing source node '{}' for edge {edge_id}", edge.src)
1712 });
1713 let output_pack = find_output_pack_from_nodeid(pid, plan);
1714 if let Some(output_pack) = output_pack {
1715 #[cfg(all(feature = "std", feature = "macro_debug"))]
1716 eprintln!(" ✓ Input from {pid} ready: {output_pack:?}");
1717 let msg_type = edge.msg.as_str();
1718 let src_port = output_pack
1719 .msg_types
1720 .iter()
1721 .position(|msg| msg == msg_type)
1722 .unwrap_or_else(|| {
1723 panic!(
1724 "Missing output port for message type '{msg_type}' on node {pid}"
1725 )
1726 });
1727 input_msg_indices_types.push(CuInputMsg {
1728 culist_index: output_pack.culist_index,
1729 msg_type: msg_type.to_string(),
1730 src_port,
1731 edge_id,
1732 });
1733 } else {
1734 #[cfg(all(feature = "std", feature = "macro_debug"))]
1735 eprintln!(" ✗ Input from {pid} not ready, returning");
1736 return (next_culist_output_index, handled);
1737 }
1738 }
1739 output_msg_pack = Some(CuOutputPack {
1740 culist_index: next_culist_output_index,
1741 msg_types: Vec::from(["()".to_string()]),
1742 });
1743 next_culist_output_index += 1;
1744 }
1745 CuTaskType::Regular => {
1746 let mut edge_ids = graph.get_dst_edges(id).unwrap_or_default();
1747 edge_ids.sort();
1748 #[cfg(all(feature = "std", feature = "macro_debug"))]
1749 eprintln!(" → Regular task with incoming edges: {edge_ids:?}");
1750 for edge_id in edge_ids {
1751 let edge = graph
1752 .edge(edge_id)
1753 .unwrap_or_else(|| panic!("Missing edge {edge_id} for node {id}"));
1754 let pid = graph
1755 .get_node_id_by_name(edge.src.as_str())
1756 .unwrap_or_else(|| {
1757 panic!("Missing source node '{}' for edge {edge_id}", edge.src)
1758 });
1759 let output_pack = find_output_pack_from_nodeid(pid, plan);
1760 if let Some(output_pack) = output_pack {
1761 #[cfg(all(feature = "std", feature = "macro_debug"))]
1762 eprintln!(" ✓ Input from {pid} ready: {output_pack:?}");
1763 let msg_type = edge.msg.as_str();
1764 let src_port = output_pack
1765 .msg_types
1766 .iter()
1767 .position(|msg| msg == msg_type)
1768 .unwrap_or_else(|| {
1769 panic!(
1770 "Missing output port for message type '{msg_type}' on node {pid}"
1771 )
1772 });
1773 input_msg_indices_types.push(CuInputMsg {
1774 culist_index: output_pack.culist_index,
1775 msg_type: msg_type.to_string(),
1776 src_port,
1777 edge_id,
1778 });
1779 } else {
1780 #[cfg(all(feature = "std", feature = "macro_debug"))]
1781 eprintln!(" ✗ Input from {pid} not ready, returning");
1782 return (next_culist_output_index, handled);
1783 }
1784 }
1785 let msg_types = collect_output_msg_types(graph, id);
1786 if msg_types.is_empty() {
1787 panic!(
1788 "Regular node '{}' has no outgoing connections",
1789 node_ref.get_id()
1790 );
1791 }
1792 output_msg_pack = Some(CuOutputPack {
1793 culist_index: next_culist_output_index,
1794 msg_types,
1795 });
1796 next_culist_output_index += 1;
1797 }
1798 }
1799
1800 sort_inputs_by_cnx_id(&mut input_msg_indices_types);
1801
1802 if let Some(pos) = plan
1803 .iter()
1804 .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
1805 {
1806 #[cfg(all(feature = "std", feature = "macro_debug"))]
1807 eprintln!(" → Already in plan, modifying existing step");
1808 let mut step = plan.remove(pos);
1809 if let CuExecutionUnit::Step(ref mut s) = step {
1810 s.input_msg_indices_types = input_msg_indices_types;
1811 }
1812 plan.push(step);
1813 } else {
1814 #[cfg(all(feature = "std", feature = "macro_debug"))]
1815 eprintln!(" → New step added to plan");
1816 let step = CuExecutionStep {
1817 node_id: id,
1818 node: node_ref.clone(),
1819 task_type,
1820 input_msg_indices_types,
1821 output_msg_pack,
1822 };
1823 plan.push(CuExecutionUnit::Step(Box::new(step)));
1824 }
1825
1826 handled = true;
1827 }
1828
1829 #[cfg(all(feature = "std", feature = "macro_debug"))]
1830 eprintln!("-- finished branch from node {starting_point} with handled={handled}");
1831 (next_culist_output_index, handled)
1832}
1833
1834pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
1837 #[cfg(all(feature = "std", feature = "macro_debug"))]
1838 eprintln!("[runtime plan]");
1839 let mut plan = Vec::new();
1840 let mut next_culist_output_index = 0u32;
1841
1842 let mut queue: VecDeque<NodeId> = graph
1843 .node_ids()
1844 .into_iter()
1845 .filter(|&node_id| find_task_type_for_id(graph, node_id) == CuTaskType::Source)
1846 .collect();
1847
1848 #[cfg(all(feature = "std", feature = "macro_debug"))]
1849 eprintln!("Initial source nodes: {queue:?}");
1850
1851 while let Some(start_node) = queue.pop_front() {
1852 #[cfg(all(feature = "std", feature = "macro_debug"))]
1853 eprintln!("→ Starting BFS from source {start_node}");
1854 for node_id in graph.bfs_nodes(start_node) {
1855 let already_in_plan = plan
1856 .iter()
1857 .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
1858 if already_in_plan {
1859 #[cfg(all(feature = "std", feature = "macro_debug"))]
1860 eprintln!(" → Node {node_id} already planned, skipping");
1861 continue;
1862 }
1863
1864 #[cfg(all(feature = "std", feature = "macro_debug"))]
1865 eprintln!(" Planning from node {node_id}");
1866 let (new_index, handled) =
1867 plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan);
1868 next_culist_output_index = new_index;
1869
1870 if !handled {
1871 #[cfg(all(feature = "std", feature = "macro_debug"))]
1872 eprintln!(" ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
1873 continue;
1874 }
1875
1876 #[cfg(all(feature = "std", feature = "macro_debug"))]
1877 eprintln!(" ✓ Node {node_id} handled successfully, enqueueing neighbors");
1878 for neighbor in graph.get_neighbor_ids(node_id, CuDirection::Outgoing) {
1879 #[cfg(all(feature = "std", feature = "macro_debug"))]
1880 eprintln!(" → Enqueueing neighbor {neighbor}");
1881 queue.push_back(neighbor);
1882 }
1883 }
1884 }
1885
1886 let mut planned_nodes = BTreeSet::new();
1887 for unit in &plan {
1888 if let CuExecutionUnit::Step(step) = unit {
1889 planned_nodes.insert(step.node_id);
1890 }
1891 }
1892
1893 let mut missing = Vec::new();
1894 for node_id in graph.node_ids() {
1895 if !planned_nodes.contains(&node_id) {
1896 if let Some(node) = graph.get_node(node_id) {
1897 missing.push(node.get_id().to_string());
1898 } else {
1899 missing.push(format!("node_id_{node_id}"));
1900 }
1901 }
1902 }
1903
1904 if !missing.is_empty() {
1905 missing.sort();
1906 return Err(CuError::from(format!(
1907 "Execution plan could not include all nodes. Missing: {}. Check for loopback or missing source connections.",
1908 missing.join(", ")
1909 )));
1910 }
1911
1912 Ok(CuExecutionLoop {
1913 steps: plan,
1914 loop_count: None,
1915 })
1916}
1917
1918#[cfg(test)]
1920mod tests {
1921 use super::*;
1922 use crate::config::Node;
1923 use crate::context::CuContext;
1924 use crate::cutask::CuSinkTask;
1925 use crate::cutask::{CuSrcTask, Freezable};
1926 use crate::monitoring::NoMonitor;
1927 use crate::reflect::Reflect;
1928 use bincode::Encode;
1929 use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks};
1930 use serde_derive::{Deserialize, Serialize};
1931
1932 #[derive(Reflect)]
1933 pub struct TestSource {}
1934
1935 impl Freezable for TestSource {}
1936
1937 impl CuSrcTask for TestSource {
1938 type Resources<'r> = ();
1939 type Output<'m> = ();
1940 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
1941 where
1942 Self: Sized,
1943 {
1944 Ok(Self {})
1945 }
1946
1947 fn process(&mut self, _ctx: &CuContext, _empty_msg: &mut Self::Output<'_>) -> CuResult<()> {
1948 Ok(())
1949 }
1950 }
1951
1952 #[derive(Reflect)]
1953 pub struct TestSink {}
1954
1955 impl Freezable for TestSink {}
1956
1957 impl CuSinkTask for TestSink {
1958 type Resources<'r> = ();
1959 type Input<'m> = ();
1960
1961 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
1962 where
1963 Self: Sized,
1964 {
1965 Ok(Self {})
1966 }
1967
1968 fn process(&mut self, _ctx: &CuContext, _input: &Self::Input<'_>) -> CuResult<()> {
1969 Ok(())
1970 }
1971 }
1972
1973 type Tasks = (TestSource, TestSink);
1975 type TestRuntime = CuRuntime<Tasks, (), Msgs, NoMonitor, 2>;
1976 const TEST_NBCL: usize = 2;
1977
1978 #[derive(Debug, Encode, Decode, Serialize, Deserialize, Default)]
1979 struct Msgs(());
1980
1981 impl ErasedCuStampedDataSet for Msgs {
1982 fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
1983 Vec::new()
1984 }
1985 }
1986
1987 impl MatchingTasks for Msgs {
1988 fn get_all_task_ids() -> &'static [&'static str] {
1989 &[]
1990 }
1991 }
1992
1993 impl CuListZeroedInit for Msgs {
1994 fn init_zeroed(&mut self) {}
1995 }
1996
1997 #[cfg(feature = "std")]
1998 fn tasks_instanciator(
1999 all_instances_configs: Vec<Option<&ComponentConfig>>,
2000 _resources: &mut ResourceManager,
2001 ) -> CuResult<Tasks> {
2002 Ok((
2003 TestSource::new(all_instances_configs[0], ())?,
2004 TestSink::new(all_instances_configs[1], ())?,
2005 ))
2006 }
2007
2008 #[cfg(not(feature = "std"))]
2009 fn tasks_instanciator(
2010 all_instances_configs: Vec<Option<&ComponentConfig>>,
2011 _resources: &mut ResourceManager,
2012 ) -> CuResult<Tasks> {
2013 Ok((
2014 TestSource::new(all_instances_configs[0], ())?,
2015 TestSink::new(all_instances_configs[1], ())?,
2016 ))
2017 }
2018
2019 fn monitor_instanciator(
2020 _config: &CuConfig,
2021 metadata: CuMonitoringMetadata,
2022 runtime: CuMonitoringRuntime,
2023 ) -> NoMonitor {
2024 NoMonitor::new(metadata, runtime).expect("NoMonitor::new should never fail")
2025 }
2026
2027 fn bridges_instanciator(_config: &CuConfig, _resources: &mut ResourceManager) -> CuResult<()> {
2028 Ok(())
2029 }
2030
2031 fn resources_instanciator(_config: &CuConfig) -> CuResult<ResourceManager> {
2032 Ok(ResourceManager::new(&[]))
2033 }
2034
2035 #[derive(Debug)]
2036 struct FakeWriter {}
2037
2038 impl<E: Encode> WriteStream<E> for FakeWriter {
2039 fn log(&mut self, _obj: &E) -> CuResult<()> {
2040 Ok(())
2041 }
2042 }
2043
2044 #[test]
2045 fn test_runtime_instantiation() {
2046 let mut config = CuConfig::default();
2047 let graph = config.get_graph_mut(None).unwrap();
2048 graph.add_node(Node::new("a", "TestSource")).unwrap();
2049 graph.add_node(Node::new("b", "TestSink")).unwrap();
2050 graph.connect(0, 1, "()").unwrap();
2051 let runtime: CuResult<TestRuntime> =
2052 CuRuntimeBuilder::<Tasks, (), Msgs, NoMonitor, TEST_NBCL, _, _, _, _, _>::new(
2053 RobotClock::default(),
2054 &config,
2055 crate::config::DEFAULT_MISSION_ID,
2056 CuRuntimeParts::new(
2057 tasks_instanciator,
2058 &[],
2059 &[],
2060 #[cfg(all(feature = "std", feature = "parallel-rt"))]
2061 &crate::parallel_rt::DISABLED_PARALLEL_RT_METADATA,
2062 monitor_instanciator,
2063 bridges_instanciator,
2064 ),
2065 FakeWriter {},
2066 FakeWriter {},
2067 )
2068 .try_with_resources_instantiator(resources_instanciator)
2069 .and_then(|builder| builder.build());
2070 assert!(runtime.is_ok());
2071 }
2072
2073 #[test]
2074 fn test_rate_target_period_rejects_zero() {
2075 let err = rate_target_period(0).expect_err("zero rate target should fail");
2076 assert!(
2077 err.to_string()
2078 .contains("Runtime rate target cannot be zero"),
2079 "unexpected error: {err}"
2080 );
2081 }
2082
2083 #[test]
2084 fn test_loop_rate_limiter_advances_to_next_period_when_on_time() {
2085 let (clock, mock) = RobotClock::mock();
2086 let mut limiter = LoopRateLimiter::from_rate_target_hz(100, &clock).unwrap();
2087 assert_eq!(limiter.next_deadline(), CuDuration::from(10_000_000));
2088
2089 mock.set_value(10_000_000);
2090 limiter.mark_tick(&clock);
2091
2092 assert_eq!(limiter.next_deadline(), CuDuration::from(20_000_000));
2093 }
2094
2095 #[test]
2096 fn test_loop_rate_limiter_skips_missed_periods_without_resetting_phase() {
2097 let (clock, mock) = RobotClock::mock();
2098 let mut limiter = LoopRateLimiter::from_rate_target_hz(100, &clock).unwrap();
2099
2100 mock.set_value(35_000_000);
2101 limiter.mark_tick(&clock);
2102
2103 assert_eq!(limiter.next_deadline(), CuDuration::from(40_000_000));
2104 }
2105
2106 #[cfg(all(feature = "std", feature = "high-precision-limiter"))]
2107 #[test]
2108 fn test_loop_rate_limiter_spin_window_is_fixed_scheduler_window() {
2109 let (clock, _) = RobotClock::mock();
2110 let limiter = LoopRateLimiter::from_rate_target_hz(1_000, &clock).unwrap();
2111 assert_eq!(limiter.spin_window(), CuDuration::from(200_000));
2112
2113 let fast = LoopRateLimiter::from_rate_target_hz(10_000, &clock).unwrap();
2114 assert_eq!(fast.spin_window(), CuDuration::from(200_000));
2115 }
2116
2117 #[cfg(not(feature = "async-cl-io"))]
2118 #[test]
2119 fn test_copperlists_manager_lifecycle() {
2120 let mut config = CuConfig::default();
2121 let graph = config.get_graph_mut(None).unwrap();
2122 graph.add_node(Node::new("a", "TestSource")).unwrap();
2123 graph.add_node(Node::new("b", "TestSink")).unwrap();
2124 graph.connect(0, 1, "()").unwrap();
2125
2126 let mut runtime: TestRuntime =
2127 CuRuntimeBuilder::<Tasks, (), Msgs, NoMonitor, TEST_NBCL, _, _, _, _, _>::new(
2128 RobotClock::default(),
2129 &config,
2130 crate::config::DEFAULT_MISSION_ID,
2131 CuRuntimeParts::new(
2132 tasks_instanciator,
2133 &[],
2134 &[],
2135 #[cfg(all(feature = "std", feature = "parallel-rt"))]
2136 &crate::parallel_rt::DISABLED_PARALLEL_RT_METADATA,
2137 monitor_instanciator,
2138 bridges_instanciator,
2139 ),
2140 FakeWriter {},
2141 FakeWriter {},
2142 )
2143 .try_with_resources_instantiator(resources_instanciator)
2144 .and_then(|builder| builder.build())
2145 .unwrap();
2146
2147 {
2149 let copperlists = &mut runtime.copperlists_manager;
2150 let culist0 = copperlists
2151 .create()
2152 .expect("Ran out of space for copper lists");
2153 let id = culist0.id;
2154 assert_eq!(id, 0);
2155 culist0.change_state(CopperListState::Processing);
2156 assert_eq!(copperlists.available_copper_lists().unwrap(), 1);
2157 }
2158
2159 {
2160 let copperlists = &mut runtime.copperlists_manager;
2161 let culist1 = copperlists
2162 .create()
2163 .expect("Ran out of space for copper lists");
2164 let id = culist1.id;
2165 assert_eq!(id, 1);
2166 culist1.change_state(CopperListState::Processing);
2167 assert_eq!(copperlists.available_copper_lists().unwrap(), 0);
2168 }
2169
2170 {
2171 let copperlists = &mut runtime.copperlists_manager;
2172 let culist2 = copperlists.create();
2173 assert!(culist2.is_err());
2174 assert_eq!(copperlists.available_copper_lists().unwrap(), 0);
2175 let _ = copperlists.end_of_processing(1);
2177 assert_eq!(copperlists.available_copper_lists().unwrap(), 1);
2178 }
2179
2180 {
2182 let copperlists = &mut runtime.copperlists_manager;
2183 let culist2 = copperlists
2184 .create()
2185 .expect("Ran out of space for copper lists");
2186 let id = culist2.id;
2187 assert_eq!(id, 2);
2188 culist2.change_state(CopperListState::Processing);
2189 assert_eq!(copperlists.available_copper_lists().unwrap(), 0);
2190 let _ = copperlists.end_of_processing(0);
2192 assert_eq!(copperlists.available_copper_lists().unwrap(), 0);
2194
2195 let _ = copperlists.end_of_processing(2);
2197 assert_eq!(copperlists.available_copper_lists().unwrap(), 2);
2200 }
2201 }
2202
2203 #[cfg(all(feature = "std", feature = "async-cl-io"))]
2204 #[derive(Debug, Default)]
2205 struct RecordingWriter {
2206 ids: Arc<Mutex<Vec<u64>>>,
2207 }
2208
2209 #[cfg(all(feature = "std", feature = "async-cl-io"))]
2210 impl WriteStream<CopperList<Msgs>> for RecordingWriter {
2211 fn log(&mut self, culist: &CopperList<Msgs>) -> CuResult<()> {
2212 self.ids.lock().unwrap().push(culist.id);
2213 std::thread::sleep(std::time::Duration::from_millis(2));
2214 Ok(())
2215 }
2216 }
2217
2218 #[cfg(all(feature = "std", feature = "async-cl-io"))]
2219 #[test]
2220 fn test_async_copperlists_manager_flushes_in_order() {
2221 let ids = Arc::new(Mutex::new(Vec::new()));
2222 let mut copperlists = CopperListsManager::<Msgs, 4>::new(Some(Box::new(RecordingWriter {
2223 ids: ids.clone(),
2224 })))
2225 .unwrap();
2226
2227 for expected_id in 0..4 {
2228 let culist = copperlists.create().unwrap();
2229 assert_eq!(culist.id, expected_id);
2230 culist.change_state(CopperListState::Processing);
2231 copperlists.end_of_processing(expected_id).unwrap();
2232 }
2233
2234 copperlists.finish_pending().unwrap();
2235 assert_eq!(copperlists.available_copper_lists().unwrap(), 4);
2236 assert_eq!(*ids.lock().unwrap(), vec![0, 1, 2, 3]);
2237 }
2238
2239 #[test]
2240 fn test_runtime_task_input_order() {
2241 let mut config = CuConfig::default();
2242 let graph = config.get_graph_mut(None).unwrap();
2243 let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
2244 let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
2245 let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
2246
2247 assert_eq!(src1_id, 0);
2248 assert_eq!(src2_id, 1);
2249
2250 let src1_type = "src1_type";
2252 let src2_type = "src2_type";
2253 graph.connect(src2_id, sink_id, src2_type).unwrap();
2254 graph.connect(src1_id, sink_id, src1_type).unwrap();
2255
2256 let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
2257 let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
2258 assert_eq!(src1_edge_id, 1);
2261 assert_eq!(src2_edge_id, 0);
2262
2263 let runtime = compute_runtime_plan(graph).unwrap();
2264 let sink_step = runtime
2265 .steps
2266 .iter()
2267 .find_map(|step| match step {
2268 CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
2269 _ => None,
2270 })
2271 .unwrap();
2272
2273 assert_eq!(sink_step.input_msg_indices_types[0].msg_type, src2_type);
2276 assert_eq!(sink_step.input_msg_indices_types[1].msg_type, src1_type);
2277 }
2278
2279 #[test]
2280 fn test_runtime_output_ports_unique_ordered() {
2281 let mut config = CuConfig::default();
2282 let graph = config.get_graph_mut(None).unwrap();
2283 let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
2284 let dst_a_id = graph.add_node(Node::new("dst_a", "SinkA")).unwrap();
2285 let dst_b_id = graph.add_node(Node::new("dst_b", "SinkB")).unwrap();
2286 let dst_a2_id = graph.add_node(Node::new("dst_a2", "SinkA2")).unwrap();
2287 let dst_c_id = graph.add_node(Node::new("dst_c", "SinkC")).unwrap();
2288
2289 graph.connect(src_id, dst_a_id, "msg::A").unwrap();
2290 graph.connect(src_id, dst_b_id, "msg::B").unwrap();
2291 graph.connect(src_id, dst_a2_id, "msg::A").unwrap();
2292 graph.connect(src_id, dst_c_id, "msg::C").unwrap();
2293
2294 let runtime = compute_runtime_plan(graph).unwrap();
2295 let src_step = runtime
2296 .steps
2297 .iter()
2298 .find_map(|step| match step {
2299 CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2300 _ => None,
2301 })
2302 .unwrap();
2303
2304 let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2305 assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B", "msg::C"]);
2306
2307 let dst_a_step = runtime
2308 .steps
2309 .iter()
2310 .find_map(|step| match step {
2311 CuExecutionUnit::Step(step) if step.node_id == dst_a_id => Some(step),
2312 _ => None,
2313 })
2314 .unwrap();
2315 let dst_b_step = runtime
2316 .steps
2317 .iter()
2318 .find_map(|step| match step {
2319 CuExecutionUnit::Step(step) if step.node_id == dst_b_id => Some(step),
2320 _ => None,
2321 })
2322 .unwrap();
2323 let dst_a2_step = runtime
2324 .steps
2325 .iter()
2326 .find_map(|step| match step {
2327 CuExecutionUnit::Step(step) if step.node_id == dst_a2_id => Some(step),
2328 _ => None,
2329 })
2330 .unwrap();
2331 let dst_c_step = runtime
2332 .steps
2333 .iter()
2334 .find_map(|step| match step {
2335 CuExecutionUnit::Step(step) if step.node_id == dst_c_id => Some(step),
2336 _ => None,
2337 })
2338 .unwrap();
2339
2340 assert_eq!(dst_a_step.input_msg_indices_types[0].src_port, 0);
2341 assert_eq!(dst_b_step.input_msg_indices_types[0].src_port, 1);
2342 assert_eq!(dst_a2_step.input_msg_indices_types[0].src_port, 0);
2343 assert_eq!(dst_c_step.input_msg_indices_types[0].src_port, 2);
2344 }
2345
2346 #[test]
2347 fn test_runtime_output_ports_fanout_single() {
2348 let mut config = CuConfig::default();
2349 let graph = config.get_graph_mut(None).unwrap();
2350 let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
2351 let dst_a_id = graph.add_node(Node::new("dst_a", "SinkA")).unwrap();
2352 let dst_b_id = graph.add_node(Node::new("dst_b", "SinkB")).unwrap();
2353
2354 graph.connect(src_id, dst_a_id, "i32").unwrap();
2355 graph.connect(src_id, dst_b_id, "i32").unwrap();
2356
2357 let runtime = compute_runtime_plan(graph).unwrap();
2358 let src_step = runtime
2359 .steps
2360 .iter()
2361 .find_map(|step| match step {
2362 CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2363 _ => None,
2364 })
2365 .unwrap();
2366
2367 let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2368 assert_eq!(output_pack.msg_types, vec!["i32"]);
2369 }
2370
2371 #[test]
2372 fn test_runtime_output_ports_include_nc_outputs() {
2373 let mut config = CuConfig::default();
2374 let graph = config.get_graph_mut(None).unwrap();
2375 let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
2376 let dst_id = graph.add_node(Node::new("dst", "Sink")).unwrap();
2377 graph.connect(src_id, dst_id, "msg::A").unwrap();
2378 graph
2379 .get_node_mut(src_id)
2380 .expect("missing source node")
2381 .add_nc_output("msg::B", usize::MAX);
2382
2383 let runtime = compute_runtime_plan(graph).unwrap();
2384 let src_step = runtime
2385 .steps
2386 .iter()
2387 .find_map(|step| match step {
2388 CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2389 _ => None,
2390 })
2391 .unwrap();
2392 let dst_step = runtime
2393 .steps
2394 .iter()
2395 .find_map(|step| match step {
2396 CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
2397 _ => None,
2398 })
2399 .unwrap();
2400
2401 let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2402 assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B"]);
2403 assert_eq!(dst_step.input_msg_indices_types[0].src_port, 0);
2404 }
2405
2406 #[test]
2407 fn test_runtime_output_ports_respect_connection_order_with_nc() {
2408 let txt = r#"(
2409 tasks: [(id: "src", type: "a"), (id: "sink", type: "b")],
2410 cnx: [
2411 (src: "src", dst: "__nc__", msg: "msg::A"),
2412 (src: "src", dst: "sink", msg: "msg::B"),
2413 ]
2414 )"#;
2415 let config = CuConfig::deserialize_ron(txt).unwrap();
2416 let graph = config.get_graph(None).unwrap();
2417 let src_id = graph.get_node_id_by_name("src").unwrap();
2418 let dst_id = graph.get_node_id_by_name("sink").unwrap();
2419
2420 let runtime = compute_runtime_plan(graph).unwrap();
2421 let src_step = runtime
2422 .steps
2423 .iter()
2424 .find_map(|step| match step {
2425 CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2426 _ => None,
2427 })
2428 .unwrap();
2429 let dst_step = runtime
2430 .steps
2431 .iter()
2432 .find_map(|step| match step {
2433 CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
2434 _ => None,
2435 })
2436 .unwrap();
2437
2438 let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2439 assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B"]);
2440 assert_eq!(dst_step.input_msg_indices_types[0].src_port, 1);
2441 }
2442
2443 #[cfg(feature = "std")]
2444 #[test]
2445 fn test_runtime_output_ports_respect_connection_order_with_nc_from_file() {
2446 let txt = r#"(
2447 tasks: [(id: "src", type: "a"), (id: "sink", type: "b")],
2448 cnx: [
2449 (src: "src", dst: "__nc__", msg: "msg::A"),
2450 (src: "src", dst: "sink", msg: "msg::B"),
2451 ]
2452 )"#;
2453 let tmp = tempfile::NamedTempFile::new().unwrap();
2454 std::fs::write(tmp.path(), txt).unwrap();
2455 let config = crate::config::read_configuration(tmp.path().to_str().unwrap()).unwrap();
2456 let graph = config.get_graph(None).unwrap();
2457 let src_id = graph.get_node_id_by_name("src").unwrap();
2458 let dst_id = graph.get_node_id_by_name("sink").unwrap();
2459
2460 let runtime = compute_runtime_plan(graph).unwrap();
2461 let src_step = runtime
2462 .steps
2463 .iter()
2464 .find_map(|step| match step {
2465 CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2466 _ => None,
2467 })
2468 .unwrap();
2469 let dst_step = runtime
2470 .steps
2471 .iter()
2472 .find_map(|step| match step {
2473 CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
2474 _ => None,
2475 })
2476 .unwrap();
2477
2478 let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2479 assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B"]);
2480 assert_eq!(dst_step.input_msg_indices_types[0].src_port, 1);
2481 }
2482
2483 #[test]
2484 fn test_runtime_output_ports_respect_connection_order_with_nc_primitives() {
2485 let txt = r#"(
2486 tasks: [(id: "src", type: "a"), (id: "sink", type: "b")],
2487 cnx: [
2488 (src: "src", dst: "__nc__", msg: "i32"),
2489 (src: "src", dst: "sink", msg: "bool"),
2490 ]
2491 )"#;
2492 let config = CuConfig::deserialize_ron(txt).unwrap();
2493 let graph = config.get_graph(None).unwrap();
2494 let src_id = graph.get_node_id_by_name("src").unwrap();
2495 let dst_id = graph.get_node_id_by_name("sink").unwrap();
2496
2497 let runtime = compute_runtime_plan(graph).unwrap();
2498 let src_step = runtime
2499 .steps
2500 .iter()
2501 .find_map(|step| match step {
2502 CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2503 _ => None,
2504 })
2505 .unwrap();
2506 let dst_step = runtime
2507 .steps
2508 .iter()
2509 .find_map(|step| match step {
2510 CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
2511 _ => None,
2512 })
2513 .unwrap();
2514
2515 let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2516 assert_eq!(output_pack.msg_types, vec!["i32", "bool"]);
2517 assert_eq!(dst_step.input_msg_indices_types[0].src_port, 1);
2518 }
2519
2520 #[test]
2521 fn test_runtime_plan_diamond_case1() {
2522 let mut config = CuConfig::default();
2524 let graph = config.get_graph_mut(None).unwrap();
2525 let cam0_id = graph
2526 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
2527 .unwrap();
2528 let inf0_id = graph
2529 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
2530 .unwrap();
2531 let broadcast_id = graph
2532 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
2533 .unwrap();
2534
2535 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
2537 graph.connect(cam0_id, inf0_id, "i32").unwrap();
2538 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
2539
2540 let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
2541 let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
2542
2543 assert_eq!(edge_cam0_to_inf0, 0);
2544 assert_eq!(edge_cam0_to_broadcast, 1);
2545
2546 let runtime = compute_runtime_plan(graph).unwrap();
2547 let broadcast_step = runtime
2548 .steps
2549 .iter()
2550 .find_map(|step| match step {
2551 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
2552 _ => None,
2553 })
2554 .unwrap();
2555
2556 assert_eq!(broadcast_step.input_msg_indices_types[0].msg_type, "i32");
2557 assert_eq!(broadcast_step.input_msg_indices_types[1].msg_type, "f32");
2558 }
2559
2560 #[test]
2561 fn test_runtime_plan_diamond_case2() {
2562 let mut config = CuConfig::default();
2564 let graph = config.get_graph_mut(None).unwrap();
2565 let cam0_id = graph
2566 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
2567 .unwrap();
2568 let inf0_id = graph
2569 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
2570 .unwrap();
2571 let broadcast_id = graph
2572 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
2573 .unwrap();
2574
2575 graph.connect(cam0_id, inf0_id, "i32").unwrap();
2577 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
2578 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
2579
2580 let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
2581 let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
2582
2583 assert_eq!(edge_cam0_to_broadcast, 0);
2584 assert_eq!(edge_cam0_to_inf0, 1);
2585
2586 let runtime = compute_runtime_plan(graph).unwrap();
2587 let broadcast_step = runtime
2588 .steps
2589 .iter()
2590 .find_map(|step| match step {
2591 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
2592 _ => None,
2593 })
2594 .unwrap();
2595
2596 assert_eq!(broadcast_step.input_msg_indices_types[0].msg_type, "i32");
2597 assert_eq!(broadcast_step.input_msg_indices_types[1].msg_type, "f32");
2598 }
2599}