1use crate::config::{ComponentConfig, Node, DEFAULT_KEYFRAME_INTERVAL};
6use crate::config::{CuConfig, CuGraph, NodeId};
7use crate::copperlist::{CopperList, CopperListState, CuListsManager};
8use crate::cutask::{BincodeAdapter, Freezable};
9use crate::monitoring::CuMonitor;
10use cu29_clock::{ClockProvider, CuTime, RobotClock};
11use cu29_log_runtime::LoggerRuntime;
12use cu29_traits::CuResult;
13use cu29_traits::WriteStream;
14use cu29_traits::{CopperListTuple, CuError};
15use cu29_unifiedlog::UnifiedLoggerWrite;
16use std::sync::{Arc, Mutex};
17
18use bincode::error::EncodeError;
19use bincode::{encode_into_std_write, Decode, Encode};
20use petgraph::prelude::*;
21use petgraph::visit::VisitMap;
22use petgraph::visit::Visitable;
23use std::fmt::Debug;
24
25pub struct CopperContext {
27 pub unified_logger: Arc<Mutex<UnifiedLoggerWrite>>,
28 pub logger_runtime: LoggerRuntime,
29 pub clock: RobotClock,
30}
31
32pub struct CopperListsManager<P: CopperListTuple, const NBCL: usize> {
34 pub inner: CuListsManager<P, NBCL>,
35 pub logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
37}
38
39impl<P: CopperListTuple, const NBCL: usize> CopperListsManager<P, NBCL> {
40 pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
41 let mut is_top = true;
42 let mut nb_done = 0;
43 for cl in self.inner.iter_mut() {
44 if cl.id == culistid && cl.get_state() == CopperListState::Processing {
45 cl.change_state(CopperListState::DoneProcessing);
46 }
47 if is_top && cl.get_state() == CopperListState::DoneProcessing {
48 if let Some(logger) = &mut self.logger {
49 cl.change_state(CopperListState::BeingSerialized);
50 logger.log(cl)?;
51 }
52 cl.change_state(CopperListState::Free);
53 nb_done += 1;
54 } else {
55 is_top = false;
56 }
57 }
58 for _ in 0..nb_done {
59 let _ = self.inner.pop();
60 }
61 Ok(())
62 }
63
64 pub fn available_copper_lists(&self) -> usize {
65 NBCL - self.inner.len()
66 }
67}
68
69pub struct KeyFramesManager {
71 inner: KeyFrame,
73
74 logger: Option<Box<dyn WriteStream<KeyFrame>>>,
76
77 keyframe_interval: u32,
79}
80
81impl KeyFramesManager {
82 fn is_keyframe(&self, culistid: u32) -> bool {
83 self.logger.is_some() && culistid % self.keyframe_interval == 0
84 }
85
86 pub fn reset(&mut self, culistid: u32, clock: &RobotClock) {
87 if self.is_keyframe(culistid) {
88 self.inner.reset(culistid, clock.now());
89 }
90 }
91
92 pub fn freeze_task(&mut self, culistid: u32, task: &impl Freezable) -> CuResult<usize> {
93 if self.is_keyframe(culistid) {
94 if self.inner.culistid != culistid {
95 panic!("Freezing task for a different culistid");
96 }
97 self.inner
98 .add_frozen_task(task)
99 .map_err(|e| CuError::from(format!("Failed to serialize task: {e}")))
100 } else {
101 Ok(0)
102 }
103 }
104
105 pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
106 if self.is_keyframe(culistid) {
107 let logger = self.logger.as_mut().unwrap();
108 logger.log(&self.inner)
109 } else {
110 Ok(())
111 }
112 }
113}
114
115pub struct CuRuntime<CT, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
119 pub clock: RobotClock, pub tasks: CT,
124
125 pub monitor: M,
127
128 pub copperlists_manager: CopperListsManager<P, NBCL>,
130
131 pub keyframes_manager: KeyFramesManager,
133}
134
135impl<CT, P: CopperListTuple, M: CuMonitor, const NBCL: usize> ClockProvider
137 for CuRuntime<CT, P, M, NBCL>
138{
139 fn get_clock(&self) -> RobotClock {
140 self.clock.clone()
141 }
142}
143
144#[derive(Encode, Decode)]
148pub struct KeyFrame {
149 pub culistid: u32,
151 pub timestamp: CuTime,
153 pub serialized_tasks: Vec<u8>,
155}
156
157impl KeyFrame {
158 fn new() -> Self {
159 KeyFrame {
160 culistid: 0,
161 timestamp: CuTime::default(),
162 serialized_tasks: Vec::new(),
163 }
164 }
165
166 fn reset(&mut self, culistid: u32, timestamp: CuTime) {
168 self.culistid = culistid;
169 self.timestamp = timestamp;
170 self.serialized_tasks.clear();
171 }
172
173 fn add_frozen_task(&mut self, task: &impl Freezable) -> Result<usize, EncodeError> {
175 let config = bincode::config::standard();
176 encode_into_std_write(BincodeAdapter(task), &mut self.serialized_tasks, config)
177 }
178}
179
180impl<CT, P: CopperListTuple + 'static, M: CuMonitor, const NBCL: usize> CuRuntime<CT, P, M, NBCL> {
181 pub fn new(
182 clock: RobotClock,
183 config: &CuConfig,
184 mission: Option<&str>,
185 tasks_instanciator: impl Fn(Vec<Option<&ComponentConfig>>) -> CuResult<CT>,
186 monitor_instanciator: impl Fn(&CuConfig) -> M,
187 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
188 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
189 ) -> CuResult<Self> {
190 let graph = config.get_graph(mission)?;
191 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
192 .get_all_nodes()
193 .iter()
194 .map(|(_, node)| node.get_instance_config())
195 .collect();
196 let tasks = tasks_instanciator(all_instances_configs)?;
197
198 let monitor = monitor_instanciator(config);
199
200 let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
201 Some(logging_config) if logging_config.enable_task_logging => (
202 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
203 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
204 logging_config.keyframe_interval.unwrap(), ),
206 Some(_) => (None, None, 0), None => (
208 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
210 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
211 DEFAULT_KEYFRAME_INTERVAL,
212 ),
213 };
214
215 let copperlists_manager = CopperListsManager {
216 inner: CuListsManager::new(),
217 logger: copperlists_logger,
218 };
219
220 let keyframes_manager = KeyFramesManager {
221 inner: KeyFrame::new(),
222 logger: keyframes_logger,
223 keyframe_interval,
224 };
225
226 let runtime = Self {
227 tasks,
228 monitor,
229 clock,
230 copperlists_manager,
231 keyframes_manager,
232 };
233
234 Ok(runtime)
235 }
236}
237
238#[derive(Debug, PartialEq, Eq, Clone, Copy)]
243pub enum CuTaskType {
244 Source,
245 Regular,
246 Sink,
247}
248
249pub struct CuExecutionStep {
251 pub node_id: NodeId,
253 pub node: Node,
255 pub task_type: CuTaskType,
257
258 pub input_msg_indices_types: Vec<(u32, String)>,
260
261 pub output_msg_index_type: Option<(u32, String)>,
263}
264
265impl Debug for CuExecutionStep {
266 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267 f.write_str(format!(" CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
268 f.write_str(format!(" task_type: {:?}\n", self.node.get_type()).as_str())?;
269 f.write_str(format!(" task: {:?}\n", self.task_type).as_str())?;
270 f.write_str(
271 format!(
272 " input_msg_types: {:?}\n",
273 self.input_msg_indices_types
274 )
275 .as_str(),
276 )?;
277 f.write_str(
278 format!(" output_msg_type: {:?}\n", self.output_msg_index_type).as_str(),
279 )?;
280 Ok(())
281 }
282}
283
284pub struct CuExecutionLoop {
289 pub steps: Vec<CuExecutionUnit>,
290 pub loop_count: Option<u32>,
291}
292
293impl Debug for CuExecutionLoop {
294 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
295 f.write_str("CuExecutionLoop:\n")?;
296 for step in &self.steps {
297 match step {
298 CuExecutionUnit::Step(step) => {
299 step.fmt(f)?;
300 }
301 CuExecutionUnit::Loop(l) => {
302 l.fmt(f)?;
303 }
304 }
305 }
306
307 f.write_str(format!(" count: {:?}", self.loop_count).as_str())?;
308 Ok(())
309 }
310}
311
312#[derive(Debug)]
314pub enum CuExecutionUnit {
315 Step(CuExecutionStep),
316 Loop(CuExecutionLoop),
317}
318
319fn find_output_index_type_from_nodeid(
320 node_id: NodeId,
321 steps: &Vec<CuExecutionUnit>,
322) -> Option<(u32, String)> {
323 for step in steps {
324 match step {
325 CuExecutionUnit::Loop(loop_unit) => {
326 if let Some(index) = find_output_index_type_from_nodeid(node_id, &loop_unit.steps) {
327 return Some(index);
328 }
329 }
330 CuExecutionUnit::Step(step) => {
331 if step.node_id == node_id {
332 return step.output_msg_index_type.clone();
333 }
334 }
335 }
336 }
337 None
338}
339
340pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuTaskType {
341 if graph.0.neighbors_directed(node_id.into(), Incoming).count() == 0 {
342 CuTaskType::Source
343 } else if graph.0.neighbors_directed(node_id.into(), Outgoing).count() == 0 {
344 CuTaskType::Sink
345 } else {
346 CuTaskType::Regular
347 }
348}
349
350fn find_edge_with_plan_input_id(
353 plan: &[CuExecutionUnit],
354 graph: &CuGraph,
355 plan_id: u32,
356 output_node_id: NodeId,
357) -> usize {
358 let input_node = plan
359 .get(plan_id as usize)
360 .expect("Input step should've been added to plan before the step that receives the input");
361 let CuExecutionUnit::Step(input_step) = input_node else {
362 panic!("Expected input to be from a step, not a loop");
363 };
364 let input_node_id = input_step.node_id;
365
366 graph
367 .0
368 .edges_connecting(input_node_id.into(), output_node_id.into())
369 .map(|edge| edge.id().index())
370 .next()
371 .expect("An edge connecting the input to the output should exist")
372}
373
374fn sort_inputs_by_cnx_id(
377 input_msg_indices_types: &mut [(u32, String)],
378 plan: &[CuExecutionUnit],
379 graph: &CuGraph,
380 curr_node_id: NodeId,
381) {
382 input_msg_indices_types.sort_by(|(a_index, _), (b_index, _)| {
383 let a_edge_id = find_edge_with_plan_input_id(plan, graph, *a_index, curr_node_id);
384 let b_edge_id = find_edge_with_plan_input_id(plan, graph, *b_index, curr_node_id);
385 a_edge_id.cmp(&b_edge_id)
386 });
387}
388fn plan_tasks_tree_branch(
390 graph: &CuGraph,
391 mut next_culist_output_index: u32,
392 starting_point: NodeId,
393 plan: &mut Vec<CuExecutionUnit>,
394) -> (u32, bool) {
395 #[cfg(feature = "macro_debug")]
396 eprintln!("-- starting branch from node {starting_point}");
397
398 let mut visitor = Bfs::new(&graph.0, starting_point.into());
399 let mut handled = false;
400
401 while let Some(node) = visitor.next(&graph.0) {
402 let id = node.index() as NodeId;
403 let node_ref = graph.get_node(id).unwrap();
404 #[cfg(feature = "macro_debug")]
405 eprintln!(" Visiting node: {node_ref:?}");
406
407 let mut input_msg_indices_types: Vec<(u32, String)> = Vec::new();
408 let output_msg_index_type: Option<(u32, String)>;
409 let task_type = find_task_type_for_id(graph, id);
410
411 match task_type {
412 CuTaskType::Source => {
413 #[cfg(feature = "macro_debug")]
414 eprintln!(" → Source node, assign output index {next_culist_output_index}");
415 output_msg_index_type = Some((
416 next_culist_output_index,
417 graph
418 .0
419 .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0]))
420 .unwrap() .msg
422 .clone(),
423 ));
424 next_culist_output_index += 1;
425 }
426 CuTaskType::Sink => {
427 let parents: Vec<NodeIndex> =
428 graph.0.neighbors_directed(id.into(), Incoming).collect();
429 #[cfg(feature = "macro_debug")]
430 eprintln!(" → Sink with parents: {parents:?}");
431 for parent in &parents {
432 let pid = parent.index() as NodeId;
433 let index_type = find_output_index_type_from_nodeid(pid, plan);
434 if let Some(index_type) = index_type {
435 #[cfg(feature = "macro_debug")]
436 eprintln!(" ✓ Input from {pid} ready: {index_type:?}");
437 input_msg_indices_types.push(index_type);
438 } else {
439 #[cfg(feature = "macro_debug")]
440 eprintln!(" ✗ Input from {pid} not ready, returning");
441 return (next_culist_output_index, handled);
442 }
443 }
444 output_msg_index_type = Some((next_culist_output_index, "()".to_string()));
445 next_culist_output_index += 1;
446 }
447 CuTaskType::Regular => {
448 let parents: Vec<NodeIndex> =
449 graph.0.neighbors_directed(id.into(), Incoming).collect();
450 #[cfg(feature = "macro_debug")]
451 eprintln!(" → Regular task with parents: {parents:?}");
452 for parent in &parents {
453 let pid = parent.index() as NodeId;
454 let index_type = find_output_index_type_from_nodeid(pid, plan);
455 if let Some(index_type) = index_type {
456 #[cfg(feature = "macro_debug")]
457 eprintln!(" ✓ Input from {pid} ready: {index_type:?}");
458 input_msg_indices_types.push(index_type);
459 } else {
460 #[cfg(feature = "macro_debug")]
461 eprintln!(" ✗ Input from {pid} not ready, returning");
462 return (next_culist_output_index, handled);
463 }
464 }
465 output_msg_index_type = Some((
466 next_culist_output_index,
467 graph
468 .0
469 .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0])) .unwrap()
471 .msg
472 .clone(),
473 ));
474 next_culist_output_index += 1;
475 }
476 }
477
478 sort_inputs_by_cnx_id(&mut input_msg_indices_types, plan, graph, id);
479
480 if let Some(pos) = plan
481 .iter()
482 .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
483 {
484 #[cfg(feature = "macro_debug")]
485 eprintln!(" → Already in plan, modifying existing step");
486 let mut step = plan.remove(pos);
487 if let CuExecutionUnit::Step(ref mut s) = step {
488 s.input_msg_indices_types = input_msg_indices_types;
489 }
490 plan.push(step);
491 } else {
492 #[cfg(feature = "macro_debug")]
493 eprintln!(" → New step added to plan");
494 let step = CuExecutionStep {
495 node_id: id,
496 node: node_ref.clone(),
497 task_type,
498 input_msg_indices_types,
499 output_msg_index_type,
500 };
501 plan.push(CuExecutionUnit::Step(step));
502 }
503
504 handled = true;
505 }
506
507 #[cfg(feature = "macro_debug")]
508 eprintln!("-- finished branch from node {starting_point} with handled={handled}");
509 (next_culist_output_index, handled)
510}
511
512pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
515 #[cfg(feature = "macro_debug")]
516 eprintln!("[runtime plan]");
517 let visited = graph.0.visit_map();
518 let mut plan = Vec::new();
519 let mut next_culist_output_index = 0u32;
520
521 let mut queue: std::collections::VecDeque<NodeId> = graph
522 .node_indices()
523 .iter()
524 .filter(|&node| find_task_type_for_id(graph, node.index() as NodeId) == CuTaskType::Source)
525 .map(|node| node.index() as NodeId)
526 .collect();
527
528 #[cfg(feature = "macro_debug")]
529 eprintln!("Initial source nodes: {queue:?}");
530
531 while let Some(start_node) = queue.pop_front() {
532 if visited.is_visited(&start_node) {
533 #[cfg(feature = "macro_debug")]
534 eprintln!("→ Skipping already visited source {start_node}");
535 continue;
536 }
537
538 #[cfg(feature = "macro_debug")]
539 eprintln!("→ Starting BFS from source {start_node}");
540 let mut bfs = Bfs::new(&graph.0, start_node.into());
541
542 while let Some(node_index) = bfs.next(&graph.0) {
543 let node_id = node_index.index() as NodeId;
544 let already_in_plan = plan
545 .iter()
546 .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
547 if already_in_plan {
548 #[cfg(feature = "macro_debug")]
549 eprintln!(" → Node {node_id} already planned, skipping");
550 continue;
551 }
552
553 #[cfg(feature = "macro_debug")]
554 eprintln!(" Planning from node {node_id}");
555 let (new_index, handled) =
556 plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan);
557 next_culist_output_index = new_index;
558
559 if !handled {
560 #[cfg(feature = "macro_debug")]
561 eprintln!(" ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
562 continue;
563 }
564
565 #[cfg(feature = "macro_debug")]
566 eprintln!(" ✓ Node {node_id} handled successfully, enqueueing neighbors");
567 for neighbor in graph.0.neighbors(node_index) {
568 if !visited.is_visited(&neighbor) {
569 let nid = neighbor.index() as NodeId;
570 #[cfg(feature = "macro_debug")]
571 eprintln!(" → Enqueueing neighbor {nid}");
572 queue.push_back(nid);
573 }
574 }
575 }
576 }
577
578 Ok(CuExecutionLoop {
579 steps: plan,
580 loop_count: None,
581 })
582}
583
584#[cfg(test)]
586mod tests {
587 use super::*;
588 use crate::config::Node;
589 use crate::cutask::CuSinkTask;
590 use crate::cutask::{CuSrcTask, Freezable};
591 use crate::monitoring::NoMonitor;
592 use bincode::Encode;
593 use cu29_traits::{ErasedCuMsg, ErasedCuMsgs, MatchingTasks};
594 use serde_derive::Serialize;
595
596 pub struct TestSource {}
597
598 impl Freezable for TestSource {}
599
600 impl CuSrcTask<'_> for TestSource {
601 type Output = ();
602 fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
603 where
604 Self: Sized,
605 {
606 Ok(Self {})
607 }
608
609 fn process(&mut self, _clock: &RobotClock, _empty_msg: Self::Output) -> CuResult<()> {
610 Ok(())
611 }
612 }
613
614 pub struct TestSink {}
615
616 impl Freezable for TestSink {}
617
618 impl CuSinkTask<'_> for TestSink {
619 type Input = ();
620
621 fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
622 where
623 Self: Sized,
624 {
625 Ok(Self {})
626 }
627
628 fn process(&mut self, _clock: &RobotClock, _input: Self::Input) -> CuResult<()> {
629 Ok(())
630 }
631 }
632
633 type Tasks = (TestSource, TestSink);
635
636 #[derive(Debug, Encode, Decode, Serialize)]
637 struct Msgs(());
638
639 impl ErasedCuMsgs for Msgs {
640 fn cumsgs(&self) -> Vec<&dyn ErasedCuMsg> {
641 Vec::new()
642 }
643 }
644
645 impl MatchingTasks for Msgs {
646 fn get_all_task_ids() -> &'static [&'static str] {
647 &[]
648 }
649 }
650
651 fn tasks_instanciator(all_instances_configs: Vec<Option<&ComponentConfig>>) -> CuResult<Tasks> {
652 Ok((
653 TestSource::new(all_instances_configs[0])?,
654 TestSink::new(all_instances_configs[1])?,
655 ))
656 }
657
658 fn monitor_instanciator(_config: &CuConfig) -> NoMonitor {
659 NoMonitor {}
660 }
661
662 #[derive(Debug)]
663 struct FakeWriter {}
664
665 impl<E: Encode> WriteStream<E> for FakeWriter {
666 fn log(&mut self, _obj: &E) -> CuResult<()> {
667 Ok(())
668 }
669 }
670
671 #[test]
672 fn test_runtime_instantiation() {
673 let mut config = CuConfig::default();
674 let graph = config.get_graph_mut(None).unwrap();
675 graph.add_node(Node::new("a", "TestSource")).unwrap();
676 graph.add_node(Node::new("b", "TestSink")).unwrap();
677 graph.connect(0, 1, "()").unwrap();
678 let runtime = CuRuntime::<Tasks, Msgs, NoMonitor, 2>::new(
679 RobotClock::default(),
680 &config,
681 None,
682 tasks_instanciator,
683 monitor_instanciator,
684 FakeWriter {},
685 FakeWriter {},
686 );
687 assert!(runtime.is_ok());
688 }
689
690 #[test]
691 fn test_copperlists_manager_lifecycle() {
692 let mut config = CuConfig::default();
693 let graph = config.get_graph_mut(None).unwrap();
694 graph.add_node(Node::new("a", "TestSource")).unwrap();
695 graph.add_node(Node::new("b", "TestSink")).unwrap();
696 graph.connect(0, 1, "()").unwrap();
697 let mut runtime = CuRuntime::<Tasks, Msgs, NoMonitor, 2>::new(
698 RobotClock::default(),
699 &config,
700 None,
701 tasks_instanciator,
702 monitor_instanciator,
703 FakeWriter {},
704 FakeWriter {},
705 )
706 .unwrap();
707
708 {
710 let copperlists = &mut runtime.copperlists_manager;
711 let culist0 = copperlists
712 .inner
713 .create()
714 .expect("Ran out of space for copper lists");
715 let id = culist0.id;
717 assert_eq!(id, 0);
718 culist0.change_state(CopperListState::Processing);
719 assert_eq!(copperlists.available_copper_lists(), 1);
720 }
721
722 {
723 let copperlists = &mut runtime.copperlists_manager;
724 let culist1 = copperlists
725 .inner
726 .create()
727 .expect("Ran out of space for copper lists"); let id = culist1.id;
729 assert_eq!(id, 1);
730 culist1.change_state(CopperListState::Processing);
731 assert_eq!(copperlists.available_copper_lists(), 0);
732 }
733
734 {
735 let copperlists = &mut runtime.copperlists_manager;
736 let culist2 = copperlists.inner.create();
737 assert!(culist2.is_none());
738 assert_eq!(copperlists.available_copper_lists(), 0);
739 let _ = copperlists.end_of_processing(1);
741 assert_eq!(copperlists.available_copper_lists(), 1);
742 }
743
744 {
746 let copperlists = &mut runtime.copperlists_manager;
747 let culist2 = copperlists
748 .inner
749 .create()
750 .expect("Ran out of space for copper lists"); let id = culist2.id;
752 assert_eq!(id, 2);
753 culist2.change_state(CopperListState::Processing);
754 assert_eq!(copperlists.available_copper_lists(), 0);
755 let _ = copperlists.end_of_processing(0);
757 assert_eq!(copperlists.available_copper_lists(), 0);
759
760 let _ = copperlists.end_of_processing(2);
762 assert_eq!(copperlists.available_copper_lists(), 2);
765 }
766 }
767
768 #[test]
769 fn test_runtime_task_input_order() {
770 let mut config = CuConfig::default();
771 let graph = config.get_graph_mut(None).unwrap();
772 let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
773 let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
774 let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
775
776 assert_eq!(src1_id, 0);
777 assert_eq!(src2_id, 1);
778
779 let src1_type = "src1_type";
781 let src2_type = "src2_type";
782 graph.connect(src2_id, sink_id, src2_type).unwrap();
783 graph.connect(src1_id, sink_id, src1_type).unwrap();
784
785 let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
786 let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
787 assert_eq!(src1_edge_id, 1);
790 assert_eq!(src2_edge_id, 0);
791
792 let runtime = compute_runtime_plan(graph).unwrap();
793 let sink_step = runtime
794 .steps
795 .iter()
796 .find_map(|step| match step {
797 CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
798 _ => None,
799 })
800 .unwrap();
801
802 assert_eq!(sink_step.input_msg_indices_types[0].1, src2_type);
805 assert_eq!(sink_step.input_msg_indices_types[1].1, src1_type);
806 }
807
808 #[test]
809 fn test_runtime_plan_diamond_case1() {
810 let mut config = CuConfig::default();
812 let graph = config.get_graph_mut(None).unwrap();
813 let cam0_id = graph
814 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
815 .unwrap();
816 let inf0_id = graph
817 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
818 .unwrap();
819 let broadcast_id = graph
820 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
821 .unwrap();
822
823 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
825 graph.connect(cam0_id, inf0_id, "i32").unwrap();
826 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
827
828 let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
829 let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
830
831 assert_eq!(edge_cam0_to_inf0, 0);
832 assert_eq!(edge_cam0_to_broadcast, 1);
833
834 let runtime = compute_runtime_plan(graph).unwrap();
835 let broadcast_step = runtime
836 .steps
837 .iter()
838 .find_map(|step| match step {
839 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
840 _ => None,
841 })
842 .unwrap();
843
844 assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
845 assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
846 }
847
848 #[test]
849 fn test_runtime_plan_diamond_case2() {
850 let mut config = CuConfig::default();
852 let graph = config.get_graph_mut(None).unwrap();
853 let cam0_id = graph
854 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
855 .unwrap();
856 let inf0_id = graph
857 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
858 .unwrap();
859 let broadcast_id = graph
860 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
861 .unwrap();
862
863 graph.connect(cam0_id, inf0_id, "i32").unwrap();
865 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
866 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
867
868 let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
869 let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
870
871 assert_eq!(edge_cam0_to_broadcast, 0);
872 assert_eq!(edge_cam0_to_inf0, 1);
873
874 let runtime = compute_runtime_plan(graph).unwrap();
875 let broadcast_step = runtime
876 .steps
877 .iter()
878 .find_map(|step| match step {
879 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
880 _ => None,
881 })
882 .unwrap();
883
884 assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
885 assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
886 }
887}