1use crate::config::{ComponentConfig, CuDirection, Node, DEFAULT_KEYFRAME_INTERVAL};
6use crate::config::{CuConfig, CuGraph, NodeId, RuntimeConfig};
7use crate::copperlist::{CopperList, CopperListState, CuListZeroedInit, CuListsManager};
8use crate::cutask::{BincodeAdapter, Freezable};
9use crate::monitoring::CuMonitor;
10use cu29_clock::{ClockProvider, CuTime, RobotClock};
11use cu29_traits::CuResult;
12use cu29_traits::WriteStream;
13use cu29_traits::{CopperListTuple, CuError};
14
15use bincode::enc::write::{SizeWriter, SliceWriter};
16use bincode::enc::EncoderImpl;
17use bincode::error::EncodeError;
18use bincode::{Decode, Encode};
19use core::fmt::{Debug, Formatter};
20use petgraph::prelude::*;
21use petgraph::visit::VisitMap;
22use petgraph::visit::Visitable;
23
24#[cfg(not(feature = "std"))]
25mod imp {
26 pub use alloc::boxed::Box;
27 pub use alloc::collections::VecDeque;
28 pub use alloc::format;
29 pub use alloc::string::String;
30 pub use alloc::string::ToString;
31 pub use alloc::vec::Vec;
32 pub use core::fmt::Result as FmtResult;
33}
34
35#[cfg(feature = "std")]
36mod imp {
37 pub use cu29_log_runtime::LoggerRuntime;
38 pub use cu29_unifiedlog::UnifiedLoggerWrite;
39 pub use rayon::ThreadPool;
40 pub use std::collections::VecDeque;
41 pub use std::fmt::Result as FmtResult;
42 pub use std::sync::{Arc, Mutex};
43}
44
45use imp::*;
46
47#[cfg(feature = "std")]
49pub struct CopperContext {
50 pub unified_logger: Arc<Mutex<UnifiedLoggerWrite>>,
51 pub logger_runtime: LoggerRuntime,
52 pub clock: RobotClock,
53}
54
55pub struct CopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
57 pub inner: CuListsManager<P, NBCL>,
58 pub logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
60}
61
62impl<P: CopperListTuple + Default, const NBCL: usize> CopperListsManager<P, NBCL> {
63 pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
64 let mut is_top = true;
65 let mut nb_done = 0;
66 for cl in self.inner.iter_mut() {
67 if cl.id == culistid && cl.get_state() == CopperListState::Processing {
68 cl.change_state(CopperListState::DoneProcessing);
69 }
70 if is_top && cl.get_state() == CopperListState::DoneProcessing {
71 if let Some(logger) = &mut self.logger {
72 cl.change_state(CopperListState::BeingSerialized);
73 logger.log(cl)?;
74 }
75 cl.change_state(CopperListState::Free);
76 nb_done += 1;
77 } else {
78 is_top = false;
79 }
80 }
81 for _ in 0..nb_done {
82 let _ = self.inner.pop();
83 }
84 Ok(())
85 }
86
87 pub fn available_copper_lists(&self) -> usize {
88 NBCL - self.inner.len()
89 }
90}
91
92pub struct KeyFramesManager {
94 inner: KeyFrame,
96
97 logger: Option<Box<dyn WriteStream<KeyFrame>>>,
99
100 keyframe_interval: u32,
102}
103
104impl KeyFramesManager {
105 fn is_keyframe(&self, culistid: u32) -> bool {
106 self.logger.is_some() && culistid.is_multiple_of(self.keyframe_interval)
107 }
108
109 pub fn reset(&mut self, culistid: u32, clock: &RobotClock) {
110 if self.is_keyframe(culistid) {
111 self.inner.reset(culistid, clock.now());
112 }
113 }
114
115 pub fn freeze_task(&mut self, culistid: u32, task: &impl Freezable) -> CuResult<usize> {
116 if self.is_keyframe(culistid) {
117 if self.inner.culistid != culistid {
118 panic!("Freezing task for a different culistid");
119 }
120 self.inner
121 .add_frozen_task(task)
122 .map_err(|e| CuError::from(format!("Failed to serialize task: {e}")))
123 } else {
124 Ok(0)
125 }
126 }
127
128 pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
129 if self.is_keyframe(culistid) {
130 let logger = self.logger.as_mut().unwrap();
131 logger.log(&self.inner)
132 } else {
133 Ok(())
134 }
135 }
136}
137
138pub struct CuRuntime<CT, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
142 pub clock: RobotClock, pub tasks: CT,
147
148 #[cfg(feature = "std")]
150 pub threadpool: Arc<ThreadPool>,
151
152 pub monitor: M,
154
155 pub copperlists_manager: CopperListsManager<P, NBCL>,
157
158 pub keyframes_manager: KeyFramesManager,
160
161 pub runtime_config: RuntimeConfig,
163}
164
165impl<CT, P: CopperListTuple + CuListZeroedInit + Default, M: CuMonitor, const NBCL: usize>
167 ClockProvider for CuRuntime<CT, P, M, NBCL>
168{
169 fn get_clock(&self) -> RobotClock {
170 self.clock.clone()
171 }
172}
173
174#[derive(Encode, Decode)]
178pub struct KeyFrame {
179 pub culistid: u32,
181 pub timestamp: CuTime,
183 pub serialized_tasks: Vec<u8>,
185}
186
187impl KeyFrame {
188 fn new() -> Self {
189 KeyFrame {
190 culistid: 0,
191 timestamp: CuTime::default(),
192 serialized_tasks: Vec::new(),
193 }
194 }
195
196 fn reset(&mut self, culistid: u32, timestamp: CuTime) {
198 self.culistid = culistid;
199 self.timestamp = timestamp;
200 self.serialized_tasks.clear();
201 }
202
203 fn add_frozen_task(&mut self, task: &impl Freezable) -> Result<usize, EncodeError> {
205 let cfg = bincode::config::standard();
206 let mut sizer = EncoderImpl::<_, _>::new(SizeWriter::default(), cfg);
207 BincodeAdapter(task).encode(&mut sizer)?;
208 let need = sizer.into_writer().bytes_written as usize;
209
210 let start = self.serialized_tasks.len();
211 self.serialized_tasks.resize(start + need, 0);
212 let mut enc =
213 EncoderImpl::<_, _>::new(SliceWriter::new(&mut self.serialized_tasks[start..]), cfg);
214 BincodeAdapter(task).encode(&mut enc)?;
215 Ok(need)
216 }
217}
218
219impl<
220 CT,
221 P: CopperListTuple + CuListZeroedInit + Default + 'static,
222 M: CuMonitor,
223 const NBCL: usize,
224 > CuRuntime<CT, P, M, NBCL>
225{
226 #[cfg(feature = "std")]
228 pub fn new(
229 clock: RobotClock,
230 config: &CuConfig,
231 mission: Option<&str>,
232 tasks_instanciator: impl for<'c> Fn(
233 Vec<Option<&'c ComponentConfig>>,
234 Arc<ThreadPool>,
235 ) -> CuResult<CT>,
236 monitor_instanciator: impl Fn(&CuConfig) -> M,
237 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
238 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
239 ) -> CuResult<Self> {
240 let graph = config.get_graph(mission)?;
241 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
242 .get_all_nodes()
243 .iter()
244 .map(|(_, node)| node.get_instance_config())
245 .collect();
246
247 let threadpool = Arc::new(
250 rayon::ThreadPoolBuilder::new()
251 .num_threads(2) .build()
253 .expect("Could not create the threadpool"),
254 );
255
256 let tasks = tasks_instanciator(all_instances_configs, threadpool.clone())?;
257 let monitor = monitor_instanciator(config);
258
259 let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
260 Some(logging_config) if logging_config.enable_task_logging => (
261 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
262 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
263 logging_config.keyframe_interval.unwrap(), ),
265 Some(_) => (None, None, 0), None => (
267 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
269 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
270 DEFAULT_KEYFRAME_INTERVAL,
271 ),
272 };
273
274 let copperlists_manager = CopperListsManager {
275 inner: CuListsManager::new(),
276 logger: copperlists_logger,
277 };
278
279 let keyframes_manager = KeyFramesManager {
280 inner: KeyFrame::new(),
281 logger: keyframes_logger,
282 keyframe_interval,
283 };
284
285 let runtime_config = config.runtime.clone().unwrap_or_default();
286
287 let runtime = Self {
288 tasks,
289 threadpool,
290 monitor,
291 clock,
292 copperlists_manager,
293 keyframes_manager,
294 runtime_config,
295 };
296
297 Ok(runtime)
298 }
299
300 #[cfg(not(feature = "std"))]
301 pub fn new(
302 clock: RobotClock,
303 config: &CuConfig,
304 mission: Option<&str>,
305 tasks_instanciator: impl for<'c> Fn(Vec<Option<&'c ComponentConfig>>) -> CuResult<CT>,
306 monitor_instanciator: impl Fn(&CuConfig) -> M,
307 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
308 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
309 ) -> CuResult<Self> {
310 let graph = config.get_graph(mission)?;
311 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
312 .get_all_nodes()
313 .iter()
314 .map(|(_, node)| node.get_instance_config())
315 .collect();
316
317 let tasks = tasks_instanciator(all_instances_configs)?;
318
319 let monitor = monitor_instanciator(config);
320
321 let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
322 Some(logging_config) if logging_config.enable_task_logging => (
323 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
324 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
325 logging_config.keyframe_interval.unwrap(), ),
327 Some(_) => (None, None, 0), None => (
329 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
331 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
332 DEFAULT_KEYFRAME_INTERVAL,
333 ),
334 };
335
336 let copperlists_manager = CopperListsManager {
337 inner: CuListsManager::new(),
338 logger: copperlists_logger,
339 };
340
341 let keyframes_manager = KeyFramesManager {
342 inner: KeyFrame::new(),
343 logger: keyframes_logger,
344 keyframe_interval,
345 };
346
347 let runtime_config = config.runtime.clone().unwrap_or_default();
348
349 let runtime = Self {
350 tasks,
351 #[cfg(feature = "std")]
352 threadpool,
353 monitor,
354 clock,
355 copperlists_manager,
356 keyframes_manager,
357 runtime_config,
358 };
359
360 Ok(runtime)
361 }
362}
363
364#[derive(Debug, PartialEq, Eq, Clone, Copy)]
369pub enum CuTaskType {
370 Source,
371 Regular,
372 Sink,
373}
374
375pub struct CuExecutionStep {
377 pub node_id: NodeId,
379 pub node: Node,
381 pub task_type: CuTaskType,
383
384 pub input_msg_indices_types: Vec<(u32, String)>,
386
387 pub output_msg_index_type: Option<(u32, String)>,
389}
390
391impl Debug for CuExecutionStep {
392 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
393 f.write_str(format!(" CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
394 f.write_str(format!(" task_type: {:?}\n", self.node.get_type()).as_str())?;
395 f.write_str(format!(" task: {:?}\n", self.task_type).as_str())?;
396 f.write_str(
397 format!(
398 " input_msg_types: {:?}\n",
399 self.input_msg_indices_types
400 )
401 .as_str(),
402 )?;
403 f.write_str(
404 format!(" output_msg_type: {:?}\n", self.output_msg_index_type).as_str(),
405 )?;
406 Ok(())
407 }
408}
409
410pub struct CuExecutionLoop {
415 pub steps: Vec<CuExecutionUnit>,
416 pub loop_count: Option<u32>,
417}
418
419impl Debug for CuExecutionLoop {
420 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
421 f.write_str("CuExecutionLoop:\n")?;
422 for step in &self.steps {
423 match step {
424 CuExecutionUnit::Step(step) => {
425 step.fmt(f)?;
426 }
427 CuExecutionUnit::Loop(l) => {
428 l.fmt(f)?;
429 }
430 }
431 }
432
433 f.write_str(format!(" count: {:?}", self.loop_count).as_str())?;
434 Ok(())
435 }
436}
437
438#[derive(Debug)]
440pub enum CuExecutionUnit {
441 Step(CuExecutionStep),
442 Loop(CuExecutionLoop),
443}
444
445fn find_output_index_type_from_nodeid(
446 node_id: NodeId,
447 steps: &Vec<CuExecutionUnit>,
448) -> Option<(u32, String)> {
449 for step in steps {
450 match step {
451 CuExecutionUnit::Loop(loop_unit) => {
452 if let Some(index) = find_output_index_type_from_nodeid(node_id, &loop_unit.steps) {
453 return Some(index);
454 }
455 }
456 CuExecutionUnit::Step(step) => {
457 if step.node_id == node_id {
458 return step.output_msg_index_type.clone();
459 }
460 }
461 }
462 }
463 None
464}
465
466pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuTaskType {
467 if graph.incoming_neighbor_count(node_id) == 0 {
468 CuTaskType::Source
469 } else if graph.outgoing_neighbor_count(node_id) == 0 {
470 CuTaskType::Sink
471 } else {
472 CuTaskType::Regular
473 }
474}
475
476fn find_edge_with_plan_input_id(
479 plan: &[CuExecutionUnit],
480 graph: &CuGraph,
481 plan_id: u32,
482 output_node_id: NodeId,
483) -> usize {
484 let input_node = plan
485 .get(plan_id as usize)
486 .expect("Input step should've been added to plan before the step that receives the input");
487 let CuExecutionUnit::Step(input_step) = input_node else {
488 panic!("Expected input to be from a step, not a loop");
489 };
490 let input_node_id = input_step.node_id;
491
492 graph
493 .0
494 .edges_connecting(input_node_id.into(), output_node_id.into())
495 .map(|edge| edge.id().index())
496 .next()
497 .expect("An edge connecting the input to the output should exist")
498}
499
500fn sort_inputs_by_cnx_id(
503 input_msg_indices_types: &mut [(u32, String)],
504 plan: &[CuExecutionUnit],
505 graph: &CuGraph,
506 curr_node_id: NodeId,
507) {
508 input_msg_indices_types.sort_by(|(a_index, _), (b_index, _)| {
509 let a_edge_id = find_edge_with_plan_input_id(plan, graph, *a_index, curr_node_id);
510 let b_edge_id = find_edge_with_plan_input_id(plan, graph, *b_index, curr_node_id);
511 a_edge_id.cmp(&b_edge_id)
512 });
513}
514fn plan_tasks_tree_branch(
516 graph: &CuGraph,
517 mut next_culist_output_index: u32,
518 starting_point: NodeId,
519 plan: &mut Vec<CuExecutionUnit>,
520) -> (u32, bool) {
521 #[cfg(all(feature = "std", feature = "macro_debug"))]
522 eprintln!("-- starting branch from node {starting_point}");
523
524 let mut visitor = Bfs::new(&graph.0, starting_point.into());
525 let mut handled = false;
526
527 while let Some(node) = visitor.next(&graph.0) {
528 let id = node.index() as NodeId;
529 let node_ref = graph.get_node(id).unwrap();
530 #[cfg(all(feature = "std", feature = "macro_debug"))]
531 eprintln!(" Visiting node: {node_ref:?}");
532
533 let mut input_msg_indices_types: Vec<(u32, String)> = Vec::new();
534 let output_msg_index_type: Option<(u32, String)>;
535 let task_type = find_task_type_for_id(graph, id);
536
537 match task_type {
538 CuTaskType::Source => {
539 #[cfg(all(feature = "std", feature = "macro_debug"))]
540 eprintln!(" → Source node, assign output index {next_culist_output_index}");
541 output_msg_index_type = Some((
542 next_culist_output_index,
543 graph
544 .0
545 .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0]))
546 .unwrap() .msg
548 .clone(),
549 ));
550 next_culist_output_index += 1;
551 }
552 CuTaskType::Sink => {
553 let parents: Vec<NodeId> = graph.get_neighbor_ids(id, CuDirection::Incoming);
554 #[cfg(all(feature = "std", feature = "macro_debug"))]
555 eprintln!(" → Sink with parents: {parents:?}");
556 for parent in parents {
557 let pid = parent;
558 let index_type = find_output_index_type_from_nodeid(pid, plan);
559 if let Some(index_type) = index_type {
560 #[cfg(all(feature = "std", feature = "macro_debug"))]
561 eprintln!(" ✓ Input from {pid} ready: {index_type:?}");
562 input_msg_indices_types.push(index_type);
563 } else {
564 #[cfg(all(feature = "std", feature = "macro_debug"))]
565 eprintln!(" ✗ Input from {pid} not ready, returning");
566 return (next_culist_output_index, handled);
567 }
568 }
569 output_msg_index_type = Some((next_culist_output_index, "()".to_string()));
570 next_culist_output_index += 1;
571 }
572 CuTaskType::Regular => {
573 let parents: Vec<NodeId> = graph.get_neighbor_ids(id, CuDirection::Incoming);
574 #[cfg(all(feature = "std", feature = "macro_debug"))]
575 eprintln!(" → Regular task with parents: {parents:?}");
576 for parent in parents {
577 let pid = parent;
578 let index_type = find_output_index_type_from_nodeid(pid, plan);
579 if let Some(index_type) = index_type {
580 #[cfg(all(feature = "std", feature = "macro_debug"))]
581 eprintln!(" ✓ Input from {pid} ready: {index_type:?}");
582 input_msg_indices_types.push(index_type);
583 } else {
584 #[cfg(all(feature = "std", feature = "macro_debug"))]
585 eprintln!(" ✗ Input from {pid} not ready, returning");
586 return (next_culist_output_index, handled);
587 }
588 }
589 output_msg_index_type = Some((
590 next_culist_output_index,
591 graph
592 .0
593 .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0])) .unwrap()
595 .msg
596 .clone(),
597 ));
598 next_culist_output_index += 1;
599 }
600 }
601
602 sort_inputs_by_cnx_id(&mut input_msg_indices_types, plan, graph, id);
603
604 if let Some(pos) = plan
605 .iter()
606 .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
607 {
608 #[cfg(all(feature = "std", feature = "macro_debug"))]
609 eprintln!(" → Already in plan, modifying existing step");
610 let mut step = plan.remove(pos);
611 if let CuExecutionUnit::Step(ref mut s) = step {
612 s.input_msg_indices_types = input_msg_indices_types;
613 }
614 plan.push(step);
615 } else {
616 #[cfg(all(feature = "std", feature = "macro_debug"))]
617 eprintln!(" → New step added to plan");
618 let step = CuExecutionStep {
619 node_id: id,
620 node: node_ref.clone(),
621 task_type,
622 input_msg_indices_types,
623 output_msg_index_type,
624 };
625 plan.push(CuExecutionUnit::Step(step));
626 }
627
628 handled = true;
629 }
630
631 #[cfg(all(feature = "std", feature = "macro_debug"))]
632 eprintln!("-- finished branch from node {starting_point} with handled={handled}");
633 (next_culist_output_index, handled)
634}
635
636pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
639 #[cfg(all(feature = "std", feature = "macro_debug"))]
640 eprintln!("[runtime plan]");
641 let visited = graph.0.visit_map();
642 let mut plan = Vec::new();
643 let mut next_culist_output_index = 0u32;
644
645 let mut queue: VecDeque<NodeId> = graph
646 .node_indices()
647 .iter()
648 .filter(|&node| find_task_type_for_id(graph, node.index() as NodeId) == CuTaskType::Source)
649 .map(|node| node.index() as NodeId)
650 .collect();
651
652 #[cfg(all(feature = "std", feature = "macro_debug"))]
653 eprintln!("Initial source nodes: {queue:?}");
654
655 while let Some(start_node) = queue.pop_front() {
656 if visited.is_visited(&start_node) {
657 #[cfg(all(feature = "std", feature = "macro_debug"))]
658 eprintln!("→ Skipping already visited source {start_node}");
659 continue;
660 }
661
662 #[cfg(all(feature = "std", feature = "macro_debug"))]
663 eprintln!("→ Starting BFS from source {start_node}");
664 let mut bfs = Bfs::new(&graph.0, start_node.into());
665
666 while let Some(node_index) = bfs.next(&graph.0) {
667 let node_id = node_index.index() as NodeId;
668 let already_in_plan = plan
669 .iter()
670 .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
671 if already_in_plan {
672 #[cfg(all(feature = "std", feature = "macro_debug"))]
673 eprintln!(" → Node {node_id} already planned, skipping");
674 continue;
675 }
676
677 #[cfg(all(feature = "std", feature = "macro_debug"))]
678 eprintln!(" Planning from node {node_id}");
679 let (new_index, handled) =
680 plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan);
681 next_culist_output_index = new_index;
682
683 if !handled {
684 #[cfg(all(feature = "std", feature = "macro_debug"))]
685 eprintln!(" ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
686 continue;
687 }
688
689 #[cfg(all(feature = "std", feature = "macro_debug"))]
690 eprintln!(" ✓ Node {node_id} handled successfully, enqueueing neighbors");
691 for neighbor in graph.0.neighbors(node_index) {
692 if !visited.is_visited(&neighbor) {
693 let nid = neighbor.index() as NodeId;
694 #[cfg(all(feature = "std", feature = "macro_debug"))]
695 eprintln!(" → Enqueueing neighbor {nid}");
696 queue.push_back(nid);
697 }
698 }
699 }
700 }
701
702 Ok(CuExecutionLoop {
703 steps: plan,
704 loop_count: None,
705 })
706}
707
708#[cfg(test)]
710mod tests {
711 use super::*;
712 use crate::config::Node;
713 use crate::cutask::CuSinkTask;
714 use crate::cutask::{CuSrcTask, Freezable};
715 use crate::monitoring::NoMonitor;
716 use bincode::Encode;
717 use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks};
718 use serde_derive::Serialize;
719
720 pub struct TestSource {}
721
722 impl Freezable for TestSource {}
723
724 impl CuSrcTask for TestSource {
725 type Output<'m> = ();
726 fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
727 where
728 Self: Sized,
729 {
730 Ok(Self {})
731 }
732
733 fn process(
734 &mut self,
735 _clock: &RobotClock,
736 _empty_msg: &mut Self::Output<'_>,
737 ) -> CuResult<()> {
738 Ok(())
739 }
740 }
741
742 pub struct TestSink {}
743
744 impl Freezable for TestSink {}
745
746 impl CuSinkTask for TestSink {
747 type Input<'m> = ();
748
749 fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
750 where
751 Self: Sized,
752 {
753 Ok(Self {})
754 }
755
756 fn process(&mut self, _clock: &RobotClock, _input: &Self::Input<'_>) -> CuResult<()> {
757 Ok(())
758 }
759 }
760
761 type Tasks = (TestSource, TestSink);
763
764 #[derive(Debug, Encode, Decode, Serialize, Default)]
765 struct Msgs(());
766
767 impl ErasedCuStampedDataSet for Msgs {
768 fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
769 Vec::new()
770 }
771 }
772
773 impl MatchingTasks for Msgs {
774 fn get_all_task_ids() -> &'static [&'static str] {
775 &[]
776 }
777 }
778
779 impl CuListZeroedInit for Msgs {
780 fn init_zeroed(&mut self) {}
781 }
782
783 #[cfg(feature = "std")]
784 fn tasks_instanciator(
785 all_instances_configs: Vec<Option<&ComponentConfig>>,
786 _threadpool: Arc<ThreadPool>,
787 ) -> CuResult<Tasks> {
788 Ok((
789 TestSource::new(all_instances_configs[0])?,
790 TestSink::new(all_instances_configs[1])?,
791 ))
792 }
793
794 #[cfg(not(feature = "std"))]
795 fn tasks_instanciator(all_instances_configs: Vec<Option<&ComponentConfig>>) -> CuResult<Tasks> {
796 Ok((
797 TestSource::new(all_instances_configs[0])?,
798 TestSink::new(all_instances_configs[1])?,
799 ))
800 }
801
802 fn monitor_instanciator(_config: &CuConfig) -> NoMonitor {
803 NoMonitor {}
804 }
805
806 #[derive(Debug)]
807 struct FakeWriter {}
808
809 impl<E: Encode> WriteStream<E> for FakeWriter {
810 fn log(&mut self, _obj: &E) -> CuResult<()> {
811 Ok(())
812 }
813 }
814
815 #[test]
816 fn test_runtime_instantiation() {
817 let mut config = CuConfig::default();
818 let graph = config.get_graph_mut(None).unwrap();
819 graph.add_node(Node::new("a", "TestSource")).unwrap();
820 graph.add_node(Node::new("b", "TestSink")).unwrap();
821 graph.connect(0, 1, "()").unwrap();
822 let runtime = CuRuntime::<Tasks, Msgs, NoMonitor, 2>::new(
823 RobotClock::default(),
824 &config,
825 None,
826 tasks_instanciator,
827 monitor_instanciator,
828 FakeWriter {},
829 FakeWriter {},
830 );
831 assert!(runtime.is_ok());
832 }
833
834 #[test]
835 fn test_copperlists_manager_lifecycle() {
836 let mut config = CuConfig::default();
837 let graph = config.get_graph_mut(None).unwrap();
838 graph.add_node(Node::new("a", "TestSource")).unwrap();
839 graph.add_node(Node::new("b", "TestSink")).unwrap();
840 graph.connect(0, 1, "()").unwrap();
841
842 let mut runtime = CuRuntime::<Tasks, Msgs, NoMonitor, 2>::new(
843 RobotClock::default(),
844 &config,
845 None,
846 tasks_instanciator,
847 monitor_instanciator,
848 FakeWriter {},
849 FakeWriter {},
850 )
851 .unwrap();
852
853 {
855 let copperlists = &mut runtime.copperlists_manager;
856 let culist0 = copperlists
857 .inner
858 .create()
859 .expect("Ran out of space for copper lists");
860 let id = culist0.id;
862 assert_eq!(id, 0);
863 culist0.change_state(CopperListState::Processing);
864 assert_eq!(copperlists.available_copper_lists(), 1);
865 }
866
867 {
868 let copperlists = &mut runtime.copperlists_manager;
869 let culist1 = copperlists
870 .inner
871 .create()
872 .expect("Ran out of space for copper lists"); let id = culist1.id;
874 assert_eq!(id, 1);
875 culist1.change_state(CopperListState::Processing);
876 assert_eq!(copperlists.available_copper_lists(), 0);
877 }
878
879 {
880 let copperlists = &mut runtime.copperlists_manager;
881 let culist2 = copperlists.inner.create();
882 assert!(culist2.is_none());
883 assert_eq!(copperlists.available_copper_lists(), 0);
884 let _ = copperlists.end_of_processing(1);
886 assert_eq!(copperlists.available_copper_lists(), 1);
887 }
888
889 {
891 let copperlists = &mut runtime.copperlists_manager;
892 let culist2 = copperlists
893 .inner
894 .create()
895 .expect("Ran out of space for copper lists"); let id = culist2.id;
897 assert_eq!(id, 2);
898 culist2.change_state(CopperListState::Processing);
899 assert_eq!(copperlists.available_copper_lists(), 0);
900 let _ = copperlists.end_of_processing(0);
902 assert_eq!(copperlists.available_copper_lists(), 0);
904
905 let _ = copperlists.end_of_processing(2);
907 assert_eq!(copperlists.available_copper_lists(), 2);
910 }
911 }
912
913 #[test]
914 fn test_runtime_task_input_order() {
915 let mut config = CuConfig::default();
916 let graph = config.get_graph_mut(None).unwrap();
917 let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
918 let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
919 let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
920
921 assert_eq!(src1_id, 0);
922 assert_eq!(src2_id, 1);
923
924 let src1_type = "src1_type";
926 let src2_type = "src2_type";
927 graph.connect(src2_id, sink_id, src2_type).unwrap();
928 graph.connect(src1_id, sink_id, src1_type).unwrap();
929
930 let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
931 let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
932 assert_eq!(src1_edge_id, 1);
935 assert_eq!(src2_edge_id, 0);
936
937 let runtime = compute_runtime_plan(graph).unwrap();
938 let sink_step = runtime
939 .steps
940 .iter()
941 .find_map(|step| match step {
942 CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
943 _ => None,
944 })
945 .unwrap();
946
947 assert_eq!(sink_step.input_msg_indices_types[0].1, src2_type);
950 assert_eq!(sink_step.input_msg_indices_types[1].1, src1_type);
951 }
952
953 #[test]
954 fn test_runtime_plan_diamond_case1() {
955 let mut config = CuConfig::default();
957 let graph = config.get_graph_mut(None).unwrap();
958 let cam0_id = graph
959 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
960 .unwrap();
961 let inf0_id = graph
962 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
963 .unwrap();
964 let broadcast_id = graph
965 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
966 .unwrap();
967
968 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
970 graph.connect(cam0_id, inf0_id, "i32").unwrap();
971 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
972
973 let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
974 let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
975
976 assert_eq!(edge_cam0_to_inf0, 0);
977 assert_eq!(edge_cam0_to_broadcast, 1);
978
979 let runtime = compute_runtime_plan(graph).unwrap();
980 let broadcast_step = runtime
981 .steps
982 .iter()
983 .find_map(|step| match step {
984 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
985 _ => None,
986 })
987 .unwrap();
988
989 assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
990 assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
991 }
992
993 #[test]
994 fn test_runtime_plan_diamond_case2() {
995 let mut config = CuConfig::default();
997 let graph = config.get_graph_mut(None).unwrap();
998 let cam0_id = graph
999 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1000 .unwrap();
1001 let inf0_id = graph
1002 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1003 .unwrap();
1004 let broadcast_id = graph
1005 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1006 .unwrap();
1007
1008 graph.connect(cam0_id, inf0_id, "i32").unwrap();
1010 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1011 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1012
1013 let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1014 let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
1015
1016 assert_eq!(edge_cam0_to_broadcast, 0);
1017 assert_eq!(edge_cam0_to_inf0, 1);
1018
1019 let runtime = compute_runtime_plan(graph).unwrap();
1020 let broadcast_step = runtime
1021 .steps
1022 .iter()
1023 .find_map(|step| match step {
1024 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1025 _ => None,
1026 })
1027 .unwrap();
1028
1029 assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
1030 assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
1031 }
1032}