1use crate::config::{ComponentConfig, CuDirection, Node, DEFAULT_KEYFRAME_INTERVAL};
6use crate::config::{CuConfig, CuGraph, NodeId, RuntimeConfig};
7use crate::copperlist::{CopperList, CopperListState, CuListZeroedInit, CuListsManager};
8use crate::cutask::{BincodeAdapter, Freezable};
9use crate::monitoring::{build_monitor_topology, CuMonitor};
10use cu29_clock::{ClockProvider, CuTime, RobotClock};
11use cu29_traits::CuResult;
12use cu29_traits::WriteStream;
13use cu29_traits::{CopperListTuple, CuError};
14
15use bincode::enc::write::{SizeWriter, SliceWriter};
16use bincode::enc::EncoderImpl;
17use bincode::error::EncodeError;
18use bincode::{Decode, Encode};
19use core::fmt::{Debug, Formatter};
20use petgraph::prelude::*;
21use petgraph::visit::VisitMap;
22use petgraph::visit::Visitable;
23
24#[cfg(not(feature = "std"))]
25mod imp {
26 pub use alloc::boxed::Box;
27 pub use alloc::collections::VecDeque;
28 pub use alloc::format;
29 pub use alloc::string::String;
30 pub use alloc::string::ToString;
31 pub use alloc::vec::Vec;
32 pub use core::fmt::Result as FmtResult;
33}
34
35#[cfg(feature = "std")]
36mod imp {
37 pub use cu29_log_runtime::LoggerRuntime;
38 pub use cu29_unifiedlog::UnifiedLoggerWrite;
39 pub use rayon::ThreadPool;
40 pub use std::collections::VecDeque;
41 pub use std::fmt::Result as FmtResult;
42 pub use std::sync::{Arc, Mutex};
43}
44
45use imp::*;
46
47#[cfg(feature = "std")]
49pub struct CopperContext {
50 pub unified_logger: Arc<Mutex<UnifiedLoggerWrite>>,
51 pub logger_runtime: LoggerRuntime,
52 pub clock: RobotClock,
53}
54
55pub struct CopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
57 pub inner: CuListsManager<P, NBCL>,
58 pub logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
60}
61
62impl<P: CopperListTuple + Default, const NBCL: usize> CopperListsManager<P, NBCL> {
63 pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
64 let mut is_top = true;
65 let mut nb_done = 0;
66 for cl in self.inner.iter_mut() {
67 if cl.id == culistid && cl.get_state() == CopperListState::Processing {
68 cl.change_state(CopperListState::DoneProcessing);
69 }
70 if is_top && cl.get_state() == CopperListState::DoneProcessing {
71 if let Some(logger) = &mut self.logger {
72 cl.change_state(CopperListState::BeingSerialized);
73 logger.log(cl)?;
74 }
75 cl.change_state(CopperListState::Free);
76 nb_done += 1;
77 } else {
78 is_top = false;
79 }
80 }
81 for _ in 0..nb_done {
82 let _ = self.inner.pop();
83 }
84 Ok(())
85 }
86
87 pub fn available_copper_lists(&self) -> usize {
88 NBCL - self.inner.len()
89 }
90}
91
92pub struct KeyFramesManager {
94 inner: KeyFrame,
96
97 logger: Option<Box<dyn WriteStream<KeyFrame>>>,
99
100 keyframe_interval: u32,
102}
103
104impl KeyFramesManager {
105 fn is_keyframe(&self, culistid: u32) -> bool {
106 self.logger.is_some() && culistid.is_multiple_of(self.keyframe_interval)
107 }
108
109 pub fn reset(&mut self, culistid: u32, clock: &RobotClock) {
110 if self.is_keyframe(culistid) {
111 self.inner.reset(culistid, clock.now());
112 }
113 }
114
115 pub fn freeze_task(&mut self, culistid: u32, task: &impl Freezable) -> CuResult<usize> {
116 if self.is_keyframe(culistid) {
117 if self.inner.culistid != culistid {
118 panic!("Freezing task for a different culistid");
119 }
120 self.inner
121 .add_frozen_task(task)
122 .map_err(|e| CuError::from(format!("Failed to serialize task: {e}")))
123 } else {
124 Ok(0)
125 }
126 }
127
128 pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
129 if self.is_keyframe(culistid) {
130 let logger = self.logger.as_mut().unwrap();
131 logger.log(&self.inner)
132 } else {
133 Ok(())
134 }
135 }
136}
137
138pub struct CuRuntime<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
142 pub clock: RobotClock, pub tasks: CT,
147
148 pub bridges: CB,
150
151 #[cfg(feature = "std")]
153 pub threadpool: Arc<ThreadPool>,
154
155 pub monitor: M,
157
158 pub copperlists_manager: CopperListsManager<P, NBCL>,
160
161 pub keyframes_manager: KeyFramesManager,
163
164 pub runtime_config: RuntimeConfig,
166}
167
168impl<CT, CB, P: CopperListTuple + CuListZeroedInit + Default, M: CuMonitor, const NBCL: usize>
170 ClockProvider for CuRuntime<CT, CB, P, M, NBCL>
171{
172 fn get_clock(&self) -> RobotClock {
173 self.clock.clone()
174 }
175}
176
177#[derive(Encode, Decode)]
181pub struct KeyFrame {
182 pub culistid: u32,
184 pub timestamp: CuTime,
186 pub serialized_tasks: Vec<u8>,
188}
189
190impl KeyFrame {
191 fn new() -> Self {
192 KeyFrame {
193 culistid: 0,
194 timestamp: CuTime::default(),
195 serialized_tasks: Vec::new(),
196 }
197 }
198
199 fn reset(&mut self, culistid: u32, timestamp: CuTime) {
201 self.culistid = culistid;
202 self.timestamp = timestamp;
203 self.serialized_tasks.clear();
204 }
205
206 fn add_frozen_task(&mut self, task: &impl Freezable) -> Result<usize, EncodeError> {
208 let cfg = bincode::config::standard();
209 let mut sizer = EncoderImpl::<_, _>::new(SizeWriter::default(), cfg);
210 BincodeAdapter(task).encode(&mut sizer)?;
211 let need = sizer.into_writer().bytes_written as usize;
212
213 let start = self.serialized_tasks.len();
214 self.serialized_tasks.resize(start + need, 0);
215 let mut enc =
216 EncoderImpl::<_, _>::new(SliceWriter::new(&mut self.serialized_tasks[start..]), cfg);
217 BincodeAdapter(task).encode(&mut enc)?;
218 Ok(need)
219 }
220}
221
222impl<
223 CT,
224 CB,
225 P: CopperListTuple + CuListZeroedInit + Default + 'static,
226 M: CuMonitor,
227 const NBCL: usize,
228 > CuRuntime<CT, CB, P, M, NBCL>
229{
230 #[allow(clippy::too_many_arguments)]
232 #[cfg(feature = "std")]
233 pub fn new(
234 clock: RobotClock,
235 config: &CuConfig,
236 mission: Option<&str>,
237 tasks_instanciator: impl for<'c> Fn(
238 Vec<Option<&'c ComponentConfig>>,
239 Arc<ThreadPool>,
240 ) -> CuResult<CT>,
241 monitor_instanciator: impl Fn(&CuConfig) -> M,
242 bridges_instanciator: impl Fn(&CuConfig) -> CuResult<CB>,
243 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
244 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
245 ) -> CuResult<Self> {
246 let graph = config.get_graph(mission)?;
247 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
248 .get_all_nodes()
249 .iter()
250 .map(|(_, node)| node.get_instance_config())
251 .collect();
252
253 let threadpool = Arc::new(
256 rayon::ThreadPoolBuilder::new()
257 .num_threads(2) .build()
259 .expect("Could not create the threadpool"),
260 );
261
262 let tasks = tasks_instanciator(all_instances_configs, threadpool.clone())?;
263 let mut monitor = monitor_instanciator(config);
264 if let Ok(topology) = build_monitor_topology(config, mission) {
265 monitor.set_topology(topology);
266 }
267 let bridges = bridges_instanciator(config)?;
268
269 let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
270 Some(logging_config) if logging_config.enable_task_logging => (
271 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
272 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
273 logging_config.keyframe_interval.unwrap(), ),
275 Some(_) => (None, None, 0), None => (
277 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
279 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
280 DEFAULT_KEYFRAME_INTERVAL,
281 ),
282 };
283
284 let copperlists_manager = CopperListsManager {
285 inner: CuListsManager::new(),
286 logger: copperlists_logger,
287 };
288
289 let keyframes_manager = KeyFramesManager {
290 inner: KeyFrame::new(),
291 logger: keyframes_logger,
292 keyframe_interval,
293 };
294
295 let runtime_config = config.runtime.clone().unwrap_or_default();
296
297 let runtime = Self {
298 tasks,
299 bridges,
300 threadpool,
301 monitor,
302 clock,
303 copperlists_manager,
304 keyframes_manager,
305 runtime_config,
306 };
307
308 Ok(runtime)
309 }
310
311 #[allow(clippy::too_many_arguments)]
312 #[cfg(not(feature = "std"))]
313 pub fn new(
314 clock: RobotClock,
315 config: &CuConfig,
316 mission: Option<&str>,
317 tasks_instanciator: impl for<'c> Fn(Vec<Option<&'c ComponentConfig>>) -> CuResult<CT>,
318 monitor_instanciator: impl Fn(&CuConfig) -> M,
319 bridges_instanciator: impl Fn(&CuConfig) -> CuResult<CB>,
320 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
321 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
322 ) -> CuResult<Self> {
323 let graph = config.get_graph(mission)?;
324 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
325 .get_all_nodes()
326 .iter()
327 .map(|(_, node)| node.get_instance_config())
328 .collect();
329
330 let tasks = tasks_instanciator(all_instances_configs)?;
331
332 let mut monitor = monitor_instanciator(config);
333 if let Ok(topology) = build_monitor_topology(config, mission) {
334 monitor.set_topology(topology);
335 }
336 let bridges = bridges_instanciator(config)?;
337
338 let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
339 Some(logging_config) if logging_config.enable_task_logging => (
340 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
341 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
342 logging_config.keyframe_interval.unwrap(), ),
344 Some(_) => (None, None, 0), None => (
346 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
348 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
349 DEFAULT_KEYFRAME_INTERVAL,
350 ),
351 };
352
353 let copperlists_manager = CopperListsManager {
354 inner: CuListsManager::new(),
355 logger: copperlists_logger,
356 };
357
358 let keyframes_manager = KeyFramesManager {
359 inner: KeyFrame::new(),
360 logger: keyframes_logger,
361 keyframe_interval,
362 };
363
364 let runtime_config = config.runtime.clone().unwrap_or_default();
365
366 let runtime = Self {
367 tasks,
368 bridges,
369 #[cfg(feature = "std")]
370 threadpool,
371 monitor,
372 clock,
373 copperlists_manager,
374 keyframes_manager,
375 runtime_config,
376 };
377
378 Ok(runtime)
379 }
380}
381
382#[derive(Debug, PartialEq, Eq, Clone, Copy)]
387pub enum CuTaskType {
388 Source,
389 Regular,
390 Sink,
391}
392
393pub struct CuExecutionStep {
395 pub node_id: NodeId,
397 pub node: Node,
399 pub task_type: CuTaskType,
401
402 pub input_msg_indices_types: Vec<(u32, String)>,
404
405 pub output_msg_index_type: Option<(u32, String)>,
407}
408
409impl Debug for CuExecutionStep {
410 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
411 f.write_str(format!(" CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
412 f.write_str(format!(" task_type: {:?}\n", self.node.get_type()).as_str())?;
413 f.write_str(format!(" task: {:?}\n", self.task_type).as_str())?;
414 f.write_str(
415 format!(
416 " input_msg_types: {:?}\n",
417 self.input_msg_indices_types
418 )
419 .as_str(),
420 )?;
421 f.write_str(
422 format!(" output_msg_type: {:?}\n", self.output_msg_index_type).as_str(),
423 )?;
424 Ok(())
425 }
426}
427
428pub struct CuExecutionLoop {
433 pub steps: Vec<CuExecutionUnit>,
434 pub loop_count: Option<u32>,
435}
436
437impl Debug for CuExecutionLoop {
438 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
439 f.write_str("CuExecutionLoop:\n")?;
440 for step in &self.steps {
441 match step {
442 CuExecutionUnit::Step(step) => {
443 step.fmt(f)?;
444 }
445 CuExecutionUnit::Loop(l) => {
446 l.fmt(f)?;
447 }
448 }
449 }
450
451 f.write_str(format!(" count: {:?}", self.loop_count).as_str())?;
452 Ok(())
453 }
454}
455
456#[derive(Debug)]
458pub enum CuExecutionUnit {
459 Step(CuExecutionStep),
460 Loop(CuExecutionLoop),
461}
462
463fn find_output_index_type_from_nodeid(
464 node_id: NodeId,
465 steps: &Vec<CuExecutionUnit>,
466) -> Option<(u32, String)> {
467 for step in steps {
468 match step {
469 CuExecutionUnit::Loop(loop_unit) => {
470 if let Some(index) = find_output_index_type_from_nodeid(node_id, &loop_unit.steps) {
471 return Some(index);
472 }
473 }
474 CuExecutionUnit::Step(step) => {
475 if step.node_id == node_id {
476 return step.output_msg_index_type.clone();
477 }
478 }
479 }
480 }
481 None
482}
483
484pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuTaskType {
485 if graph.incoming_neighbor_count(node_id) == 0 {
486 CuTaskType::Source
487 } else if graph.outgoing_neighbor_count(node_id) == 0 {
488 CuTaskType::Sink
489 } else {
490 CuTaskType::Regular
491 }
492}
493
494fn find_edge_with_plan_input_id(
497 plan: &[CuExecutionUnit],
498 graph: &CuGraph,
499 plan_id: u32,
500 output_node_id: NodeId,
501) -> usize {
502 let input_node = plan
503 .get(plan_id as usize)
504 .expect("Input step should've been added to plan before the step that receives the input");
505 let CuExecutionUnit::Step(input_step) = input_node else {
506 panic!("Expected input to be from a step, not a loop");
507 };
508 let input_node_id = input_step.node_id;
509
510 graph
511 .0
512 .edges_connecting(input_node_id.into(), output_node_id.into())
513 .map(|edge| edge.id().index())
514 .next()
515 .expect("An edge connecting the input to the output should exist")
516}
517
518fn sort_inputs_by_cnx_id(
521 input_msg_indices_types: &mut [(u32, String)],
522 plan: &[CuExecutionUnit],
523 graph: &CuGraph,
524 curr_node_id: NodeId,
525) {
526 input_msg_indices_types.sort_by(|(a_index, _), (b_index, _)| {
527 let a_edge_id = find_edge_with_plan_input_id(plan, graph, *a_index, curr_node_id);
528 let b_edge_id = find_edge_with_plan_input_id(plan, graph, *b_index, curr_node_id);
529 a_edge_id.cmp(&b_edge_id)
530 });
531}
532fn plan_tasks_tree_branch(
534 graph: &CuGraph,
535 mut next_culist_output_index: u32,
536 starting_point: NodeId,
537 plan: &mut Vec<CuExecutionUnit>,
538) -> (u32, bool) {
539 #[cfg(all(feature = "std", feature = "macro_debug"))]
540 eprintln!("-- starting branch from node {starting_point}");
541
542 let mut visitor = Bfs::new(&graph.0, starting_point.into());
543 let mut handled = false;
544
545 while let Some(node) = visitor.next(&graph.0) {
546 let id = node.index() as NodeId;
547 let node_ref = graph.get_node(id).unwrap();
548 #[cfg(all(feature = "std", feature = "macro_debug"))]
549 eprintln!(" Visiting node: {node_ref:?}");
550
551 let mut input_msg_indices_types: Vec<(u32, String)> = Vec::new();
552 let output_msg_index_type: Option<(u32, String)>;
553 let task_type = find_task_type_for_id(graph, id);
554
555 match task_type {
556 CuTaskType::Source => {
557 #[cfg(all(feature = "std", feature = "macro_debug"))]
558 eprintln!(" → Source node, assign output index {next_culist_output_index}");
559 output_msg_index_type = Some((
560 next_culist_output_index,
561 graph
562 .0
563 .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0]))
564 .unwrap() .msg
566 .clone(),
567 ));
568 next_culist_output_index += 1;
569 }
570 CuTaskType::Sink => {
571 let parents: Vec<NodeId> = graph.get_neighbor_ids(id, CuDirection::Incoming);
572 #[cfg(all(feature = "std", feature = "macro_debug"))]
573 eprintln!(" → Sink with parents: {parents:?}");
574 for parent in parents {
575 let pid = parent;
576 let index_type = find_output_index_type_from_nodeid(pid, plan);
577 if let Some(index_type) = index_type {
578 #[cfg(all(feature = "std", feature = "macro_debug"))]
579 eprintln!(" ✓ Input from {pid} ready: {index_type:?}");
580 input_msg_indices_types.push(index_type);
581 } else {
582 #[cfg(all(feature = "std", feature = "macro_debug"))]
583 eprintln!(" ✗ Input from {pid} not ready, returning");
584 return (next_culist_output_index, handled);
585 }
586 }
587 output_msg_index_type = Some((next_culist_output_index, "()".to_string()));
588 next_culist_output_index += 1;
589 }
590 CuTaskType::Regular => {
591 let parents: Vec<NodeId> = graph.get_neighbor_ids(id, CuDirection::Incoming);
592 #[cfg(all(feature = "std", feature = "macro_debug"))]
593 eprintln!(" → Regular task with parents: {parents:?}");
594 for parent in parents {
595 let pid = parent;
596 let index_type = find_output_index_type_from_nodeid(pid, plan);
597 if let Some(index_type) = index_type {
598 #[cfg(all(feature = "std", feature = "macro_debug"))]
599 eprintln!(" ✓ Input from {pid} ready: {index_type:?}");
600 input_msg_indices_types.push(index_type);
601 } else {
602 #[cfg(all(feature = "std", feature = "macro_debug"))]
603 eprintln!(" ✗ Input from {pid} not ready, returning");
604 return (next_culist_output_index, handled);
605 }
606 }
607 output_msg_index_type = Some((
608 next_culist_output_index,
609 graph
610 .0
611 .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0])) .unwrap()
613 .msg
614 .clone(),
615 ));
616 next_culist_output_index += 1;
617 }
618 }
619
620 sort_inputs_by_cnx_id(&mut input_msg_indices_types, plan, graph, id);
621
622 if let Some(pos) = plan
623 .iter()
624 .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
625 {
626 #[cfg(all(feature = "std", feature = "macro_debug"))]
627 eprintln!(" → Already in plan, modifying existing step");
628 let mut step = plan.remove(pos);
629 if let CuExecutionUnit::Step(ref mut s) = step {
630 s.input_msg_indices_types = input_msg_indices_types;
631 }
632 plan.push(step);
633 } else {
634 #[cfg(all(feature = "std", feature = "macro_debug"))]
635 eprintln!(" → New step added to plan");
636 let step = CuExecutionStep {
637 node_id: id,
638 node: node_ref.clone(),
639 task_type,
640 input_msg_indices_types,
641 output_msg_index_type,
642 };
643 plan.push(CuExecutionUnit::Step(step));
644 }
645
646 handled = true;
647 }
648
649 #[cfg(all(feature = "std", feature = "macro_debug"))]
650 eprintln!("-- finished branch from node {starting_point} with handled={handled}");
651 (next_culist_output_index, handled)
652}
653
654pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
657 #[cfg(all(feature = "std", feature = "macro_debug"))]
658 eprintln!("[runtime plan]");
659 let visited = graph.0.visit_map();
660 let mut plan = Vec::new();
661 let mut next_culist_output_index = 0u32;
662
663 let mut queue: VecDeque<NodeId> = graph
664 .node_indices()
665 .iter()
666 .filter(|&node| find_task_type_for_id(graph, node.index() as NodeId) == CuTaskType::Source)
667 .map(|node| node.index() as NodeId)
668 .collect();
669
670 #[cfg(all(feature = "std", feature = "macro_debug"))]
671 eprintln!("Initial source nodes: {queue:?}");
672
673 while let Some(start_node) = queue.pop_front() {
674 if visited.is_visited(&start_node) {
675 #[cfg(all(feature = "std", feature = "macro_debug"))]
676 eprintln!("→ Skipping already visited source {start_node}");
677 continue;
678 }
679
680 #[cfg(all(feature = "std", feature = "macro_debug"))]
681 eprintln!("→ Starting BFS from source {start_node}");
682 let mut bfs = Bfs::new(&graph.0, start_node.into());
683
684 while let Some(node_index) = bfs.next(&graph.0) {
685 let node_id = node_index.index() as NodeId;
686 let already_in_plan = plan
687 .iter()
688 .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
689 if already_in_plan {
690 #[cfg(all(feature = "std", feature = "macro_debug"))]
691 eprintln!(" → Node {node_id} already planned, skipping");
692 continue;
693 }
694
695 #[cfg(all(feature = "std", feature = "macro_debug"))]
696 eprintln!(" Planning from node {node_id}");
697 let (new_index, handled) =
698 plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan);
699 next_culist_output_index = new_index;
700
701 if !handled {
702 #[cfg(all(feature = "std", feature = "macro_debug"))]
703 eprintln!(" ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
704 continue;
705 }
706
707 #[cfg(all(feature = "std", feature = "macro_debug"))]
708 eprintln!(" ✓ Node {node_id} handled successfully, enqueueing neighbors");
709 for neighbor in graph.0.neighbors(node_index) {
710 if !visited.is_visited(&neighbor) {
711 let nid = neighbor.index() as NodeId;
712 #[cfg(all(feature = "std", feature = "macro_debug"))]
713 eprintln!(" → Enqueueing neighbor {nid}");
714 queue.push_back(nid);
715 }
716 }
717 }
718 }
719
720 Ok(CuExecutionLoop {
721 steps: plan,
722 loop_count: None,
723 })
724}
725
726#[cfg(test)]
728mod tests {
729 use super::*;
730 use crate::config::Node;
731 use crate::cutask::CuSinkTask;
732 use crate::cutask::{CuSrcTask, Freezable};
733 use crate::monitoring::NoMonitor;
734 use bincode::Encode;
735 use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks};
736 use serde_derive::Serialize;
737
738 pub struct TestSource {}
739
740 impl Freezable for TestSource {}
741
742 impl CuSrcTask for TestSource {
743 type Output<'m> = ();
744 fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
745 where
746 Self: Sized,
747 {
748 Ok(Self {})
749 }
750
751 fn process(
752 &mut self,
753 _clock: &RobotClock,
754 _empty_msg: &mut Self::Output<'_>,
755 ) -> CuResult<()> {
756 Ok(())
757 }
758 }
759
760 pub struct TestSink {}
761
762 impl Freezable for TestSink {}
763
764 impl CuSinkTask for TestSink {
765 type Input<'m> = ();
766
767 fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
768 where
769 Self: Sized,
770 {
771 Ok(Self {})
772 }
773
774 fn process(&mut self, _clock: &RobotClock, _input: &Self::Input<'_>) -> CuResult<()> {
775 Ok(())
776 }
777 }
778
779 type Tasks = (TestSource, TestSink);
781
782 #[derive(Debug, Encode, Decode, Serialize, Default)]
783 struct Msgs(());
784
785 impl ErasedCuStampedDataSet for Msgs {
786 fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
787 Vec::new()
788 }
789 }
790
791 impl MatchingTasks for Msgs {
792 fn get_all_task_ids() -> &'static [&'static str] {
793 &[]
794 }
795 }
796
797 impl CuListZeroedInit for Msgs {
798 fn init_zeroed(&mut self) {}
799 }
800
801 #[cfg(feature = "std")]
802 fn tasks_instanciator(
803 all_instances_configs: Vec<Option<&ComponentConfig>>,
804 _threadpool: Arc<ThreadPool>,
805 ) -> CuResult<Tasks> {
806 Ok((
807 TestSource::new(all_instances_configs[0])?,
808 TestSink::new(all_instances_configs[1])?,
809 ))
810 }
811
812 #[cfg(not(feature = "std"))]
813 fn tasks_instanciator(all_instances_configs: Vec<Option<&ComponentConfig>>) -> CuResult<Tasks> {
814 Ok((
815 TestSource::new(all_instances_configs[0])?,
816 TestSink::new(all_instances_configs[1])?,
817 ))
818 }
819
820 fn monitor_instanciator(_config: &CuConfig) -> NoMonitor {
821 NoMonitor {}
822 }
823
824 fn bridges_instanciator(_config: &CuConfig) -> CuResult<()> {
825 Ok(())
826 }
827
828 #[derive(Debug)]
829 struct FakeWriter {}
830
831 impl<E: Encode> WriteStream<E> for FakeWriter {
832 fn log(&mut self, _obj: &E) -> CuResult<()> {
833 Ok(())
834 }
835 }
836
837 #[test]
838 fn test_runtime_instantiation() {
839 let mut config = CuConfig::default();
840 let graph = config.get_graph_mut(None).unwrap();
841 graph.add_node(Node::new("a", "TestSource")).unwrap();
842 graph.add_node(Node::new("b", "TestSink")).unwrap();
843 graph.connect(0, 1, "()").unwrap();
844 let runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
845 RobotClock::default(),
846 &config,
847 None,
848 tasks_instanciator,
849 monitor_instanciator,
850 bridges_instanciator,
851 FakeWriter {},
852 FakeWriter {},
853 );
854 assert!(runtime.is_ok());
855 }
856
857 #[test]
858 fn test_copperlists_manager_lifecycle() {
859 let mut config = CuConfig::default();
860 let graph = config.get_graph_mut(None).unwrap();
861 graph.add_node(Node::new("a", "TestSource")).unwrap();
862 graph.add_node(Node::new("b", "TestSink")).unwrap();
863 graph.connect(0, 1, "()").unwrap();
864
865 let mut runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
866 RobotClock::default(),
867 &config,
868 None,
869 tasks_instanciator,
870 monitor_instanciator,
871 bridges_instanciator,
872 FakeWriter {},
873 FakeWriter {},
874 )
875 .unwrap();
876
877 {
879 let copperlists = &mut runtime.copperlists_manager;
880 let culist0 = copperlists
881 .inner
882 .create()
883 .expect("Ran out of space for copper lists");
884 let id = culist0.id;
886 assert_eq!(id, 0);
887 culist0.change_state(CopperListState::Processing);
888 assert_eq!(copperlists.available_copper_lists(), 1);
889 }
890
891 {
892 let copperlists = &mut runtime.copperlists_manager;
893 let culist1 = copperlists
894 .inner
895 .create()
896 .expect("Ran out of space for copper lists"); let id = culist1.id;
898 assert_eq!(id, 1);
899 culist1.change_state(CopperListState::Processing);
900 assert_eq!(copperlists.available_copper_lists(), 0);
901 }
902
903 {
904 let copperlists = &mut runtime.copperlists_manager;
905 let culist2 = copperlists.inner.create();
906 assert!(culist2.is_none());
907 assert_eq!(copperlists.available_copper_lists(), 0);
908 let _ = copperlists.end_of_processing(1);
910 assert_eq!(copperlists.available_copper_lists(), 1);
911 }
912
913 {
915 let copperlists = &mut runtime.copperlists_manager;
916 let culist2 = copperlists
917 .inner
918 .create()
919 .expect("Ran out of space for copper lists"); let id = culist2.id;
921 assert_eq!(id, 2);
922 culist2.change_state(CopperListState::Processing);
923 assert_eq!(copperlists.available_copper_lists(), 0);
924 let _ = copperlists.end_of_processing(0);
926 assert_eq!(copperlists.available_copper_lists(), 0);
928
929 let _ = copperlists.end_of_processing(2);
931 assert_eq!(copperlists.available_copper_lists(), 2);
934 }
935 }
936
937 #[test]
938 fn test_runtime_task_input_order() {
939 let mut config = CuConfig::default();
940 let graph = config.get_graph_mut(None).unwrap();
941 let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
942 let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
943 let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
944
945 assert_eq!(src1_id, 0);
946 assert_eq!(src2_id, 1);
947
948 let src1_type = "src1_type";
950 let src2_type = "src2_type";
951 graph.connect(src2_id, sink_id, src2_type).unwrap();
952 graph.connect(src1_id, sink_id, src1_type).unwrap();
953
954 let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
955 let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
956 assert_eq!(src1_edge_id, 1);
959 assert_eq!(src2_edge_id, 0);
960
961 let runtime = compute_runtime_plan(graph).unwrap();
962 let sink_step = runtime
963 .steps
964 .iter()
965 .find_map(|step| match step {
966 CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
967 _ => None,
968 })
969 .unwrap();
970
971 assert_eq!(sink_step.input_msg_indices_types[0].1, src2_type);
974 assert_eq!(sink_step.input_msg_indices_types[1].1, src1_type);
975 }
976
977 #[test]
978 fn test_runtime_plan_diamond_case1() {
979 let mut config = CuConfig::default();
981 let graph = config.get_graph_mut(None).unwrap();
982 let cam0_id = graph
983 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
984 .unwrap();
985 let inf0_id = graph
986 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
987 .unwrap();
988 let broadcast_id = graph
989 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
990 .unwrap();
991
992 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
994 graph.connect(cam0_id, inf0_id, "i32").unwrap();
995 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
996
997 let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
998 let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
999
1000 assert_eq!(edge_cam0_to_inf0, 0);
1001 assert_eq!(edge_cam0_to_broadcast, 1);
1002
1003 let runtime = compute_runtime_plan(graph).unwrap();
1004 let broadcast_step = runtime
1005 .steps
1006 .iter()
1007 .find_map(|step| match step {
1008 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1009 _ => None,
1010 })
1011 .unwrap();
1012
1013 assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
1014 assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
1015 }
1016
1017 #[test]
1018 fn test_runtime_plan_diamond_case2() {
1019 let mut config = CuConfig::default();
1021 let graph = config.get_graph_mut(None).unwrap();
1022 let cam0_id = graph
1023 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1024 .unwrap();
1025 let inf0_id = graph
1026 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1027 .unwrap();
1028 let broadcast_id = graph
1029 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1030 .unwrap();
1031
1032 graph.connect(cam0_id, inf0_id, "i32").unwrap();
1034 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1035 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1036
1037 let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1038 let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
1039
1040 assert_eq!(edge_cam0_to_broadcast, 0);
1041 assert_eq!(edge_cam0_to_inf0, 1);
1042
1043 let runtime = compute_runtime_plan(graph).unwrap();
1044 let broadcast_step = runtime
1045 .steps
1046 .iter()
1047 .find_map(|step| match step {
1048 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1049 _ => None,
1050 })
1051 .unwrap();
1052
1053 assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
1054 assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
1055 }
1056}