1use crate::config::{ComponentConfig, CuDirection, Node, DEFAULT_KEYFRAME_INTERVAL};
6use crate::config::{CuConfig, CuGraph, NodeId, RuntimeConfig};
7use crate::copperlist::{CopperList, CopperListState, CuListZeroedInit, CuListsManager};
8use crate::cutask::{BincodeAdapter, Freezable};
9use crate::monitoring::{build_monitor_topology, CuMonitor};
10use crate::resource::ResourceManager;
11use cu29_clock::{ClockProvider, CuTime, RobotClock};
12use cu29_traits::CuResult;
13use cu29_traits::WriteStream;
14use cu29_traits::{CopperListTuple, CuError};
15
16use bincode::enc::write::{SizeWriter, SliceWriter};
17use bincode::enc::EncoderImpl;
18use bincode::error::EncodeError;
19use bincode::{Decode, Encode};
20use core::fmt::{Debug, Formatter};
21use petgraph::prelude::*;
22use petgraph::visit::VisitMap;
23use petgraph::visit::Visitable;
24
25#[cfg(not(feature = "std"))]
26mod imp {
27 pub use alloc::boxed::Box;
28 pub use alloc::collections::VecDeque;
29 pub use alloc::format;
30 pub use alloc::string::String;
31 pub use alloc::string::ToString;
32 pub use alloc::vec::Vec;
33 pub use core::fmt::Result as FmtResult;
34}
35
36#[cfg(feature = "std")]
37mod imp {
38 pub use cu29_log_runtime::LoggerRuntime;
39 pub use cu29_unifiedlog::UnifiedLoggerWrite;
40 pub use rayon::ThreadPool;
41 pub use std::collections::VecDeque;
42 pub use std::fmt::Result as FmtResult;
43 pub use std::sync::{Arc, Mutex};
44}
45
46use imp::*;
47
48#[cfg(feature = "std")]
50pub struct CopperContext {
51 pub unified_logger: Arc<Mutex<UnifiedLoggerWrite>>,
52 pub logger_runtime: LoggerRuntime,
53 pub clock: RobotClock,
54}
55
56pub struct CopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
58 pub inner: CuListsManager<P, NBCL>,
59 pub logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
61}
62
63impl<P: CopperListTuple + Default, const NBCL: usize> CopperListsManager<P, NBCL> {
64 pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
65 let mut is_top = true;
66 let mut nb_done = 0;
67 for cl in self.inner.iter_mut() {
68 if cl.id == culistid && cl.get_state() == CopperListState::Processing {
69 cl.change_state(CopperListState::DoneProcessing);
70 }
71 if is_top && cl.get_state() == CopperListState::DoneProcessing {
72 if let Some(logger) = &mut self.logger {
73 cl.change_state(CopperListState::BeingSerialized);
74 logger.log(cl)?;
75 }
76 cl.change_state(CopperListState::Free);
77 nb_done += 1;
78 } else {
79 is_top = false;
80 }
81 }
82 for _ in 0..nb_done {
83 let _ = self.inner.pop();
84 }
85 Ok(())
86 }
87
88 pub fn available_copper_lists(&self) -> usize {
89 NBCL - self.inner.len()
90 }
91}
92
93pub struct KeyFramesManager {
95 inner: KeyFrame,
97
98 logger: Option<Box<dyn WriteStream<KeyFrame>>>,
100
101 keyframe_interval: u32,
103}
104
105impl KeyFramesManager {
106 fn is_keyframe(&self, culistid: u32) -> bool {
107 self.logger.is_some() && culistid.is_multiple_of(self.keyframe_interval)
108 }
109
110 pub fn reset(&mut self, culistid: u32, clock: &RobotClock) {
111 if self.is_keyframe(culistid) {
112 self.inner.reset(culistid, clock.now());
113 }
114 }
115
116 pub fn freeze_task(&mut self, culistid: u32, task: &impl Freezable) -> CuResult<usize> {
117 if self.is_keyframe(culistid) {
118 if self.inner.culistid != culistid {
119 panic!("Freezing task for a different culistid");
120 }
121 self.inner
122 .add_frozen_task(task)
123 .map_err(|e| CuError::from(format!("Failed to serialize task: {e}")))
124 } else {
125 Ok(0)
126 }
127 }
128
129 pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
130 if self.is_keyframe(culistid) {
131 let logger = self.logger.as_mut().unwrap();
132 logger.log(&self.inner)
133 } else {
134 Ok(())
135 }
136 }
137}
138
139pub struct CuRuntime<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
143 pub clock: RobotClock, pub tasks: CT,
148
149 pub bridges: CB,
151
152 pub resources: ResourceManager,
154
155 #[cfg(feature = "std")]
157 pub threadpool: Arc<ThreadPool>,
158
159 pub monitor: M,
161
162 pub copperlists_manager: CopperListsManager<P, NBCL>,
164
165 pub keyframes_manager: KeyFramesManager,
167
168 pub runtime_config: RuntimeConfig,
170}
171
172impl<CT, CB, P: CopperListTuple + CuListZeroedInit + Default, M: CuMonitor, const NBCL: usize>
174 ClockProvider for CuRuntime<CT, CB, P, M, NBCL>
175{
176 fn get_clock(&self) -> RobotClock {
177 self.clock.clone()
178 }
179}
180
181#[derive(Encode, Decode)]
185pub struct KeyFrame {
186 pub culistid: u32,
188 pub timestamp: CuTime,
190 pub serialized_tasks: Vec<u8>,
192}
193
194impl KeyFrame {
195 fn new() -> Self {
196 KeyFrame {
197 culistid: 0,
198 timestamp: CuTime::default(),
199 serialized_tasks: Vec::new(),
200 }
201 }
202
203 fn reset(&mut self, culistid: u32, timestamp: CuTime) {
205 self.culistid = culistid;
206 self.timestamp = timestamp;
207 self.serialized_tasks.clear();
208 }
209
210 fn add_frozen_task(&mut self, task: &impl Freezable) -> Result<usize, EncodeError> {
212 let cfg = bincode::config::standard();
213 let mut sizer = EncoderImpl::<_, _>::new(SizeWriter::default(), cfg);
214 BincodeAdapter(task).encode(&mut sizer)?;
215 let need = sizer.into_writer().bytes_written as usize;
216
217 let start = self.serialized_tasks.len();
218 self.serialized_tasks.resize(start + need, 0);
219 let mut enc =
220 EncoderImpl::<_, _>::new(SliceWriter::new(&mut self.serialized_tasks[start..]), cfg);
221 BincodeAdapter(task).encode(&mut enc)?;
222 Ok(need)
223 }
224}
225
226impl<
227 CT,
228 CB,
229 P: CopperListTuple + CuListZeroedInit + Default + 'static,
230 M: CuMonitor,
231 const NBCL: usize,
232 > CuRuntime<CT, CB, P, M, NBCL>
233{
234 #[allow(clippy::too_many_arguments)]
236 #[cfg(feature = "std")]
237 pub fn new(
238 clock: RobotClock,
239 config: &CuConfig,
240 mission: Option<&str>,
241 resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
242 tasks_instanciator: impl for<'c> Fn(
243 Vec<Option<&'c ComponentConfig>>,
244 &mut ResourceManager,
245 Arc<ThreadPool>,
246 ) -> CuResult<CT>,
247 monitor_instanciator: impl Fn(&CuConfig) -> M,
248 bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
249 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
250 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
251 ) -> CuResult<Self> {
252 let graph = config.get_graph(mission)?;
253 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
254 .get_all_nodes()
255 .iter()
256 .map(|(_, node)| node.get_instance_config())
257 .collect();
258
259 let threadpool = Arc::new(
262 rayon::ThreadPoolBuilder::new()
263 .num_threads(2) .build()
265 .expect("Could not create the threadpool"),
266 );
267
268 let mut resources = resources_instanciator(config)?;
269
270 let tasks = tasks_instanciator(all_instances_configs, &mut resources, threadpool.clone())?;
271 let mut monitor = monitor_instanciator(config);
272 if let Ok(topology) = build_monitor_topology(config, mission) {
273 monitor.set_topology(topology);
274 }
275 let bridges = bridges_instanciator(config, &mut resources)?;
276
277 let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
278 Some(logging_config) if logging_config.enable_task_logging => (
279 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
280 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
281 logging_config.keyframe_interval.unwrap(), ),
283 Some(_) => (None, None, 0), None => (
285 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
287 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
288 DEFAULT_KEYFRAME_INTERVAL,
289 ),
290 };
291
292 let copperlists_manager = CopperListsManager {
293 inner: CuListsManager::new(),
294 logger: copperlists_logger,
295 };
296
297 let keyframes_manager = KeyFramesManager {
298 inner: KeyFrame::new(),
299 logger: keyframes_logger,
300 keyframe_interval,
301 };
302
303 let runtime_config = config.runtime.clone().unwrap_or_default();
304
305 let runtime = Self {
306 tasks,
307 bridges,
308 resources,
309 threadpool,
310 monitor,
311 clock,
312 copperlists_manager,
313 keyframes_manager,
314 runtime_config,
315 };
316
317 Ok(runtime)
318 }
319
320 #[allow(clippy::too_many_arguments)]
321 #[cfg(not(feature = "std"))]
322 pub fn new(
323 clock: RobotClock,
324 config: &CuConfig,
325 mission: Option<&str>,
326 resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
327 tasks_instanciator: impl for<'c> Fn(
328 Vec<Option<&'c ComponentConfig>>,
329 &mut ResourceManager,
330 ) -> CuResult<CT>,
331 monitor_instanciator: impl Fn(&CuConfig) -> M,
332 bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
333 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
334 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
335 ) -> CuResult<Self> {
336 let graph = config.get_graph(mission)?;
337 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
338 .get_all_nodes()
339 .iter()
340 .map(|(_, node)| node.get_instance_config())
341 .collect();
342
343 let mut resources = resources_instanciator(config)?;
344
345 let tasks = tasks_instanciator(all_instances_configs, &mut resources)?;
346
347 let mut monitor = monitor_instanciator(config);
348 if let Ok(topology) = build_monitor_topology(config, mission) {
349 monitor.set_topology(topology);
350 }
351 let bridges = bridges_instanciator(config, &mut resources)?;
352
353 let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
354 Some(logging_config) if logging_config.enable_task_logging => (
355 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
356 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
357 logging_config.keyframe_interval.unwrap(), ),
359 Some(_) => (None, None, 0), None => (
361 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
363 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
364 DEFAULT_KEYFRAME_INTERVAL,
365 ),
366 };
367
368 let copperlists_manager = CopperListsManager {
369 inner: CuListsManager::new(),
370 logger: copperlists_logger,
371 };
372
373 let keyframes_manager = KeyFramesManager {
374 inner: KeyFrame::new(),
375 logger: keyframes_logger,
376 keyframe_interval,
377 };
378
379 let runtime_config = config.runtime.clone().unwrap_or_default();
380
381 let runtime = Self {
382 tasks,
383 bridges,
384 #[cfg(feature = "std")]
385 threadpool,
386 resources,
387 monitor,
388 clock,
389 copperlists_manager,
390 keyframes_manager,
391 runtime_config,
392 };
393
394 Ok(runtime)
395 }
396}
397
398#[derive(Debug, PartialEq, Eq, Clone, Copy)]
403pub enum CuTaskType {
404 Source,
405 Regular,
406 Sink,
407}
408
409pub struct CuExecutionStep {
411 pub node_id: NodeId,
413 pub node: Node,
415 pub task_type: CuTaskType,
417
418 pub input_msg_indices_types: Vec<(u32, String)>,
420
421 pub output_msg_index_type: Option<(u32, String)>,
423}
424
425impl Debug for CuExecutionStep {
426 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
427 f.write_str(format!(" CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
428 f.write_str(format!(" task_type: {:?}\n", self.node.get_type()).as_str())?;
429 f.write_str(format!(" task: {:?}\n", self.task_type).as_str())?;
430 f.write_str(
431 format!(
432 " input_msg_types: {:?}\n",
433 self.input_msg_indices_types
434 )
435 .as_str(),
436 )?;
437 f.write_str(
438 format!(" output_msg_type: {:?}\n", self.output_msg_index_type).as_str(),
439 )?;
440 Ok(())
441 }
442}
443
444pub struct CuExecutionLoop {
449 pub steps: Vec<CuExecutionUnit>,
450 pub loop_count: Option<u32>,
451}
452
453impl Debug for CuExecutionLoop {
454 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
455 f.write_str("CuExecutionLoop:\n")?;
456 for step in &self.steps {
457 match step {
458 CuExecutionUnit::Step(step) => {
459 step.fmt(f)?;
460 }
461 CuExecutionUnit::Loop(l) => {
462 l.fmt(f)?;
463 }
464 }
465 }
466
467 f.write_str(format!(" count: {:?}", self.loop_count).as_str())?;
468 Ok(())
469 }
470}
471
472#[derive(Debug)]
474pub enum CuExecutionUnit {
475 Step(CuExecutionStep),
476 Loop(CuExecutionLoop),
477}
478
479fn find_output_index_type_from_nodeid(
480 node_id: NodeId,
481 steps: &Vec<CuExecutionUnit>,
482) -> Option<(u32, String)> {
483 for step in steps {
484 match step {
485 CuExecutionUnit::Loop(loop_unit) => {
486 if let Some(index) = find_output_index_type_from_nodeid(node_id, &loop_unit.steps) {
487 return Some(index);
488 }
489 }
490 CuExecutionUnit::Step(step) => {
491 if step.node_id == node_id {
492 return step.output_msg_index_type.clone();
493 }
494 }
495 }
496 }
497 None
498}
499
500pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuTaskType {
501 if graph.incoming_neighbor_count(node_id) == 0 {
502 CuTaskType::Source
503 } else if graph.outgoing_neighbor_count(node_id) == 0 {
504 CuTaskType::Sink
505 } else {
506 CuTaskType::Regular
507 }
508}
509
510fn find_edge_with_plan_input_id(
513 plan: &[CuExecutionUnit],
514 graph: &CuGraph,
515 plan_id: u32,
516 output_node_id: NodeId,
517) -> usize {
518 let input_node = plan
519 .get(plan_id as usize)
520 .expect("Input step should've been added to plan before the step that receives the input");
521 let CuExecutionUnit::Step(input_step) = input_node else {
522 panic!("Expected input to be from a step, not a loop");
523 };
524 let input_node_id = input_step.node_id;
525
526 graph
527 .0
528 .edges_connecting(input_node_id.into(), output_node_id.into())
529 .map(|edge| edge.id().index())
530 .next()
531 .expect("An edge connecting the input to the output should exist")
532}
533
534fn sort_inputs_by_cnx_id(
537 input_msg_indices_types: &mut [(u32, String)],
538 plan: &[CuExecutionUnit],
539 graph: &CuGraph,
540 curr_node_id: NodeId,
541) {
542 input_msg_indices_types.sort_by(|(a_index, _), (b_index, _)| {
543 let a_edge_id = find_edge_with_plan_input_id(plan, graph, *a_index, curr_node_id);
544 let b_edge_id = find_edge_with_plan_input_id(plan, graph, *b_index, curr_node_id);
545 a_edge_id.cmp(&b_edge_id)
546 });
547}
548fn plan_tasks_tree_branch(
550 graph: &CuGraph,
551 mut next_culist_output_index: u32,
552 starting_point: NodeId,
553 plan: &mut Vec<CuExecutionUnit>,
554) -> (u32, bool) {
555 #[cfg(all(feature = "std", feature = "macro_debug"))]
556 eprintln!("-- starting branch from node {starting_point}");
557
558 let mut visitor = Bfs::new(&graph.0, starting_point.into());
559 let mut handled = false;
560
561 while let Some(node) = visitor.next(&graph.0) {
562 let id = node.index() as NodeId;
563 let node_ref = graph.get_node(id).unwrap();
564 #[cfg(all(feature = "std", feature = "macro_debug"))]
565 eprintln!(" Visiting node: {node_ref:?}");
566
567 let mut input_msg_indices_types: Vec<(u32, String)> = Vec::new();
568 let output_msg_index_type: Option<(u32, String)>;
569 let task_type = find_task_type_for_id(graph, id);
570
571 match task_type {
572 CuTaskType::Source => {
573 #[cfg(all(feature = "std", feature = "macro_debug"))]
574 eprintln!(" → Source node, assign output index {next_culist_output_index}");
575 output_msg_index_type = Some((
576 next_culist_output_index,
577 graph
578 .0
579 .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0]))
580 .unwrap() .msg
582 .clone(),
583 ));
584 next_culist_output_index += 1;
585 }
586 CuTaskType::Sink => {
587 let parents: Vec<NodeId> = graph.get_neighbor_ids(id, CuDirection::Incoming);
588 #[cfg(all(feature = "std", feature = "macro_debug"))]
589 eprintln!(" → Sink with parents: {parents:?}");
590 for parent in parents {
591 let pid = parent;
592 let index_type = find_output_index_type_from_nodeid(pid, plan);
593 if let Some(index_type) = index_type {
594 #[cfg(all(feature = "std", feature = "macro_debug"))]
595 eprintln!(" ✓ Input from {pid} ready: {index_type:?}");
596 input_msg_indices_types.push(index_type);
597 } else {
598 #[cfg(all(feature = "std", feature = "macro_debug"))]
599 eprintln!(" ✗ Input from {pid} not ready, returning");
600 return (next_culist_output_index, handled);
601 }
602 }
603 output_msg_index_type = Some((next_culist_output_index, "()".to_string()));
604 next_culist_output_index += 1;
605 }
606 CuTaskType::Regular => {
607 let parents: Vec<NodeId> = graph.get_neighbor_ids(id, CuDirection::Incoming);
608 #[cfg(all(feature = "std", feature = "macro_debug"))]
609 eprintln!(" → Regular task with parents: {parents:?}");
610 for parent in parents {
611 let pid = parent;
612 let index_type = find_output_index_type_from_nodeid(pid, plan);
613 if let Some(index_type) = index_type {
614 #[cfg(all(feature = "std", feature = "macro_debug"))]
615 eprintln!(" ✓ Input from {pid} ready: {index_type:?}");
616 input_msg_indices_types.push(index_type);
617 } else {
618 #[cfg(all(feature = "std", feature = "macro_debug"))]
619 eprintln!(" ✗ Input from {pid} not ready, returning");
620 return (next_culist_output_index, handled);
621 }
622 }
623 output_msg_index_type = Some((
624 next_culist_output_index,
625 graph
626 .0
627 .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0])) .unwrap()
629 .msg
630 .clone(),
631 ));
632 next_culist_output_index += 1;
633 }
634 }
635
636 sort_inputs_by_cnx_id(&mut input_msg_indices_types, plan, graph, id);
637
638 if let Some(pos) = plan
639 .iter()
640 .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
641 {
642 #[cfg(all(feature = "std", feature = "macro_debug"))]
643 eprintln!(" → Already in plan, modifying existing step");
644 let mut step = plan.remove(pos);
645 if let CuExecutionUnit::Step(ref mut s) = step {
646 s.input_msg_indices_types = input_msg_indices_types;
647 }
648 plan.push(step);
649 } else {
650 #[cfg(all(feature = "std", feature = "macro_debug"))]
651 eprintln!(" → New step added to plan");
652 let step = CuExecutionStep {
653 node_id: id,
654 node: node_ref.clone(),
655 task_type,
656 input_msg_indices_types,
657 output_msg_index_type,
658 };
659 plan.push(CuExecutionUnit::Step(step));
660 }
661
662 handled = true;
663 }
664
665 #[cfg(all(feature = "std", feature = "macro_debug"))]
666 eprintln!("-- finished branch from node {starting_point} with handled={handled}");
667 (next_culist_output_index, handled)
668}
669
670pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
673 #[cfg(all(feature = "std", feature = "macro_debug"))]
674 eprintln!("[runtime plan]");
675 let visited = graph.0.visit_map();
676 let mut plan = Vec::new();
677 let mut next_culist_output_index = 0u32;
678
679 let mut queue: VecDeque<NodeId> = graph
680 .node_indices()
681 .iter()
682 .filter(|&node| find_task_type_for_id(graph, node.index() as NodeId) == CuTaskType::Source)
683 .map(|node| node.index() as NodeId)
684 .collect();
685
686 #[cfg(all(feature = "std", feature = "macro_debug"))]
687 eprintln!("Initial source nodes: {queue:?}");
688
689 while let Some(start_node) = queue.pop_front() {
690 if visited.is_visited(&start_node) {
691 #[cfg(all(feature = "std", feature = "macro_debug"))]
692 eprintln!("→ Skipping already visited source {start_node}");
693 continue;
694 }
695
696 #[cfg(all(feature = "std", feature = "macro_debug"))]
697 eprintln!("→ Starting BFS from source {start_node}");
698 let mut bfs = Bfs::new(&graph.0, start_node.into());
699
700 while let Some(node_index) = bfs.next(&graph.0) {
701 let node_id = node_index.index() as NodeId;
702 let already_in_plan = plan
703 .iter()
704 .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
705 if already_in_plan {
706 #[cfg(all(feature = "std", feature = "macro_debug"))]
707 eprintln!(" → Node {node_id} already planned, skipping");
708 continue;
709 }
710
711 #[cfg(all(feature = "std", feature = "macro_debug"))]
712 eprintln!(" Planning from node {node_id}");
713 let (new_index, handled) =
714 plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan);
715 next_culist_output_index = new_index;
716
717 if !handled {
718 #[cfg(all(feature = "std", feature = "macro_debug"))]
719 eprintln!(" ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
720 continue;
721 }
722
723 #[cfg(all(feature = "std", feature = "macro_debug"))]
724 eprintln!(" ✓ Node {node_id} handled successfully, enqueueing neighbors");
725 for neighbor in graph.0.neighbors(node_index) {
726 if !visited.is_visited(&neighbor) {
727 let nid = neighbor.index() as NodeId;
728 #[cfg(all(feature = "std", feature = "macro_debug"))]
729 eprintln!(" → Enqueueing neighbor {nid}");
730 queue.push_back(nid);
731 }
732 }
733 }
734 }
735
736 Ok(CuExecutionLoop {
737 steps: plan,
738 loop_count: None,
739 })
740}
741
742#[cfg(test)]
744mod tests {
745 use super::*;
746 use crate::config::Node;
747 use crate::cutask::CuSinkTask;
748 use crate::cutask::{CuSrcTask, Freezable};
749 use crate::monitoring::NoMonitor;
750 use bincode::Encode;
751 use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks};
752 use serde_derive::Serialize;
753
754 pub struct TestSource {}
755
756 impl Freezable for TestSource {}
757
758 impl CuSrcTask for TestSource {
759 type Resources<'r> = ();
760 type Output<'m> = ();
761 fn new_with(
762 _config: Option<&ComponentConfig>,
763 _resources: Self::Resources<'_>,
764 ) -> CuResult<Self>
765 where
766 Self: Sized,
767 {
768 Ok(Self {})
769 }
770
771 fn process(
772 &mut self,
773 _clock: &RobotClock,
774 _empty_msg: &mut Self::Output<'_>,
775 ) -> CuResult<()> {
776 Ok(())
777 }
778 }
779
780 pub struct TestSink {}
781
782 impl Freezable for TestSink {}
783
784 impl CuSinkTask for TestSink {
785 type Resources<'r> = ();
786 type Input<'m> = ();
787
788 fn new_with(
789 _config: Option<&ComponentConfig>,
790 _resources: Self::Resources<'_>,
791 ) -> CuResult<Self>
792 where
793 Self: Sized,
794 {
795 Ok(Self {})
796 }
797
798 fn process(&mut self, _clock: &RobotClock, _input: &Self::Input<'_>) -> CuResult<()> {
799 Ok(())
800 }
801 }
802
803 type Tasks = (TestSource, TestSink);
805
806 #[derive(Debug, Encode, Decode, Serialize, Default)]
807 struct Msgs(());
808
809 impl ErasedCuStampedDataSet for Msgs {
810 fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
811 Vec::new()
812 }
813 }
814
815 impl MatchingTasks for Msgs {
816 fn get_all_task_ids() -> &'static [&'static str] {
817 &[]
818 }
819 }
820
821 impl CuListZeroedInit for Msgs {
822 fn init_zeroed(&mut self) {}
823 }
824
825 #[cfg(feature = "std")]
826 fn tasks_instanciator(
827 all_instances_configs: Vec<Option<&ComponentConfig>>,
828 _resources: &mut ResourceManager,
829 _threadpool: Arc<ThreadPool>,
830 ) -> CuResult<Tasks> {
831 Ok((
832 TestSource::new(all_instances_configs[0])?,
833 TestSink::new(all_instances_configs[1])?,
834 ))
835 }
836
837 #[cfg(not(feature = "std"))]
838 fn tasks_instanciator(
839 all_instances_configs: Vec<Option<&ComponentConfig>>,
840 _resources: &mut ResourceManager,
841 ) -> CuResult<Tasks> {
842 Ok((
843 TestSource::new(all_instances_configs[0])?,
844 TestSink::new(all_instances_configs[1])?,
845 ))
846 }
847
848 fn monitor_instanciator(_config: &CuConfig) -> NoMonitor {
849 NoMonitor {}
850 }
851
852 fn bridges_instanciator(_config: &CuConfig, _resources: &mut ResourceManager) -> CuResult<()> {
853 Ok(())
854 }
855
856 fn resources_instanciator(_config: &CuConfig) -> CuResult<ResourceManager> {
857 Ok(ResourceManager::new(0))
858 }
859
860 #[derive(Debug)]
861 struct FakeWriter {}
862
863 impl<E: Encode> WriteStream<E> for FakeWriter {
864 fn log(&mut self, _obj: &E) -> CuResult<()> {
865 Ok(())
866 }
867 }
868
869 #[test]
870 fn test_runtime_instantiation() {
871 let mut config = CuConfig::default();
872 let graph = config.get_graph_mut(None).unwrap();
873 graph.add_node(Node::new("a", "TestSource")).unwrap();
874 graph.add_node(Node::new("b", "TestSink")).unwrap();
875 graph.connect(0, 1, "()").unwrap();
876 let runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
877 RobotClock::default(),
878 &config,
879 None,
880 resources_instanciator,
881 tasks_instanciator,
882 monitor_instanciator,
883 bridges_instanciator,
884 FakeWriter {},
885 FakeWriter {},
886 );
887 assert!(runtime.is_ok());
888 }
889
890 #[test]
891 fn test_copperlists_manager_lifecycle() {
892 let mut config = CuConfig::default();
893 let graph = config.get_graph_mut(None).unwrap();
894 graph.add_node(Node::new("a", "TestSource")).unwrap();
895 graph.add_node(Node::new("b", "TestSink")).unwrap();
896 graph.connect(0, 1, "()").unwrap();
897
898 let mut runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
899 RobotClock::default(),
900 &config,
901 None,
902 resources_instanciator,
903 tasks_instanciator,
904 monitor_instanciator,
905 bridges_instanciator,
906 FakeWriter {},
907 FakeWriter {},
908 )
909 .unwrap();
910
911 {
913 let copperlists = &mut runtime.copperlists_manager;
914 let culist0 = copperlists
915 .inner
916 .create()
917 .expect("Ran out of space for copper lists");
918 let id = culist0.id;
920 assert_eq!(id, 0);
921 culist0.change_state(CopperListState::Processing);
922 assert_eq!(copperlists.available_copper_lists(), 1);
923 }
924
925 {
926 let copperlists = &mut runtime.copperlists_manager;
927 let culist1 = copperlists
928 .inner
929 .create()
930 .expect("Ran out of space for copper lists"); let id = culist1.id;
932 assert_eq!(id, 1);
933 culist1.change_state(CopperListState::Processing);
934 assert_eq!(copperlists.available_copper_lists(), 0);
935 }
936
937 {
938 let copperlists = &mut runtime.copperlists_manager;
939 let culist2 = copperlists.inner.create();
940 assert!(culist2.is_none());
941 assert_eq!(copperlists.available_copper_lists(), 0);
942 let _ = copperlists.end_of_processing(1);
944 assert_eq!(copperlists.available_copper_lists(), 1);
945 }
946
947 {
949 let copperlists = &mut runtime.copperlists_manager;
950 let culist2 = copperlists
951 .inner
952 .create()
953 .expect("Ran out of space for copper lists"); let id = culist2.id;
955 assert_eq!(id, 2);
956 culist2.change_state(CopperListState::Processing);
957 assert_eq!(copperlists.available_copper_lists(), 0);
958 let _ = copperlists.end_of_processing(0);
960 assert_eq!(copperlists.available_copper_lists(), 0);
962
963 let _ = copperlists.end_of_processing(2);
965 assert_eq!(copperlists.available_copper_lists(), 2);
968 }
969 }
970
971 #[test]
972 fn test_runtime_task_input_order() {
973 let mut config = CuConfig::default();
974 let graph = config.get_graph_mut(None).unwrap();
975 let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
976 let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
977 let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
978
979 assert_eq!(src1_id, 0);
980 assert_eq!(src2_id, 1);
981
982 let src1_type = "src1_type";
984 let src2_type = "src2_type";
985 graph.connect(src2_id, sink_id, src2_type).unwrap();
986 graph.connect(src1_id, sink_id, src1_type).unwrap();
987
988 let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
989 let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
990 assert_eq!(src1_edge_id, 1);
993 assert_eq!(src2_edge_id, 0);
994
995 let runtime = compute_runtime_plan(graph).unwrap();
996 let sink_step = runtime
997 .steps
998 .iter()
999 .find_map(|step| match step {
1000 CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
1001 _ => None,
1002 })
1003 .unwrap();
1004
1005 assert_eq!(sink_step.input_msg_indices_types[0].1, src2_type);
1008 assert_eq!(sink_step.input_msg_indices_types[1].1, src1_type);
1009 }
1010
1011 #[test]
1012 fn test_runtime_plan_diamond_case1() {
1013 let mut config = CuConfig::default();
1015 let graph = config.get_graph_mut(None).unwrap();
1016 let cam0_id = graph
1017 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1018 .unwrap();
1019 let inf0_id = graph
1020 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1021 .unwrap();
1022 let broadcast_id = graph
1023 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1024 .unwrap();
1025
1026 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1028 graph.connect(cam0_id, inf0_id, "i32").unwrap();
1029 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1030
1031 let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1032 let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
1033
1034 assert_eq!(edge_cam0_to_inf0, 0);
1035 assert_eq!(edge_cam0_to_broadcast, 1);
1036
1037 let runtime = compute_runtime_plan(graph).unwrap();
1038 let broadcast_step = runtime
1039 .steps
1040 .iter()
1041 .find_map(|step| match step {
1042 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1043 _ => None,
1044 })
1045 .unwrap();
1046
1047 assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
1048 assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
1049 }
1050
1051 #[test]
1052 fn test_runtime_plan_diamond_case2() {
1053 let mut config = CuConfig::default();
1055 let graph = config.get_graph_mut(None).unwrap();
1056 let cam0_id = graph
1057 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1058 .unwrap();
1059 let inf0_id = graph
1060 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1061 .unwrap();
1062 let broadcast_id = graph
1063 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1064 .unwrap();
1065
1066 graph.connect(cam0_id, inf0_id, "i32").unwrap();
1068 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1069 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1070
1071 let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1072 let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
1073
1074 assert_eq!(edge_cam0_to_broadcast, 0);
1075 assert_eq!(edge_cam0_to_inf0, 1);
1076
1077 let runtime = compute_runtime_plan(graph).unwrap();
1078 let broadcast_step = runtime
1079 .steps
1080 .iter()
1081 .find_map(|step| match step {
1082 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1083 _ => None,
1084 })
1085 .unwrap();
1086
1087 assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
1088 assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
1089 }
1090}