1use crate::config::{ComponentConfig, CuDirection, DEFAULT_KEYFRAME_INTERVAL, Node};
6use crate::config::{CuConfig, CuGraph, NodeId, RuntimeConfig};
7use crate::copperlist::{CopperList, CopperListState, CuListZeroedInit, CuListsManager};
8use crate::cutask::{BincodeAdapter, Freezable};
9use crate::monitoring::{CuMonitor, build_monitor_topology};
10use crate::resource::ResourceManager;
11use cu29_clock::{ClockProvider, CuTime, RobotClock};
12use cu29_traits::CuResult;
13use cu29_traits::WriteStream;
14use cu29_traits::{CopperListTuple, CuError};
15
16#[cfg(target_os = "none")]
17#[allow(unused_imports)]
18use cu29_log::{ANONYMOUS, CuLogEntry, CuLogLevel};
19#[cfg(target_os = "none")]
20#[allow(unused_imports)]
21use cu29_log_derive::info;
22#[cfg(target_os = "none")]
23#[allow(unused_imports)]
24use cu29_log_runtime::log;
25#[cfg(all(target_os = "none", debug_assertions))]
26#[allow(unused_imports)]
27use cu29_log_runtime::log_debug_mode;
28#[cfg(target_os = "none")]
29#[allow(unused_imports)]
30use cu29_value::to_value;
31
32use alloc::boxed::Box;
33use alloc::collections::VecDeque;
34use alloc::format;
35use alloc::string::{String, ToString};
36use alloc::vec::Vec;
37use bincode::enc::EncoderImpl;
38use bincode::enc::write::{SizeWriter, SliceWriter};
39use bincode::error::EncodeError;
40use bincode::{Decode, Encode};
41use core::fmt::Result as FmtResult;
42use core::fmt::{Debug, Formatter};
43
44#[cfg(feature = "std")]
45use cu29_log_runtime::LoggerRuntime;
46#[cfg(feature = "std")]
47use cu29_unifiedlog::UnifiedLoggerWrite;
48#[cfg(feature = "std")]
49use std::sync::{Arc, Mutex};
50
51#[cfg(feature = "std")]
53pub struct CopperContext {
54 pub unified_logger: Arc<Mutex<UnifiedLoggerWrite>>,
55 pub logger_runtime: LoggerRuntime,
56 pub clock: RobotClock,
57}
58
59pub struct CopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
61 pub inner: CuListsManager<P, NBCL>,
62 pub logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
64}
65
66impl<P: CopperListTuple + Default, const NBCL: usize> CopperListsManager<P, NBCL> {
67 pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
68 let mut is_top = true;
69 let mut nb_done = 0;
70 for cl in self.inner.iter_mut() {
71 if cl.id == culistid && cl.get_state() == CopperListState::Processing {
72 cl.change_state(CopperListState::DoneProcessing);
73 }
74 if is_top && cl.get_state() == CopperListState::DoneProcessing {
75 if let Some(logger) = &mut self.logger {
76 cl.change_state(CopperListState::BeingSerialized);
77 logger.log(cl)?;
78 }
79 cl.change_state(CopperListState::Free);
80 nb_done += 1;
81 } else {
82 is_top = false;
83 }
84 }
85 for _ in 0..nb_done {
86 let _ = self.inner.pop();
87 }
88 Ok(())
89 }
90
91 pub fn available_copper_lists(&self) -> usize {
92 NBCL - self.inner.len()
93 }
94}
95
96pub struct KeyFramesManager {
98 inner: KeyFrame,
100
101 logger: Option<Box<dyn WriteStream<KeyFrame>>>,
103
104 keyframe_interval: u32,
106}
107
108impl KeyFramesManager {
109 fn is_keyframe(&self, culistid: u32) -> bool {
110 self.logger.is_some() && culistid.is_multiple_of(self.keyframe_interval)
111 }
112
113 pub fn reset(&mut self, culistid: u32, clock: &RobotClock) {
114 if self.is_keyframe(culistid) {
115 self.inner.reset(culistid, clock.now());
116 }
117 }
118
119 pub fn freeze_task(&mut self, culistid: u32, task: &impl Freezable) -> CuResult<usize> {
120 if self.is_keyframe(culistid) {
121 if self.inner.culistid != culistid {
122 panic!("Freezing task for a different culistid");
123 }
124 self.inner
125 .add_frozen_task(task)
126 .map_err(|e| CuError::from(format!("Failed to serialize task: {e}")))
127 } else {
128 Ok(0)
129 }
130 }
131
132 pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
133 if self.is_keyframe(culistid) {
134 let logger = self.logger.as_mut().unwrap();
135 logger.log(&self.inner)
136 } else {
137 Ok(())
138 }
139 }
140}
141
142pub struct CuRuntime<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
146 pub clock: RobotClock, pub tasks: CT,
151
152 pub bridges: CB,
154
155 pub resources: ResourceManager,
157
158 pub monitor: M,
160
161 pub copperlists_manager: CopperListsManager<P, NBCL>,
163
164 pub keyframes_manager: KeyFramesManager,
166
167 pub runtime_config: RuntimeConfig,
169}
170
171impl<CT, CB, P: CopperListTuple + CuListZeroedInit + Default, M: CuMonitor, const NBCL: usize>
173 ClockProvider for CuRuntime<CT, CB, P, M, NBCL>
174{
175 fn get_clock(&self) -> RobotClock {
176 self.clock.clone()
177 }
178}
179
180#[derive(Encode, Decode)]
184pub struct KeyFrame {
185 pub culistid: u32,
187 pub timestamp: CuTime,
189 pub serialized_tasks: Vec<u8>,
191}
192
193impl KeyFrame {
194 fn new() -> Self {
195 KeyFrame {
196 culistid: 0,
197 timestamp: CuTime::default(),
198 serialized_tasks: Vec::new(),
199 }
200 }
201
202 fn reset(&mut self, culistid: u32, timestamp: CuTime) {
204 self.culistid = culistid;
205 self.timestamp = timestamp;
206 self.serialized_tasks.clear();
207 }
208
209 fn add_frozen_task(&mut self, task: &impl Freezable) -> Result<usize, EncodeError> {
211 let cfg = bincode::config::standard();
212 let mut sizer = EncoderImpl::<_, _>::new(SizeWriter::default(), cfg);
213 BincodeAdapter(task).encode(&mut sizer)?;
214 let need = sizer.into_writer().bytes_written as usize;
215
216 let start = self.serialized_tasks.len();
217 self.serialized_tasks.resize(start + need, 0);
218 let mut enc =
219 EncoderImpl::<_, _>::new(SliceWriter::new(&mut self.serialized_tasks[start..]), cfg);
220 BincodeAdapter(task).encode(&mut enc)?;
221 Ok(need)
222 }
223}
224
225impl<
226 CT,
227 CB,
228 P: CopperListTuple + CuListZeroedInit + Default + 'static,
229 M: CuMonitor,
230 const NBCL: usize,
231> CuRuntime<CT, CB, P, M, NBCL>
232{
233 #[allow(clippy::too_many_arguments)]
235 #[cfg(feature = "std")]
236 pub fn new(
237 clock: RobotClock,
238 config: &CuConfig,
239 mission: Option<&str>,
240 resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
241 tasks_instanciator: impl for<'c> Fn(
242 Vec<Option<&'c ComponentConfig>>,
243 &mut ResourceManager,
244 ) -> CuResult<CT>,
245 monitor_instanciator: impl Fn(&CuConfig) -> M,
246 bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
247 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
248 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
249 ) -> CuResult<Self> {
250 let resources = resources_instanciator(config)?;
251 Self::new_with_resources(
252 clock,
253 config,
254 mission,
255 resources,
256 tasks_instanciator,
257 monitor_instanciator,
258 bridges_instanciator,
259 copperlists_logger,
260 keyframes_logger,
261 )
262 }
263
264 #[allow(clippy::too_many_arguments)]
265 #[cfg(feature = "std")]
266 pub fn new_with_resources(
267 clock: RobotClock,
268 config: &CuConfig,
269 mission: Option<&str>,
270 mut resources: ResourceManager,
271 tasks_instanciator: impl for<'c> Fn(
272 Vec<Option<&'c ComponentConfig>>,
273 &mut ResourceManager,
274 ) -> CuResult<CT>,
275 monitor_instanciator: impl Fn(&CuConfig) -> M,
276 bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
277 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
278 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
279 ) -> CuResult<Self> {
280 let graph = config.get_graph(mission)?;
281 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
282 .get_all_nodes()
283 .iter()
284 .map(|(_, node)| node.get_instance_config())
285 .collect();
286
287 let tasks = tasks_instanciator(all_instances_configs, &mut resources)?;
288 let mut monitor = monitor_instanciator(config);
289 if let Ok(topology) = build_monitor_topology(config, mission) {
290 monitor.set_topology(topology);
291 }
292 let bridges = bridges_instanciator(config, &mut resources)?;
293
294 let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
295 Some(logging_config) if logging_config.enable_task_logging => (
296 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
297 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
298 logging_config.keyframe_interval.unwrap(), ),
300 Some(_) => (None, None, 0), None => (
302 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
304 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
305 DEFAULT_KEYFRAME_INTERVAL,
306 ),
307 };
308
309 let copperlists_manager = CopperListsManager {
310 inner: CuListsManager::new(),
311 logger: copperlists_logger,
312 };
313 #[cfg(target_os = "none")]
314 {
315 let cl_size = core::mem::size_of::<CopperList<P>>();
316 let total_bytes = cl_size.saturating_mul(NBCL);
317 info!(
318 "CuRuntime::new: copperlists count={} cl_size={} total_bytes={}",
319 NBCL, cl_size, total_bytes
320 );
321 }
322
323 let keyframes_manager = KeyFramesManager {
324 inner: KeyFrame::new(),
325 logger: keyframes_logger,
326 keyframe_interval,
327 };
328
329 let runtime_config = config.runtime.clone().unwrap_or_default();
330
331 let runtime = Self {
332 tasks,
333 bridges,
334 resources,
335 monitor,
336 clock,
337 copperlists_manager,
338 keyframes_manager,
339 runtime_config,
340 };
341
342 Ok(runtime)
343 }
344
345 #[allow(clippy::too_many_arguments)]
346 #[cfg(not(feature = "std"))]
347 pub fn new(
348 clock: RobotClock,
349 config: &CuConfig,
350 mission: Option<&str>,
351 resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
352 tasks_instanciator: impl for<'c> Fn(
353 Vec<Option<&'c ComponentConfig>>,
354 &mut ResourceManager,
355 ) -> CuResult<CT>,
356 monitor_instanciator: impl Fn(&CuConfig) -> M,
357 bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
358 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
359 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
360 ) -> CuResult<Self> {
361 #[cfg(target_os = "none")]
362 info!("CuRuntime::new: resources instanciator");
363 let resources = resources_instanciator(config)?;
364 Self::new_with_resources(
365 clock,
366 config,
367 mission,
368 resources,
369 tasks_instanciator,
370 monitor_instanciator,
371 bridges_instanciator,
372 copperlists_logger,
373 keyframes_logger,
374 )
375 }
376
377 #[allow(clippy::too_many_arguments)]
378 #[cfg(not(feature = "std"))]
379 pub fn new_with_resources(
380 clock: RobotClock,
381 config: &CuConfig,
382 mission: Option<&str>,
383 mut resources: ResourceManager,
384 tasks_instanciator: impl for<'c> Fn(
385 Vec<Option<&'c ComponentConfig>>,
386 &mut ResourceManager,
387 ) -> CuResult<CT>,
388 monitor_instanciator: impl Fn(&CuConfig) -> M,
389 bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
390 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
391 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
392 ) -> CuResult<Self> {
393 #[cfg(target_os = "none")]
394 info!("CuRuntime::new: get graph");
395 let graph = config.get_graph(mission)?;
396 #[cfg(target_os = "none")]
397 info!("CuRuntime::new: graph ok");
398 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
399 .get_all_nodes()
400 .iter()
401 .map(|(_, node)| node.get_instance_config())
402 .collect();
403
404 #[cfg(target_os = "none")]
405 info!("CuRuntime::new: tasks instanciator");
406 let tasks = tasks_instanciator(all_instances_configs, &mut resources)?;
407
408 #[cfg(target_os = "none")]
409 info!("CuRuntime::new: monitor instanciator");
410 let mut monitor = monitor_instanciator(config);
411 #[cfg(target_os = "none")]
412 info!("CuRuntime::new: monitor instanciator ok");
413 #[cfg(target_os = "none")]
414 info!("CuRuntime::new: build monitor topology");
415 if let Ok(topology) = build_monitor_topology(config, mission) {
416 #[cfg(target_os = "none")]
417 info!("CuRuntime::new: monitor topology ok");
418 monitor.set_topology(topology);
419 #[cfg(target_os = "none")]
420 info!("CuRuntime::new: monitor topology set");
421 }
422 #[cfg(target_os = "none")]
423 info!("CuRuntime::new: bridges instanciator");
424 let bridges = bridges_instanciator(config, &mut resources)?;
425
426 let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
427 Some(logging_config) if logging_config.enable_task_logging => (
428 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
429 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
430 logging_config.keyframe_interval.unwrap(), ),
432 Some(_) => (None, None, 0), None => (
434 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
436 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
437 DEFAULT_KEYFRAME_INTERVAL,
438 ),
439 };
440
441 let copperlists_manager = CopperListsManager {
442 inner: CuListsManager::new(),
443 logger: copperlists_logger,
444 };
445 #[cfg(target_os = "none")]
446 {
447 let cl_size = core::mem::size_of::<CopperList<P>>();
448 let total_bytes = cl_size.saturating_mul(NBCL);
449 info!(
450 "CuRuntime::new: copperlists count={} cl_size={} total_bytes={}",
451 NBCL, cl_size, total_bytes
452 );
453 }
454
455 let keyframes_manager = KeyFramesManager {
456 inner: KeyFrame::new(),
457 logger: keyframes_logger,
458 keyframe_interval,
459 };
460
461 let runtime_config = config.runtime.clone().unwrap_or_default();
462
463 let runtime = Self {
464 tasks,
465 bridges,
466 resources,
467 monitor,
468 clock,
469 copperlists_manager,
470 keyframes_manager,
471 runtime_config,
472 };
473
474 Ok(runtime)
475 }
476}
477
478#[derive(Debug, PartialEq, Eq, Clone, Copy)]
483pub enum CuTaskType {
484 Source,
485 Regular,
486 Sink,
487}
488
489pub struct CuExecutionStep {
491 pub node_id: NodeId,
493 pub node: Node,
495 pub task_type: CuTaskType,
497
498 pub input_msg_indices_types: Vec<(u32, String)>,
500
501 pub output_msg_index_type: Option<(u32, String)>,
503}
504
505impl Debug for CuExecutionStep {
506 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
507 f.write_str(format!(" CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
508 f.write_str(format!(" task_type: {:?}\n", self.node.get_type()).as_str())?;
509 f.write_str(format!(" task: {:?}\n", self.task_type).as_str())?;
510 f.write_str(
511 format!(
512 " input_msg_types: {:?}\n",
513 self.input_msg_indices_types
514 )
515 .as_str(),
516 )?;
517 f.write_str(
518 format!(" output_msg_type: {:?}\n", self.output_msg_index_type).as_str(),
519 )?;
520 Ok(())
521 }
522}
523
524pub struct CuExecutionLoop {
529 pub steps: Vec<CuExecutionUnit>,
530 pub loop_count: Option<u32>,
531}
532
533impl Debug for CuExecutionLoop {
534 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
535 f.write_str("CuExecutionLoop:\n")?;
536 for step in &self.steps {
537 match step {
538 CuExecutionUnit::Step(step) => {
539 step.fmt(f)?;
540 }
541 CuExecutionUnit::Loop(l) => {
542 l.fmt(f)?;
543 }
544 }
545 }
546
547 f.write_str(format!(" count: {:?}", self.loop_count).as_str())?;
548 Ok(())
549 }
550}
551
552#[derive(Debug)]
554pub enum CuExecutionUnit {
555 Step(CuExecutionStep),
556 Loop(CuExecutionLoop),
557}
558
559fn find_output_index_type_from_nodeid(
560 node_id: NodeId,
561 steps: &Vec<CuExecutionUnit>,
562) -> Option<(u32, String)> {
563 for step in steps {
564 match step {
565 CuExecutionUnit::Loop(loop_unit) => {
566 if let Some(index) = find_output_index_type_from_nodeid(node_id, &loop_unit.steps) {
567 return Some(index);
568 }
569 }
570 CuExecutionUnit::Step(step) => {
571 if step.node_id == node_id {
572 return step.output_msg_index_type.clone();
573 }
574 }
575 }
576 }
577 None
578}
579
580pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuTaskType {
581 if graph.incoming_neighbor_count(node_id) == 0 {
582 CuTaskType::Source
583 } else if graph.outgoing_neighbor_count(node_id) == 0 {
584 CuTaskType::Sink
585 } else {
586 CuTaskType::Regular
587 }
588}
589
590fn find_edge_with_plan_input_id(
593 plan: &[CuExecutionUnit],
594 graph: &CuGraph,
595 plan_id: u32,
596 output_node_id: NodeId,
597) -> usize {
598 let input_node = plan
599 .get(plan_id as usize)
600 .expect("Input step should've been added to plan before the step that receives the input");
601 let CuExecutionUnit::Step(input_step) = input_node else {
602 panic!("Expected input to be from a step, not a loop");
603 };
604 let input_node_id = input_step.node_id;
605
606 graph
607 .edge_id_between(input_node_id, output_node_id)
608 .expect("An edge connecting the input to the output should exist")
609}
610
611fn sort_inputs_by_cnx_id(
614 input_msg_indices_types: &mut [(u32, String)],
615 plan: &[CuExecutionUnit],
616 graph: &CuGraph,
617 curr_node_id: NodeId,
618) {
619 input_msg_indices_types.sort_by(|(a_index, _), (b_index, _)| {
620 let a_edge_id = find_edge_with_plan_input_id(plan, graph, *a_index, curr_node_id);
621 let b_edge_id = find_edge_with_plan_input_id(plan, graph, *b_index, curr_node_id);
622 a_edge_id.cmp(&b_edge_id)
623 });
624}
625fn plan_tasks_tree_branch(
627 graph: &CuGraph,
628 mut next_culist_output_index: u32,
629 starting_point: NodeId,
630 plan: &mut Vec<CuExecutionUnit>,
631) -> (u32, bool) {
632 #[cfg(all(feature = "std", feature = "macro_debug"))]
633 eprintln!("-- starting branch from node {starting_point}");
634
635 let mut handled = false;
636
637 for id in graph.bfs_nodes(starting_point) {
638 let node_ref = graph.get_node(id).unwrap();
639 #[cfg(all(feature = "std", feature = "macro_debug"))]
640 eprintln!(" Visiting node: {node_ref:?}");
641
642 let mut input_msg_indices_types: Vec<(u32, String)> = Vec::new();
643 let output_msg_index_type: Option<(u32, String)>;
644 let task_type = find_task_type_for_id(graph, id);
645
646 match task_type {
647 CuTaskType::Source => {
648 #[cfg(all(feature = "std", feature = "macro_debug"))]
649 eprintln!(" → Source node, assign output index {next_culist_output_index}");
650 let edge_id = graph.get_src_edges(id).unwrap()[0];
651 output_msg_index_type = Some((
652 next_culist_output_index,
653 graph
654 .edge(edge_id)
655 .unwrap() .msg
657 .clone(),
658 ));
659 next_culist_output_index += 1;
660 }
661 CuTaskType::Sink => {
662 let parents: Vec<NodeId> = graph.get_neighbor_ids(id, CuDirection::Incoming);
663 #[cfg(all(feature = "std", feature = "macro_debug"))]
664 eprintln!(" → Sink with parents: {parents:?}");
665 for parent in parents {
666 let pid = parent;
667 let index_type = find_output_index_type_from_nodeid(pid, plan);
668 if let Some(index_type) = index_type {
669 #[cfg(all(feature = "std", feature = "macro_debug"))]
670 eprintln!(" ✓ Input from {pid} ready: {index_type:?}");
671 input_msg_indices_types.push(index_type);
672 } else {
673 #[cfg(all(feature = "std", feature = "macro_debug"))]
674 eprintln!(" ✗ Input from {pid} not ready, returning");
675 return (next_culist_output_index, handled);
676 }
677 }
678 output_msg_index_type = Some((next_culist_output_index, "()".to_string()));
679 next_culist_output_index += 1;
680 }
681 CuTaskType::Regular => {
682 let parents: Vec<NodeId> = graph.get_neighbor_ids(id, CuDirection::Incoming);
683 #[cfg(all(feature = "std", feature = "macro_debug"))]
684 eprintln!(" → Regular task with parents: {parents:?}");
685 for parent in parents {
686 let pid = parent;
687 let index_type = find_output_index_type_from_nodeid(pid, plan);
688 if let Some(index_type) = index_type {
689 #[cfg(all(feature = "std", feature = "macro_debug"))]
690 eprintln!(" ✓ Input from {pid} ready: {index_type:?}");
691 input_msg_indices_types.push(index_type);
692 } else {
693 #[cfg(all(feature = "std", feature = "macro_debug"))]
694 eprintln!(" ✗ Input from {pid} not ready, returning");
695 return (next_culist_output_index, handled);
696 }
697 }
698 let edge_id = graph.get_src_edges(id).unwrap()[0];
699 output_msg_index_type = Some((
700 next_culist_output_index,
701 graph
702 .edge(edge_id) .unwrap()
704 .msg
705 .clone(),
706 ));
707 next_culist_output_index += 1;
708 }
709 }
710
711 sort_inputs_by_cnx_id(&mut input_msg_indices_types, plan, graph, id);
712
713 if let Some(pos) = plan
714 .iter()
715 .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
716 {
717 #[cfg(all(feature = "std", feature = "macro_debug"))]
718 eprintln!(" → Already in plan, modifying existing step");
719 let mut step = plan.remove(pos);
720 if let CuExecutionUnit::Step(ref mut s) = step {
721 s.input_msg_indices_types = input_msg_indices_types;
722 }
723 plan.push(step);
724 } else {
725 #[cfg(all(feature = "std", feature = "macro_debug"))]
726 eprintln!(" → New step added to plan");
727 let step = CuExecutionStep {
728 node_id: id,
729 node: node_ref.clone(),
730 task_type,
731 input_msg_indices_types,
732 output_msg_index_type,
733 };
734 plan.push(CuExecutionUnit::Step(step));
735 }
736
737 handled = true;
738 }
739
740 #[cfg(all(feature = "std", feature = "macro_debug"))]
741 eprintln!("-- finished branch from node {starting_point} with handled={handled}");
742 (next_culist_output_index, handled)
743}
744
745pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
748 #[cfg(all(feature = "std", feature = "macro_debug"))]
749 eprintln!("[runtime plan]");
750 let mut plan = Vec::new();
751 let mut next_culist_output_index = 0u32;
752
753 let mut queue: VecDeque<NodeId> = graph
754 .node_ids()
755 .into_iter()
756 .filter(|&node_id| find_task_type_for_id(graph, node_id) == CuTaskType::Source)
757 .collect();
758
759 #[cfg(all(feature = "std", feature = "macro_debug"))]
760 eprintln!("Initial source nodes: {queue:?}");
761
762 while let Some(start_node) = queue.pop_front() {
763 #[cfg(all(feature = "std", feature = "macro_debug"))]
764 eprintln!("→ Starting BFS from source {start_node}");
765 for node_id in graph.bfs_nodes(start_node) {
766 let already_in_plan = plan
767 .iter()
768 .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
769 if already_in_plan {
770 #[cfg(all(feature = "std", feature = "macro_debug"))]
771 eprintln!(" → Node {node_id} already planned, skipping");
772 continue;
773 }
774
775 #[cfg(all(feature = "std", feature = "macro_debug"))]
776 eprintln!(" Planning from node {node_id}");
777 let (new_index, handled) =
778 plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan);
779 next_culist_output_index = new_index;
780
781 if !handled {
782 #[cfg(all(feature = "std", feature = "macro_debug"))]
783 eprintln!(" ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
784 continue;
785 }
786
787 #[cfg(all(feature = "std", feature = "macro_debug"))]
788 eprintln!(" ✓ Node {node_id} handled successfully, enqueueing neighbors");
789 for neighbor in graph.get_neighbor_ids(node_id, CuDirection::Outgoing) {
790 #[cfg(all(feature = "std", feature = "macro_debug"))]
791 eprintln!(" → Enqueueing neighbor {neighbor}");
792 queue.push_back(neighbor);
793 }
794 }
795 }
796
797 Ok(CuExecutionLoop {
798 steps: plan,
799 loop_count: None,
800 })
801}
802
803#[cfg(test)]
805mod tests {
806 use super::*;
807 use crate::config::Node;
808 use crate::cutask::CuSinkTask;
809 use crate::cutask::{CuSrcTask, Freezable};
810 use crate::monitoring::NoMonitor;
811 use bincode::Encode;
812 use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks};
813 use serde_derive::Serialize;
814
815 pub struct TestSource {}
816
817 impl Freezable for TestSource {}
818
819 impl CuSrcTask for TestSource {
820 type Resources<'r> = ();
821 type Output<'m> = ();
822 fn new_with(
823 _config: Option<&ComponentConfig>,
824 _resources: Self::Resources<'_>,
825 ) -> CuResult<Self>
826 where
827 Self: Sized,
828 {
829 Ok(Self {})
830 }
831
832 fn process(
833 &mut self,
834 _clock: &RobotClock,
835 _empty_msg: &mut Self::Output<'_>,
836 ) -> CuResult<()> {
837 Ok(())
838 }
839 }
840
841 pub struct TestSink {}
842
843 impl Freezable for TestSink {}
844
845 impl CuSinkTask for TestSink {
846 type Resources<'r> = ();
847 type Input<'m> = ();
848
849 fn new_with(
850 _config: Option<&ComponentConfig>,
851 _resources: Self::Resources<'_>,
852 ) -> CuResult<Self>
853 where
854 Self: Sized,
855 {
856 Ok(Self {})
857 }
858
859 fn process(&mut self, _clock: &RobotClock, _input: &Self::Input<'_>) -> CuResult<()> {
860 Ok(())
861 }
862 }
863
864 type Tasks = (TestSource, TestSink);
866
867 #[derive(Debug, Encode, Decode, Serialize, Default)]
868 struct Msgs(());
869
870 impl ErasedCuStampedDataSet for Msgs {
871 fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
872 Vec::new()
873 }
874 }
875
876 impl MatchingTasks for Msgs {
877 fn get_all_task_ids() -> &'static [&'static str] {
878 &[]
879 }
880 }
881
882 impl CuListZeroedInit for Msgs {
883 fn init_zeroed(&mut self) {}
884 }
885
886 #[cfg(feature = "std")]
887 fn tasks_instanciator(
888 all_instances_configs: Vec<Option<&ComponentConfig>>,
889 _resources: &mut ResourceManager,
890 ) -> CuResult<Tasks> {
891 Ok((
892 TestSource::new(all_instances_configs[0])?,
893 TestSink::new(all_instances_configs[1])?,
894 ))
895 }
896
897 #[cfg(not(feature = "std"))]
898 fn tasks_instanciator(
899 all_instances_configs: Vec<Option<&ComponentConfig>>,
900 _resources: &mut ResourceManager,
901 ) -> CuResult<Tasks> {
902 Ok((
903 TestSource::new(all_instances_configs[0])?,
904 TestSink::new(all_instances_configs[1])?,
905 ))
906 }
907
908 fn monitor_instanciator(_config: &CuConfig) -> NoMonitor {
909 NoMonitor {}
910 }
911
912 fn bridges_instanciator(_config: &CuConfig, _resources: &mut ResourceManager) -> CuResult<()> {
913 Ok(())
914 }
915
916 fn resources_instanciator(_config: &CuConfig) -> CuResult<ResourceManager> {
917 Ok(ResourceManager::new(&[]))
918 }
919
920 #[derive(Debug)]
921 struct FakeWriter {}
922
923 impl<E: Encode> WriteStream<E> for FakeWriter {
924 fn log(&mut self, _obj: &E) -> CuResult<()> {
925 Ok(())
926 }
927 }
928
929 #[test]
930 fn test_runtime_instantiation() {
931 let mut config = CuConfig::default();
932 let graph = config.get_graph_mut(None).unwrap();
933 graph.add_node(Node::new("a", "TestSource")).unwrap();
934 graph.add_node(Node::new("b", "TestSink")).unwrap();
935 graph.connect(0, 1, "()").unwrap();
936 let runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
937 RobotClock::default(),
938 &config,
939 None,
940 resources_instanciator,
941 tasks_instanciator,
942 monitor_instanciator,
943 bridges_instanciator,
944 FakeWriter {},
945 FakeWriter {},
946 );
947 assert!(runtime.is_ok());
948 }
949
950 #[test]
951 fn test_copperlists_manager_lifecycle() {
952 let mut config = CuConfig::default();
953 let graph = config.get_graph_mut(None).unwrap();
954 graph.add_node(Node::new("a", "TestSource")).unwrap();
955 graph.add_node(Node::new("b", "TestSink")).unwrap();
956 graph.connect(0, 1, "()").unwrap();
957
958 let mut runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
959 RobotClock::default(),
960 &config,
961 None,
962 resources_instanciator,
963 tasks_instanciator,
964 monitor_instanciator,
965 bridges_instanciator,
966 FakeWriter {},
967 FakeWriter {},
968 )
969 .unwrap();
970
971 {
973 let copperlists = &mut runtime.copperlists_manager;
974 let culist0 = copperlists
975 .inner
976 .create()
977 .expect("Ran out of space for copper lists");
978 let id = culist0.id;
980 assert_eq!(id, 0);
981 culist0.change_state(CopperListState::Processing);
982 assert_eq!(copperlists.available_copper_lists(), 1);
983 }
984
985 {
986 let copperlists = &mut runtime.copperlists_manager;
987 let culist1 = copperlists
988 .inner
989 .create()
990 .expect("Ran out of space for copper lists"); let id = culist1.id;
992 assert_eq!(id, 1);
993 culist1.change_state(CopperListState::Processing);
994 assert_eq!(copperlists.available_copper_lists(), 0);
995 }
996
997 {
998 let copperlists = &mut runtime.copperlists_manager;
999 let culist2 = copperlists.inner.create();
1000 assert!(culist2.is_none());
1001 assert_eq!(copperlists.available_copper_lists(), 0);
1002 let _ = copperlists.end_of_processing(1);
1004 assert_eq!(copperlists.available_copper_lists(), 1);
1005 }
1006
1007 {
1009 let copperlists = &mut runtime.copperlists_manager;
1010 let culist2 = copperlists
1011 .inner
1012 .create()
1013 .expect("Ran out of space for copper lists"); let id = culist2.id;
1015 assert_eq!(id, 2);
1016 culist2.change_state(CopperListState::Processing);
1017 assert_eq!(copperlists.available_copper_lists(), 0);
1018 let _ = copperlists.end_of_processing(0);
1020 assert_eq!(copperlists.available_copper_lists(), 0);
1022
1023 let _ = copperlists.end_of_processing(2);
1025 assert_eq!(copperlists.available_copper_lists(), 2);
1028 }
1029 }
1030
1031 #[test]
1032 fn test_runtime_task_input_order() {
1033 let mut config = CuConfig::default();
1034 let graph = config.get_graph_mut(None).unwrap();
1035 let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
1036 let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
1037 let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
1038
1039 assert_eq!(src1_id, 0);
1040 assert_eq!(src2_id, 1);
1041
1042 let src1_type = "src1_type";
1044 let src2_type = "src2_type";
1045 graph.connect(src2_id, sink_id, src2_type).unwrap();
1046 graph.connect(src1_id, sink_id, src1_type).unwrap();
1047
1048 let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
1049 let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
1050 assert_eq!(src1_edge_id, 1);
1053 assert_eq!(src2_edge_id, 0);
1054
1055 let runtime = compute_runtime_plan(graph).unwrap();
1056 let sink_step = runtime
1057 .steps
1058 .iter()
1059 .find_map(|step| match step {
1060 CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
1061 _ => None,
1062 })
1063 .unwrap();
1064
1065 assert_eq!(sink_step.input_msg_indices_types[0].1, src2_type);
1068 assert_eq!(sink_step.input_msg_indices_types[1].1, src1_type);
1069 }
1070
1071 #[test]
1072 fn test_runtime_plan_diamond_case1() {
1073 let mut config = CuConfig::default();
1075 let graph = config.get_graph_mut(None).unwrap();
1076 let cam0_id = graph
1077 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1078 .unwrap();
1079 let inf0_id = graph
1080 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1081 .unwrap();
1082 let broadcast_id = graph
1083 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1084 .unwrap();
1085
1086 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1088 graph.connect(cam0_id, inf0_id, "i32").unwrap();
1089 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1090
1091 let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1092 let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
1093
1094 assert_eq!(edge_cam0_to_inf0, 0);
1095 assert_eq!(edge_cam0_to_broadcast, 1);
1096
1097 let runtime = compute_runtime_plan(graph).unwrap();
1098 let broadcast_step = runtime
1099 .steps
1100 .iter()
1101 .find_map(|step| match step {
1102 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1103 _ => None,
1104 })
1105 .unwrap();
1106
1107 assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
1108 assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
1109 }
1110
1111 #[test]
1112 fn test_runtime_plan_diamond_case2() {
1113 let mut config = CuConfig::default();
1115 let graph = config.get_graph_mut(None).unwrap();
1116 let cam0_id = graph
1117 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1118 .unwrap();
1119 let inf0_id = graph
1120 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1121 .unwrap();
1122 let broadcast_id = graph
1123 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1124 .unwrap();
1125
1126 graph.connect(cam0_id, inf0_id, "i32").unwrap();
1128 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1129 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1130
1131 let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1132 let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
1133
1134 assert_eq!(edge_cam0_to_broadcast, 0);
1135 assert_eq!(edge_cam0_to_inf0, 1);
1136
1137 let runtime = compute_runtime_plan(graph).unwrap();
1138 let broadcast_step = runtime
1139 .steps
1140 .iter()
1141 .find_map(|step| match step {
1142 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1143 _ => None,
1144 })
1145 .unwrap();
1146
1147 assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
1148 assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
1149 }
1150}