1use crate::config::{ComponentConfig, CuDirection, DEFAULT_KEYFRAME_INTERVAL, Node};
6use crate::config::{CuConfig, CuGraph, NodeId, RuntimeConfig};
7use crate::copperlist::{CopperList, CopperListState, CuListZeroedInit, CuListsManager};
8use crate::cutask::{BincodeAdapter, Freezable};
9#[cfg(feature = "std")]
10use crate::monitoring::ExecutionProbeHandle;
11use crate::monitoring::{
12 CuMonitor, ExecutionMarker, RuntimeExecutionProbe, build_monitor_topology,
13};
14use crate::resource::ResourceManager;
15use cu29_clock::{ClockProvider, CuTime, RobotClock};
16use cu29_traits::CuResult;
17use cu29_traits::WriteStream;
18use cu29_traits::{CopperListTuple, CuError};
19
20#[cfg(target_os = "none")]
21#[allow(unused_imports)]
22use cu29_log::{ANONYMOUS, CuLogEntry, CuLogLevel};
23#[cfg(target_os = "none")]
24#[allow(unused_imports)]
25use cu29_log_derive::info;
26#[cfg(target_os = "none")]
27#[allow(unused_imports)]
28use cu29_log_runtime::log;
29#[cfg(all(target_os = "none", debug_assertions))]
30#[allow(unused_imports)]
31use cu29_log_runtime::log_debug_mode;
32#[cfg(target_os = "none")]
33#[allow(unused_imports)]
34use cu29_value::to_value;
35
36use alloc::boxed::Box;
37use alloc::collections::{BTreeSet, VecDeque};
38use alloc::format;
39use alloc::string::{String, ToString};
40use alloc::vec::Vec;
41use bincode::enc::EncoderImpl;
42use bincode::enc::write::{SizeWriter, SliceWriter};
43use bincode::error::EncodeError;
44use bincode::{Decode, Encode};
45use core::fmt::Result as FmtResult;
46use core::fmt::{Debug, Formatter};
47
48#[cfg(feature = "std")]
49use cu29_log_runtime::LoggerRuntime;
50#[cfg(feature = "std")]
51use cu29_unifiedlog::UnifiedLoggerWrite;
52#[cfg(feature = "std")]
53use std::sync::{Arc, Mutex};
54
55#[cfg(feature = "std")]
57pub struct CopperContext {
58 pub unified_logger: Arc<Mutex<UnifiedLoggerWrite>>,
59 pub logger_runtime: LoggerRuntime,
60 pub clock: RobotClock,
61}
62
63pub struct CopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
65 pub inner: CuListsManager<P, NBCL>,
66 pub logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
68 pub last_encoded_bytes: u64,
70}
71
72impl<P: CopperListTuple + Default, const NBCL: usize> CopperListsManager<P, NBCL> {
73 pub fn end_of_processing(&mut self, culistid: u64) -> CuResult<()> {
74 let mut is_top = true;
75 let mut nb_done = 0;
76 for cl in self.inner.iter_mut() {
77 if cl.id == culistid && cl.get_state() == CopperListState::Processing {
78 cl.change_state(CopperListState::DoneProcessing);
79 }
80 if is_top && cl.get_state() == CopperListState::DoneProcessing {
81 if let Some(logger) = &mut self.logger {
82 cl.change_state(CopperListState::BeingSerialized);
83 logger.log(cl)?;
84 self.last_encoded_bytes = logger.last_log_bytes().unwrap_or(0) as u64;
85 }
86 cl.change_state(CopperListState::Free);
87 nb_done += 1;
88 } else {
89 is_top = false;
90 }
91 }
92 for _ in 0..nb_done {
93 let _ = self.inner.pop();
94 }
95 Ok(())
96 }
97
98 pub fn available_copper_lists(&self) -> usize {
99 NBCL - self.inner.len()
100 }
101}
102
103pub struct KeyFramesManager {
105 inner: KeyFrame,
107
108 forced_timestamp: Option<CuTime>,
110
111 locked: bool,
113
114 logger: Option<Box<dyn WriteStream<KeyFrame>>>,
116
117 keyframe_interval: u32,
119
120 pub last_encoded_bytes: u64,
122}
123
124impl KeyFramesManager {
125 fn is_keyframe(&self, culistid: u64) -> bool {
126 self.logger.is_some() && culistid.is_multiple_of(self.keyframe_interval as u64)
127 }
128
129 pub fn reset(&mut self, culistid: u64, clock: &RobotClock) {
130 if self.is_keyframe(culistid) {
131 if self.locked && self.inner.culistid == culistid {
133 return;
134 }
135 let ts = self.forced_timestamp.take().unwrap_or_else(|| clock.now());
136 self.inner.reset(culistid, ts);
137 self.locked = false;
138 }
139 }
140
141 #[cfg(feature = "std")]
143 pub fn set_forced_timestamp(&mut self, ts: CuTime) {
144 self.forced_timestamp = Some(ts);
145 }
146
147 pub fn freeze_task(&mut self, culistid: u64, task: &impl Freezable) -> CuResult<usize> {
148 if self.is_keyframe(culistid) {
149 if self.locked {
150 return Ok(0);
152 }
153 if self.inner.culistid != culistid {
154 return Err(CuError::from(format!(
155 "Freezing task for culistid {} but current keyframe is {}",
156 culistid, self.inner.culistid
157 )));
158 }
159 self.inner
160 .add_frozen_task(task)
161 .map_err(|e| CuError::from(format!("Failed to serialize task: {e}")))
162 } else {
163 Ok(0)
164 }
165 }
166
167 pub fn freeze_any(&mut self, culistid: u64, item: &impl Freezable) -> CuResult<usize> {
169 self.freeze_task(culistid, item)
170 }
171
172 pub fn end_of_processing(&mut self, culistid: u64) -> CuResult<()> {
173 if self.is_keyframe(culistid) {
174 let logger = self.logger.as_mut().unwrap();
175 logger.log(&self.inner)?;
176 self.last_encoded_bytes = logger.last_log_bytes().unwrap_or(0) as u64;
177 self.locked = false;
179 Ok(())
180 } else {
181 self.last_encoded_bytes = 0;
183 Ok(())
184 }
185 }
186
187 #[cfg(feature = "std")]
189 pub fn lock_keyframe(&mut self, keyframe: &KeyFrame) {
190 self.inner = keyframe.clone();
191 self.forced_timestamp = Some(keyframe.timestamp);
192 self.locked = true;
193 }
194}
195
196pub struct CuRuntime<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
200 pub clock: RobotClock, pub tasks: CT,
205
206 pub bridges: CB,
208
209 pub resources: ResourceManager,
211
212 pub monitor: M,
214
215 #[cfg(feature = "std")]
221 pub execution_probe: ExecutionProbeHandle,
222 #[cfg(not(feature = "std"))]
223 pub execution_probe: RuntimeExecutionProbe,
224
225 pub copperlists_manager: CopperListsManager<P, NBCL>,
227
228 pub keyframes_manager: KeyFramesManager,
230
231 pub runtime_config: RuntimeConfig,
233}
234
235impl<CT, CB, P: CopperListTuple + CuListZeroedInit + Default, M: CuMonitor, const NBCL: usize>
237 ClockProvider for CuRuntime<CT, CB, P, M, NBCL>
238{
239 fn get_clock(&self) -> RobotClock {
240 self.clock.clone()
241 }
242}
243
244#[derive(Clone, Encode, Decode)]
248pub struct KeyFrame {
249 pub culistid: u64,
251 pub timestamp: CuTime,
253 pub serialized_tasks: Vec<u8>,
255}
256
257impl KeyFrame {
258 fn new() -> Self {
259 KeyFrame {
260 culistid: 0,
261 timestamp: CuTime::default(),
262 serialized_tasks: Vec::new(),
263 }
264 }
265
266 fn reset(&mut self, culistid: u64, timestamp: CuTime) {
268 self.culistid = culistid;
269 self.timestamp = timestamp;
270 self.serialized_tasks.clear();
271 }
272
273 fn add_frozen_task(&mut self, task: &impl Freezable) -> Result<usize, EncodeError> {
275 let cfg = bincode::config::standard();
276 let mut sizer = EncoderImpl::<_, _>::new(SizeWriter::default(), cfg);
277 BincodeAdapter(task).encode(&mut sizer)?;
278 let need = sizer.into_writer().bytes_written as usize;
279
280 let start = self.serialized_tasks.len();
281 self.serialized_tasks.resize(start + need, 0);
282 let mut enc =
283 EncoderImpl::<_, _>::new(SliceWriter::new(&mut self.serialized_tasks[start..]), cfg);
284 BincodeAdapter(task).encode(&mut enc)?;
285 Ok(need)
286 }
287}
288
289#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
291pub enum RuntimeLifecycleConfigSource {
292 ProgrammaticOverride,
293 ExternalFile,
294 BundledDefault,
295}
296
297#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
299pub struct RuntimeLifecycleStackInfo {
300 pub app_name: String,
301 pub app_version: String,
302 pub git_commit: Option<String>,
303 pub git_dirty: Option<bool>,
304}
305
306#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
308pub enum RuntimeLifecycleEvent {
309 Instantiated {
310 config_source: RuntimeLifecycleConfigSource,
311 effective_config_ron: String,
312 stack: RuntimeLifecycleStackInfo,
313 },
314 MissionStarted {
315 mission: String,
316 },
317 MissionStopped {
318 mission: String,
319 reason: String,
322 },
323 Panic {
325 message: String,
326 file: Option<String>,
327 line: Option<u32>,
328 column: Option<u32>,
329 },
330 ShutdownCompleted,
331}
332
333#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
335pub struct RuntimeLifecycleRecord {
336 pub timestamp: CuTime,
337 pub event: RuntimeLifecycleEvent,
338}
339
340impl<
341 CT,
342 CB,
343 P: CopperListTuple + CuListZeroedInit + Default + 'static,
344 M: CuMonitor,
345 const NBCL: usize,
346> CuRuntime<CT, CB, P, M, NBCL>
347{
348 #[inline]
352 pub fn record_execution_marker(&self, marker: ExecutionMarker) {
353 self.execution_probe.record(marker);
354 }
355
356 #[allow(clippy::too_many_arguments)]
358 #[cfg(feature = "std")]
359 pub fn new(
360 clock: RobotClock,
361 config: &CuConfig,
362 mission: Option<&str>,
363 resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
364 tasks_instanciator: impl for<'c> Fn(
365 Vec<Option<&'c ComponentConfig>>,
366 &mut ResourceManager,
367 ) -> CuResult<CT>,
368 monitor_instanciator: impl Fn(&CuConfig) -> M,
369 bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
370 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
371 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
372 ) -> CuResult<Self> {
373 let resources = resources_instanciator(config)?;
374 Self::new_with_resources(
375 clock,
376 config,
377 mission,
378 resources,
379 tasks_instanciator,
380 monitor_instanciator,
381 bridges_instanciator,
382 copperlists_logger,
383 keyframes_logger,
384 )
385 }
386
387 #[allow(clippy::too_many_arguments)]
388 #[cfg(feature = "std")]
389 pub fn new_with_resources(
390 clock: RobotClock,
391 config: &CuConfig,
392 mission: Option<&str>,
393 mut resources: ResourceManager,
394 tasks_instanciator: impl for<'c> Fn(
395 Vec<Option<&'c ComponentConfig>>,
396 &mut ResourceManager,
397 ) -> CuResult<CT>,
398 monitor_instanciator: impl Fn(&CuConfig) -> M,
399 bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
400 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
401 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
402 ) -> CuResult<Self> {
403 let graph = config.get_graph(mission)?;
404 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
405 .get_all_nodes()
406 .iter()
407 .map(|(_, node)| node.get_instance_config())
408 .collect();
409
410 let tasks = tasks_instanciator(all_instances_configs, &mut resources)?;
411 let mut monitor = monitor_instanciator(config);
412 let execution_probe = std::sync::Arc::new(RuntimeExecutionProbe::default());
413 monitor.set_execution_probe(execution_probe.clone());
414 if let Ok(topology) = build_monitor_topology(config, mission) {
415 monitor.set_topology(topology);
416 }
417 let bridges = bridges_instanciator(config, &mut resources)?;
418
419 let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
420 Some(logging_config) if logging_config.enable_task_logging => (
421 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
422 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
423 logging_config.keyframe_interval.unwrap(), ),
425 Some(_) => (None, None, 0), None => (
427 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
429 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
430 DEFAULT_KEYFRAME_INTERVAL,
431 ),
432 };
433
434 let copperlists_manager = CopperListsManager {
435 inner: CuListsManager::new(),
436 logger: copperlists_logger,
437 last_encoded_bytes: 0,
438 };
439 #[cfg(target_os = "none")]
440 {
441 let cl_size = core::mem::size_of::<CopperList<P>>();
442 let total_bytes = cl_size.saturating_mul(NBCL);
443 info!(
444 "CuRuntime::new: copperlists count={} cl_size={} total_bytes={}",
445 NBCL, cl_size, total_bytes
446 );
447 }
448
449 let keyframes_manager = KeyFramesManager {
450 inner: KeyFrame::new(),
451 logger: keyframes_logger,
452 keyframe_interval,
453 last_encoded_bytes: 0,
454 forced_timestamp: None,
455 locked: false,
456 };
457
458 let runtime_config = config.runtime.clone().unwrap_or_default();
459
460 let runtime = Self {
461 tasks,
462 bridges,
463 resources,
464 monitor,
465 execution_probe,
466 clock,
467 copperlists_manager,
468 keyframes_manager,
469 runtime_config,
470 };
471
472 Ok(runtime)
473 }
474
475 #[allow(clippy::too_many_arguments)]
476 #[cfg(not(feature = "std"))]
477 pub fn new(
478 clock: RobotClock,
479 config: &CuConfig,
480 mission: Option<&str>,
481 resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
482 tasks_instanciator: impl for<'c> Fn(
483 Vec<Option<&'c ComponentConfig>>,
484 &mut ResourceManager,
485 ) -> CuResult<CT>,
486 monitor_instanciator: impl Fn(&CuConfig) -> M,
487 bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
488 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
489 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
490 ) -> CuResult<Self> {
491 #[cfg(target_os = "none")]
492 info!("CuRuntime::new: resources instanciator");
493 let resources = resources_instanciator(config)?;
494 Self::new_with_resources(
495 clock,
496 config,
497 mission,
498 resources,
499 tasks_instanciator,
500 monitor_instanciator,
501 bridges_instanciator,
502 copperlists_logger,
503 keyframes_logger,
504 )
505 }
506
507 #[allow(clippy::too_many_arguments)]
508 #[cfg(not(feature = "std"))]
509 pub fn new_with_resources(
510 clock: RobotClock,
511 config: &CuConfig,
512 mission: Option<&str>,
513 mut resources: ResourceManager,
514 tasks_instanciator: impl for<'c> Fn(
515 Vec<Option<&'c ComponentConfig>>,
516 &mut ResourceManager,
517 ) -> CuResult<CT>,
518 monitor_instanciator: impl Fn(&CuConfig) -> M,
519 bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
520 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
521 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
522 ) -> CuResult<Self> {
523 #[cfg(target_os = "none")]
524 info!("CuRuntime::new: get graph");
525 let graph = config.get_graph(mission)?;
526 #[cfg(target_os = "none")]
527 info!("CuRuntime::new: graph ok");
528 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
529 .get_all_nodes()
530 .iter()
531 .map(|(_, node)| node.get_instance_config())
532 .collect();
533
534 #[cfg(target_os = "none")]
535 info!("CuRuntime::new: tasks instanciator");
536 let tasks = tasks_instanciator(all_instances_configs, &mut resources)?;
537
538 #[cfg(target_os = "none")]
539 info!("CuRuntime::new: monitor instanciator");
540 let mut monitor = monitor_instanciator(config);
541 let execution_probe = RuntimeExecutionProbe::default();
542 #[cfg(target_os = "none")]
543 info!("CuRuntime::new: monitor instanciator ok");
544 #[cfg(target_os = "none")]
545 info!("CuRuntime::new: build monitor topology");
546 if let Ok(topology) = build_monitor_topology(config, mission) {
547 #[cfg(target_os = "none")]
548 info!("CuRuntime::new: monitor topology ok");
549 monitor.set_topology(topology);
550 #[cfg(target_os = "none")]
551 info!("CuRuntime::new: monitor topology set");
552 }
553 #[cfg(target_os = "none")]
554 info!("CuRuntime::new: bridges instanciator");
555 let bridges = bridges_instanciator(config, &mut resources)?;
556
557 let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
558 Some(logging_config) if logging_config.enable_task_logging => (
559 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
560 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
561 logging_config.keyframe_interval.unwrap(), ),
563 Some(_) => (None, None, 0), None => (
565 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
567 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
568 DEFAULT_KEYFRAME_INTERVAL,
569 ),
570 };
571
572 let copperlists_manager = CopperListsManager {
573 inner: CuListsManager::new(),
574 logger: copperlists_logger,
575 last_encoded_bytes: 0,
576 };
577 #[cfg(target_os = "none")]
578 {
579 let cl_size = core::mem::size_of::<CopperList<P>>();
580 let total_bytes = cl_size.saturating_mul(NBCL);
581 info!(
582 "CuRuntime::new: copperlists count={} cl_size={} total_bytes={}",
583 NBCL, cl_size, total_bytes
584 );
585 }
586
587 let keyframes_manager = KeyFramesManager {
588 inner: KeyFrame::new(),
589 logger: keyframes_logger,
590 keyframe_interval,
591 last_encoded_bytes: 0,
592 forced_timestamp: None,
593 locked: false,
594 };
595
596 let runtime_config = config.runtime.clone().unwrap_or_default();
597
598 let runtime = Self {
599 tasks,
600 bridges,
601 resources,
602 monitor,
603 execution_probe,
604 clock,
605 copperlists_manager,
606 keyframes_manager,
607 runtime_config,
608 };
609
610 Ok(runtime)
611 }
612}
613
614#[derive(Debug, PartialEq, Eq, Clone, Copy)]
619pub enum CuTaskType {
620 Source,
621 Regular,
622 Sink,
623}
624
625#[derive(Debug, Clone)]
626pub struct CuOutputPack {
627 pub culist_index: u32,
628 pub msg_types: Vec<String>,
629}
630
631#[derive(Debug, Clone)]
632pub struct CuInputMsg {
633 pub culist_index: u32,
634 pub msg_type: String,
635 pub src_port: usize,
636 pub edge_id: usize,
637}
638
639pub struct CuExecutionStep {
641 pub node_id: NodeId,
643 pub node: Node,
645 pub task_type: CuTaskType,
647
648 pub input_msg_indices_types: Vec<CuInputMsg>,
650
651 pub output_msg_pack: Option<CuOutputPack>,
653}
654
655impl Debug for CuExecutionStep {
656 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
657 f.write_str(format!(" CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
658 f.write_str(format!(" task_type: {:?}\n", self.node.get_type()).as_str())?;
659 f.write_str(format!(" task: {:?}\n", self.task_type).as_str())?;
660 f.write_str(
661 format!(
662 " input_msg_types: {:?}\n",
663 self.input_msg_indices_types
664 )
665 .as_str(),
666 )?;
667 f.write_str(format!(" output_msg_pack: {:?}\n", self.output_msg_pack).as_str())?;
668 Ok(())
669 }
670}
671
672pub struct CuExecutionLoop {
677 pub steps: Vec<CuExecutionUnit>,
678 pub loop_count: Option<u32>,
679}
680
681impl Debug for CuExecutionLoop {
682 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
683 f.write_str("CuExecutionLoop:\n")?;
684 for step in &self.steps {
685 match step {
686 CuExecutionUnit::Step(step) => {
687 step.fmt(f)?;
688 }
689 CuExecutionUnit::Loop(l) => {
690 l.fmt(f)?;
691 }
692 }
693 }
694
695 f.write_str(format!(" count: {:?}", self.loop_count).as_str())?;
696 Ok(())
697 }
698}
699
700#[derive(Debug)]
702pub enum CuExecutionUnit {
703 Step(CuExecutionStep),
704 Loop(CuExecutionLoop),
705}
706
707fn find_output_pack_from_nodeid(
708 node_id: NodeId,
709 steps: &Vec<CuExecutionUnit>,
710) -> Option<CuOutputPack> {
711 for step in steps {
712 match step {
713 CuExecutionUnit::Loop(loop_unit) => {
714 if let Some(output_pack) = find_output_pack_from_nodeid(node_id, &loop_unit.steps) {
715 return Some(output_pack);
716 }
717 }
718 CuExecutionUnit::Step(step) => {
719 if step.node_id == node_id {
720 return step.output_msg_pack.clone();
721 }
722 }
723 }
724 }
725 None
726}
727
728pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuTaskType {
729 if graph.incoming_neighbor_count(node_id) == 0 {
730 CuTaskType::Source
731 } else if graph.outgoing_neighbor_count(node_id) == 0 {
732 CuTaskType::Sink
733 } else {
734 CuTaskType::Regular
735 }
736}
737
738fn sort_inputs_by_cnx_id(input_msg_indices_types: &mut [CuInputMsg]) {
741 input_msg_indices_types.sort_by_key(|input| input.edge_id);
742}
743
744fn collect_output_msg_types(graph: &CuGraph, node_id: NodeId) -> Vec<String> {
745 let mut edge_ids = graph.get_src_edges(node_id).unwrap_or_default();
746 edge_ids.sort();
747
748 let mut msg_types = Vec::new();
749 let mut seen = Vec::new();
750 for edge_id in edge_ids {
751 if let Some(edge) = graph.edge(edge_id) {
752 if seen.iter().any(|msg| msg == &edge.msg) {
753 continue;
754 }
755 seen.push(edge.msg.clone());
756 msg_types.push(edge.msg.clone());
757 }
758 }
759 msg_types
760}
761fn plan_tasks_tree_branch(
763 graph: &CuGraph,
764 mut next_culist_output_index: u32,
765 starting_point: NodeId,
766 plan: &mut Vec<CuExecutionUnit>,
767) -> (u32, bool) {
768 #[cfg(all(feature = "std", feature = "macro_debug"))]
769 eprintln!("-- starting branch from node {starting_point}");
770
771 let mut handled = false;
772
773 for id in graph.bfs_nodes(starting_point) {
774 let node_ref = graph.get_node(id).unwrap();
775 #[cfg(all(feature = "std", feature = "macro_debug"))]
776 eprintln!(" Visiting node: {node_ref:?}");
777
778 let mut input_msg_indices_types: Vec<CuInputMsg> = Vec::new();
779 let output_msg_pack: Option<CuOutputPack>;
780 let task_type = find_task_type_for_id(graph, id);
781
782 match task_type {
783 CuTaskType::Source => {
784 #[cfg(all(feature = "std", feature = "macro_debug"))]
785 eprintln!(" → Source node, assign output index {next_culist_output_index}");
786 let msg_types = collect_output_msg_types(graph, id);
787 if msg_types.is_empty() {
788 panic!(
789 "Source node '{}' has no outgoing connections",
790 node_ref.get_id()
791 );
792 }
793 output_msg_pack = Some(CuOutputPack {
794 culist_index: next_culist_output_index,
795 msg_types,
796 });
797 next_culist_output_index += 1;
798 }
799 CuTaskType::Sink => {
800 let mut edge_ids = graph.get_dst_edges(id).unwrap_or_default();
801 edge_ids.sort();
802 #[cfg(all(feature = "std", feature = "macro_debug"))]
803 eprintln!(" → Sink with incoming edges: {edge_ids:?}");
804 for edge_id in edge_ids {
805 let edge = graph
806 .edge(edge_id)
807 .unwrap_or_else(|| panic!("Missing edge {edge_id} for node {id}"));
808 let pid = graph
809 .get_node_id_by_name(edge.src.as_str())
810 .unwrap_or_else(|| {
811 panic!("Missing source node '{}' for edge {edge_id}", edge.src)
812 });
813 let output_pack = find_output_pack_from_nodeid(pid, plan);
814 if let Some(output_pack) = output_pack {
815 #[cfg(all(feature = "std", feature = "macro_debug"))]
816 eprintln!(" ✓ Input from {pid} ready: {output_pack:?}");
817 let msg_type = edge.msg.as_str();
818 let src_port = output_pack
819 .msg_types
820 .iter()
821 .position(|msg| msg == msg_type)
822 .unwrap_or_else(|| {
823 panic!(
824 "Missing output port for message type '{msg_type}' on node {pid}"
825 )
826 });
827 input_msg_indices_types.push(CuInputMsg {
828 culist_index: output_pack.culist_index,
829 msg_type: msg_type.to_string(),
830 src_port,
831 edge_id,
832 });
833 } else {
834 #[cfg(all(feature = "std", feature = "macro_debug"))]
835 eprintln!(" ✗ Input from {pid} not ready, returning");
836 return (next_culist_output_index, handled);
837 }
838 }
839 output_msg_pack = Some(CuOutputPack {
840 culist_index: next_culist_output_index,
841 msg_types: Vec::from(["()".to_string()]),
842 });
843 next_culist_output_index += 1;
844 }
845 CuTaskType::Regular => {
846 let mut edge_ids = graph.get_dst_edges(id).unwrap_or_default();
847 edge_ids.sort();
848 #[cfg(all(feature = "std", feature = "macro_debug"))]
849 eprintln!(" → Regular task with incoming edges: {edge_ids:?}");
850 for edge_id in edge_ids {
851 let edge = graph
852 .edge(edge_id)
853 .unwrap_or_else(|| panic!("Missing edge {edge_id} for node {id}"));
854 let pid = graph
855 .get_node_id_by_name(edge.src.as_str())
856 .unwrap_or_else(|| {
857 panic!("Missing source node '{}' for edge {edge_id}", edge.src)
858 });
859 let output_pack = find_output_pack_from_nodeid(pid, plan);
860 if let Some(output_pack) = output_pack {
861 #[cfg(all(feature = "std", feature = "macro_debug"))]
862 eprintln!(" ✓ Input from {pid} ready: {output_pack:?}");
863 let msg_type = edge.msg.as_str();
864 let src_port = output_pack
865 .msg_types
866 .iter()
867 .position(|msg| msg == msg_type)
868 .unwrap_or_else(|| {
869 panic!(
870 "Missing output port for message type '{msg_type}' on node {pid}"
871 )
872 });
873 input_msg_indices_types.push(CuInputMsg {
874 culist_index: output_pack.culist_index,
875 msg_type: msg_type.to_string(),
876 src_port,
877 edge_id,
878 });
879 } else {
880 #[cfg(all(feature = "std", feature = "macro_debug"))]
881 eprintln!(" ✗ Input from {pid} not ready, returning");
882 return (next_culist_output_index, handled);
883 }
884 }
885 let msg_types = collect_output_msg_types(graph, id);
886 if msg_types.is_empty() {
887 panic!(
888 "Regular node '{}' has no outgoing connections",
889 node_ref.get_id()
890 );
891 }
892 output_msg_pack = Some(CuOutputPack {
893 culist_index: next_culist_output_index,
894 msg_types,
895 });
896 next_culist_output_index += 1;
897 }
898 }
899
900 sort_inputs_by_cnx_id(&mut input_msg_indices_types);
901
902 if let Some(pos) = plan
903 .iter()
904 .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
905 {
906 #[cfg(all(feature = "std", feature = "macro_debug"))]
907 eprintln!(" → Already in plan, modifying existing step");
908 let mut step = plan.remove(pos);
909 if let CuExecutionUnit::Step(ref mut s) = step {
910 s.input_msg_indices_types = input_msg_indices_types;
911 }
912 plan.push(step);
913 } else {
914 #[cfg(all(feature = "std", feature = "macro_debug"))]
915 eprintln!(" → New step added to plan");
916 let step = CuExecutionStep {
917 node_id: id,
918 node: node_ref.clone(),
919 task_type,
920 input_msg_indices_types,
921 output_msg_pack,
922 };
923 plan.push(CuExecutionUnit::Step(step));
924 }
925
926 handled = true;
927 }
928
929 #[cfg(all(feature = "std", feature = "macro_debug"))]
930 eprintln!("-- finished branch from node {starting_point} with handled={handled}");
931 (next_culist_output_index, handled)
932}
933
934pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
937 #[cfg(all(feature = "std", feature = "macro_debug"))]
938 eprintln!("[runtime plan]");
939 let mut plan = Vec::new();
940 let mut next_culist_output_index = 0u32;
941
942 let mut queue: VecDeque<NodeId> = graph
943 .node_ids()
944 .into_iter()
945 .filter(|&node_id| find_task_type_for_id(graph, node_id) == CuTaskType::Source)
946 .collect();
947
948 #[cfg(all(feature = "std", feature = "macro_debug"))]
949 eprintln!("Initial source nodes: {queue:?}");
950
951 while let Some(start_node) = queue.pop_front() {
952 #[cfg(all(feature = "std", feature = "macro_debug"))]
953 eprintln!("→ Starting BFS from source {start_node}");
954 for node_id in graph.bfs_nodes(start_node) {
955 let already_in_plan = plan
956 .iter()
957 .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
958 if already_in_plan {
959 #[cfg(all(feature = "std", feature = "macro_debug"))]
960 eprintln!(" → Node {node_id} already planned, skipping");
961 continue;
962 }
963
964 #[cfg(all(feature = "std", feature = "macro_debug"))]
965 eprintln!(" Planning from node {node_id}");
966 let (new_index, handled) =
967 plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan);
968 next_culist_output_index = new_index;
969
970 if !handled {
971 #[cfg(all(feature = "std", feature = "macro_debug"))]
972 eprintln!(" ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
973 continue;
974 }
975
976 #[cfg(all(feature = "std", feature = "macro_debug"))]
977 eprintln!(" ✓ Node {node_id} handled successfully, enqueueing neighbors");
978 for neighbor in graph.get_neighbor_ids(node_id, CuDirection::Outgoing) {
979 #[cfg(all(feature = "std", feature = "macro_debug"))]
980 eprintln!(" → Enqueueing neighbor {neighbor}");
981 queue.push_back(neighbor);
982 }
983 }
984 }
985
986 let mut planned_nodes = BTreeSet::new();
987 for unit in &plan {
988 if let CuExecutionUnit::Step(step) = unit {
989 planned_nodes.insert(step.node_id);
990 }
991 }
992
993 let mut missing = Vec::new();
994 for node_id in graph.node_ids() {
995 if !planned_nodes.contains(&node_id) {
996 if let Some(node) = graph.get_node(node_id) {
997 missing.push(node.get_id().to_string());
998 } else {
999 missing.push(format!("node_id_{node_id}"));
1000 }
1001 }
1002 }
1003
1004 if !missing.is_empty() {
1005 missing.sort();
1006 return Err(CuError::from(format!(
1007 "Execution plan could not include all nodes. Missing: {}. Check for loopback or missing source connections.",
1008 missing.join(", ")
1009 )));
1010 }
1011
1012 Ok(CuExecutionLoop {
1013 steps: plan,
1014 loop_count: None,
1015 })
1016}
1017
1018#[cfg(test)]
1020mod tests {
1021 use super::*;
1022 use crate::config::Node;
1023 use crate::context::CuContext;
1024 use crate::cutask::CuSinkTask;
1025 use crate::cutask::{CuSrcTask, Freezable};
1026 use crate::monitoring::NoMonitor;
1027 use crate::reflect::Reflect;
1028 use bincode::Encode;
1029 use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks};
1030 use serde_derive::{Deserialize, Serialize};
1031
1032 #[derive(Reflect)]
1033 pub struct TestSource {}
1034
1035 impl Freezable for TestSource {}
1036
1037 impl CuSrcTask for TestSource {
1038 type Resources<'r> = ();
1039 type Output<'m> = ();
1040 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
1041 where
1042 Self: Sized,
1043 {
1044 Ok(Self {})
1045 }
1046
1047 fn process(&mut self, _ctx: &CuContext, _empty_msg: &mut Self::Output<'_>) -> CuResult<()> {
1048 Ok(())
1049 }
1050 }
1051
1052 #[derive(Reflect)]
1053 pub struct TestSink {}
1054
1055 impl Freezable for TestSink {}
1056
1057 impl CuSinkTask for TestSink {
1058 type Resources<'r> = ();
1059 type Input<'m> = ();
1060
1061 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
1062 where
1063 Self: Sized,
1064 {
1065 Ok(Self {})
1066 }
1067
1068 fn process(&mut self, _ctx: &CuContext, _input: &Self::Input<'_>) -> CuResult<()> {
1069 Ok(())
1070 }
1071 }
1072
1073 type Tasks = (TestSource, TestSink);
1075
1076 #[derive(Debug, Encode, Decode, Serialize, Deserialize, Default)]
1077 struct Msgs(());
1078
1079 impl ErasedCuStampedDataSet for Msgs {
1080 fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
1081 Vec::new()
1082 }
1083 }
1084
1085 impl MatchingTasks for Msgs {
1086 fn get_all_task_ids() -> &'static [&'static str] {
1087 &[]
1088 }
1089 }
1090
1091 impl CuListZeroedInit for Msgs {
1092 fn init_zeroed(&mut self) {}
1093 }
1094
1095 #[cfg(feature = "std")]
1096 fn tasks_instanciator(
1097 all_instances_configs: Vec<Option<&ComponentConfig>>,
1098 _resources: &mut ResourceManager,
1099 ) -> CuResult<Tasks> {
1100 Ok((
1101 TestSource::new(all_instances_configs[0], ())?,
1102 TestSink::new(all_instances_configs[1], ())?,
1103 ))
1104 }
1105
1106 #[cfg(not(feature = "std"))]
1107 fn tasks_instanciator(
1108 all_instances_configs: Vec<Option<&ComponentConfig>>,
1109 _resources: &mut ResourceManager,
1110 ) -> CuResult<Tasks> {
1111 Ok((
1112 TestSource::new(all_instances_configs[0], ())?,
1113 TestSink::new(all_instances_configs[1], ())?,
1114 ))
1115 }
1116
1117 fn monitor_instanciator(_config: &CuConfig) -> NoMonitor {
1118 NoMonitor {}
1119 }
1120
1121 fn bridges_instanciator(_config: &CuConfig, _resources: &mut ResourceManager) -> CuResult<()> {
1122 Ok(())
1123 }
1124
1125 fn resources_instanciator(_config: &CuConfig) -> CuResult<ResourceManager> {
1126 Ok(ResourceManager::new(&[]))
1127 }
1128
1129 #[derive(Debug)]
1130 struct FakeWriter {}
1131
1132 impl<E: Encode> WriteStream<E> for FakeWriter {
1133 fn log(&mut self, _obj: &E) -> CuResult<()> {
1134 Ok(())
1135 }
1136 }
1137
1138 #[test]
1139 fn test_runtime_instantiation() {
1140 let mut config = CuConfig::default();
1141 let graph = config.get_graph_mut(None).unwrap();
1142 graph.add_node(Node::new("a", "TestSource")).unwrap();
1143 graph.add_node(Node::new("b", "TestSink")).unwrap();
1144 graph.connect(0, 1, "()").unwrap();
1145 let runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
1146 RobotClock::default(),
1147 &config,
1148 None,
1149 resources_instanciator,
1150 tasks_instanciator,
1151 monitor_instanciator,
1152 bridges_instanciator,
1153 FakeWriter {},
1154 FakeWriter {},
1155 );
1156 assert!(runtime.is_ok());
1157 }
1158
1159 #[test]
1160 fn test_copperlists_manager_lifecycle() {
1161 let mut config = CuConfig::default();
1162 let graph = config.get_graph_mut(None).unwrap();
1163 graph.add_node(Node::new("a", "TestSource")).unwrap();
1164 graph.add_node(Node::new("b", "TestSink")).unwrap();
1165 graph.connect(0, 1, "()").unwrap();
1166
1167 let mut runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
1168 RobotClock::default(),
1169 &config,
1170 None,
1171 resources_instanciator,
1172 tasks_instanciator,
1173 monitor_instanciator,
1174 bridges_instanciator,
1175 FakeWriter {},
1176 FakeWriter {},
1177 )
1178 .unwrap();
1179
1180 {
1182 let copperlists = &mut runtime.copperlists_manager;
1183 let culist0 = copperlists
1184 .inner
1185 .create()
1186 .expect("Ran out of space for copper lists");
1187 let id = culist0.id;
1189 assert_eq!(id, 0);
1190 culist0.change_state(CopperListState::Processing);
1191 assert_eq!(copperlists.available_copper_lists(), 1);
1192 }
1193
1194 {
1195 let copperlists = &mut runtime.copperlists_manager;
1196 let culist1 = copperlists
1197 .inner
1198 .create()
1199 .expect("Ran out of space for copper lists"); let id = culist1.id;
1201 assert_eq!(id, 1);
1202 culist1.change_state(CopperListState::Processing);
1203 assert_eq!(copperlists.available_copper_lists(), 0);
1204 }
1205
1206 {
1207 let copperlists = &mut runtime.copperlists_manager;
1208 let culist2 = copperlists.inner.create();
1209 assert!(culist2.is_none());
1210 assert_eq!(copperlists.available_copper_lists(), 0);
1211 let _ = copperlists.end_of_processing(1);
1213 assert_eq!(copperlists.available_copper_lists(), 1);
1214 }
1215
1216 {
1218 let copperlists = &mut runtime.copperlists_manager;
1219 let culist2 = copperlists
1220 .inner
1221 .create()
1222 .expect("Ran out of space for copper lists"); let id = culist2.id;
1224 assert_eq!(id, 2);
1225 culist2.change_state(CopperListState::Processing);
1226 assert_eq!(copperlists.available_copper_lists(), 0);
1227 let _ = copperlists.end_of_processing(0);
1229 assert_eq!(copperlists.available_copper_lists(), 0);
1231
1232 let _ = copperlists.end_of_processing(2);
1234 assert_eq!(copperlists.available_copper_lists(), 2);
1237 }
1238 }
1239
1240 #[test]
1241 fn test_runtime_task_input_order() {
1242 let mut config = CuConfig::default();
1243 let graph = config.get_graph_mut(None).unwrap();
1244 let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
1245 let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
1246 let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
1247
1248 assert_eq!(src1_id, 0);
1249 assert_eq!(src2_id, 1);
1250
1251 let src1_type = "src1_type";
1253 let src2_type = "src2_type";
1254 graph.connect(src2_id, sink_id, src2_type).unwrap();
1255 graph.connect(src1_id, sink_id, src1_type).unwrap();
1256
1257 let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
1258 let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
1259 assert_eq!(src1_edge_id, 1);
1262 assert_eq!(src2_edge_id, 0);
1263
1264 let runtime = compute_runtime_plan(graph).unwrap();
1265 let sink_step = runtime
1266 .steps
1267 .iter()
1268 .find_map(|step| match step {
1269 CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
1270 _ => None,
1271 })
1272 .unwrap();
1273
1274 assert_eq!(sink_step.input_msg_indices_types[0].msg_type, src2_type);
1277 assert_eq!(sink_step.input_msg_indices_types[1].msg_type, src1_type);
1278 }
1279
1280 #[test]
1281 fn test_runtime_output_ports_unique_ordered() {
1282 let mut config = CuConfig::default();
1283 let graph = config.get_graph_mut(None).unwrap();
1284 let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
1285 let dst_a_id = graph.add_node(Node::new("dst_a", "SinkA")).unwrap();
1286 let dst_b_id = graph.add_node(Node::new("dst_b", "SinkB")).unwrap();
1287 let dst_a2_id = graph.add_node(Node::new("dst_a2", "SinkA2")).unwrap();
1288 let dst_c_id = graph.add_node(Node::new("dst_c", "SinkC")).unwrap();
1289
1290 graph.connect(src_id, dst_a_id, "msg::A").unwrap();
1291 graph.connect(src_id, dst_b_id, "msg::B").unwrap();
1292 graph.connect(src_id, dst_a2_id, "msg::A").unwrap();
1293 graph.connect(src_id, dst_c_id, "msg::C").unwrap();
1294
1295 let runtime = compute_runtime_plan(graph).unwrap();
1296 let src_step = runtime
1297 .steps
1298 .iter()
1299 .find_map(|step| match step {
1300 CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
1301 _ => None,
1302 })
1303 .unwrap();
1304
1305 let output_pack = src_step.output_msg_pack.as_ref().unwrap();
1306 assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B", "msg::C"]);
1307
1308 let dst_a_step = runtime
1309 .steps
1310 .iter()
1311 .find_map(|step| match step {
1312 CuExecutionUnit::Step(step) if step.node_id == dst_a_id => Some(step),
1313 _ => None,
1314 })
1315 .unwrap();
1316 let dst_b_step = runtime
1317 .steps
1318 .iter()
1319 .find_map(|step| match step {
1320 CuExecutionUnit::Step(step) if step.node_id == dst_b_id => Some(step),
1321 _ => None,
1322 })
1323 .unwrap();
1324 let dst_a2_step = runtime
1325 .steps
1326 .iter()
1327 .find_map(|step| match step {
1328 CuExecutionUnit::Step(step) if step.node_id == dst_a2_id => Some(step),
1329 _ => None,
1330 })
1331 .unwrap();
1332 let dst_c_step = runtime
1333 .steps
1334 .iter()
1335 .find_map(|step| match step {
1336 CuExecutionUnit::Step(step) if step.node_id == dst_c_id => Some(step),
1337 _ => None,
1338 })
1339 .unwrap();
1340
1341 assert_eq!(dst_a_step.input_msg_indices_types[0].src_port, 0);
1342 assert_eq!(dst_b_step.input_msg_indices_types[0].src_port, 1);
1343 assert_eq!(dst_a2_step.input_msg_indices_types[0].src_port, 0);
1344 assert_eq!(dst_c_step.input_msg_indices_types[0].src_port, 2);
1345 }
1346
1347 #[test]
1348 fn test_runtime_output_ports_fanout_single() {
1349 let mut config = CuConfig::default();
1350 let graph = config.get_graph_mut(None).unwrap();
1351 let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
1352 let dst_a_id = graph.add_node(Node::new("dst_a", "SinkA")).unwrap();
1353 let dst_b_id = graph.add_node(Node::new("dst_b", "SinkB")).unwrap();
1354
1355 graph.connect(src_id, dst_a_id, "i32").unwrap();
1356 graph.connect(src_id, dst_b_id, "i32").unwrap();
1357
1358 let runtime = compute_runtime_plan(graph).unwrap();
1359 let src_step = runtime
1360 .steps
1361 .iter()
1362 .find_map(|step| match step {
1363 CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
1364 _ => None,
1365 })
1366 .unwrap();
1367
1368 let output_pack = src_step.output_msg_pack.as_ref().unwrap();
1369 assert_eq!(output_pack.msg_types, vec!["i32"]);
1370 }
1371
1372 #[test]
1373 fn test_runtime_plan_diamond_case1() {
1374 let mut config = CuConfig::default();
1376 let graph = config.get_graph_mut(None).unwrap();
1377 let cam0_id = graph
1378 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1379 .unwrap();
1380 let inf0_id = graph
1381 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1382 .unwrap();
1383 let broadcast_id = graph
1384 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1385 .unwrap();
1386
1387 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1389 graph.connect(cam0_id, inf0_id, "i32").unwrap();
1390 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1391
1392 let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1393 let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
1394
1395 assert_eq!(edge_cam0_to_inf0, 0);
1396 assert_eq!(edge_cam0_to_broadcast, 1);
1397
1398 let runtime = compute_runtime_plan(graph).unwrap();
1399 let broadcast_step = runtime
1400 .steps
1401 .iter()
1402 .find_map(|step| match step {
1403 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1404 _ => None,
1405 })
1406 .unwrap();
1407
1408 assert_eq!(broadcast_step.input_msg_indices_types[0].msg_type, "i32");
1409 assert_eq!(broadcast_step.input_msg_indices_types[1].msg_type, "f32");
1410 }
1411
1412 #[test]
1413 fn test_runtime_plan_diamond_case2() {
1414 let mut config = CuConfig::default();
1416 let graph = config.get_graph_mut(None).unwrap();
1417 let cam0_id = graph
1418 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1419 .unwrap();
1420 let inf0_id = graph
1421 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1422 .unwrap();
1423 let broadcast_id = graph
1424 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1425 .unwrap();
1426
1427 graph.connect(cam0_id, inf0_id, "i32").unwrap();
1429 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1430 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1431
1432 let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1433 let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
1434
1435 assert_eq!(edge_cam0_to_broadcast, 0);
1436 assert_eq!(edge_cam0_to_inf0, 1);
1437
1438 let runtime = compute_runtime_plan(graph).unwrap();
1439 let broadcast_step = runtime
1440 .steps
1441 .iter()
1442 .find_map(|step| match step {
1443 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1444 _ => None,
1445 })
1446 .unwrap();
1447
1448 assert_eq!(broadcast_step.input_msg_indices_types[0].msg_type, "i32");
1449 assert_eq!(broadcast_step.input_msg_indices_types[1].msg_type, "f32");
1450 }
1451}