1use crate::config::{ComponentConfig, CuDirection, DEFAULT_KEYFRAME_INTERVAL, Node};
6use crate::config::{CuConfig, CuGraph, NodeId, RuntimeConfig};
7use crate::copperlist::{CopperList, CopperListState, CuListZeroedInit, CuListsManager};
8use crate::cutask::{BincodeAdapter, Freezable};
9use crate::monitoring::{CuMonitor, build_monitor_topology};
10use crate::resource::ResourceManager;
11use cu29_clock::{ClockProvider, CuTime, RobotClock};
12use cu29_traits::CuResult;
13use cu29_traits::WriteStream;
14use cu29_traits::{CopperListTuple, CuError};
15
16use alloc::boxed::Box;
17use alloc::collections::VecDeque;
18use alloc::format;
19use alloc::string::{String, ToString};
20use alloc::vec::Vec;
21use bincode::enc::EncoderImpl;
22use bincode::enc::write::{SizeWriter, SliceWriter};
23use bincode::error::EncodeError;
24use bincode::{Decode, Encode};
25use core::fmt::Result as FmtResult;
26use core::fmt::{Debug, Formatter};
27use petgraph::prelude::*;
28use petgraph::visit::VisitMap;
29use petgraph::visit::Visitable;
30
31#[cfg(feature = "std")]
32use cu29_log_runtime::LoggerRuntime;
33#[cfg(feature = "std")]
34use cu29_unifiedlog::UnifiedLoggerWrite;
35#[cfg(feature = "std")]
36use std::sync::{Arc, Mutex};
37
38#[cfg(feature = "std")]
40pub struct CopperContext {
41 pub unified_logger: Arc<Mutex<UnifiedLoggerWrite>>,
42 pub logger_runtime: LoggerRuntime,
43 pub clock: RobotClock,
44}
45
46pub struct CopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
48 pub inner: CuListsManager<P, NBCL>,
49 pub logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
51}
52
53impl<P: CopperListTuple + Default, const NBCL: usize> CopperListsManager<P, NBCL> {
54 pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
55 let mut is_top = true;
56 let mut nb_done = 0;
57 for cl in self.inner.iter_mut() {
58 if cl.id == culistid && cl.get_state() == CopperListState::Processing {
59 cl.change_state(CopperListState::DoneProcessing);
60 }
61 if is_top && cl.get_state() == CopperListState::DoneProcessing {
62 if let Some(logger) = &mut self.logger {
63 cl.change_state(CopperListState::BeingSerialized);
64 logger.log(cl)?;
65 }
66 cl.change_state(CopperListState::Free);
67 nb_done += 1;
68 } else {
69 is_top = false;
70 }
71 }
72 for _ in 0..nb_done {
73 let _ = self.inner.pop();
74 }
75 Ok(())
76 }
77
78 pub fn available_copper_lists(&self) -> usize {
79 NBCL - self.inner.len()
80 }
81}
82
83pub struct KeyFramesManager {
85 inner: KeyFrame,
87
88 logger: Option<Box<dyn WriteStream<KeyFrame>>>,
90
91 keyframe_interval: u32,
93}
94
95impl KeyFramesManager {
96 fn is_keyframe(&self, culistid: u32) -> bool {
97 self.logger.is_some() && culistid.is_multiple_of(self.keyframe_interval)
98 }
99
100 pub fn reset(&mut self, culistid: u32, clock: &RobotClock) {
101 if self.is_keyframe(culistid) {
102 self.inner.reset(culistid, clock.now());
103 }
104 }
105
106 pub fn freeze_task(&mut self, culistid: u32, task: &impl Freezable) -> CuResult<usize> {
107 if self.is_keyframe(culistid) {
108 if self.inner.culistid != culistid {
109 panic!("Freezing task for a different culistid");
110 }
111 self.inner
112 .add_frozen_task(task)
113 .map_err(|e| CuError::from(format!("Failed to serialize task: {e}")))
114 } else {
115 Ok(0)
116 }
117 }
118
119 pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
120 if self.is_keyframe(culistid) {
121 let logger = self.logger.as_mut().unwrap();
122 logger.log(&self.inner)
123 } else {
124 Ok(())
125 }
126 }
127}
128
129pub struct CuRuntime<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
133 pub clock: RobotClock, pub tasks: CT,
138
139 pub bridges: CB,
141
142 pub resources: ResourceManager,
144
145 pub monitor: M,
147
148 pub copperlists_manager: CopperListsManager<P, NBCL>,
150
151 pub keyframes_manager: KeyFramesManager,
153
154 pub runtime_config: RuntimeConfig,
156}
157
158impl<CT, CB, P: CopperListTuple + CuListZeroedInit + Default, M: CuMonitor, const NBCL: usize>
160 ClockProvider for CuRuntime<CT, CB, P, M, NBCL>
161{
162 fn get_clock(&self) -> RobotClock {
163 self.clock.clone()
164 }
165}
166
167#[derive(Encode, Decode)]
171pub struct KeyFrame {
172 pub culistid: u32,
174 pub timestamp: CuTime,
176 pub serialized_tasks: Vec<u8>,
178}
179
180impl KeyFrame {
181 fn new() -> Self {
182 KeyFrame {
183 culistid: 0,
184 timestamp: CuTime::default(),
185 serialized_tasks: Vec::new(),
186 }
187 }
188
189 fn reset(&mut self, culistid: u32, timestamp: CuTime) {
191 self.culistid = culistid;
192 self.timestamp = timestamp;
193 self.serialized_tasks.clear();
194 }
195
196 fn add_frozen_task(&mut self, task: &impl Freezable) -> Result<usize, EncodeError> {
198 let cfg = bincode::config::standard();
199 let mut sizer = EncoderImpl::<_, _>::new(SizeWriter::default(), cfg);
200 BincodeAdapter(task).encode(&mut sizer)?;
201 let need = sizer.into_writer().bytes_written as usize;
202
203 let start = self.serialized_tasks.len();
204 self.serialized_tasks.resize(start + need, 0);
205 let mut enc =
206 EncoderImpl::<_, _>::new(SliceWriter::new(&mut self.serialized_tasks[start..]), cfg);
207 BincodeAdapter(task).encode(&mut enc)?;
208 Ok(need)
209 }
210}
211
212impl<
213 CT,
214 CB,
215 P: CopperListTuple + CuListZeroedInit + Default + 'static,
216 M: CuMonitor,
217 const NBCL: usize,
218> CuRuntime<CT, CB, P, M, NBCL>
219{
220 #[allow(clippy::too_many_arguments)]
222 #[cfg(feature = "std")]
223 pub fn new(
224 clock: RobotClock,
225 config: &CuConfig,
226 mission: Option<&str>,
227 resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
228 tasks_instanciator: impl for<'c> Fn(
229 Vec<Option<&'c ComponentConfig>>,
230 &mut ResourceManager,
231 ) -> CuResult<CT>,
232 monitor_instanciator: impl Fn(&CuConfig) -> M,
233 bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
234 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
235 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
236 ) -> CuResult<Self> {
237 let graph = config.get_graph(mission)?;
238 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
239 .get_all_nodes()
240 .iter()
241 .map(|(_, node)| node.get_instance_config())
242 .collect();
243
244 let mut resources = resources_instanciator(config)?;
245
246 let tasks = tasks_instanciator(all_instances_configs, &mut resources)?;
247 let mut monitor = monitor_instanciator(config);
248 if let Ok(topology) = build_monitor_topology(config, mission) {
249 monitor.set_topology(topology);
250 }
251 let bridges = bridges_instanciator(config, &mut resources)?;
252
253 let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
254 Some(logging_config) if logging_config.enable_task_logging => (
255 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
256 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
257 logging_config.keyframe_interval.unwrap(), ),
259 Some(_) => (None, None, 0), None => (
261 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
263 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
264 DEFAULT_KEYFRAME_INTERVAL,
265 ),
266 };
267
268 let copperlists_manager = CopperListsManager {
269 inner: CuListsManager::new(),
270 logger: copperlists_logger,
271 };
272
273 let keyframes_manager = KeyFramesManager {
274 inner: KeyFrame::new(),
275 logger: keyframes_logger,
276 keyframe_interval,
277 };
278
279 let runtime_config = config.runtime.clone().unwrap_or_default();
280
281 let runtime = Self {
282 tasks,
283 bridges,
284 resources,
285 monitor,
286 clock,
287 copperlists_manager,
288 keyframes_manager,
289 runtime_config,
290 };
291
292 Ok(runtime)
293 }
294
295 #[allow(clippy::too_many_arguments)]
296 #[cfg(not(feature = "std"))]
297 pub fn new(
298 clock: RobotClock,
299 config: &CuConfig,
300 mission: Option<&str>,
301 resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
302 tasks_instanciator: impl for<'c> Fn(
303 Vec<Option<&'c ComponentConfig>>,
304 &mut ResourceManager,
305 ) -> CuResult<CT>,
306 monitor_instanciator: impl Fn(&CuConfig) -> M,
307 bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
308 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
309 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
310 ) -> CuResult<Self> {
311 let graph = config.get_graph(mission)?;
312 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
313 .get_all_nodes()
314 .iter()
315 .map(|(_, node)| node.get_instance_config())
316 .collect();
317
318 let mut resources = resources_instanciator(config)?;
319
320 let tasks = tasks_instanciator(all_instances_configs, &mut resources)?;
321
322 let mut monitor = monitor_instanciator(config);
323 if let Ok(topology) = build_monitor_topology(config, mission) {
324 monitor.set_topology(topology);
325 }
326 let bridges = bridges_instanciator(config, &mut resources)?;
327
328 let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
329 Some(logging_config) if logging_config.enable_task_logging => (
330 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
331 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
332 logging_config.keyframe_interval.unwrap(), ),
334 Some(_) => (None, None, 0), None => (
336 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
338 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
339 DEFAULT_KEYFRAME_INTERVAL,
340 ),
341 };
342
343 let copperlists_manager = CopperListsManager {
344 inner: CuListsManager::new(),
345 logger: copperlists_logger,
346 };
347
348 let keyframes_manager = KeyFramesManager {
349 inner: KeyFrame::new(),
350 logger: keyframes_logger,
351 keyframe_interval,
352 };
353
354 let runtime_config = config.runtime.clone().unwrap_or_default();
355
356 let runtime = Self {
357 tasks,
358 bridges,
359 resources,
360 monitor,
361 clock,
362 copperlists_manager,
363 keyframes_manager,
364 runtime_config,
365 };
366
367 Ok(runtime)
368 }
369}
370
371#[derive(Debug, PartialEq, Eq, Clone, Copy)]
376pub enum CuTaskType {
377 Source,
378 Regular,
379 Sink,
380}
381
382pub struct CuExecutionStep {
384 pub node_id: NodeId,
386 pub node: Node,
388 pub task_type: CuTaskType,
390
391 pub input_msg_indices_types: Vec<(u32, String)>,
393
394 pub output_msg_index_type: Option<(u32, String)>,
396}
397
398impl Debug for CuExecutionStep {
399 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
400 f.write_str(format!(" CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
401 f.write_str(format!(" task_type: {:?}\n", self.node.get_type()).as_str())?;
402 f.write_str(format!(" task: {:?}\n", self.task_type).as_str())?;
403 f.write_str(
404 format!(
405 " input_msg_types: {:?}\n",
406 self.input_msg_indices_types
407 )
408 .as_str(),
409 )?;
410 f.write_str(
411 format!(" output_msg_type: {:?}\n", self.output_msg_index_type).as_str(),
412 )?;
413 Ok(())
414 }
415}
416
417pub struct CuExecutionLoop {
422 pub steps: Vec<CuExecutionUnit>,
423 pub loop_count: Option<u32>,
424}
425
426impl Debug for CuExecutionLoop {
427 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
428 f.write_str("CuExecutionLoop:\n")?;
429 for step in &self.steps {
430 match step {
431 CuExecutionUnit::Step(step) => {
432 step.fmt(f)?;
433 }
434 CuExecutionUnit::Loop(l) => {
435 l.fmt(f)?;
436 }
437 }
438 }
439
440 f.write_str(format!(" count: {:?}", self.loop_count).as_str())?;
441 Ok(())
442 }
443}
444
445#[derive(Debug)]
447pub enum CuExecutionUnit {
448 Step(CuExecutionStep),
449 Loop(CuExecutionLoop),
450}
451
452fn find_output_index_type_from_nodeid(
453 node_id: NodeId,
454 steps: &Vec<CuExecutionUnit>,
455) -> Option<(u32, String)> {
456 for step in steps {
457 match step {
458 CuExecutionUnit::Loop(loop_unit) => {
459 if let Some(index) = find_output_index_type_from_nodeid(node_id, &loop_unit.steps) {
460 return Some(index);
461 }
462 }
463 CuExecutionUnit::Step(step) => {
464 if step.node_id == node_id {
465 return step.output_msg_index_type.clone();
466 }
467 }
468 }
469 }
470 None
471}
472
473pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuTaskType {
474 if graph.incoming_neighbor_count(node_id) == 0 {
475 CuTaskType::Source
476 } else if graph.outgoing_neighbor_count(node_id) == 0 {
477 CuTaskType::Sink
478 } else {
479 CuTaskType::Regular
480 }
481}
482
483fn find_edge_with_plan_input_id(
486 plan: &[CuExecutionUnit],
487 graph: &CuGraph,
488 plan_id: u32,
489 output_node_id: NodeId,
490) -> usize {
491 let input_node = plan
492 .get(plan_id as usize)
493 .expect("Input step should've been added to plan before the step that receives the input");
494 let CuExecutionUnit::Step(input_step) = input_node else {
495 panic!("Expected input to be from a step, not a loop");
496 };
497 let input_node_id = input_step.node_id;
498
499 graph
500 .0
501 .edges_connecting(input_node_id.into(), output_node_id.into())
502 .map(|edge| edge.id().index())
503 .next()
504 .expect("An edge connecting the input to the output should exist")
505}
506
507fn sort_inputs_by_cnx_id(
510 input_msg_indices_types: &mut [(u32, String)],
511 plan: &[CuExecutionUnit],
512 graph: &CuGraph,
513 curr_node_id: NodeId,
514) {
515 input_msg_indices_types.sort_by(|(a_index, _), (b_index, _)| {
516 let a_edge_id = find_edge_with_plan_input_id(plan, graph, *a_index, curr_node_id);
517 let b_edge_id = find_edge_with_plan_input_id(plan, graph, *b_index, curr_node_id);
518 a_edge_id.cmp(&b_edge_id)
519 });
520}
521fn plan_tasks_tree_branch(
523 graph: &CuGraph,
524 mut next_culist_output_index: u32,
525 starting_point: NodeId,
526 plan: &mut Vec<CuExecutionUnit>,
527) -> (u32, bool) {
528 #[cfg(all(feature = "std", feature = "macro_debug"))]
529 eprintln!("-- starting branch from node {starting_point}");
530
531 let mut visitor = Bfs::new(&graph.0, starting_point.into());
532 let mut handled = false;
533
534 while let Some(node) = visitor.next(&graph.0) {
535 let id = node.index() as NodeId;
536 let node_ref = graph.get_node(id).unwrap();
537 #[cfg(all(feature = "std", feature = "macro_debug"))]
538 eprintln!(" Visiting node: {node_ref:?}");
539
540 let mut input_msg_indices_types: Vec<(u32, String)> = Vec::new();
541 let output_msg_index_type: Option<(u32, String)>;
542 let task_type = find_task_type_for_id(graph, id);
543
544 match task_type {
545 CuTaskType::Source => {
546 #[cfg(all(feature = "std", feature = "macro_debug"))]
547 eprintln!(" → Source node, assign output index {next_culist_output_index}");
548 output_msg_index_type = Some((
549 next_culist_output_index,
550 graph
551 .0
552 .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0]))
553 .unwrap() .msg
555 .clone(),
556 ));
557 next_culist_output_index += 1;
558 }
559 CuTaskType::Sink => {
560 let parents: Vec<NodeId> = graph.get_neighbor_ids(id, CuDirection::Incoming);
561 #[cfg(all(feature = "std", feature = "macro_debug"))]
562 eprintln!(" → Sink with parents: {parents:?}");
563 for parent in parents {
564 let pid = parent;
565 let index_type = find_output_index_type_from_nodeid(pid, plan);
566 if let Some(index_type) = index_type {
567 #[cfg(all(feature = "std", feature = "macro_debug"))]
568 eprintln!(" ✓ Input from {pid} ready: {index_type:?}");
569 input_msg_indices_types.push(index_type);
570 } else {
571 #[cfg(all(feature = "std", feature = "macro_debug"))]
572 eprintln!(" ✗ Input from {pid} not ready, returning");
573 return (next_culist_output_index, handled);
574 }
575 }
576 output_msg_index_type = Some((next_culist_output_index, "()".to_string()));
577 next_culist_output_index += 1;
578 }
579 CuTaskType::Regular => {
580 let parents: Vec<NodeId> = graph.get_neighbor_ids(id, CuDirection::Incoming);
581 #[cfg(all(feature = "std", feature = "macro_debug"))]
582 eprintln!(" → Regular task with parents: {parents:?}");
583 for parent in parents {
584 let pid = parent;
585 let index_type = find_output_index_type_from_nodeid(pid, plan);
586 if let Some(index_type) = index_type {
587 #[cfg(all(feature = "std", feature = "macro_debug"))]
588 eprintln!(" ✓ Input from {pid} ready: {index_type:?}");
589 input_msg_indices_types.push(index_type);
590 } else {
591 #[cfg(all(feature = "std", feature = "macro_debug"))]
592 eprintln!(" ✗ Input from {pid} not ready, returning");
593 return (next_culist_output_index, handled);
594 }
595 }
596 output_msg_index_type = Some((
597 next_culist_output_index,
598 graph
599 .0
600 .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0])) .unwrap()
602 .msg
603 .clone(),
604 ));
605 next_culist_output_index += 1;
606 }
607 }
608
609 sort_inputs_by_cnx_id(&mut input_msg_indices_types, plan, graph, id);
610
611 if let Some(pos) = plan
612 .iter()
613 .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
614 {
615 #[cfg(all(feature = "std", feature = "macro_debug"))]
616 eprintln!(" → Already in plan, modifying existing step");
617 let mut step = plan.remove(pos);
618 if let CuExecutionUnit::Step(ref mut s) = step {
619 s.input_msg_indices_types = input_msg_indices_types;
620 }
621 plan.push(step);
622 } else {
623 #[cfg(all(feature = "std", feature = "macro_debug"))]
624 eprintln!(" → New step added to plan");
625 let step = CuExecutionStep {
626 node_id: id,
627 node: node_ref.clone(),
628 task_type,
629 input_msg_indices_types,
630 output_msg_index_type,
631 };
632 plan.push(CuExecutionUnit::Step(step));
633 }
634
635 handled = true;
636 }
637
638 #[cfg(all(feature = "std", feature = "macro_debug"))]
639 eprintln!("-- finished branch from node {starting_point} with handled={handled}");
640 (next_culist_output_index, handled)
641}
642
643pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
646 #[cfg(all(feature = "std", feature = "macro_debug"))]
647 eprintln!("[runtime plan]");
648 let visited = graph.0.visit_map();
649 let mut plan = Vec::new();
650 let mut next_culist_output_index = 0u32;
651
652 let mut queue: VecDeque<NodeId> = graph
653 .node_indices()
654 .iter()
655 .filter(|&node| find_task_type_for_id(graph, node.index() as NodeId) == CuTaskType::Source)
656 .map(|node| node.index() as NodeId)
657 .collect();
658
659 #[cfg(all(feature = "std", feature = "macro_debug"))]
660 eprintln!("Initial source nodes: {queue:?}");
661
662 while let Some(start_node) = queue.pop_front() {
663 if visited.is_visited(&start_node) {
664 #[cfg(all(feature = "std", feature = "macro_debug"))]
665 eprintln!("→ Skipping already visited source {start_node}");
666 continue;
667 }
668
669 #[cfg(all(feature = "std", feature = "macro_debug"))]
670 eprintln!("→ Starting BFS from source {start_node}");
671 let mut bfs = Bfs::new(&graph.0, start_node.into());
672
673 while let Some(node_index) = bfs.next(&graph.0) {
674 let node_id = node_index.index() as NodeId;
675 let already_in_plan = plan
676 .iter()
677 .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
678 if already_in_plan {
679 #[cfg(all(feature = "std", feature = "macro_debug"))]
680 eprintln!(" → Node {node_id} already planned, skipping");
681 continue;
682 }
683
684 #[cfg(all(feature = "std", feature = "macro_debug"))]
685 eprintln!(" Planning from node {node_id}");
686 let (new_index, handled) =
687 plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan);
688 next_culist_output_index = new_index;
689
690 if !handled {
691 #[cfg(all(feature = "std", feature = "macro_debug"))]
692 eprintln!(" ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
693 continue;
694 }
695
696 #[cfg(all(feature = "std", feature = "macro_debug"))]
697 eprintln!(" ✓ Node {node_id} handled successfully, enqueueing neighbors");
698 for neighbor in graph.0.neighbors(node_index) {
699 if !visited.is_visited(&neighbor) {
700 let nid = neighbor.index() as NodeId;
701 #[cfg(all(feature = "std", feature = "macro_debug"))]
702 eprintln!(" → Enqueueing neighbor {nid}");
703 queue.push_back(nid);
704 }
705 }
706 }
707 }
708
709 Ok(CuExecutionLoop {
710 steps: plan,
711 loop_count: None,
712 })
713}
714
715#[cfg(test)]
717mod tests {
718 use super::*;
719 use crate::config::Node;
720 use crate::cutask::CuSinkTask;
721 use crate::cutask::{CuSrcTask, Freezable};
722 use crate::monitoring::NoMonitor;
723 use bincode::Encode;
724 use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks};
725 use serde_derive::Serialize;
726
727 pub struct TestSource {}
728
729 impl Freezable for TestSource {}
730
731 impl CuSrcTask for TestSource {
732 type Resources<'r> = ();
733 type Output<'m> = ();
734 fn new_with(
735 _config: Option<&ComponentConfig>,
736 _resources: Self::Resources<'_>,
737 ) -> CuResult<Self>
738 where
739 Self: Sized,
740 {
741 Ok(Self {})
742 }
743
744 fn process(
745 &mut self,
746 _clock: &RobotClock,
747 _empty_msg: &mut Self::Output<'_>,
748 ) -> CuResult<()> {
749 Ok(())
750 }
751 }
752
753 pub struct TestSink {}
754
755 impl Freezable for TestSink {}
756
757 impl CuSinkTask for TestSink {
758 type Resources<'r> = ();
759 type Input<'m> = ();
760
761 fn new_with(
762 _config: Option<&ComponentConfig>,
763 _resources: Self::Resources<'_>,
764 ) -> CuResult<Self>
765 where
766 Self: Sized,
767 {
768 Ok(Self {})
769 }
770
771 fn process(&mut self, _clock: &RobotClock, _input: &Self::Input<'_>) -> CuResult<()> {
772 Ok(())
773 }
774 }
775
776 type Tasks = (TestSource, TestSink);
778
779 #[derive(Debug, Encode, Decode, Serialize, Default)]
780 struct Msgs(());
781
782 impl ErasedCuStampedDataSet for Msgs {
783 fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
784 Vec::new()
785 }
786 }
787
788 impl MatchingTasks for Msgs {
789 fn get_all_task_ids() -> &'static [&'static str] {
790 &[]
791 }
792 }
793
794 impl CuListZeroedInit for Msgs {
795 fn init_zeroed(&mut self) {}
796 }
797
798 #[cfg(feature = "std")]
799 fn tasks_instanciator(
800 all_instances_configs: Vec<Option<&ComponentConfig>>,
801 _resources: &mut ResourceManager,
802 ) -> CuResult<Tasks> {
803 Ok((
804 TestSource::new(all_instances_configs[0])?,
805 TestSink::new(all_instances_configs[1])?,
806 ))
807 }
808
809 #[cfg(not(feature = "std"))]
810 fn tasks_instanciator(
811 all_instances_configs: Vec<Option<&ComponentConfig>>,
812 _resources: &mut ResourceManager,
813 ) -> CuResult<Tasks> {
814 Ok((
815 TestSource::new(all_instances_configs[0])?,
816 TestSink::new(all_instances_configs[1])?,
817 ))
818 }
819
820 fn monitor_instanciator(_config: &CuConfig) -> NoMonitor {
821 NoMonitor {}
822 }
823
824 fn bridges_instanciator(_config: &CuConfig, _resources: &mut ResourceManager) -> CuResult<()> {
825 Ok(())
826 }
827
828 fn resources_instanciator(_config: &CuConfig) -> CuResult<ResourceManager> {
829 Ok(ResourceManager::new(0))
830 }
831
832 #[derive(Debug)]
833 struct FakeWriter {}
834
835 impl<E: Encode> WriteStream<E> for FakeWriter {
836 fn log(&mut self, _obj: &E) -> CuResult<()> {
837 Ok(())
838 }
839 }
840
841 #[test]
842 fn test_runtime_instantiation() {
843 let mut config = CuConfig::default();
844 let graph = config.get_graph_mut(None).unwrap();
845 graph.add_node(Node::new("a", "TestSource")).unwrap();
846 graph.add_node(Node::new("b", "TestSink")).unwrap();
847 graph.connect(0, 1, "()").unwrap();
848 let runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
849 RobotClock::default(),
850 &config,
851 None,
852 resources_instanciator,
853 tasks_instanciator,
854 monitor_instanciator,
855 bridges_instanciator,
856 FakeWriter {},
857 FakeWriter {},
858 );
859 assert!(runtime.is_ok());
860 }
861
862 #[test]
863 fn test_copperlists_manager_lifecycle() {
864 let mut config = CuConfig::default();
865 let graph = config.get_graph_mut(None).unwrap();
866 graph.add_node(Node::new("a", "TestSource")).unwrap();
867 graph.add_node(Node::new("b", "TestSink")).unwrap();
868 graph.connect(0, 1, "()").unwrap();
869
870 let mut runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
871 RobotClock::default(),
872 &config,
873 None,
874 resources_instanciator,
875 tasks_instanciator,
876 monitor_instanciator,
877 bridges_instanciator,
878 FakeWriter {},
879 FakeWriter {},
880 )
881 .unwrap();
882
883 {
885 let copperlists = &mut runtime.copperlists_manager;
886 let culist0 = copperlists
887 .inner
888 .create()
889 .expect("Ran out of space for copper lists");
890 let id = culist0.id;
892 assert_eq!(id, 0);
893 culist0.change_state(CopperListState::Processing);
894 assert_eq!(copperlists.available_copper_lists(), 1);
895 }
896
897 {
898 let copperlists = &mut runtime.copperlists_manager;
899 let culist1 = copperlists
900 .inner
901 .create()
902 .expect("Ran out of space for copper lists"); let id = culist1.id;
904 assert_eq!(id, 1);
905 culist1.change_state(CopperListState::Processing);
906 assert_eq!(copperlists.available_copper_lists(), 0);
907 }
908
909 {
910 let copperlists = &mut runtime.copperlists_manager;
911 let culist2 = copperlists.inner.create();
912 assert!(culist2.is_none());
913 assert_eq!(copperlists.available_copper_lists(), 0);
914 let _ = copperlists.end_of_processing(1);
916 assert_eq!(copperlists.available_copper_lists(), 1);
917 }
918
919 {
921 let copperlists = &mut runtime.copperlists_manager;
922 let culist2 = copperlists
923 .inner
924 .create()
925 .expect("Ran out of space for copper lists"); let id = culist2.id;
927 assert_eq!(id, 2);
928 culist2.change_state(CopperListState::Processing);
929 assert_eq!(copperlists.available_copper_lists(), 0);
930 let _ = copperlists.end_of_processing(0);
932 assert_eq!(copperlists.available_copper_lists(), 0);
934
935 let _ = copperlists.end_of_processing(2);
937 assert_eq!(copperlists.available_copper_lists(), 2);
940 }
941 }
942
943 #[test]
944 fn test_runtime_task_input_order() {
945 let mut config = CuConfig::default();
946 let graph = config.get_graph_mut(None).unwrap();
947 let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
948 let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
949 let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
950
951 assert_eq!(src1_id, 0);
952 assert_eq!(src2_id, 1);
953
954 let src1_type = "src1_type";
956 let src2_type = "src2_type";
957 graph.connect(src2_id, sink_id, src2_type).unwrap();
958 graph.connect(src1_id, sink_id, src1_type).unwrap();
959
960 let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
961 let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
962 assert_eq!(src1_edge_id, 1);
965 assert_eq!(src2_edge_id, 0);
966
967 let runtime = compute_runtime_plan(graph).unwrap();
968 let sink_step = runtime
969 .steps
970 .iter()
971 .find_map(|step| match step {
972 CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
973 _ => None,
974 })
975 .unwrap();
976
977 assert_eq!(sink_step.input_msg_indices_types[0].1, src2_type);
980 assert_eq!(sink_step.input_msg_indices_types[1].1, src1_type);
981 }
982
983 #[test]
984 fn test_runtime_plan_diamond_case1() {
985 let mut config = CuConfig::default();
987 let graph = config.get_graph_mut(None).unwrap();
988 let cam0_id = graph
989 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
990 .unwrap();
991 let inf0_id = graph
992 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
993 .unwrap();
994 let broadcast_id = graph
995 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
996 .unwrap();
997
998 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1000 graph.connect(cam0_id, inf0_id, "i32").unwrap();
1001 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1002
1003 let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1004 let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
1005
1006 assert_eq!(edge_cam0_to_inf0, 0);
1007 assert_eq!(edge_cam0_to_broadcast, 1);
1008
1009 let runtime = compute_runtime_plan(graph).unwrap();
1010 let broadcast_step = runtime
1011 .steps
1012 .iter()
1013 .find_map(|step| match step {
1014 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1015 _ => None,
1016 })
1017 .unwrap();
1018
1019 assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
1020 assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
1021 }
1022
1023 #[test]
1024 fn test_runtime_plan_diamond_case2() {
1025 let mut config = CuConfig::default();
1027 let graph = config.get_graph_mut(None).unwrap();
1028 let cam0_id = graph
1029 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1030 .unwrap();
1031 let inf0_id = graph
1032 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1033 .unwrap();
1034 let broadcast_id = graph
1035 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1036 .unwrap();
1037
1038 graph.connect(cam0_id, inf0_id, "i32").unwrap();
1040 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1041 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1042
1043 let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1044 let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
1045
1046 assert_eq!(edge_cam0_to_broadcast, 0);
1047 assert_eq!(edge_cam0_to_inf0, 1);
1048
1049 let runtime = compute_runtime_plan(graph).unwrap();
1050 let broadcast_step = runtime
1051 .steps
1052 .iter()
1053 .find_map(|step| match step {
1054 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1055 _ => None,
1056 })
1057 .unwrap();
1058
1059 assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
1060 assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
1061 }
1062}