1use crate::config::{ComponentConfig, CuDirection, DEFAULT_KEYFRAME_INTERVAL, Node};
6use crate::config::{CuConfig, CuGraph, NodeId, RuntimeConfig};
7use crate::copperlist::{CopperList, CopperListState, CuListZeroedInit, CuListsManager};
8use crate::cutask::{BincodeAdapter, Freezable};
9use crate::monitoring::{CuMonitor, build_monitor_topology};
10use crate::resource::ResourceManager;
11use cu29_clock::{ClockProvider, CuTime, RobotClock};
12use cu29_traits::CuResult;
13use cu29_traits::WriteStream;
14use cu29_traits::{CopperListTuple, CuError};
15
16#[cfg(target_os = "none")]
17#[allow(unused_imports)]
18use cu29_log::{ANONYMOUS, CuLogEntry, CuLogLevel};
19#[cfg(target_os = "none")]
20#[allow(unused_imports)]
21use cu29_log_derive::info;
22#[cfg(target_os = "none")]
23#[allow(unused_imports)]
24use cu29_log_runtime::log;
25#[cfg(all(target_os = "none", debug_assertions))]
26#[allow(unused_imports)]
27use cu29_log_runtime::log_debug_mode;
28#[cfg(target_os = "none")]
29#[allow(unused_imports)]
30use cu29_value::to_value;
31
32use alloc::boxed::Box;
33use alloc::collections::VecDeque;
34use alloc::format;
35use alloc::string::{String, ToString};
36use alloc::vec::Vec;
37use bincode::enc::EncoderImpl;
38use bincode::enc::write::{SizeWriter, SliceWriter};
39use bincode::error::EncodeError;
40use bincode::{Decode, Encode};
41use core::fmt::Result as FmtResult;
42use core::fmt::{Debug, Formatter};
43use petgraph::prelude::*;
44use petgraph::visit::VisitMap;
45use petgraph::visit::Visitable;
46
47#[cfg(feature = "std")]
48use cu29_log_runtime::LoggerRuntime;
49#[cfg(feature = "std")]
50use cu29_unifiedlog::UnifiedLoggerWrite;
51#[cfg(feature = "std")]
52use std::sync::{Arc, Mutex};
53
54#[cfg(feature = "std")]
56pub struct CopperContext {
57 pub unified_logger: Arc<Mutex<UnifiedLoggerWrite>>,
58 pub logger_runtime: LoggerRuntime,
59 pub clock: RobotClock,
60}
61
62pub struct CopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
64 pub inner: CuListsManager<P, NBCL>,
65 pub logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
67}
68
69impl<P: CopperListTuple + Default, const NBCL: usize> CopperListsManager<P, NBCL> {
70 pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
71 let mut is_top = true;
72 let mut nb_done = 0;
73 for cl in self.inner.iter_mut() {
74 if cl.id == culistid && cl.get_state() == CopperListState::Processing {
75 cl.change_state(CopperListState::DoneProcessing);
76 }
77 if is_top && cl.get_state() == CopperListState::DoneProcessing {
78 if let Some(logger) = &mut self.logger {
79 cl.change_state(CopperListState::BeingSerialized);
80 logger.log(cl)?;
81 }
82 cl.change_state(CopperListState::Free);
83 nb_done += 1;
84 } else {
85 is_top = false;
86 }
87 }
88 for _ in 0..nb_done {
89 let _ = self.inner.pop();
90 }
91 Ok(())
92 }
93
94 pub fn available_copper_lists(&self) -> usize {
95 NBCL - self.inner.len()
96 }
97}
98
99pub struct KeyFramesManager {
101 inner: KeyFrame,
103
104 logger: Option<Box<dyn WriteStream<KeyFrame>>>,
106
107 keyframe_interval: u32,
109}
110
111impl KeyFramesManager {
112 fn is_keyframe(&self, culistid: u32) -> bool {
113 self.logger.is_some() && culistid.is_multiple_of(self.keyframe_interval)
114 }
115
116 pub fn reset(&mut self, culistid: u32, clock: &RobotClock) {
117 if self.is_keyframe(culistid) {
118 self.inner.reset(culistid, clock.now());
119 }
120 }
121
122 pub fn freeze_task(&mut self, culistid: u32, task: &impl Freezable) -> CuResult<usize> {
123 if self.is_keyframe(culistid) {
124 if self.inner.culistid != culistid {
125 panic!("Freezing task for a different culistid");
126 }
127 self.inner
128 .add_frozen_task(task)
129 .map_err(|e| CuError::from(format!("Failed to serialize task: {e}")))
130 } else {
131 Ok(0)
132 }
133 }
134
135 pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
136 if self.is_keyframe(culistid) {
137 let logger = self.logger.as_mut().unwrap();
138 logger.log(&self.inner)
139 } else {
140 Ok(())
141 }
142 }
143}
144
145pub struct CuRuntime<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
149 pub clock: RobotClock, pub tasks: CT,
154
155 pub bridges: CB,
157
158 pub resources: ResourceManager,
160
161 pub monitor: M,
163
164 pub copperlists_manager: CopperListsManager<P, NBCL>,
166
167 pub keyframes_manager: KeyFramesManager,
169
170 pub runtime_config: RuntimeConfig,
172}
173
174impl<CT, CB, P: CopperListTuple + CuListZeroedInit + Default, M: CuMonitor, const NBCL: usize>
176 ClockProvider for CuRuntime<CT, CB, P, M, NBCL>
177{
178 fn get_clock(&self) -> RobotClock {
179 self.clock.clone()
180 }
181}
182
183#[derive(Encode, Decode)]
187pub struct KeyFrame {
188 pub culistid: u32,
190 pub timestamp: CuTime,
192 pub serialized_tasks: Vec<u8>,
194}
195
196impl KeyFrame {
197 fn new() -> Self {
198 KeyFrame {
199 culistid: 0,
200 timestamp: CuTime::default(),
201 serialized_tasks: Vec::new(),
202 }
203 }
204
205 fn reset(&mut self, culistid: u32, timestamp: CuTime) {
207 self.culistid = culistid;
208 self.timestamp = timestamp;
209 self.serialized_tasks.clear();
210 }
211
212 fn add_frozen_task(&mut self, task: &impl Freezable) -> Result<usize, EncodeError> {
214 let cfg = bincode::config::standard();
215 let mut sizer = EncoderImpl::<_, _>::new(SizeWriter::default(), cfg);
216 BincodeAdapter(task).encode(&mut sizer)?;
217 let need = sizer.into_writer().bytes_written as usize;
218
219 let start = self.serialized_tasks.len();
220 self.serialized_tasks.resize(start + need, 0);
221 let mut enc =
222 EncoderImpl::<_, _>::new(SliceWriter::new(&mut self.serialized_tasks[start..]), cfg);
223 BincodeAdapter(task).encode(&mut enc)?;
224 Ok(need)
225 }
226}
227
228impl<
229 CT,
230 CB,
231 P: CopperListTuple + CuListZeroedInit + Default + 'static,
232 M: CuMonitor,
233 const NBCL: usize,
234> CuRuntime<CT, CB, P, M, NBCL>
235{
236 #[allow(clippy::too_many_arguments)]
238 #[cfg(feature = "std")]
239 pub fn new(
240 clock: RobotClock,
241 config: &CuConfig,
242 mission: Option<&str>,
243 resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
244 tasks_instanciator: impl for<'c> Fn(
245 Vec<Option<&'c ComponentConfig>>,
246 &mut ResourceManager,
247 ) -> CuResult<CT>,
248 monitor_instanciator: impl Fn(&CuConfig) -> M,
249 bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
250 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
251 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
252 ) -> CuResult<Self> {
253 let resources = resources_instanciator(config)?;
254 Self::new_with_resources(
255 clock,
256 config,
257 mission,
258 resources,
259 tasks_instanciator,
260 monitor_instanciator,
261 bridges_instanciator,
262 copperlists_logger,
263 keyframes_logger,
264 )
265 }
266
267 #[allow(clippy::too_many_arguments)]
268 #[cfg(feature = "std")]
269 pub fn new_with_resources(
270 clock: RobotClock,
271 config: &CuConfig,
272 mission: Option<&str>,
273 mut resources: ResourceManager,
274 tasks_instanciator: impl for<'c> Fn(
275 Vec<Option<&'c ComponentConfig>>,
276 &mut ResourceManager,
277 ) -> CuResult<CT>,
278 monitor_instanciator: impl Fn(&CuConfig) -> M,
279 bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
280 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
281 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
282 ) -> CuResult<Self> {
283 let graph = config.get_graph(mission)?;
284 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
285 .get_all_nodes()
286 .iter()
287 .map(|(_, node)| node.get_instance_config())
288 .collect();
289
290 let tasks = tasks_instanciator(all_instances_configs, &mut resources)?;
291 let mut monitor = monitor_instanciator(config);
292 if let Ok(topology) = build_monitor_topology(config, mission) {
293 monitor.set_topology(topology);
294 }
295 let bridges = bridges_instanciator(config, &mut resources)?;
296
297 let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
298 Some(logging_config) if logging_config.enable_task_logging => (
299 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
300 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
301 logging_config.keyframe_interval.unwrap(), ),
303 Some(_) => (None, None, 0), None => (
305 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
307 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
308 DEFAULT_KEYFRAME_INTERVAL,
309 ),
310 };
311
312 let copperlists_manager = CopperListsManager {
313 inner: CuListsManager::new(),
314 logger: copperlists_logger,
315 };
316 #[cfg(target_os = "none")]
317 {
318 let cl_size = core::mem::size_of::<CopperList<P>>();
319 let total_bytes = cl_size.saturating_mul(NBCL);
320 info!(
321 "CuRuntime::new: copperlists count={} cl_size={} total_bytes={}",
322 NBCL, cl_size, total_bytes
323 );
324 }
325
326 let keyframes_manager = KeyFramesManager {
327 inner: KeyFrame::new(),
328 logger: keyframes_logger,
329 keyframe_interval,
330 };
331
332 let runtime_config = config.runtime.clone().unwrap_or_default();
333
334 let runtime = Self {
335 tasks,
336 bridges,
337 resources,
338 monitor,
339 clock,
340 copperlists_manager,
341 keyframes_manager,
342 runtime_config,
343 };
344
345 Ok(runtime)
346 }
347
348 #[allow(clippy::too_many_arguments)]
349 #[cfg(not(feature = "std"))]
350 pub fn new(
351 clock: RobotClock,
352 config: &CuConfig,
353 mission: Option<&str>,
354 resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
355 tasks_instanciator: impl for<'c> Fn(
356 Vec<Option<&'c ComponentConfig>>,
357 &mut ResourceManager,
358 ) -> CuResult<CT>,
359 monitor_instanciator: impl Fn(&CuConfig) -> M,
360 bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
361 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
362 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
363 ) -> CuResult<Self> {
364 #[cfg(target_os = "none")]
365 info!("CuRuntime::new: resources instanciator");
366 let resources = resources_instanciator(config)?;
367 Self::new_with_resources(
368 clock,
369 config,
370 mission,
371 resources,
372 tasks_instanciator,
373 monitor_instanciator,
374 bridges_instanciator,
375 copperlists_logger,
376 keyframes_logger,
377 )
378 }
379
380 #[allow(clippy::too_many_arguments)]
381 #[cfg(not(feature = "std"))]
382 pub fn new_with_resources(
383 clock: RobotClock,
384 config: &CuConfig,
385 mission: Option<&str>,
386 mut resources: ResourceManager,
387 tasks_instanciator: impl for<'c> Fn(
388 Vec<Option<&'c ComponentConfig>>,
389 &mut ResourceManager,
390 ) -> CuResult<CT>,
391 monitor_instanciator: impl Fn(&CuConfig) -> M,
392 bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
393 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
394 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
395 ) -> CuResult<Self> {
396 #[cfg(target_os = "none")]
397 info!("CuRuntime::new: get graph");
398 let graph = config.get_graph(mission)?;
399 #[cfg(target_os = "none")]
400 info!("CuRuntime::new: graph ok");
401 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
402 .get_all_nodes()
403 .iter()
404 .map(|(_, node)| node.get_instance_config())
405 .collect();
406
407 #[cfg(target_os = "none")]
408 info!("CuRuntime::new: tasks instanciator");
409 let tasks = tasks_instanciator(all_instances_configs, &mut resources)?;
410
411 #[cfg(target_os = "none")]
412 info!("CuRuntime::new: monitor instanciator");
413 let mut monitor = monitor_instanciator(config);
414 #[cfg(target_os = "none")]
415 info!("CuRuntime::new: monitor instanciator ok");
416 #[cfg(target_os = "none")]
417 info!("CuRuntime::new: build monitor topology");
418 if let Ok(topology) = build_monitor_topology(config, mission) {
419 #[cfg(target_os = "none")]
420 info!("CuRuntime::new: monitor topology ok");
421 monitor.set_topology(topology);
422 #[cfg(target_os = "none")]
423 info!("CuRuntime::new: monitor topology set");
424 }
425 #[cfg(target_os = "none")]
426 info!("CuRuntime::new: bridges instanciator");
427 let bridges = bridges_instanciator(config, &mut resources)?;
428
429 let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
430 Some(logging_config) if logging_config.enable_task_logging => (
431 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
432 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
433 logging_config.keyframe_interval.unwrap(), ),
435 Some(_) => (None, None, 0), None => (
437 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
439 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
440 DEFAULT_KEYFRAME_INTERVAL,
441 ),
442 };
443
444 let copperlists_manager = CopperListsManager {
445 inner: CuListsManager::new(),
446 logger: copperlists_logger,
447 };
448 #[cfg(target_os = "none")]
449 {
450 let cl_size = core::mem::size_of::<CopperList<P>>();
451 let total_bytes = cl_size.saturating_mul(NBCL);
452 info!(
453 "CuRuntime::new: copperlists count={} cl_size={} total_bytes={}",
454 NBCL, cl_size, total_bytes
455 );
456 }
457
458 let keyframes_manager = KeyFramesManager {
459 inner: KeyFrame::new(),
460 logger: keyframes_logger,
461 keyframe_interval,
462 };
463
464 let runtime_config = config.runtime.clone().unwrap_or_default();
465
466 let runtime = Self {
467 tasks,
468 bridges,
469 resources,
470 monitor,
471 clock,
472 copperlists_manager,
473 keyframes_manager,
474 runtime_config,
475 };
476
477 Ok(runtime)
478 }
479}
480
481#[derive(Debug, PartialEq, Eq, Clone, Copy)]
486pub enum CuTaskType {
487 Source,
488 Regular,
489 Sink,
490}
491
492pub struct CuExecutionStep {
494 pub node_id: NodeId,
496 pub node: Node,
498 pub task_type: CuTaskType,
500
501 pub input_msg_indices_types: Vec<(u32, String)>,
503
504 pub output_msg_index_type: Option<(u32, String)>,
506}
507
508impl Debug for CuExecutionStep {
509 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
510 f.write_str(format!(" CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
511 f.write_str(format!(" task_type: {:?}\n", self.node.get_type()).as_str())?;
512 f.write_str(format!(" task: {:?}\n", self.task_type).as_str())?;
513 f.write_str(
514 format!(
515 " input_msg_types: {:?}\n",
516 self.input_msg_indices_types
517 )
518 .as_str(),
519 )?;
520 f.write_str(
521 format!(" output_msg_type: {:?}\n", self.output_msg_index_type).as_str(),
522 )?;
523 Ok(())
524 }
525}
526
527pub struct CuExecutionLoop {
532 pub steps: Vec<CuExecutionUnit>,
533 pub loop_count: Option<u32>,
534}
535
536impl Debug for CuExecutionLoop {
537 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
538 f.write_str("CuExecutionLoop:\n")?;
539 for step in &self.steps {
540 match step {
541 CuExecutionUnit::Step(step) => {
542 step.fmt(f)?;
543 }
544 CuExecutionUnit::Loop(l) => {
545 l.fmt(f)?;
546 }
547 }
548 }
549
550 f.write_str(format!(" count: {:?}", self.loop_count).as_str())?;
551 Ok(())
552 }
553}
554
555#[derive(Debug)]
557pub enum CuExecutionUnit {
558 Step(CuExecutionStep),
559 Loop(CuExecutionLoop),
560}
561
562fn find_output_index_type_from_nodeid(
563 node_id: NodeId,
564 steps: &Vec<CuExecutionUnit>,
565) -> Option<(u32, String)> {
566 for step in steps {
567 match step {
568 CuExecutionUnit::Loop(loop_unit) => {
569 if let Some(index) = find_output_index_type_from_nodeid(node_id, &loop_unit.steps) {
570 return Some(index);
571 }
572 }
573 CuExecutionUnit::Step(step) => {
574 if step.node_id == node_id {
575 return step.output_msg_index_type.clone();
576 }
577 }
578 }
579 }
580 None
581}
582
583pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuTaskType {
584 if graph.incoming_neighbor_count(node_id) == 0 {
585 CuTaskType::Source
586 } else if graph.outgoing_neighbor_count(node_id) == 0 {
587 CuTaskType::Sink
588 } else {
589 CuTaskType::Regular
590 }
591}
592
593fn find_edge_with_plan_input_id(
596 plan: &[CuExecutionUnit],
597 graph: &CuGraph,
598 plan_id: u32,
599 output_node_id: NodeId,
600) -> usize {
601 let input_node = plan
602 .get(plan_id as usize)
603 .expect("Input step should've been added to plan before the step that receives the input");
604 let CuExecutionUnit::Step(input_step) = input_node else {
605 panic!("Expected input to be from a step, not a loop");
606 };
607 let input_node_id = input_step.node_id;
608
609 graph
610 .0
611 .edges_connecting(input_node_id.into(), output_node_id.into())
612 .map(|edge| edge.id().index())
613 .next()
614 .expect("An edge connecting the input to the output should exist")
615}
616
617fn sort_inputs_by_cnx_id(
620 input_msg_indices_types: &mut [(u32, String)],
621 plan: &[CuExecutionUnit],
622 graph: &CuGraph,
623 curr_node_id: NodeId,
624) {
625 input_msg_indices_types.sort_by(|(a_index, _), (b_index, _)| {
626 let a_edge_id = find_edge_with_plan_input_id(plan, graph, *a_index, curr_node_id);
627 let b_edge_id = find_edge_with_plan_input_id(plan, graph, *b_index, curr_node_id);
628 a_edge_id.cmp(&b_edge_id)
629 });
630}
631fn plan_tasks_tree_branch(
633 graph: &CuGraph,
634 mut next_culist_output_index: u32,
635 starting_point: NodeId,
636 plan: &mut Vec<CuExecutionUnit>,
637) -> (u32, bool) {
638 #[cfg(all(feature = "std", feature = "macro_debug"))]
639 eprintln!("-- starting branch from node {starting_point}");
640
641 let mut visitor = Bfs::new(&graph.0, starting_point.into());
642 let mut handled = false;
643
644 while let Some(node) = visitor.next(&graph.0) {
645 let id = node.index() as NodeId;
646 let node_ref = graph.get_node(id).unwrap();
647 #[cfg(all(feature = "std", feature = "macro_debug"))]
648 eprintln!(" Visiting node: {node_ref:?}");
649
650 let mut input_msg_indices_types: Vec<(u32, String)> = Vec::new();
651 let output_msg_index_type: Option<(u32, String)>;
652 let task_type = find_task_type_for_id(graph, id);
653
654 match task_type {
655 CuTaskType::Source => {
656 #[cfg(all(feature = "std", feature = "macro_debug"))]
657 eprintln!(" → Source node, assign output index {next_culist_output_index}");
658 output_msg_index_type = Some((
659 next_culist_output_index,
660 graph
661 .0
662 .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0]))
663 .unwrap() .msg
665 .clone(),
666 ));
667 next_culist_output_index += 1;
668 }
669 CuTaskType::Sink => {
670 let parents: Vec<NodeId> = graph.get_neighbor_ids(id, CuDirection::Incoming);
671 #[cfg(all(feature = "std", feature = "macro_debug"))]
672 eprintln!(" → Sink with parents: {parents:?}");
673 for parent in parents {
674 let pid = parent;
675 let index_type = find_output_index_type_from_nodeid(pid, plan);
676 if let Some(index_type) = index_type {
677 #[cfg(all(feature = "std", feature = "macro_debug"))]
678 eprintln!(" ✓ Input from {pid} ready: {index_type:?}");
679 input_msg_indices_types.push(index_type);
680 } else {
681 #[cfg(all(feature = "std", feature = "macro_debug"))]
682 eprintln!(" ✗ Input from {pid} not ready, returning");
683 return (next_culist_output_index, handled);
684 }
685 }
686 output_msg_index_type = Some((next_culist_output_index, "()".to_string()));
687 next_culist_output_index += 1;
688 }
689 CuTaskType::Regular => {
690 let parents: Vec<NodeId> = graph.get_neighbor_ids(id, CuDirection::Incoming);
691 #[cfg(all(feature = "std", feature = "macro_debug"))]
692 eprintln!(" → Regular task with parents: {parents:?}");
693 for parent in parents {
694 let pid = parent;
695 let index_type = find_output_index_type_from_nodeid(pid, plan);
696 if let Some(index_type) = index_type {
697 #[cfg(all(feature = "std", feature = "macro_debug"))]
698 eprintln!(" ✓ Input from {pid} ready: {index_type:?}");
699 input_msg_indices_types.push(index_type);
700 } else {
701 #[cfg(all(feature = "std", feature = "macro_debug"))]
702 eprintln!(" ✗ Input from {pid} not ready, returning");
703 return (next_culist_output_index, handled);
704 }
705 }
706 output_msg_index_type = Some((
707 next_culist_output_index,
708 graph
709 .0
710 .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0])) .unwrap()
712 .msg
713 .clone(),
714 ));
715 next_culist_output_index += 1;
716 }
717 }
718
719 sort_inputs_by_cnx_id(&mut input_msg_indices_types, plan, graph, id);
720
721 if let Some(pos) = plan
722 .iter()
723 .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
724 {
725 #[cfg(all(feature = "std", feature = "macro_debug"))]
726 eprintln!(" → Already in plan, modifying existing step");
727 let mut step = plan.remove(pos);
728 if let CuExecutionUnit::Step(ref mut s) = step {
729 s.input_msg_indices_types = input_msg_indices_types;
730 }
731 plan.push(step);
732 } else {
733 #[cfg(all(feature = "std", feature = "macro_debug"))]
734 eprintln!(" → New step added to plan");
735 let step = CuExecutionStep {
736 node_id: id,
737 node: node_ref.clone(),
738 task_type,
739 input_msg_indices_types,
740 output_msg_index_type,
741 };
742 plan.push(CuExecutionUnit::Step(step));
743 }
744
745 handled = true;
746 }
747
748 #[cfg(all(feature = "std", feature = "macro_debug"))]
749 eprintln!("-- finished branch from node {starting_point} with handled={handled}");
750 (next_culist_output_index, handled)
751}
752
753pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
756 #[cfg(all(feature = "std", feature = "macro_debug"))]
757 eprintln!("[runtime plan]");
758 let visited = graph.0.visit_map();
759 let mut plan = Vec::new();
760 let mut next_culist_output_index = 0u32;
761
762 let mut queue: VecDeque<NodeId> = graph
763 .node_indices()
764 .iter()
765 .filter(|&node| find_task_type_for_id(graph, node.index() as NodeId) == CuTaskType::Source)
766 .map(|node| node.index() as NodeId)
767 .collect();
768
769 #[cfg(all(feature = "std", feature = "macro_debug"))]
770 eprintln!("Initial source nodes: {queue:?}");
771
772 while let Some(start_node) = queue.pop_front() {
773 if visited.is_visited(&start_node) {
774 #[cfg(all(feature = "std", feature = "macro_debug"))]
775 eprintln!("→ Skipping already visited source {start_node}");
776 continue;
777 }
778
779 #[cfg(all(feature = "std", feature = "macro_debug"))]
780 eprintln!("→ Starting BFS from source {start_node}");
781 let mut bfs = Bfs::new(&graph.0, start_node.into());
782
783 while let Some(node_index) = bfs.next(&graph.0) {
784 let node_id = node_index.index() as NodeId;
785 let already_in_plan = plan
786 .iter()
787 .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
788 if already_in_plan {
789 #[cfg(all(feature = "std", feature = "macro_debug"))]
790 eprintln!(" → Node {node_id} already planned, skipping");
791 continue;
792 }
793
794 #[cfg(all(feature = "std", feature = "macro_debug"))]
795 eprintln!(" Planning from node {node_id}");
796 let (new_index, handled) =
797 plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan);
798 next_culist_output_index = new_index;
799
800 if !handled {
801 #[cfg(all(feature = "std", feature = "macro_debug"))]
802 eprintln!(" ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
803 continue;
804 }
805
806 #[cfg(all(feature = "std", feature = "macro_debug"))]
807 eprintln!(" ✓ Node {node_id} handled successfully, enqueueing neighbors");
808 for neighbor in graph.0.neighbors(node_index) {
809 if !visited.is_visited(&neighbor) {
810 let nid = neighbor.index() as NodeId;
811 #[cfg(all(feature = "std", feature = "macro_debug"))]
812 eprintln!(" → Enqueueing neighbor {nid}");
813 queue.push_back(nid);
814 }
815 }
816 }
817 }
818
819 Ok(CuExecutionLoop {
820 steps: plan,
821 loop_count: None,
822 })
823}
824
825#[cfg(test)]
827mod tests {
828 use super::*;
829 use crate::config::Node;
830 use crate::cutask::CuSinkTask;
831 use crate::cutask::{CuSrcTask, Freezable};
832 use crate::monitoring::NoMonitor;
833 use bincode::Encode;
834 use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks};
835 use serde_derive::Serialize;
836
837 pub struct TestSource {}
838
839 impl Freezable for TestSource {}
840
841 impl CuSrcTask for TestSource {
842 type Resources<'r> = ();
843 type Output<'m> = ();
844 fn new_with(
845 _config: Option<&ComponentConfig>,
846 _resources: Self::Resources<'_>,
847 ) -> CuResult<Self>
848 where
849 Self: Sized,
850 {
851 Ok(Self {})
852 }
853
854 fn process(
855 &mut self,
856 _clock: &RobotClock,
857 _empty_msg: &mut Self::Output<'_>,
858 ) -> CuResult<()> {
859 Ok(())
860 }
861 }
862
863 pub struct TestSink {}
864
865 impl Freezable for TestSink {}
866
867 impl CuSinkTask for TestSink {
868 type Resources<'r> = ();
869 type Input<'m> = ();
870
871 fn new_with(
872 _config: Option<&ComponentConfig>,
873 _resources: Self::Resources<'_>,
874 ) -> CuResult<Self>
875 where
876 Self: Sized,
877 {
878 Ok(Self {})
879 }
880
881 fn process(&mut self, _clock: &RobotClock, _input: &Self::Input<'_>) -> CuResult<()> {
882 Ok(())
883 }
884 }
885
886 type Tasks = (TestSource, TestSink);
888
889 #[derive(Debug, Encode, Decode, Serialize, Default)]
890 struct Msgs(());
891
892 impl ErasedCuStampedDataSet for Msgs {
893 fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
894 Vec::new()
895 }
896 }
897
898 impl MatchingTasks for Msgs {
899 fn get_all_task_ids() -> &'static [&'static str] {
900 &[]
901 }
902 }
903
904 impl CuListZeroedInit for Msgs {
905 fn init_zeroed(&mut self) {}
906 }
907
908 #[cfg(feature = "std")]
909 fn tasks_instanciator(
910 all_instances_configs: Vec<Option<&ComponentConfig>>,
911 _resources: &mut ResourceManager,
912 ) -> CuResult<Tasks> {
913 Ok((
914 TestSource::new(all_instances_configs[0])?,
915 TestSink::new(all_instances_configs[1])?,
916 ))
917 }
918
919 #[cfg(not(feature = "std"))]
920 fn tasks_instanciator(
921 all_instances_configs: Vec<Option<&ComponentConfig>>,
922 _resources: &mut ResourceManager,
923 ) -> CuResult<Tasks> {
924 Ok((
925 TestSource::new(all_instances_configs[0])?,
926 TestSink::new(all_instances_configs[1])?,
927 ))
928 }
929
930 fn monitor_instanciator(_config: &CuConfig) -> NoMonitor {
931 NoMonitor {}
932 }
933
934 fn bridges_instanciator(_config: &CuConfig, _resources: &mut ResourceManager) -> CuResult<()> {
935 Ok(())
936 }
937
938 fn resources_instanciator(_config: &CuConfig) -> CuResult<ResourceManager> {
939 Ok(ResourceManager::new(&[]))
940 }
941
942 #[derive(Debug)]
943 struct FakeWriter {}
944
945 impl<E: Encode> WriteStream<E> for FakeWriter {
946 fn log(&mut self, _obj: &E) -> CuResult<()> {
947 Ok(())
948 }
949 }
950
951 #[test]
952 fn test_runtime_instantiation() {
953 let mut config = CuConfig::default();
954 let graph = config.get_graph_mut(None).unwrap();
955 graph.add_node(Node::new("a", "TestSource")).unwrap();
956 graph.add_node(Node::new("b", "TestSink")).unwrap();
957 graph.connect(0, 1, "()").unwrap();
958 let runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
959 RobotClock::default(),
960 &config,
961 None,
962 resources_instanciator,
963 tasks_instanciator,
964 monitor_instanciator,
965 bridges_instanciator,
966 FakeWriter {},
967 FakeWriter {},
968 );
969 assert!(runtime.is_ok());
970 }
971
972 #[test]
973 fn test_copperlists_manager_lifecycle() {
974 let mut config = CuConfig::default();
975 let graph = config.get_graph_mut(None).unwrap();
976 graph.add_node(Node::new("a", "TestSource")).unwrap();
977 graph.add_node(Node::new("b", "TestSink")).unwrap();
978 graph.connect(0, 1, "()").unwrap();
979
980 let mut runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
981 RobotClock::default(),
982 &config,
983 None,
984 resources_instanciator,
985 tasks_instanciator,
986 monitor_instanciator,
987 bridges_instanciator,
988 FakeWriter {},
989 FakeWriter {},
990 )
991 .unwrap();
992
993 {
995 let copperlists = &mut runtime.copperlists_manager;
996 let culist0 = copperlists
997 .inner
998 .create()
999 .expect("Ran out of space for copper lists");
1000 let id = culist0.id;
1002 assert_eq!(id, 0);
1003 culist0.change_state(CopperListState::Processing);
1004 assert_eq!(copperlists.available_copper_lists(), 1);
1005 }
1006
1007 {
1008 let copperlists = &mut runtime.copperlists_manager;
1009 let culist1 = copperlists
1010 .inner
1011 .create()
1012 .expect("Ran out of space for copper lists"); let id = culist1.id;
1014 assert_eq!(id, 1);
1015 culist1.change_state(CopperListState::Processing);
1016 assert_eq!(copperlists.available_copper_lists(), 0);
1017 }
1018
1019 {
1020 let copperlists = &mut runtime.copperlists_manager;
1021 let culist2 = copperlists.inner.create();
1022 assert!(culist2.is_none());
1023 assert_eq!(copperlists.available_copper_lists(), 0);
1024 let _ = copperlists.end_of_processing(1);
1026 assert_eq!(copperlists.available_copper_lists(), 1);
1027 }
1028
1029 {
1031 let copperlists = &mut runtime.copperlists_manager;
1032 let culist2 = copperlists
1033 .inner
1034 .create()
1035 .expect("Ran out of space for copper lists"); let id = culist2.id;
1037 assert_eq!(id, 2);
1038 culist2.change_state(CopperListState::Processing);
1039 assert_eq!(copperlists.available_copper_lists(), 0);
1040 let _ = copperlists.end_of_processing(0);
1042 assert_eq!(copperlists.available_copper_lists(), 0);
1044
1045 let _ = copperlists.end_of_processing(2);
1047 assert_eq!(copperlists.available_copper_lists(), 2);
1050 }
1051 }
1052
1053 #[test]
1054 fn test_runtime_task_input_order() {
1055 let mut config = CuConfig::default();
1056 let graph = config.get_graph_mut(None).unwrap();
1057 let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
1058 let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
1059 let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
1060
1061 assert_eq!(src1_id, 0);
1062 assert_eq!(src2_id, 1);
1063
1064 let src1_type = "src1_type";
1066 let src2_type = "src2_type";
1067 graph.connect(src2_id, sink_id, src2_type).unwrap();
1068 graph.connect(src1_id, sink_id, src1_type).unwrap();
1069
1070 let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
1071 let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
1072 assert_eq!(src1_edge_id, 1);
1075 assert_eq!(src2_edge_id, 0);
1076
1077 let runtime = compute_runtime_plan(graph).unwrap();
1078 let sink_step = runtime
1079 .steps
1080 .iter()
1081 .find_map(|step| match step {
1082 CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
1083 _ => None,
1084 })
1085 .unwrap();
1086
1087 assert_eq!(sink_step.input_msg_indices_types[0].1, src2_type);
1090 assert_eq!(sink_step.input_msg_indices_types[1].1, src1_type);
1091 }
1092
1093 #[test]
1094 fn test_runtime_plan_diamond_case1() {
1095 let mut config = CuConfig::default();
1097 let graph = config.get_graph_mut(None).unwrap();
1098 let cam0_id = graph
1099 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1100 .unwrap();
1101 let inf0_id = graph
1102 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1103 .unwrap();
1104 let broadcast_id = graph
1105 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1106 .unwrap();
1107
1108 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1110 graph.connect(cam0_id, inf0_id, "i32").unwrap();
1111 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1112
1113 let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1114 let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
1115
1116 assert_eq!(edge_cam0_to_inf0, 0);
1117 assert_eq!(edge_cam0_to_broadcast, 1);
1118
1119 let runtime = compute_runtime_plan(graph).unwrap();
1120 let broadcast_step = runtime
1121 .steps
1122 .iter()
1123 .find_map(|step| match step {
1124 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1125 _ => None,
1126 })
1127 .unwrap();
1128
1129 assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
1130 assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
1131 }
1132
1133 #[test]
1134 fn test_runtime_plan_diamond_case2() {
1135 let mut config = CuConfig::default();
1137 let graph = config.get_graph_mut(None).unwrap();
1138 let cam0_id = graph
1139 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1140 .unwrap();
1141 let inf0_id = graph
1142 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1143 .unwrap();
1144 let broadcast_id = graph
1145 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1146 .unwrap();
1147
1148 graph.connect(cam0_id, inf0_id, "i32").unwrap();
1150 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1151 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1152
1153 let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1154 let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
1155
1156 assert_eq!(edge_cam0_to_broadcast, 0);
1157 assert_eq!(edge_cam0_to_inf0, 1);
1158
1159 let runtime = compute_runtime_plan(graph).unwrap();
1160 let broadcast_step = runtime
1161 .steps
1162 .iter()
1163 .find_map(|step| match step {
1164 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1165 _ => None,
1166 })
1167 .unwrap();
1168
1169 assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
1170 assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
1171 }
1172}