1use crate::config::{ComponentConfig, Node, DEFAULT_KEYFRAME_INTERVAL};
6use crate::config::{CuConfig, CuGraph, NodeId, RuntimeConfig};
7use crate::copperlist::{CopperList, CopperListState, CuListZeroedInit, CuListsManager};
8use crate::cutask::{BincodeAdapter, Freezable};
9use crate::monitoring::CuMonitor;
10use cu29_clock::{ClockProvider, CuTime, RobotClock};
11use cu29_log_runtime::LoggerRuntime;
12use cu29_traits::CuResult;
13use cu29_traits::WriteStream;
14use cu29_traits::{CopperListTuple, CuError};
15use cu29_unifiedlog::UnifiedLoggerWrite;
16use std::sync::{Arc, Mutex};
17
18use bincode::error::EncodeError;
19use bincode::{encode_into_std_write, Decode, Encode};
20use petgraph::prelude::*;
21use petgraph::visit::VisitMap;
22use petgraph::visit::Visitable;
23use rayon::ThreadPool;
24use std::fmt::Debug;
25
26pub struct CopperContext {
28 pub unified_logger: Arc<Mutex<UnifiedLoggerWrite>>,
29 pub logger_runtime: LoggerRuntime,
30 pub clock: RobotClock,
31}
32
33pub struct CopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
35 pub inner: CuListsManager<P, NBCL>,
36 pub logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
38}
39
40impl<P: CopperListTuple + Default, const NBCL: usize> CopperListsManager<P, NBCL> {
41 pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
42 let mut is_top = true;
43 let mut nb_done = 0;
44 for cl in self.inner.iter_mut() {
45 if cl.id == culistid && cl.get_state() == CopperListState::Processing {
46 cl.change_state(CopperListState::DoneProcessing);
47 }
48 if is_top && cl.get_state() == CopperListState::DoneProcessing {
49 if let Some(logger) = &mut self.logger {
50 cl.change_state(CopperListState::BeingSerialized);
51 logger.log(cl)?;
52 }
53 cl.change_state(CopperListState::Free);
54 nb_done += 1;
55 } else {
56 is_top = false;
57 }
58 }
59 for _ in 0..nb_done {
60 let _ = self.inner.pop();
61 }
62 Ok(())
63 }
64
65 pub fn available_copper_lists(&self) -> usize {
66 NBCL - self.inner.len()
67 }
68}
69
70pub struct KeyFramesManager {
72 inner: KeyFrame,
74
75 logger: Option<Box<dyn WriteStream<KeyFrame>>>,
77
78 keyframe_interval: u32,
80}
81
82impl KeyFramesManager {
83 fn is_keyframe(&self, culistid: u32) -> bool {
84 self.logger.is_some() && culistid.is_multiple_of(self.keyframe_interval)
85 }
86
87 pub fn reset(&mut self, culistid: u32, clock: &RobotClock) {
88 if self.is_keyframe(culistid) {
89 self.inner.reset(culistid, clock.now());
90 }
91 }
92
93 pub fn freeze_task(&mut self, culistid: u32, task: &impl Freezable) -> CuResult<usize> {
94 if self.is_keyframe(culistid) {
95 if self.inner.culistid != culistid {
96 panic!("Freezing task for a different culistid");
97 }
98 self.inner
99 .add_frozen_task(task)
100 .map_err(|e| CuError::from(format!("Failed to serialize task: {e}")))
101 } else {
102 Ok(0)
103 }
104 }
105
106 pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
107 if self.is_keyframe(culistid) {
108 let logger = self.logger.as_mut().unwrap();
109 logger.log(&self.inner)
110 } else {
111 Ok(())
112 }
113 }
114}
115
116pub struct CuRuntime<CT, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
120 pub clock: RobotClock, pub tasks: CT,
125
126 pub threadpool: Arc<ThreadPool>,
128
129 pub monitor: M,
131
132 pub copperlists_manager: CopperListsManager<P, NBCL>,
134
135 pub keyframes_manager: KeyFramesManager,
137
138 pub runtime_config: RuntimeConfig,
140}
141
142impl<CT, P: CopperListTuple + CuListZeroedInit + Default, M: CuMonitor, const NBCL: usize>
144 ClockProvider for CuRuntime<CT, P, M, NBCL>
145{
146 fn get_clock(&self) -> RobotClock {
147 self.clock.clone()
148 }
149}
150
151#[derive(Encode, Decode)]
155pub struct KeyFrame {
156 pub culistid: u32,
158 pub timestamp: CuTime,
160 pub serialized_tasks: Vec<u8>,
162}
163
164impl KeyFrame {
165 fn new() -> Self {
166 KeyFrame {
167 culistid: 0,
168 timestamp: CuTime::default(),
169 serialized_tasks: Vec::new(),
170 }
171 }
172
173 fn reset(&mut self, culistid: u32, timestamp: CuTime) {
175 self.culistid = culistid;
176 self.timestamp = timestamp;
177 self.serialized_tasks.clear();
178 }
179
180 fn add_frozen_task(&mut self, task: &impl Freezable) -> Result<usize, EncodeError> {
182 let config = bincode::config::standard();
183 encode_into_std_write(BincodeAdapter(task), &mut self.serialized_tasks, config)
184 }
185}
186
187impl<
188 CT,
189 P: CopperListTuple + CuListZeroedInit + Default + 'static,
190 M: CuMonitor,
191 const NBCL: usize,
192 > CuRuntime<CT, P, M, NBCL>
193{
194 pub fn new(
195 clock: RobotClock,
196 config: &CuConfig,
197 mission: Option<&str>,
198 tasks_instanciator: impl for<'c> Fn(
199 Vec<Option<&'c ComponentConfig>>,
200 Arc<ThreadPool>,
201 ) -> CuResult<CT>,
202 monitor_instanciator: impl Fn(&CuConfig) -> M,
203 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
204 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
205 ) -> CuResult<Self> {
206 let graph = config.get_graph(mission)?;
207 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
208 .get_all_nodes()
209 .iter()
210 .map(|(_, node)| node.get_instance_config())
211 .collect();
212
213 let threadpool = Arc::new(
215 rayon::ThreadPoolBuilder::new()
216 .num_threads(2) .build()
218 .expect("Could not create the threadpool"),
219 );
220
221 let tasks = tasks_instanciator(all_instances_configs, threadpool.clone())?;
222
223 let monitor = monitor_instanciator(config);
224
225 let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
226 Some(logging_config) if logging_config.enable_task_logging => (
227 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
228 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
229 logging_config.keyframe_interval.unwrap(), ),
231 Some(_) => (None, None, 0), None => (
233 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
235 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
236 DEFAULT_KEYFRAME_INTERVAL,
237 ),
238 };
239
240 let copperlists_manager = CopperListsManager {
241 inner: CuListsManager::new(),
242 logger: copperlists_logger,
243 };
244
245 let keyframes_manager = KeyFramesManager {
246 inner: KeyFrame::new(),
247 logger: keyframes_logger,
248 keyframe_interval,
249 };
250
251 let runtime_config = config.runtime.clone().unwrap_or_default();
252
253 let runtime = Self {
254 tasks,
255 threadpool,
256 monitor,
257 clock,
258 copperlists_manager,
259 keyframes_manager,
260 runtime_config,
261 };
262
263 Ok(runtime)
264 }
265}
266
267#[derive(Debug, PartialEq, Eq, Clone, Copy)]
272pub enum CuTaskType {
273 Source,
274 Regular,
275 Sink,
276}
277
278pub struct CuExecutionStep {
280 pub node_id: NodeId,
282 pub node: Node,
284 pub task_type: CuTaskType,
286
287 pub input_msg_indices_types: Vec<(u32, String)>,
289
290 pub output_msg_index_type: Option<(u32, String)>,
292}
293
294impl Debug for CuExecutionStep {
295 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
296 f.write_str(format!(" CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
297 f.write_str(format!(" task_type: {:?}\n", self.node.get_type()).as_str())?;
298 f.write_str(format!(" task: {:?}\n", self.task_type).as_str())?;
299 f.write_str(
300 format!(
301 " input_msg_types: {:?}\n",
302 self.input_msg_indices_types
303 )
304 .as_str(),
305 )?;
306 f.write_str(
307 format!(" output_msg_type: {:?}\n", self.output_msg_index_type).as_str(),
308 )?;
309 Ok(())
310 }
311}
312
313pub struct CuExecutionLoop {
318 pub steps: Vec<CuExecutionUnit>,
319 pub loop_count: Option<u32>,
320}
321
322impl Debug for CuExecutionLoop {
323 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
324 f.write_str("CuExecutionLoop:\n")?;
325 for step in &self.steps {
326 match step {
327 CuExecutionUnit::Step(step) => {
328 step.fmt(f)?;
329 }
330 CuExecutionUnit::Loop(l) => {
331 l.fmt(f)?;
332 }
333 }
334 }
335
336 f.write_str(format!(" count: {:?}", self.loop_count).as_str())?;
337 Ok(())
338 }
339}
340
341#[derive(Debug)]
343pub enum CuExecutionUnit {
344 Step(CuExecutionStep),
345 Loop(CuExecutionLoop),
346}
347
348fn find_output_index_type_from_nodeid(
349 node_id: NodeId,
350 steps: &Vec<CuExecutionUnit>,
351) -> Option<(u32, String)> {
352 for step in steps {
353 match step {
354 CuExecutionUnit::Loop(loop_unit) => {
355 if let Some(index) = find_output_index_type_from_nodeid(node_id, &loop_unit.steps) {
356 return Some(index);
357 }
358 }
359 CuExecutionUnit::Step(step) => {
360 if step.node_id == node_id {
361 return step.output_msg_index_type.clone();
362 }
363 }
364 }
365 }
366 None
367}
368
369pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuTaskType {
370 if graph.0.neighbors_directed(node_id.into(), Incoming).count() == 0 {
371 CuTaskType::Source
372 } else if graph.0.neighbors_directed(node_id.into(), Outgoing).count() == 0 {
373 CuTaskType::Sink
374 } else {
375 CuTaskType::Regular
376 }
377}
378
379fn find_edge_with_plan_input_id(
382 plan: &[CuExecutionUnit],
383 graph: &CuGraph,
384 plan_id: u32,
385 output_node_id: NodeId,
386) -> usize {
387 let input_node = plan
388 .get(plan_id as usize)
389 .expect("Input step should've been added to plan before the step that receives the input");
390 let CuExecutionUnit::Step(input_step) = input_node else {
391 panic!("Expected input to be from a step, not a loop");
392 };
393 let input_node_id = input_step.node_id;
394
395 graph
396 .0
397 .edges_connecting(input_node_id.into(), output_node_id.into())
398 .map(|edge| edge.id().index())
399 .next()
400 .expect("An edge connecting the input to the output should exist")
401}
402
403fn sort_inputs_by_cnx_id(
406 input_msg_indices_types: &mut [(u32, String)],
407 plan: &[CuExecutionUnit],
408 graph: &CuGraph,
409 curr_node_id: NodeId,
410) {
411 input_msg_indices_types.sort_by(|(a_index, _), (b_index, _)| {
412 let a_edge_id = find_edge_with_plan_input_id(plan, graph, *a_index, curr_node_id);
413 let b_edge_id = find_edge_with_plan_input_id(plan, graph, *b_index, curr_node_id);
414 a_edge_id.cmp(&b_edge_id)
415 });
416}
417fn plan_tasks_tree_branch(
419 graph: &CuGraph,
420 mut next_culist_output_index: u32,
421 starting_point: NodeId,
422 plan: &mut Vec<CuExecutionUnit>,
423) -> (u32, bool) {
424 #[cfg(feature = "macro_debug")]
425 eprintln!("-- starting branch from node {starting_point}");
426
427 let mut visitor = Bfs::new(&graph.0, starting_point.into());
428 let mut handled = false;
429
430 while let Some(node) = visitor.next(&graph.0) {
431 let id = node.index() as NodeId;
432 let node_ref = graph.get_node(id).unwrap();
433 #[cfg(feature = "macro_debug")]
434 eprintln!(" Visiting node: {node_ref:?}");
435
436 let mut input_msg_indices_types: Vec<(u32, String)> = Vec::new();
437 let output_msg_index_type: Option<(u32, String)>;
438 let task_type = find_task_type_for_id(graph, id);
439
440 match task_type {
441 CuTaskType::Source => {
442 #[cfg(feature = "macro_debug")]
443 eprintln!(" → Source node, assign output index {next_culist_output_index}");
444 output_msg_index_type = Some((
445 next_culist_output_index,
446 graph
447 .0
448 .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0]))
449 .unwrap() .msg
451 .clone(),
452 ));
453 next_culist_output_index += 1;
454 }
455 CuTaskType::Sink => {
456 let parents: Vec<NodeIndex> =
457 graph.0.neighbors_directed(id.into(), Incoming).collect();
458 #[cfg(feature = "macro_debug")]
459 eprintln!(" → Sink with parents: {parents:?}");
460 for parent in &parents {
461 let pid = parent.index() as NodeId;
462 let index_type = find_output_index_type_from_nodeid(pid, plan);
463 if let Some(index_type) = index_type {
464 #[cfg(feature = "macro_debug")]
465 eprintln!(" ✓ Input from {pid} ready: {index_type:?}");
466 input_msg_indices_types.push(index_type);
467 } else {
468 #[cfg(feature = "macro_debug")]
469 eprintln!(" ✗ Input from {pid} not ready, returning");
470 return (next_culist_output_index, handled);
471 }
472 }
473 output_msg_index_type = Some((next_culist_output_index, "()".to_string()));
474 next_culist_output_index += 1;
475 }
476 CuTaskType::Regular => {
477 let parents: Vec<NodeIndex> =
478 graph.0.neighbors_directed(id.into(), Incoming).collect();
479 #[cfg(feature = "macro_debug")]
480 eprintln!(" → Regular task with parents: {parents:?}");
481 for parent in &parents {
482 let pid = parent.index() as NodeId;
483 let index_type = find_output_index_type_from_nodeid(pid, plan);
484 if let Some(index_type) = index_type {
485 #[cfg(feature = "macro_debug")]
486 eprintln!(" ✓ Input from {pid} ready: {index_type:?}");
487 input_msg_indices_types.push(index_type);
488 } else {
489 #[cfg(feature = "macro_debug")]
490 eprintln!(" ✗ Input from {pid} not ready, returning");
491 return (next_culist_output_index, handled);
492 }
493 }
494 output_msg_index_type = Some((
495 next_culist_output_index,
496 graph
497 .0
498 .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0])) .unwrap()
500 .msg
501 .clone(),
502 ));
503 next_culist_output_index += 1;
504 }
505 }
506
507 sort_inputs_by_cnx_id(&mut input_msg_indices_types, plan, graph, id);
508
509 if let Some(pos) = plan
510 .iter()
511 .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
512 {
513 #[cfg(feature = "macro_debug")]
514 eprintln!(" → Already in plan, modifying existing step");
515 let mut step = plan.remove(pos);
516 if let CuExecutionUnit::Step(ref mut s) = step {
517 s.input_msg_indices_types = input_msg_indices_types;
518 }
519 plan.push(step);
520 } else {
521 #[cfg(feature = "macro_debug")]
522 eprintln!(" → New step added to plan");
523 let step = CuExecutionStep {
524 node_id: id,
525 node: node_ref.clone(),
526 task_type,
527 input_msg_indices_types,
528 output_msg_index_type,
529 };
530 plan.push(CuExecutionUnit::Step(step));
531 }
532
533 handled = true;
534 }
535
536 #[cfg(feature = "macro_debug")]
537 eprintln!("-- finished branch from node {starting_point} with handled={handled}");
538 (next_culist_output_index, handled)
539}
540
541pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
544 #[cfg(feature = "macro_debug")]
545 eprintln!("[runtime plan]");
546 let visited = graph.0.visit_map();
547 let mut plan = Vec::new();
548 let mut next_culist_output_index = 0u32;
549
550 let mut queue: std::collections::VecDeque<NodeId> = graph
551 .node_indices()
552 .iter()
553 .filter(|&node| find_task_type_for_id(graph, node.index() as NodeId) == CuTaskType::Source)
554 .map(|node| node.index() as NodeId)
555 .collect();
556
557 #[cfg(feature = "macro_debug")]
558 eprintln!("Initial source nodes: {queue:?}");
559
560 while let Some(start_node) = queue.pop_front() {
561 if visited.is_visited(&start_node) {
562 #[cfg(feature = "macro_debug")]
563 eprintln!("→ Skipping already visited source {start_node}");
564 continue;
565 }
566
567 #[cfg(feature = "macro_debug")]
568 eprintln!("→ Starting BFS from source {start_node}");
569 let mut bfs = Bfs::new(&graph.0, start_node.into());
570
571 while let Some(node_index) = bfs.next(&graph.0) {
572 let node_id = node_index.index() as NodeId;
573 let already_in_plan = plan
574 .iter()
575 .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
576 if already_in_plan {
577 #[cfg(feature = "macro_debug")]
578 eprintln!(" → Node {node_id} already planned, skipping");
579 continue;
580 }
581
582 #[cfg(feature = "macro_debug")]
583 eprintln!(" Planning from node {node_id}");
584 let (new_index, handled) =
585 plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan);
586 next_culist_output_index = new_index;
587
588 if !handled {
589 #[cfg(feature = "macro_debug")]
590 eprintln!(" ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
591 continue;
592 }
593
594 #[cfg(feature = "macro_debug")]
595 eprintln!(" ✓ Node {node_id} handled successfully, enqueueing neighbors");
596 for neighbor in graph.0.neighbors(node_index) {
597 if !visited.is_visited(&neighbor) {
598 let nid = neighbor.index() as NodeId;
599 #[cfg(feature = "macro_debug")]
600 eprintln!(" → Enqueueing neighbor {nid}");
601 queue.push_back(nid);
602 }
603 }
604 }
605 }
606
607 Ok(CuExecutionLoop {
608 steps: plan,
609 loop_count: None,
610 })
611}
612
613#[cfg(test)]
615mod tests {
616 use super::*;
617 use crate::config::Node;
618 use crate::cutask::CuSinkTask;
619 use crate::cutask::{CuSrcTask, Freezable};
620 use crate::monitoring::NoMonitor;
621 use bincode::Encode;
622 use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks};
623 use serde_derive::Serialize;
624
625 pub struct TestSource {}
626
627 impl Freezable for TestSource {}
628
629 impl CuSrcTask for TestSource {
630 type Output<'m> = ();
631 fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
632 where
633 Self: Sized,
634 {
635 Ok(Self {})
636 }
637
638 fn process(
639 &mut self,
640 _clock: &RobotClock,
641 _empty_msg: &mut Self::Output<'_>,
642 ) -> CuResult<()> {
643 Ok(())
644 }
645 }
646
647 pub struct TestSink {}
648
649 impl Freezable for TestSink {}
650
651 impl CuSinkTask for TestSink {
652 type Input<'m> = ();
653
654 fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
655 where
656 Self: Sized,
657 {
658 Ok(Self {})
659 }
660
661 fn process(&mut self, _clock: &RobotClock, _input: &Self::Input<'_>) -> CuResult<()> {
662 Ok(())
663 }
664 }
665
666 type Tasks = (TestSource, TestSink);
668
669 #[derive(Debug, Encode, Decode, Serialize, Default)]
670 struct Msgs(());
671
672 impl ErasedCuStampedDataSet for Msgs {
673 fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
674 Vec::new()
675 }
676 }
677
678 impl MatchingTasks for Msgs {
679 fn get_all_task_ids() -> &'static [&'static str] {
680 &[]
681 }
682 }
683
684 impl CuListZeroedInit for Msgs {
685 fn init_zeroed(&mut self) {}
686 }
687
688 fn tasks_instanciator(
689 all_instances_configs: Vec<Option<&ComponentConfig>>,
690 _threadpool: Arc<ThreadPool>,
691 ) -> CuResult<Tasks> {
692 Ok((
693 TestSource::new(all_instances_configs[0])?,
694 TestSink::new(all_instances_configs[1])?,
695 ))
696 }
697
698 fn monitor_instanciator(_config: &CuConfig) -> NoMonitor {
699 NoMonitor {}
700 }
701
702 #[derive(Debug)]
703 struct FakeWriter {}
704
705 impl<E: Encode> WriteStream<E> for FakeWriter {
706 fn log(&mut self, _obj: &E) -> CuResult<()> {
707 Ok(())
708 }
709 }
710
711 #[test]
712 fn test_runtime_instantiation() {
713 let mut config = CuConfig::default();
714 let graph = config.get_graph_mut(None).unwrap();
715 graph.add_node(Node::new("a", "TestSource")).unwrap();
716 graph.add_node(Node::new("b", "TestSink")).unwrap();
717 graph.connect(0, 1, "()").unwrap();
718 let runtime = CuRuntime::<Tasks, Msgs, NoMonitor, 2>::new(
719 RobotClock::default(),
720 &config,
721 None,
722 tasks_instanciator,
723 monitor_instanciator,
724 FakeWriter {},
725 FakeWriter {},
726 );
727 assert!(runtime.is_ok());
728 }
729
730 #[test]
731 fn test_copperlists_manager_lifecycle() {
732 let mut config = CuConfig::default();
733 let graph = config.get_graph_mut(None).unwrap();
734 graph.add_node(Node::new("a", "TestSource")).unwrap();
735 graph.add_node(Node::new("b", "TestSink")).unwrap();
736 graph.connect(0, 1, "()").unwrap();
737
738 let mut runtime = CuRuntime::<Tasks, Msgs, NoMonitor, 2>::new(
739 RobotClock::default(),
740 &config,
741 None,
742 tasks_instanciator,
743 monitor_instanciator,
744 FakeWriter {},
745 FakeWriter {},
746 )
747 .unwrap();
748
749 {
751 let copperlists = &mut runtime.copperlists_manager;
752 let culist0 = copperlists
753 .inner
754 .create()
755 .expect("Ran out of space for copper lists");
756 let id = culist0.id;
758 assert_eq!(id, 0);
759 culist0.change_state(CopperListState::Processing);
760 assert_eq!(copperlists.available_copper_lists(), 1);
761 }
762
763 {
764 let copperlists = &mut runtime.copperlists_manager;
765 let culist1 = copperlists
766 .inner
767 .create()
768 .expect("Ran out of space for copper lists"); let id = culist1.id;
770 assert_eq!(id, 1);
771 culist1.change_state(CopperListState::Processing);
772 assert_eq!(copperlists.available_copper_lists(), 0);
773 }
774
775 {
776 let copperlists = &mut runtime.copperlists_manager;
777 let culist2 = copperlists.inner.create();
778 assert!(culist2.is_none());
779 assert_eq!(copperlists.available_copper_lists(), 0);
780 let _ = copperlists.end_of_processing(1);
782 assert_eq!(copperlists.available_copper_lists(), 1);
783 }
784
785 {
787 let copperlists = &mut runtime.copperlists_manager;
788 let culist2 = copperlists
789 .inner
790 .create()
791 .expect("Ran out of space for copper lists"); let id = culist2.id;
793 assert_eq!(id, 2);
794 culist2.change_state(CopperListState::Processing);
795 assert_eq!(copperlists.available_copper_lists(), 0);
796 let _ = copperlists.end_of_processing(0);
798 assert_eq!(copperlists.available_copper_lists(), 0);
800
801 let _ = copperlists.end_of_processing(2);
803 assert_eq!(copperlists.available_copper_lists(), 2);
806 }
807 }
808
809 #[test]
810 fn test_runtime_task_input_order() {
811 let mut config = CuConfig::default();
812 let graph = config.get_graph_mut(None).unwrap();
813 let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
814 let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
815 let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
816
817 assert_eq!(src1_id, 0);
818 assert_eq!(src2_id, 1);
819
820 let src1_type = "src1_type";
822 let src2_type = "src2_type";
823 graph.connect(src2_id, sink_id, src2_type).unwrap();
824 graph.connect(src1_id, sink_id, src1_type).unwrap();
825
826 let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
827 let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
828 assert_eq!(src1_edge_id, 1);
831 assert_eq!(src2_edge_id, 0);
832
833 let runtime = compute_runtime_plan(graph).unwrap();
834 let sink_step = runtime
835 .steps
836 .iter()
837 .find_map(|step| match step {
838 CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
839 _ => None,
840 })
841 .unwrap();
842
843 assert_eq!(sink_step.input_msg_indices_types[0].1, src2_type);
846 assert_eq!(sink_step.input_msg_indices_types[1].1, src1_type);
847 }
848
849 #[test]
850 fn test_runtime_plan_diamond_case1() {
851 let mut config = CuConfig::default();
853 let graph = config.get_graph_mut(None).unwrap();
854 let cam0_id = graph
855 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
856 .unwrap();
857 let inf0_id = graph
858 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
859 .unwrap();
860 let broadcast_id = graph
861 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
862 .unwrap();
863
864 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
866 graph.connect(cam0_id, inf0_id, "i32").unwrap();
867 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
868
869 let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
870 let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
871
872 assert_eq!(edge_cam0_to_inf0, 0);
873 assert_eq!(edge_cam0_to_broadcast, 1);
874
875 let runtime = compute_runtime_plan(graph).unwrap();
876 let broadcast_step = runtime
877 .steps
878 .iter()
879 .find_map(|step| match step {
880 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
881 _ => None,
882 })
883 .unwrap();
884
885 assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
886 assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
887 }
888
889 #[test]
890 fn test_runtime_plan_diamond_case2() {
891 let mut config = CuConfig::default();
893 let graph = config.get_graph_mut(None).unwrap();
894 let cam0_id = graph
895 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
896 .unwrap();
897 let inf0_id = graph
898 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
899 .unwrap();
900 let broadcast_id = graph
901 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
902 .unwrap();
903
904 graph.connect(cam0_id, inf0_id, "i32").unwrap();
906 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
907 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
908
909 let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
910 let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
911
912 assert_eq!(edge_cam0_to_broadcast, 0);
913 assert_eq!(edge_cam0_to_inf0, 1);
914
915 let runtime = compute_runtime_plan(graph).unwrap();
916 let broadcast_step = runtime
917 .steps
918 .iter()
919 .find_map(|step| match step {
920 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
921 _ => None,
922 })
923 .unwrap();
924
925 assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
926 assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
927 }
928}