1use crate::config::{ComponentConfig, CuDirection, DEFAULT_KEYFRAME_INTERVAL, Node};
6use crate::config::{CuConfig, CuGraph, NodeId, RuntimeConfig};
7use crate::copperlist::{CopperList, CopperListState, CuListZeroedInit, CuListsManager};
8use crate::cutask::{BincodeAdapter, Freezable};
9use crate::monitoring::{CuMonitor, build_monitor_topology};
10use crate::resource::ResourceManager;
11use cu29_clock::{ClockProvider, CuTime, RobotClock};
12use cu29_traits::CuResult;
13use cu29_traits::WriteStream;
14use cu29_traits::{CopperListTuple, CuError};
15
16#[cfg(target_os = "none")]
17#[allow(unused_imports)]
18use cu29_log::{ANONYMOUS, CuLogEntry, CuLogLevel};
19#[cfg(target_os = "none")]
20#[allow(unused_imports)]
21use cu29_log_derive::info;
22#[cfg(target_os = "none")]
23#[allow(unused_imports)]
24use cu29_log_runtime::log;
25#[cfg(all(target_os = "none", debug_assertions))]
26#[allow(unused_imports)]
27use cu29_log_runtime::log_debug_mode;
28#[cfg(target_os = "none")]
29#[allow(unused_imports)]
30use cu29_value::to_value;
31
32use alloc::boxed::Box;
33use alloc::collections::VecDeque;
34use alloc::format;
35use alloc::string::{String, ToString};
36use alloc::vec::Vec;
37use bincode::enc::EncoderImpl;
38use bincode::enc::write::{SizeWriter, SliceWriter};
39use bincode::error::EncodeError;
40use bincode::{Decode, Encode};
41use core::fmt::Result as FmtResult;
42use core::fmt::{Debug, Formatter};
43use petgraph::prelude::*;
44use petgraph::visit::VisitMap;
45use petgraph::visit::Visitable;
46
47#[cfg(feature = "std")]
48use cu29_log_runtime::LoggerRuntime;
49#[cfg(feature = "std")]
50use cu29_unifiedlog::UnifiedLoggerWrite;
51#[cfg(feature = "std")]
52use std::sync::{Arc, Mutex};
53
54#[cfg(feature = "std")]
56pub struct CopperContext {
57 pub unified_logger: Arc<Mutex<UnifiedLoggerWrite>>,
58 pub logger_runtime: LoggerRuntime,
59 pub clock: RobotClock,
60}
61
62pub struct CopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
64 pub inner: CuListsManager<P, NBCL>,
65 pub logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
67}
68
69impl<P: CopperListTuple + Default, const NBCL: usize> CopperListsManager<P, NBCL> {
70 pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
71 let mut is_top = true;
72 let mut nb_done = 0;
73 for cl in self.inner.iter_mut() {
74 if cl.id == culistid && cl.get_state() == CopperListState::Processing {
75 cl.change_state(CopperListState::DoneProcessing);
76 }
77 if is_top && cl.get_state() == CopperListState::DoneProcessing {
78 if let Some(logger) = &mut self.logger {
79 cl.change_state(CopperListState::BeingSerialized);
80 logger.log(cl)?;
81 }
82 cl.change_state(CopperListState::Free);
83 nb_done += 1;
84 } else {
85 is_top = false;
86 }
87 }
88 for _ in 0..nb_done {
89 let _ = self.inner.pop();
90 }
91 Ok(())
92 }
93
94 pub fn available_copper_lists(&self) -> usize {
95 NBCL - self.inner.len()
96 }
97}
98
99pub struct KeyFramesManager {
101 inner: KeyFrame,
103
104 logger: Option<Box<dyn WriteStream<KeyFrame>>>,
106
107 keyframe_interval: u32,
109}
110
111impl KeyFramesManager {
112 fn is_keyframe(&self, culistid: u32) -> bool {
113 self.logger.is_some() && culistid.is_multiple_of(self.keyframe_interval)
114 }
115
116 pub fn reset(&mut self, culistid: u32, clock: &RobotClock) {
117 if self.is_keyframe(culistid) {
118 self.inner.reset(culistid, clock.now());
119 }
120 }
121
122 pub fn freeze_task(&mut self, culistid: u32, task: &impl Freezable) -> CuResult<usize> {
123 if self.is_keyframe(culistid) {
124 if self.inner.culistid != culistid {
125 panic!("Freezing task for a different culistid");
126 }
127 self.inner
128 .add_frozen_task(task)
129 .map_err(|e| CuError::from(format!("Failed to serialize task: {e}")))
130 } else {
131 Ok(0)
132 }
133 }
134
135 pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
136 if self.is_keyframe(culistid) {
137 let logger = self.logger.as_mut().unwrap();
138 logger.log(&self.inner)
139 } else {
140 Ok(())
141 }
142 }
143}
144
145pub struct CuRuntime<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
149 pub clock: RobotClock, pub tasks: CT,
154
155 pub bridges: CB,
157
158 pub resources: ResourceManager,
160
161 pub monitor: M,
163
164 pub copperlists_manager: CopperListsManager<P, NBCL>,
166
167 pub keyframes_manager: KeyFramesManager,
169
170 pub runtime_config: RuntimeConfig,
172}
173
174impl<CT, CB, P: CopperListTuple + CuListZeroedInit + Default, M: CuMonitor, const NBCL: usize>
176 ClockProvider for CuRuntime<CT, CB, P, M, NBCL>
177{
178 fn get_clock(&self) -> RobotClock {
179 self.clock.clone()
180 }
181}
182
183#[derive(Encode, Decode)]
187pub struct KeyFrame {
188 pub culistid: u32,
190 pub timestamp: CuTime,
192 pub serialized_tasks: Vec<u8>,
194}
195
196impl KeyFrame {
197 fn new() -> Self {
198 KeyFrame {
199 culistid: 0,
200 timestamp: CuTime::default(),
201 serialized_tasks: Vec::new(),
202 }
203 }
204
205 fn reset(&mut self, culistid: u32, timestamp: CuTime) {
207 self.culistid = culistid;
208 self.timestamp = timestamp;
209 self.serialized_tasks.clear();
210 }
211
212 fn add_frozen_task(&mut self, task: &impl Freezable) -> Result<usize, EncodeError> {
214 let cfg = bincode::config::standard();
215 let mut sizer = EncoderImpl::<_, _>::new(SizeWriter::default(), cfg);
216 BincodeAdapter(task).encode(&mut sizer)?;
217 let need = sizer.into_writer().bytes_written as usize;
218
219 let start = self.serialized_tasks.len();
220 self.serialized_tasks.resize(start + need, 0);
221 let mut enc =
222 EncoderImpl::<_, _>::new(SliceWriter::new(&mut self.serialized_tasks[start..]), cfg);
223 BincodeAdapter(task).encode(&mut enc)?;
224 Ok(need)
225 }
226}
227
228impl<
229 CT,
230 CB,
231 P: CopperListTuple + CuListZeroedInit + Default + 'static,
232 M: CuMonitor,
233 const NBCL: usize,
234> CuRuntime<CT, CB, P, M, NBCL>
235{
236 #[allow(clippy::too_many_arguments)]
238 #[cfg(feature = "std")]
239 pub fn new(
240 clock: RobotClock,
241 config: &CuConfig,
242 mission: Option<&str>,
243 resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
244 tasks_instanciator: impl for<'c> Fn(
245 Vec<Option<&'c ComponentConfig>>,
246 &mut ResourceManager,
247 ) -> CuResult<CT>,
248 monitor_instanciator: impl Fn(&CuConfig) -> M,
249 bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
250 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
251 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
252 ) -> CuResult<Self> {
253 let graph = config.get_graph(mission)?;
254 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
255 .get_all_nodes()
256 .iter()
257 .map(|(_, node)| node.get_instance_config())
258 .collect();
259
260 let mut resources = resources_instanciator(config)?;
261
262 let tasks = tasks_instanciator(all_instances_configs, &mut resources)?;
263 let mut monitor = monitor_instanciator(config);
264 if let Ok(topology) = build_monitor_topology(config, mission) {
265 monitor.set_topology(topology);
266 }
267 let bridges = bridges_instanciator(config, &mut resources)?;
268
269 let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
270 Some(logging_config) if logging_config.enable_task_logging => (
271 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
272 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
273 logging_config.keyframe_interval.unwrap(), ),
275 Some(_) => (None, None, 0), None => (
277 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
279 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
280 DEFAULT_KEYFRAME_INTERVAL,
281 ),
282 };
283
284 let copperlists_manager = CopperListsManager {
285 inner: CuListsManager::new(),
286 logger: copperlists_logger,
287 };
288 #[cfg(target_os = "none")]
289 {
290 let cl_size = core::mem::size_of::<CopperList<P>>();
291 let total_bytes = cl_size.saturating_mul(NBCL);
292 info!(
293 "CuRuntime::new: copperlists count={} cl_size={} total_bytes={}",
294 NBCL, cl_size, total_bytes
295 );
296 }
297
298 let keyframes_manager = KeyFramesManager {
299 inner: KeyFrame::new(),
300 logger: keyframes_logger,
301 keyframe_interval,
302 };
303
304 let runtime_config = config.runtime.clone().unwrap_or_default();
305
306 let runtime = Self {
307 tasks,
308 bridges,
309 resources,
310 monitor,
311 clock,
312 copperlists_manager,
313 keyframes_manager,
314 runtime_config,
315 };
316
317 Ok(runtime)
318 }
319
320 #[allow(clippy::too_many_arguments)]
321 #[cfg(not(feature = "std"))]
322 pub fn new(
323 clock: RobotClock,
324 config: &CuConfig,
325 mission: Option<&str>,
326 resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
327 tasks_instanciator: impl for<'c> Fn(
328 Vec<Option<&'c ComponentConfig>>,
329 &mut ResourceManager,
330 ) -> CuResult<CT>,
331 monitor_instanciator: impl Fn(&CuConfig) -> M,
332 bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
333 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
334 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
335 ) -> CuResult<Self> {
336 #[cfg(target_os = "none")]
337 info!("CuRuntime::new: get graph");
338 let graph = config.get_graph(mission)?;
339 #[cfg(target_os = "none")]
340 info!("CuRuntime::new: graph ok");
341 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
342 .get_all_nodes()
343 .iter()
344 .map(|(_, node)| node.get_instance_config())
345 .collect();
346
347 #[cfg(target_os = "none")]
348 info!("CuRuntime::new: resources instanciator");
349 let mut resources = resources_instanciator(config)?;
350
351 #[cfg(target_os = "none")]
352 info!("CuRuntime::new: tasks instanciator");
353 let tasks = tasks_instanciator(all_instances_configs, &mut resources)?;
354
355 #[cfg(target_os = "none")]
356 info!("CuRuntime::new: monitor instanciator");
357 let mut monitor = monitor_instanciator(config);
358 #[cfg(target_os = "none")]
359 info!("CuRuntime::new: monitor instanciator ok");
360 #[cfg(target_os = "none")]
361 info!("CuRuntime::new: build monitor topology");
362 if let Ok(topology) = build_monitor_topology(config, mission) {
363 #[cfg(target_os = "none")]
364 info!("CuRuntime::new: monitor topology ok");
365 monitor.set_topology(topology);
366 #[cfg(target_os = "none")]
367 info!("CuRuntime::new: monitor topology set");
368 }
369 #[cfg(target_os = "none")]
370 info!("CuRuntime::new: bridges instanciator");
371 let bridges = bridges_instanciator(config, &mut resources)?;
372
373 let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
374 Some(logging_config) if logging_config.enable_task_logging => (
375 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
376 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
377 logging_config.keyframe_interval.unwrap(), ),
379 Some(_) => (None, None, 0), None => (
381 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
383 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
384 DEFAULT_KEYFRAME_INTERVAL,
385 ),
386 };
387
388 let copperlists_manager = CopperListsManager {
389 inner: CuListsManager::new(),
390 logger: copperlists_logger,
391 };
392 #[cfg(target_os = "none")]
393 {
394 let cl_size = core::mem::size_of::<CopperList<P>>();
395 let total_bytes = cl_size.saturating_mul(NBCL);
396 info!(
397 "CuRuntime::new: copperlists count={} cl_size={} total_bytes={}",
398 NBCL, cl_size, total_bytes
399 );
400 }
401
402 let keyframes_manager = KeyFramesManager {
403 inner: KeyFrame::new(),
404 logger: keyframes_logger,
405 keyframe_interval,
406 };
407
408 let runtime_config = config.runtime.clone().unwrap_or_default();
409
410 let runtime = Self {
411 tasks,
412 bridges,
413 resources,
414 monitor,
415 clock,
416 copperlists_manager,
417 keyframes_manager,
418 runtime_config,
419 };
420
421 Ok(runtime)
422 }
423}
424
425#[derive(Debug, PartialEq, Eq, Clone, Copy)]
430pub enum CuTaskType {
431 Source,
432 Regular,
433 Sink,
434}
435
436pub struct CuExecutionStep {
438 pub node_id: NodeId,
440 pub node: Node,
442 pub task_type: CuTaskType,
444
445 pub input_msg_indices_types: Vec<(u32, String)>,
447
448 pub output_msg_index_type: Option<(u32, String)>,
450}
451
452impl Debug for CuExecutionStep {
453 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
454 f.write_str(format!(" CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
455 f.write_str(format!(" task_type: {:?}\n", self.node.get_type()).as_str())?;
456 f.write_str(format!(" task: {:?}\n", self.task_type).as_str())?;
457 f.write_str(
458 format!(
459 " input_msg_types: {:?}\n",
460 self.input_msg_indices_types
461 )
462 .as_str(),
463 )?;
464 f.write_str(
465 format!(" output_msg_type: {:?}\n", self.output_msg_index_type).as_str(),
466 )?;
467 Ok(())
468 }
469}
470
471pub struct CuExecutionLoop {
476 pub steps: Vec<CuExecutionUnit>,
477 pub loop_count: Option<u32>,
478}
479
480impl Debug for CuExecutionLoop {
481 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
482 f.write_str("CuExecutionLoop:\n")?;
483 for step in &self.steps {
484 match step {
485 CuExecutionUnit::Step(step) => {
486 step.fmt(f)?;
487 }
488 CuExecutionUnit::Loop(l) => {
489 l.fmt(f)?;
490 }
491 }
492 }
493
494 f.write_str(format!(" count: {:?}", self.loop_count).as_str())?;
495 Ok(())
496 }
497}
498
499#[derive(Debug)]
501pub enum CuExecutionUnit {
502 Step(CuExecutionStep),
503 Loop(CuExecutionLoop),
504}
505
506fn find_output_index_type_from_nodeid(
507 node_id: NodeId,
508 steps: &Vec<CuExecutionUnit>,
509) -> Option<(u32, String)> {
510 for step in steps {
511 match step {
512 CuExecutionUnit::Loop(loop_unit) => {
513 if let Some(index) = find_output_index_type_from_nodeid(node_id, &loop_unit.steps) {
514 return Some(index);
515 }
516 }
517 CuExecutionUnit::Step(step) => {
518 if step.node_id == node_id {
519 return step.output_msg_index_type.clone();
520 }
521 }
522 }
523 }
524 None
525}
526
527pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuTaskType {
528 if graph.incoming_neighbor_count(node_id) == 0 {
529 CuTaskType::Source
530 } else if graph.outgoing_neighbor_count(node_id) == 0 {
531 CuTaskType::Sink
532 } else {
533 CuTaskType::Regular
534 }
535}
536
537fn find_edge_with_plan_input_id(
540 plan: &[CuExecutionUnit],
541 graph: &CuGraph,
542 plan_id: u32,
543 output_node_id: NodeId,
544) -> usize {
545 let input_node = plan
546 .get(plan_id as usize)
547 .expect("Input step should've been added to plan before the step that receives the input");
548 let CuExecutionUnit::Step(input_step) = input_node else {
549 panic!("Expected input to be from a step, not a loop");
550 };
551 let input_node_id = input_step.node_id;
552
553 graph
554 .0
555 .edges_connecting(input_node_id.into(), output_node_id.into())
556 .map(|edge| edge.id().index())
557 .next()
558 .expect("An edge connecting the input to the output should exist")
559}
560
561fn sort_inputs_by_cnx_id(
564 input_msg_indices_types: &mut [(u32, String)],
565 plan: &[CuExecutionUnit],
566 graph: &CuGraph,
567 curr_node_id: NodeId,
568) {
569 input_msg_indices_types.sort_by(|(a_index, _), (b_index, _)| {
570 let a_edge_id = find_edge_with_plan_input_id(plan, graph, *a_index, curr_node_id);
571 let b_edge_id = find_edge_with_plan_input_id(plan, graph, *b_index, curr_node_id);
572 a_edge_id.cmp(&b_edge_id)
573 });
574}
575fn plan_tasks_tree_branch(
577 graph: &CuGraph,
578 mut next_culist_output_index: u32,
579 starting_point: NodeId,
580 plan: &mut Vec<CuExecutionUnit>,
581) -> (u32, bool) {
582 #[cfg(all(feature = "std", feature = "macro_debug"))]
583 eprintln!("-- starting branch from node {starting_point}");
584
585 let mut visitor = Bfs::new(&graph.0, starting_point.into());
586 let mut handled = false;
587
588 while let Some(node) = visitor.next(&graph.0) {
589 let id = node.index() as NodeId;
590 let node_ref = graph.get_node(id).unwrap();
591 #[cfg(all(feature = "std", feature = "macro_debug"))]
592 eprintln!(" Visiting node: {node_ref:?}");
593
594 let mut input_msg_indices_types: Vec<(u32, String)> = Vec::new();
595 let output_msg_index_type: Option<(u32, String)>;
596 let task_type = find_task_type_for_id(graph, id);
597
598 match task_type {
599 CuTaskType::Source => {
600 #[cfg(all(feature = "std", feature = "macro_debug"))]
601 eprintln!(" → Source node, assign output index {next_culist_output_index}");
602 output_msg_index_type = Some((
603 next_culist_output_index,
604 graph
605 .0
606 .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0]))
607 .unwrap() .msg
609 .clone(),
610 ));
611 next_culist_output_index += 1;
612 }
613 CuTaskType::Sink => {
614 let parents: Vec<NodeId> = graph.get_neighbor_ids(id, CuDirection::Incoming);
615 #[cfg(all(feature = "std", feature = "macro_debug"))]
616 eprintln!(" → Sink with parents: {parents:?}");
617 for parent in parents {
618 let pid = parent;
619 let index_type = find_output_index_type_from_nodeid(pid, plan);
620 if let Some(index_type) = index_type {
621 #[cfg(all(feature = "std", feature = "macro_debug"))]
622 eprintln!(" ✓ Input from {pid} ready: {index_type:?}");
623 input_msg_indices_types.push(index_type);
624 } else {
625 #[cfg(all(feature = "std", feature = "macro_debug"))]
626 eprintln!(" ✗ Input from {pid} not ready, returning");
627 return (next_culist_output_index, handled);
628 }
629 }
630 output_msg_index_type = Some((next_culist_output_index, "()".to_string()));
631 next_culist_output_index += 1;
632 }
633 CuTaskType::Regular => {
634 let parents: Vec<NodeId> = graph.get_neighbor_ids(id, CuDirection::Incoming);
635 #[cfg(all(feature = "std", feature = "macro_debug"))]
636 eprintln!(" → Regular task with parents: {parents:?}");
637 for parent in parents {
638 let pid = parent;
639 let index_type = find_output_index_type_from_nodeid(pid, plan);
640 if let Some(index_type) = index_type {
641 #[cfg(all(feature = "std", feature = "macro_debug"))]
642 eprintln!(" ✓ Input from {pid} ready: {index_type:?}");
643 input_msg_indices_types.push(index_type);
644 } else {
645 #[cfg(all(feature = "std", feature = "macro_debug"))]
646 eprintln!(" ✗ Input from {pid} not ready, returning");
647 return (next_culist_output_index, handled);
648 }
649 }
650 output_msg_index_type = Some((
651 next_culist_output_index,
652 graph
653 .0
654 .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0])) .unwrap()
656 .msg
657 .clone(),
658 ));
659 next_culist_output_index += 1;
660 }
661 }
662
663 sort_inputs_by_cnx_id(&mut input_msg_indices_types, plan, graph, id);
664
665 if let Some(pos) = plan
666 .iter()
667 .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
668 {
669 #[cfg(all(feature = "std", feature = "macro_debug"))]
670 eprintln!(" → Already in plan, modifying existing step");
671 let mut step = plan.remove(pos);
672 if let CuExecutionUnit::Step(ref mut s) = step {
673 s.input_msg_indices_types = input_msg_indices_types;
674 }
675 plan.push(step);
676 } else {
677 #[cfg(all(feature = "std", feature = "macro_debug"))]
678 eprintln!(" → New step added to plan");
679 let step = CuExecutionStep {
680 node_id: id,
681 node: node_ref.clone(),
682 task_type,
683 input_msg_indices_types,
684 output_msg_index_type,
685 };
686 plan.push(CuExecutionUnit::Step(step));
687 }
688
689 handled = true;
690 }
691
692 #[cfg(all(feature = "std", feature = "macro_debug"))]
693 eprintln!("-- finished branch from node {starting_point} with handled={handled}");
694 (next_culist_output_index, handled)
695}
696
697pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
700 #[cfg(all(feature = "std", feature = "macro_debug"))]
701 eprintln!("[runtime plan]");
702 let visited = graph.0.visit_map();
703 let mut plan = Vec::new();
704 let mut next_culist_output_index = 0u32;
705
706 let mut queue: VecDeque<NodeId> = graph
707 .node_indices()
708 .iter()
709 .filter(|&node| find_task_type_for_id(graph, node.index() as NodeId) == CuTaskType::Source)
710 .map(|node| node.index() as NodeId)
711 .collect();
712
713 #[cfg(all(feature = "std", feature = "macro_debug"))]
714 eprintln!("Initial source nodes: {queue:?}");
715
716 while let Some(start_node) = queue.pop_front() {
717 if visited.is_visited(&start_node) {
718 #[cfg(all(feature = "std", feature = "macro_debug"))]
719 eprintln!("→ Skipping already visited source {start_node}");
720 continue;
721 }
722
723 #[cfg(all(feature = "std", feature = "macro_debug"))]
724 eprintln!("→ Starting BFS from source {start_node}");
725 let mut bfs = Bfs::new(&graph.0, start_node.into());
726
727 while let Some(node_index) = bfs.next(&graph.0) {
728 let node_id = node_index.index() as NodeId;
729 let already_in_plan = plan
730 .iter()
731 .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
732 if already_in_plan {
733 #[cfg(all(feature = "std", feature = "macro_debug"))]
734 eprintln!(" → Node {node_id} already planned, skipping");
735 continue;
736 }
737
738 #[cfg(all(feature = "std", feature = "macro_debug"))]
739 eprintln!(" Planning from node {node_id}");
740 let (new_index, handled) =
741 plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan);
742 next_culist_output_index = new_index;
743
744 if !handled {
745 #[cfg(all(feature = "std", feature = "macro_debug"))]
746 eprintln!(" ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
747 continue;
748 }
749
750 #[cfg(all(feature = "std", feature = "macro_debug"))]
751 eprintln!(" ✓ Node {node_id} handled successfully, enqueueing neighbors");
752 for neighbor in graph.0.neighbors(node_index) {
753 if !visited.is_visited(&neighbor) {
754 let nid = neighbor.index() as NodeId;
755 #[cfg(all(feature = "std", feature = "macro_debug"))]
756 eprintln!(" → Enqueueing neighbor {nid}");
757 queue.push_back(nid);
758 }
759 }
760 }
761 }
762
763 Ok(CuExecutionLoop {
764 steps: plan,
765 loop_count: None,
766 })
767}
768
769#[cfg(test)]
771mod tests {
772 use super::*;
773 use crate::config::Node;
774 use crate::cutask::CuSinkTask;
775 use crate::cutask::{CuSrcTask, Freezable};
776 use crate::monitoring::NoMonitor;
777 use bincode::Encode;
778 use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks};
779 use serde_derive::Serialize;
780
781 pub struct TestSource {}
782
783 impl Freezable for TestSource {}
784
785 impl CuSrcTask for TestSource {
786 type Resources<'r> = ();
787 type Output<'m> = ();
788 fn new_with(
789 _config: Option<&ComponentConfig>,
790 _resources: Self::Resources<'_>,
791 ) -> CuResult<Self>
792 where
793 Self: Sized,
794 {
795 Ok(Self {})
796 }
797
798 fn process(
799 &mut self,
800 _clock: &RobotClock,
801 _empty_msg: &mut Self::Output<'_>,
802 ) -> CuResult<()> {
803 Ok(())
804 }
805 }
806
807 pub struct TestSink {}
808
809 impl Freezable for TestSink {}
810
811 impl CuSinkTask for TestSink {
812 type Resources<'r> = ();
813 type Input<'m> = ();
814
815 fn new_with(
816 _config: Option<&ComponentConfig>,
817 _resources: Self::Resources<'_>,
818 ) -> CuResult<Self>
819 where
820 Self: Sized,
821 {
822 Ok(Self {})
823 }
824
825 fn process(&mut self, _clock: &RobotClock, _input: &Self::Input<'_>) -> CuResult<()> {
826 Ok(())
827 }
828 }
829
830 type Tasks = (TestSource, TestSink);
832
833 #[derive(Debug, Encode, Decode, Serialize, Default)]
834 struct Msgs(());
835
836 impl ErasedCuStampedDataSet for Msgs {
837 fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
838 Vec::new()
839 }
840 }
841
842 impl MatchingTasks for Msgs {
843 fn get_all_task_ids() -> &'static [&'static str] {
844 &[]
845 }
846 }
847
848 impl CuListZeroedInit for Msgs {
849 fn init_zeroed(&mut self) {}
850 }
851
852 #[cfg(feature = "std")]
853 fn tasks_instanciator(
854 all_instances_configs: Vec<Option<&ComponentConfig>>,
855 _resources: &mut ResourceManager,
856 ) -> CuResult<Tasks> {
857 Ok((
858 TestSource::new(all_instances_configs[0])?,
859 TestSink::new(all_instances_configs[1])?,
860 ))
861 }
862
863 #[cfg(not(feature = "std"))]
864 fn tasks_instanciator(
865 all_instances_configs: Vec<Option<&ComponentConfig>>,
866 _resources: &mut ResourceManager,
867 ) -> CuResult<Tasks> {
868 Ok((
869 TestSource::new(all_instances_configs[0])?,
870 TestSink::new(all_instances_configs[1])?,
871 ))
872 }
873
874 fn monitor_instanciator(_config: &CuConfig) -> NoMonitor {
875 NoMonitor {}
876 }
877
878 fn bridges_instanciator(_config: &CuConfig, _resources: &mut ResourceManager) -> CuResult<()> {
879 Ok(())
880 }
881
882 fn resources_instanciator(_config: &CuConfig) -> CuResult<ResourceManager> {
883 Ok(ResourceManager::new(0))
884 }
885
886 #[derive(Debug)]
887 struct FakeWriter {}
888
889 impl<E: Encode> WriteStream<E> for FakeWriter {
890 fn log(&mut self, _obj: &E) -> CuResult<()> {
891 Ok(())
892 }
893 }
894
895 #[test]
896 fn test_runtime_instantiation() {
897 let mut config = CuConfig::default();
898 let graph = config.get_graph_mut(None).unwrap();
899 graph.add_node(Node::new("a", "TestSource")).unwrap();
900 graph.add_node(Node::new("b", "TestSink")).unwrap();
901 graph.connect(0, 1, "()").unwrap();
902 let runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
903 RobotClock::default(),
904 &config,
905 None,
906 resources_instanciator,
907 tasks_instanciator,
908 monitor_instanciator,
909 bridges_instanciator,
910 FakeWriter {},
911 FakeWriter {},
912 );
913 assert!(runtime.is_ok());
914 }
915
916 #[test]
917 fn test_copperlists_manager_lifecycle() {
918 let mut config = CuConfig::default();
919 let graph = config.get_graph_mut(None).unwrap();
920 graph.add_node(Node::new("a", "TestSource")).unwrap();
921 graph.add_node(Node::new("b", "TestSink")).unwrap();
922 graph.connect(0, 1, "()").unwrap();
923
924 let mut runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
925 RobotClock::default(),
926 &config,
927 None,
928 resources_instanciator,
929 tasks_instanciator,
930 monitor_instanciator,
931 bridges_instanciator,
932 FakeWriter {},
933 FakeWriter {},
934 )
935 .unwrap();
936
937 {
939 let copperlists = &mut runtime.copperlists_manager;
940 let culist0 = copperlists
941 .inner
942 .create()
943 .expect("Ran out of space for copper lists");
944 let id = culist0.id;
946 assert_eq!(id, 0);
947 culist0.change_state(CopperListState::Processing);
948 assert_eq!(copperlists.available_copper_lists(), 1);
949 }
950
951 {
952 let copperlists = &mut runtime.copperlists_manager;
953 let culist1 = copperlists
954 .inner
955 .create()
956 .expect("Ran out of space for copper lists"); let id = culist1.id;
958 assert_eq!(id, 1);
959 culist1.change_state(CopperListState::Processing);
960 assert_eq!(copperlists.available_copper_lists(), 0);
961 }
962
963 {
964 let copperlists = &mut runtime.copperlists_manager;
965 let culist2 = copperlists.inner.create();
966 assert!(culist2.is_none());
967 assert_eq!(copperlists.available_copper_lists(), 0);
968 let _ = copperlists.end_of_processing(1);
970 assert_eq!(copperlists.available_copper_lists(), 1);
971 }
972
973 {
975 let copperlists = &mut runtime.copperlists_manager;
976 let culist2 = copperlists
977 .inner
978 .create()
979 .expect("Ran out of space for copper lists"); let id = culist2.id;
981 assert_eq!(id, 2);
982 culist2.change_state(CopperListState::Processing);
983 assert_eq!(copperlists.available_copper_lists(), 0);
984 let _ = copperlists.end_of_processing(0);
986 assert_eq!(copperlists.available_copper_lists(), 0);
988
989 let _ = copperlists.end_of_processing(2);
991 assert_eq!(copperlists.available_copper_lists(), 2);
994 }
995 }
996
997 #[test]
998 fn test_runtime_task_input_order() {
999 let mut config = CuConfig::default();
1000 let graph = config.get_graph_mut(None).unwrap();
1001 let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
1002 let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
1003 let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
1004
1005 assert_eq!(src1_id, 0);
1006 assert_eq!(src2_id, 1);
1007
1008 let src1_type = "src1_type";
1010 let src2_type = "src2_type";
1011 graph.connect(src2_id, sink_id, src2_type).unwrap();
1012 graph.connect(src1_id, sink_id, src1_type).unwrap();
1013
1014 let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
1015 let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
1016 assert_eq!(src1_edge_id, 1);
1019 assert_eq!(src2_edge_id, 0);
1020
1021 let runtime = compute_runtime_plan(graph).unwrap();
1022 let sink_step = runtime
1023 .steps
1024 .iter()
1025 .find_map(|step| match step {
1026 CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
1027 _ => None,
1028 })
1029 .unwrap();
1030
1031 assert_eq!(sink_step.input_msg_indices_types[0].1, src2_type);
1034 assert_eq!(sink_step.input_msg_indices_types[1].1, src1_type);
1035 }
1036
1037 #[test]
1038 fn test_runtime_plan_diamond_case1() {
1039 let mut config = CuConfig::default();
1041 let graph = config.get_graph_mut(None).unwrap();
1042 let cam0_id = graph
1043 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1044 .unwrap();
1045 let inf0_id = graph
1046 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1047 .unwrap();
1048 let broadcast_id = graph
1049 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1050 .unwrap();
1051
1052 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1054 graph.connect(cam0_id, inf0_id, "i32").unwrap();
1055 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1056
1057 let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1058 let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
1059
1060 assert_eq!(edge_cam0_to_inf0, 0);
1061 assert_eq!(edge_cam0_to_broadcast, 1);
1062
1063 let runtime = compute_runtime_plan(graph).unwrap();
1064 let broadcast_step = runtime
1065 .steps
1066 .iter()
1067 .find_map(|step| match step {
1068 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1069 _ => None,
1070 })
1071 .unwrap();
1072
1073 assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
1074 assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
1075 }
1076
1077 #[test]
1078 fn test_runtime_plan_diamond_case2() {
1079 let mut config = CuConfig::default();
1081 let graph = config.get_graph_mut(None).unwrap();
1082 let cam0_id = graph
1083 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1084 .unwrap();
1085 let inf0_id = graph
1086 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1087 .unwrap();
1088 let broadcast_id = graph
1089 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1090 .unwrap();
1091
1092 graph.connect(cam0_id, inf0_id, "i32").unwrap();
1094 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1095 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1096
1097 let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1098 let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
1099
1100 assert_eq!(edge_cam0_to_broadcast, 0);
1101 assert_eq!(edge_cam0_to_inf0, 1);
1102
1103 let runtime = compute_runtime_plan(graph).unwrap();
1104 let broadcast_step = runtime
1105 .steps
1106 .iter()
1107 .find_map(|step| match step {
1108 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1109 _ => None,
1110 })
1111 .unwrap();
1112
1113 assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
1114 assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
1115 }
1116}