1use crate::config::{ComponentConfig, CuDirection, Node, DEFAULT_KEYFRAME_INTERVAL};
6use crate::config::{CuConfig, CuGraph, NodeId, RuntimeConfig};
7use crate::copperlist::{CopperList, CopperListState, CuListZeroedInit, CuListsManager};
8use crate::cutask::{BincodeAdapter, Freezable};
9use crate::monitoring::CuMonitor;
10use cu29_clock::{ClockProvider, CuTime, RobotClock};
11use cu29_traits::CuResult;
12use cu29_traits::WriteStream;
13use cu29_traits::{CopperListTuple, CuError};
14
15use bincode::enc::write::{SizeWriter, SliceWriter};
16use bincode::enc::EncoderImpl;
17use bincode::error::EncodeError;
18use bincode::{Decode, Encode};
19use core::fmt::{Debug, Formatter};
20use petgraph::prelude::*;
21use petgraph::visit::VisitMap;
22use petgraph::visit::Visitable;
23
24#[cfg(not(feature = "std"))]
25mod imp {
26 pub use alloc::boxed::Box;
27 pub use alloc::collections::VecDeque;
28 pub use alloc::format;
29 pub use alloc::string::String;
30 pub use alloc::string::ToString;
31 pub use alloc::vec::Vec;
32 pub use core::fmt::Result as FmtResult;
33}
34
35#[cfg(feature = "std")]
36mod imp {
37 pub use cu29_log_runtime::LoggerRuntime;
38 pub use cu29_unifiedlog::UnifiedLoggerWrite;
39 pub use rayon::ThreadPool;
40 pub use std::collections::VecDeque;
41 pub use std::fmt::Result as FmtResult;
42 pub use std::sync::{Arc, Mutex};
43}
44
45use imp::*;
46
47#[cfg(feature = "std")]
49pub struct CopperContext {
50 pub unified_logger: Arc<Mutex<UnifiedLoggerWrite>>,
51 pub logger_runtime: LoggerRuntime,
52 pub clock: RobotClock,
53}
54
55pub struct CopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
57 pub inner: CuListsManager<P, NBCL>,
58 pub logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
60}
61
62impl<P: CopperListTuple + Default, const NBCL: usize> CopperListsManager<P, NBCL> {
63 pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
64 let mut is_top = true;
65 let mut nb_done = 0;
66 for cl in self.inner.iter_mut() {
67 if cl.id == culistid && cl.get_state() == CopperListState::Processing {
68 cl.change_state(CopperListState::DoneProcessing);
69 }
70 if is_top && cl.get_state() == CopperListState::DoneProcessing {
71 if let Some(logger) = &mut self.logger {
72 cl.change_state(CopperListState::BeingSerialized);
73 logger.log(cl)?;
74 }
75 cl.change_state(CopperListState::Free);
76 nb_done += 1;
77 } else {
78 is_top = false;
79 }
80 }
81 for _ in 0..nb_done {
82 let _ = self.inner.pop();
83 }
84 Ok(())
85 }
86
87 pub fn available_copper_lists(&self) -> usize {
88 NBCL - self.inner.len()
89 }
90}
91
92pub struct KeyFramesManager {
94 inner: KeyFrame,
96
97 logger: Option<Box<dyn WriteStream<KeyFrame>>>,
99
100 keyframe_interval: u32,
102}
103
104impl KeyFramesManager {
105 fn is_keyframe(&self, culistid: u32) -> bool {
106 self.logger.is_some() && culistid.is_multiple_of(self.keyframe_interval)
107 }
108
109 pub fn reset(&mut self, culistid: u32, clock: &RobotClock) {
110 if self.is_keyframe(culistid) {
111 self.inner.reset(culistid, clock.now());
112 }
113 }
114
115 pub fn freeze_task(&mut self, culistid: u32, task: &impl Freezable) -> CuResult<usize> {
116 if self.is_keyframe(culistid) {
117 if self.inner.culistid != culistid {
118 panic!("Freezing task for a different culistid");
119 }
120 self.inner
121 .add_frozen_task(task)
122 .map_err(|e| CuError::from(format!("Failed to serialize task: {e}")))
123 } else {
124 Ok(0)
125 }
126 }
127
128 pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
129 if self.is_keyframe(culistid) {
130 let logger = self.logger.as_mut().unwrap();
131 logger.log(&self.inner)
132 } else {
133 Ok(())
134 }
135 }
136}
137
138pub struct CuRuntime<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
142 pub clock: RobotClock, pub tasks: CT,
147
148 pub bridges: CB,
150
151 #[cfg(feature = "std")]
153 pub threadpool: Arc<ThreadPool>,
154
155 pub monitor: M,
157
158 pub copperlists_manager: CopperListsManager<P, NBCL>,
160
161 pub keyframes_manager: KeyFramesManager,
163
164 pub runtime_config: RuntimeConfig,
166}
167
168impl<CT, CB, P: CopperListTuple + CuListZeroedInit + Default, M: CuMonitor, const NBCL: usize>
170 ClockProvider for CuRuntime<CT, CB, P, M, NBCL>
171{
172 fn get_clock(&self) -> RobotClock {
173 self.clock.clone()
174 }
175}
176
177#[derive(Encode, Decode)]
181pub struct KeyFrame {
182 pub culistid: u32,
184 pub timestamp: CuTime,
186 pub serialized_tasks: Vec<u8>,
188}
189
190impl KeyFrame {
191 fn new() -> Self {
192 KeyFrame {
193 culistid: 0,
194 timestamp: CuTime::default(),
195 serialized_tasks: Vec::new(),
196 }
197 }
198
199 fn reset(&mut self, culistid: u32, timestamp: CuTime) {
201 self.culistid = culistid;
202 self.timestamp = timestamp;
203 self.serialized_tasks.clear();
204 }
205
206 fn add_frozen_task(&mut self, task: &impl Freezable) -> Result<usize, EncodeError> {
208 let cfg = bincode::config::standard();
209 let mut sizer = EncoderImpl::<_, _>::new(SizeWriter::default(), cfg);
210 BincodeAdapter(task).encode(&mut sizer)?;
211 let need = sizer.into_writer().bytes_written as usize;
212
213 let start = self.serialized_tasks.len();
214 self.serialized_tasks.resize(start + need, 0);
215 let mut enc =
216 EncoderImpl::<_, _>::new(SliceWriter::new(&mut self.serialized_tasks[start..]), cfg);
217 BincodeAdapter(task).encode(&mut enc)?;
218 Ok(need)
219 }
220}
221
222impl<
223 CT,
224 CB,
225 P: CopperListTuple + CuListZeroedInit + Default + 'static,
226 M: CuMonitor,
227 const NBCL: usize,
228 > CuRuntime<CT, CB, P, M, NBCL>
229{
230 #[allow(clippy::too_many_arguments)]
232 #[cfg(feature = "std")]
233 pub fn new(
234 clock: RobotClock,
235 config: &CuConfig,
236 mission: Option<&str>,
237 tasks_instanciator: impl for<'c> Fn(
238 Vec<Option<&'c ComponentConfig>>,
239 Arc<ThreadPool>,
240 ) -> CuResult<CT>,
241 monitor_instanciator: impl Fn(&CuConfig) -> M,
242 bridges_instanciator: impl Fn(&CuConfig) -> CuResult<CB>,
243 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
244 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
245 ) -> CuResult<Self> {
246 let graph = config.get_graph(mission)?;
247 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
248 .get_all_nodes()
249 .iter()
250 .map(|(_, node)| node.get_instance_config())
251 .collect();
252
253 let threadpool = Arc::new(
256 rayon::ThreadPoolBuilder::new()
257 .num_threads(2) .build()
259 .expect("Could not create the threadpool"),
260 );
261
262 let tasks = tasks_instanciator(all_instances_configs, threadpool.clone())?;
263 let monitor = monitor_instanciator(config);
264 let bridges = bridges_instanciator(config)?;
265
266 let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
267 Some(logging_config) if logging_config.enable_task_logging => (
268 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
269 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
270 logging_config.keyframe_interval.unwrap(), ),
272 Some(_) => (None, None, 0), None => (
274 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
276 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
277 DEFAULT_KEYFRAME_INTERVAL,
278 ),
279 };
280
281 let copperlists_manager = CopperListsManager {
282 inner: CuListsManager::new(),
283 logger: copperlists_logger,
284 };
285
286 let keyframes_manager = KeyFramesManager {
287 inner: KeyFrame::new(),
288 logger: keyframes_logger,
289 keyframe_interval,
290 };
291
292 let runtime_config = config.runtime.clone().unwrap_or_default();
293
294 let runtime = Self {
295 tasks,
296 bridges,
297 threadpool,
298 monitor,
299 clock,
300 copperlists_manager,
301 keyframes_manager,
302 runtime_config,
303 };
304
305 Ok(runtime)
306 }
307
308 #[allow(clippy::too_many_arguments)]
309 #[cfg(not(feature = "std"))]
310 pub fn new(
311 clock: RobotClock,
312 config: &CuConfig,
313 mission: Option<&str>,
314 tasks_instanciator: impl for<'c> Fn(Vec<Option<&'c ComponentConfig>>) -> CuResult<CT>,
315 monitor_instanciator: impl Fn(&CuConfig) -> M,
316 bridges_instanciator: impl Fn(&CuConfig) -> CuResult<CB>,
317 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
318 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
319 ) -> CuResult<Self> {
320 let graph = config.get_graph(mission)?;
321 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
322 .get_all_nodes()
323 .iter()
324 .map(|(_, node)| node.get_instance_config())
325 .collect();
326
327 let tasks = tasks_instanciator(all_instances_configs)?;
328
329 let monitor = monitor_instanciator(config);
330 let bridges = bridges_instanciator(config)?;
331
332 let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
333 Some(logging_config) if logging_config.enable_task_logging => (
334 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
335 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
336 logging_config.keyframe_interval.unwrap(), ),
338 Some(_) => (None, None, 0), None => (
340 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
342 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
343 DEFAULT_KEYFRAME_INTERVAL,
344 ),
345 };
346
347 let copperlists_manager = CopperListsManager {
348 inner: CuListsManager::new(),
349 logger: copperlists_logger,
350 };
351
352 let keyframes_manager = KeyFramesManager {
353 inner: KeyFrame::new(),
354 logger: keyframes_logger,
355 keyframe_interval,
356 };
357
358 let runtime_config = config.runtime.clone().unwrap_or_default();
359
360 let runtime = Self {
361 tasks,
362 bridges,
363 #[cfg(feature = "std")]
364 threadpool,
365 monitor,
366 clock,
367 copperlists_manager,
368 keyframes_manager,
369 runtime_config,
370 };
371
372 Ok(runtime)
373 }
374}
375
376#[derive(Debug, PartialEq, Eq, Clone, Copy)]
381pub enum CuTaskType {
382 Source,
383 Regular,
384 Sink,
385}
386
387pub struct CuExecutionStep {
389 pub node_id: NodeId,
391 pub node: Node,
393 pub task_type: CuTaskType,
395
396 pub input_msg_indices_types: Vec<(u32, String)>,
398
399 pub output_msg_index_type: Option<(u32, String)>,
401}
402
403impl Debug for CuExecutionStep {
404 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
405 f.write_str(format!(" CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
406 f.write_str(format!(" task_type: {:?}\n", self.node.get_type()).as_str())?;
407 f.write_str(format!(" task: {:?}\n", self.task_type).as_str())?;
408 f.write_str(
409 format!(
410 " input_msg_types: {:?}\n",
411 self.input_msg_indices_types
412 )
413 .as_str(),
414 )?;
415 f.write_str(
416 format!(" output_msg_type: {:?}\n", self.output_msg_index_type).as_str(),
417 )?;
418 Ok(())
419 }
420}
421
422pub struct CuExecutionLoop {
427 pub steps: Vec<CuExecutionUnit>,
428 pub loop_count: Option<u32>,
429}
430
431impl Debug for CuExecutionLoop {
432 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
433 f.write_str("CuExecutionLoop:\n")?;
434 for step in &self.steps {
435 match step {
436 CuExecutionUnit::Step(step) => {
437 step.fmt(f)?;
438 }
439 CuExecutionUnit::Loop(l) => {
440 l.fmt(f)?;
441 }
442 }
443 }
444
445 f.write_str(format!(" count: {:?}", self.loop_count).as_str())?;
446 Ok(())
447 }
448}
449
450#[derive(Debug)]
452pub enum CuExecutionUnit {
453 Step(CuExecutionStep),
454 Loop(CuExecutionLoop),
455}
456
457fn find_output_index_type_from_nodeid(
458 node_id: NodeId,
459 steps: &Vec<CuExecutionUnit>,
460) -> Option<(u32, String)> {
461 for step in steps {
462 match step {
463 CuExecutionUnit::Loop(loop_unit) => {
464 if let Some(index) = find_output_index_type_from_nodeid(node_id, &loop_unit.steps) {
465 return Some(index);
466 }
467 }
468 CuExecutionUnit::Step(step) => {
469 if step.node_id == node_id {
470 return step.output_msg_index_type.clone();
471 }
472 }
473 }
474 }
475 None
476}
477
478pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuTaskType {
479 if graph.incoming_neighbor_count(node_id) == 0 {
480 CuTaskType::Source
481 } else if graph.outgoing_neighbor_count(node_id) == 0 {
482 CuTaskType::Sink
483 } else {
484 CuTaskType::Regular
485 }
486}
487
488fn find_edge_with_plan_input_id(
491 plan: &[CuExecutionUnit],
492 graph: &CuGraph,
493 plan_id: u32,
494 output_node_id: NodeId,
495) -> usize {
496 let input_node = plan
497 .get(plan_id as usize)
498 .expect("Input step should've been added to plan before the step that receives the input");
499 let CuExecutionUnit::Step(input_step) = input_node else {
500 panic!("Expected input to be from a step, not a loop");
501 };
502 let input_node_id = input_step.node_id;
503
504 graph
505 .0
506 .edges_connecting(input_node_id.into(), output_node_id.into())
507 .map(|edge| edge.id().index())
508 .next()
509 .expect("An edge connecting the input to the output should exist")
510}
511
512fn sort_inputs_by_cnx_id(
515 input_msg_indices_types: &mut [(u32, String)],
516 plan: &[CuExecutionUnit],
517 graph: &CuGraph,
518 curr_node_id: NodeId,
519) {
520 input_msg_indices_types.sort_by(|(a_index, _), (b_index, _)| {
521 let a_edge_id = find_edge_with_plan_input_id(plan, graph, *a_index, curr_node_id);
522 let b_edge_id = find_edge_with_plan_input_id(plan, graph, *b_index, curr_node_id);
523 a_edge_id.cmp(&b_edge_id)
524 });
525}
526fn plan_tasks_tree_branch(
528 graph: &CuGraph,
529 mut next_culist_output_index: u32,
530 starting_point: NodeId,
531 plan: &mut Vec<CuExecutionUnit>,
532) -> (u32, bool) {
533 #[cfg(all(feature = "std", feature = "macro_debug"))]
534 eprintln!("-- starting branch from node {starting_point}");
535
536 let mut visitor = Bfs::new(&graph.0, starting_point.into());
537 let mut handled = false;
538
539 while let Some(node) = visitor.next(&graph.0) {
540 let id = node.index() as NodeId;
541 let node_ref = graph.get_node(id).unwrap();
542 #[cfg(all(feature = "std", feature = "macro_debug"))]
543 eprintln!(" Visiting node: {node_ref:?}");
544
545 let mut input_msg_indices_types: Vec<(u32, String)> = Vec::new();
546 let output_msg_index_type: Option<(u32, String)>;
547 let task_type = find_task_type_for_id(graph, id);
548
549 match task_type {
550 CuTaskType::Source => {
551 #[cfg(all(feature = "std", feature = "macro_debug"))]
552 eprintln!(" → Source node, assign output index {next_culist_output_index}");
553 output_msg_index_type = Some((
554 next_culist_output_index,
555 graph
556 .0
557 .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0]))
558 .unwrap() .msg
560 .clone(),
561 ));
562 next_culist_output_index += 1;
563 }
564 CuTaskType::Sink => {
565 let parents: Vec<NodeId> = graph.get_neighbor_ids(id, CuDirection::Incoming);
566 #[cfg(all(feature = "std", feature = "macro_debug"))]
567 eprintln!(" → Sink with parents: {parents:?}");
568 for parent in parents {
569 let pid = parent;
570 let index_type = find_output_index_type_from_nodeid(pid, plan);
571 if let Some(index_type) = index_type {
572 #[cfg(all(feature = "std", feature = "macro_debug"))]
573 eprintln!(" ✓ Input from {pid} ready: {index_type:?}");
574 input_msg_indices_types.push(index_type);
575 } else {
576 #[cfg(all(feature = "std", feature = "macro_debug"))]
577 eprintln!(" ✗ Input from {pid} not ready, returning");
578 return (next_culist_output_index, handled);
579 }
580 }
581 output_msg_index_type = Some((next_culist_output_index, "()".to_string()));
582 next_culist_output_index += 1;
583 }
584 CuTaskType::Regular => {
585 let parents: Vec<NodeId> = graph.get_neighbor_ids(id, CuDirection::Incoming);
586 #[cfg(all(feature = "std", feature = "macro_debug"))]
587 eprintln!(" → Regular task with parents: {parents:?}");
588 for parent in parents {
589 let pid = parent;
590 let index_type = find_output_index_type_from_nodeid(pid, plan);
591 if let Some(index_type) = index_type {
592 #[cfg(all(feature = "std", feature = "macro_debug"))]
593 eprintln!(" ✓ Input from {pid} ready: {index_type:?}");
594 input_msg_indices_types.push(index_type);
595 } else {
596 #[cfg(all(feature = "std", feature = "macro_debug"))]
597 eprintln!(" ✗ Input from {pid} not ready, returning");
598 return (next_culist_output_index, handled);
599 }
600 }
601 output_msg_index_type = Some((
602 next_culist_output_index,
603 graph
604 .0
605 .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0])) .unwrap()
607 .msg
608 .clone(),
609 ));
610 next_culist_output_index += 1;
611 }
612 }
613
614 sort_inputs_by_cnx_id(&mut input_msg_indices_types, plan, graph, id);
615
616 if let Some(pos) = plan
617 .iter()
618 .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
619 {
620 #[cfg(all(feature = "std", feature = "macro_debug"))]
621 eprintln!(" → Already in plan, modifying existing step");
622 let mut step = plan.remove(pos);
623 if let CuExecutionUnit::Step(ref mut s) = step {
624 s.input_msg_indices_types = input_msg_indices_types;
625 }
626 plan.push(step);
627 } else {
628 #[cfg(all(feature = "std", feature = "macro_debug"))]
629 eprintln!(" → New step added to plan");
630 let step = CuExecutionStep {
631 node_id: id,
632 node: node_ref.clone(),
633 task_type,
634 input_msg_indices_types,
635 output_msg_index_type,
636 };
637 plan.push(CuExecutionUnit::Step(step));
638 }
639
640 handled = true;
641 }
642
643 #[cfg(all(feature = "std", feature = "macro_debug"))]
644 eprintln!("-- finished branch from node {starting_point} with handled={handled}");
645 (next_culist_output_index, handled)
646}
647
648pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
651 #[cfg(all(feature = "std", feature = "macro_debug"))]
652 eprintln!("[runtime plan]");
653 let visited = graph.0.visit_map();
654 let mut plan = Vec::new();
655 let mut next_culist_output_index = 0u32;
656
657 let mut queue: VecDeque<NodeId> = graph
658 .node_indices()
659 .iter()
660 .filter(|&node| find_task_type_for_id(graph, node.index() as NodeId) == CuTaskType::Source)
661 .map(|node| node.index() as NodeId)
662 .collect();
663
664 #[cfg(all(feature = "std", feature = "macro_debug"))]
665 eprintln!("Initial source nodes: {queue:?}");
666
667 while let Some(start_node) = queue.pop_front() {
668 if visited.is_visited(&start_node) {
669 #[cfg(all(feature = "std", feature = "macro_debug"))]
670 eprintln!("→ Skipping already visited source {start_node}");
671 continue;
672 }
673
674 #[cfg(all(feature = "std", feature = "macro_debug"))]
675 eprintln!("→ Starting BFS from source {start_node}");
676 let mut bfs = Bfs::new(&graph.0, start_node.into());
677
678 while let Some(node_index) = bfs.next(&graph.0) {
679 let node_id = node_index.index() as NodeId;
680 let already_in_plan = plan
681 .iter()
682 .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
683 if already_in_plan {
684 #[cfg(all(feature = "std", feature = "macro_debug"))]
685 eprintln!(" → Node {node_id} already planned, skipping");
686 continue;
687 }
688
689 #[cfg(all(feature = "std", feature = "macro_debug"))]
690 eprintln!(" Planning from node {node_id}");
691 let (new_index, handled) =
692 plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan);
693 next_culist_output_index = new_index;
694
695 if !handled {
696 #[cfg(all(feature = "std", feature = "macro_debug"))]
697 eprintln!(" ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
698 continue;
699 }
700
701 #[cfg(all(feature = "std", feature = "macro_debug"))]
702 eprintln!(" ✓ Node {node_id} handled successfully, enqueueing neighbors");
703 for neighbor in graph.0.neighbors(node_index) {
704 if !visited.is_visited(&neighbor) {
705 let nid = neighbor.index() as NodeId;
706 #[cfg(all(feature = "std", feature = "macro_debug"))]
707 eprintln!(" → Enqueueing neighbor {nid}");
708 queue.push_back(nid);
709 }
710 }
711 }
712 }
713
714 Ok(CuExecutionLoop {
715 steps: plan,
716 loop_count: None,
717 })
718}
719
720#[cfg(test)]
722mod tests {
723 use super::*;
724 use crate::config::Node;
725 use crate::cutask::CuSinkTask;
726 use crate::cutask::{CuSrcTask, Freezable};
727 use crate::monitoring::NoMonitor;
728 use bincode::Encode;
729 use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks};
730 use serde_derive::Serialize;
731
732 pub struct TestSource {}
733
734 impl Freezable for TestSource {}
735
736 impl CuSrcTask for TestSource {
737 type Output<'m> = ();
738 fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
739 where
740 Self: Sized,
741 {
742 Ok(Self {})
743 }
744
745 fn process(
746 &mut self,
747 _clock: &RobotClock,
748 _empty_msg: &mut Self::Output<'_>,
749 ) -> CuResult<()> {
750 Ok(())
751 }
752 }
753
754 pub struct TestSink {}
755
756 impl Freezable for TestSink {}
757
758 impl CuSinkTask for TestSink {
759 type Input<'m> = ();
760
761 fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
762 where
763 Self: Sized,
764 {
765 Ok(Self {})
766 }
767
768 fn process(&mut self, _clock: &RobotClock, _input: &Self::Input<'_>) -> CuResult<()> {
769 Ok(())
770 }
771 }
772
773 type Tasks = (TestSource, TestSink);
775
776 #[derive(Debug, Encode, Decode, Serialize, Default)]
777 struct Msgs(());
778
779 impl ErasedCuStampedDataSet for Msgs {
780 fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
781 Vec::new()
782 }
783 }
784
785 impl MatchingTasks for Msgs {
786 fn get_all_task_ids() -> &'static [&'static str] {
787 &[]
788 }
789 }
790
791 impl CuListZeroedInit for Msgs {
792 fn init_zeroed(&mut self) {}
793 }
794
795 #[cfg(feature = "std")]
796 fn tasks_instanciator(
797 all_instances_configs: Vec<Option<&ComponentConfig>>,
798 _threadpool: Arc<ThreadPool>,
799 ) -> CuResult<Tasks> {
800 Ok((
801 TestSource::new(all_instances_configs[0])?,
802 TestSink::new(all_instances_configs[1])?,
803 ))
804 }
805
806 #[cfg(not(feature = "std"))]
807 fn tasks_instanciator(all_instances_configs: Vec<Option<&ComponentConfig>>) -> CuResult<Tasks> {
808 Ok((
809 TestSource::new(all_instances_configs[0])?,
810 TestSink::new(all_instances_configs[1])?,
811 ))
812 }
813
814 fn monitor_instanciator(_config: &CuConfig) -> NoMonitor {
815 NoMonitor {}
816 }
817
818 fn bridges_instanciator(_config: &CuConfig) -> CuResult<()> {
819 Ok(())
820 }
821
822 #[derive(Debug)]
823 struct FakeWriter {}
824
825 impl<E: Encode> WriteStream<E> for FakeWriter {
826 fn log(&mut self, _obj: &E) -> CuResult<()> {
827 Ok(())
828 }
829 }
830
831 #[test]
832 fn test_runtime_instantiation() {
833 let mut config = CuConfig::default();
834 let graph = config.get_graph_mut(None).unwrap();
835 graph.add_node(Node::new("a", "TestSource")).unwrap();
836 graph.add_node(Node::new("b", "TestSink")).unwrap();
837 graph.connect(0, 1, "()").unwrap();
838 let runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
839 RobotClock::default(),
840 &config,
841 None,
842 tasks_instanciator,
843 monitor_instanciator,
844 bridges_instanciator,
845 FakeWriter {},
846 FakeWriter {},
847 );
848 assert!(runtime.is_ok());
849 }
850
851 #[test]
852 fn test_copperlists_manager_lifecycle() {
853 let mut config = CuConfig::default();
854 let graph = config.get_graph_mut(None).unwrap();
855 graph.add_node(Node::new("a", "TestSource")).unwrap();
856 graph.add_node(Node::new("b", "TestSink")).unwrap();
857 graph.connect(0, 1, "()").unwrap();
858
859 let mut runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
860 RobotClock::default(),
861 &config,
862 None,
863 tasks_instanciator,
864 monitor_instanciator,
865 bridges_instanciator,
866 FakeWriter {},
867 FakeWriter {},
868 )
869 .unwrap();
870
871 {
873 let copperlists = &mut runtime.copperlists_manager;
874 let culist0 = copperlists
875 .inner
876 .create()
877 .expect("Ran out of space for copper lists");
878 let id = culist0.id;
880 assert_eq!(id, 0);
881 culist0.change_state(CopperListState::Processing);
882 assert_eq!(copperlists.available_copper_lists(), 1);
883 }
884
885 {
886 let copperlists = &mut runtime.copperlists_manager;
887 let culist1 = copperlists
888 .inner
889 .create()
890 .expect("Ran out of space for copper lists"); let id = culist1.id;
892 assert_eq!(id, 1);
893 culist1.change_state(CopperListState::Processing);
894 assert_eq!(copperlists.available_copper_lists(), 0);
895 }
896
897 {
898 let copperlists = &mut runtime.copperlists_manager;
899 let culist2 = copperlists.inner.create();
900 assert!(culist2.is_none());
901 assert_eq!(copperlists.available_copper_lists(), 0);
902 let _ = copperlists.end_of_processing(1);
904 assert_eq!(copperlists.available_copper_lists(), 1);
905 }
906
907 {
909 let copperlists = &mut runtime.copperlists_manager;
910 let culist2 = copperlists
911 .inner
912 .create()
913 .expect("Ran out of space for copper lists"); let id = culist2.id;
915 assert_eq!(id, 2);
916 culist2.change_state(CopperListState::Processing);
917 assert_eq!(copperlists.available_copper_lists(), 0);
918 let _ = copperlists.end_of_processing(0);
920 assert_eq!(copperlists.available_copper_lists(), 0);
922
923 let _ = copperlists.end_of_processing(2);
925 assert_eq!(copperlists.available_copper_lists(), 2);
928 }
929 }
930
931 #[test]
932 fn test_runtime_task_input_order() {
933 let mut config = CuConfig::default();
934 let graph = config.get_graph_mut(None).unwrap();
935 let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
936 let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
937 let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
938
939 assert_eq!(src1_id, 0);
940 assert_eq!(src2_id, 1);
941
942 let src1_type = "src1_type";
944 let src2_type = "src2_type";
945 graph.connect(src2_id, sink_id, src2_type).unwrap();
946 graph.connect(src1_id, sink_id, src1_type).unwrap();
947
948 let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
949 let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
950 assert_eq!(src1_edge_id, 1);
953 assert_eq!(src2_edge_id, 0);
954
955 let runtime = compute_runtime_plan(graph).unwrap();
956 let sink_step = runtime
957 .steps
958 .iter()
959 .find_map(|step| match step {
960 CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
961 _ => None,
962 })
963 .unwrap();
964
965 assert_eq!(sink_step.input_msg_indices_types[0].1, src2_type);
968 assert_eq!(sink_step.input_msg_indices_types[1].1, src1_type);
969 }
970
971 #[test]
972 fn test_runtime_plan_diamond_case1() {
973 let mut config = CuConfig::default();
975 let graph = config.get_graph_mut(None).unwrap();
976 let cam0_id = graph
977 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
978 .unwrap();
979 let inf0_id = graph
980 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
981 .unwrap();
982 let broadcast_id = graph
983 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
984 .unwrap();
985
986 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
988 graph.connect(cam0_id, inf0_id, "i32").unwrap();
989 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
990
991 let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
992 let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
993
994 assert_eq!(edge_cam0_to_inf0, 0);
995 assert_eq!(edge_cam0_to_broadcast, 1);
996
997 let runtime = compute_runtime_plan(graph).unwrap();
998 let broadcast_step = runtime
999 .steps
1000 .iter()
1001 .find_map(|step| match step {
1002 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1003 _ => None,
1004 })
1005 .unwrap();
1006
1007 assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
1008 assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
1009 }
1010
1011 #[test]
1012 fn test_runtime_plan_diamond_case2() {
1013 let mut config = CuConfig::default();
1015 let graph = config.get_graph_mut(None).unwrap();
1016 let cam0_id = graph
1017 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1018 .unwrap();
1019 let inf0_id = graph
1020 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1021 .unwrap();
1022 let broadcast_id = graph
1023 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1024 .unwrap();
1025
1026 graph.connect(cam0_id, inf0_id, "i32").unwrap();
1028 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1029 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1030
1031 let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1032 let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
1033
1034 assert_eq!(edge_cam0_to_broadcast, 0);
1035 assert_eq!(edge_cam0_to_inf0, 1);
1036
1037 let runtime = compute_runtime_plan(graph).unwrap();
1038 let broadcast_step = runtime
1039 .steps
1040 .iter()
1041 .find_map(|step| match step {
1042 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1043 _ => None,
1044 })
1045 .unwrap();
1046
1047 assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
1048 assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
1049 }
1050}