1use proc_macro::TokenStream;
2use quote::{format_ident, quote};
3use std::collections::HashMap;
4use std::fs::read_to_string;
5use syn::meta::parser;
6use syn::Fields::{Named, Unnamed};
7use syn::{
8 parse_macro_input, parse_quote, parse_str, Field, Fields, ItemImpl, ItemStruct, LitStr, Type,
9 TypeTuple,
10};
11
12#[cfg(feature = "macro_debug")]
13use crate::format::rustfmt_generated_code;
14use crate::utils::{config_id_to_bridge_const, config_id_to_enum, config_id_to_struct_member};
15use cu29_runtime::config::CuConfig;
16use cu29_runtime::config::{read_configuration, BridgeChannel, CuGraph, Flavor, Node, NodeId};
17use cu29_runtime::curuntime::{
18 compute_runtime_plan, find_task_type_for_id, CuExecutionLoop, CuExecutionStep, CuExecutionUnit,
19 CuTaskType,
20};
21use cu29_traits::{CuError, CuResult};
22use proc_macro2::{Ident, Span};
23
24mod format;
25mod utils;
26
27const DEFAULT_CLNB: usize = 10;
29
30#[inline]
31fn int2sliceindex(i: u32) -> syn::Index {
32 syn::Index::from(i as usize)
33}
34
35#[inline(always)]
36fn return_error(msg: String) -> TokenStream {
37 syn::Error::new(Span::call_site(), msg)
38 .to_compile_error()
39 .into()
40}
41
42#[proc_macro]
46pub fn gen_cumsgs(config_path_lit: TokenStream) -> TokenStream {
47 #[cfg(feature = "std")]
48 let std = true;
49
50 #[cfg(not(feature = "std"))]
51 let std = false;
52
53 let config = parse_macro_input!(config_path_lit as LitStr).value();
54 if !std::path::Path::new(&config_full_path(&config)).exists() {
55 return return_error(format!(
56 "The configuration file `{config}` does not exist. Please provide a valid path."
57 ));
58 }
59 #[cfg(feature = "macro_debug")]
60 eprintln!("[gen culist support with {config:?}]");
61 let cuconfig = match read_config(&config) {
62 Ok(cuconfig) => cuconfig,
63 Err(e) => return return_error(e.to_string()),
64 };
65 let graph = cuconfig
66 .get_graph(None) .expect("Could not find the specified mission for gen_cumsgs");
68 let task_specs = CuTaskSpecSet::from_graph(graph);
69 let channel_usage = collect_bridge_channel_usage(graph);
70 let mut bridge_specs = build_bridge_specs(&cuconfig, graph, &channel_usage);
71 let (culist_plan, exec_entities, plan_to_original) =
72 match build_execution_plan(graph, &task_specs, &mut bridge_specs) {
73 Ok(plan) => plan,
74 Err(e) => return return_error(format!("Could not compute copperlist plan: {e}")),
75 };
76 let task_member_names = collect_task_member_names(graph);
77 let (culist_order, node_output_positions) = collect_culist_metadata(
78 &culist_plan,
79 &exec_entities,
80 &mut bridge_specs,
81 &plan_to_original,
82 );
83
84 #[cfg(feature = "macro_debug")]
85 eprintln!(
86 "[The CuStampedDataSet matching tasks ids are {:?}]",
87 culist_order
88 );
89
90 let support = gen_culist_support(
91 &culist_plan,
92 &culist_order,
93 &node_output_positions,
94 &task_member_names,
95 );
96
97 let extra_imports = if !std {
98 quote! {
99 use core::fmt::Debug;
100 use core::fmt::Formatter;
101 use core::fmt::Result as FmtResult;
102 use alloc::vec;
103 }
104 } else {
105 quote! {
106 use std::fmt::Debug;
107 use std::fmt::Formatter;
108 use std::fmt::Result as FmtResult;
109 }
110 };
111
112 let with_uses = quote! {
113 mod cumsgs {
114 use cu29::bincode::Encode;
115 use cu29::bincode::enc::Encoder;
116 use cu29::bincode::error::EncodeError;
117 use cu29::bincode::Decode;
118 use cu29::bincode::de::Decoder;
119 use cu29::bincode::error::DecodeError;
120 use cu29::copperlist::CopperList;
121 use cu29::prelude::CuStampedData;
122 use cu29::prelude::ErasedCuStampedData;
123 use cu29::prelude::ErasedCuStampedDataSet;
124 use cu29::prelude::MatchingTasks;
125 use cu29::prelude::Serialize;
126 use cu29::prelude::CuMsg;
127 use cu29::prelude::CuMsgMetadata;
128 use cu29::prelude::CuListZeroedInit;
129 use cu29::prelude::CuCompactString;
130 #extra_imports
131 #support
132 }
133 use cumsgs::CuStampedDataSet;
134 type CuMsgs=CuStampedDataSet;
135 };
136 with_uses.into()
137}
138
139fn gen_culist_support(
141 runtime_plan: &CuExecutionLoop,
142 culist_indices_in_plan_order: &[usize],
143 node_output_positions: &HashMap<NodeId, usize>,
144 task_member_names: &[(NodeId, String)],
145) -> proc_macro2::TokenStream {
146 #[cfg(feature = "macro_debug")]
147 eprintln!("[Extract msgs types]");
148 let all_msgs_types_in_culist_order = extract_msg_types(runtime_plan);
149
150 let culist_size = all_msgs_types_in_culist_order.len();
151 let task_indices: Vec<_> = culist_indices_in_plan_order
152 .iter()
153 .map(|i| syn::Index::from(*i))
154 .collect();
155
156 #[cfg(feature = "macro_debug")]
157 eprintln!("[build the copperlist struct]");
158 let msgs_types_tuple: TypeTuple = build_culist_tuple(&all_msgs_types_in_culist_order);
159
160 #[cfg(feature = "macro_debug")]
161 eprintln!("[build the copperlist tuple bincode support]");
162 let msgs_types_tuple_encode = build_culist_tuple_encode(&all_msgs_types_in_culist_order);
163 let msgs_types_tuple_decode = build_culist_tuple_decode(&all_msgs_types_in_culist_order);
164
165 #[cfg(feature = "macro_debug")]
166 eprintln!("[build the copperlist tuple debug support]");
167 let msgs_types_tuple_debug = build_culist_tuple_debug(&all_msgs_types_in_culist_order);
168
169 #[cfg(feature = "macro_debug")]
170 eprintln!("[build the copperlist tuple serialize support]");
171 let msgs_types_tuple_serialize = build_culist_tuple_serialize(&all_msgs_types_in_culist_order);
172
173 #[cfg(feature = "macro_debug")]
174 eprintln!("[build the default tuple support]");
175 let msgs_types_tuple_default = build_culist_tuple_default(&all_msgs_types_in_culist_order);
176
177 #[cfg(feature = "macro_debug")]
178 eprintln!("[build erasedcumsgs]");
179
180 let erasedmsg_trait_impl = build_culist_erasedcumsgs(&all_msgs_types_in_culist_order);
181
182 let collect_metadata_function = quote! {
183 pub fn collect_metadata<'a>(culist: &'a CuList) -> [&'a CuMsgMetadata; #culist_size] {
184 [#( &culist.msgs.0.#task_indices.metadata, )*]
185 }
186 };
187
188 let task_name_literals: Vec<String> = task_member_names
189 .iter()
190 .map(|(_, name)| name.clone())
191 .collect();
192
193 let methods = task_member_names.iter().map(|(node_id, name)| {
194 let output_position = node_output_positions
195 .get(node_id)
196 .unwrap_or_else(|| panic!("Task {name} (id: {node_id}) not found in execution order"));
197
198 let fn_name = format_ident!("get_{}_output", name);
199 let payload_type = all_msgs_types_in_culist_order[*output_position].clone();
200 let index = syn::Index::from(*output_position);
201 quote! {
202 #[allow(dead_code)]
203 pub fn #fn_name(&self) -> &CuMsg<#payload_type> {
204 &self.0.#index
205 }
206 }
207 });
208
209 quote! {
211 #collect_metadata_function
212
213 pub struct CuStampedDataSet(pub #msgs_types_tuple);
214
215 pub type CuList = CopperList<CuStampedDataSet>;
216
217 impl CuStampedDataSet {
218 #(#methods)*
219
220 #[allow(dead_code)]
221 fn get_tuple(&self) -> &#msgs_types_tuple {
222 &self.0
223 }
224
225 #[allow(dead_code)]
226 fn get_tuple_mut(&mut self) -> &mut #msgs_types_tuple {
227 &mut self.0
228 }
229 }
230
231 impl MatchingTasks for CuStampedDataSet {
232 #[allow(dead_code)]
233 fn get_all_task_ids() -> &'static [&'static str] {
234 &[#(#task_name_literals),*]
235 }
236 }
237
238 #msgs_types_tuple_encode
240 #msgs_types_tuple_decode
241
242 #msgs_types_tuple_debug
244
245 #msgs_types_tuple_serialize
247
248 #msgs_types_tuple_default
250
251 #erasedmsg_trait_impl
253
254 impl CuListZeroedInit for CuStampedDataSet {
255 fn init_zeroed(&mut self) {
256 #(self.0.#task_indices.metadata.status_txt = CuCompactString::default();)*
257 }
258 }
259 }
260}
261
262fn gen_sim_support(
263 runtime_plan: &CuExecutionLoop,
264 exec_entities: &[ExecutionEntity],
265) -> proc_macro2::TokenStream {
266 #[cfg(feature = "macro_debug")]
267 eprintln!("[Sim: Build SimEnum]");
268 let plan_enum: Vec<proc_macro2::TokenStream> = runtime_plan
269 .steps
270 .iter()
271 .filter_map(|unit| match unit {
272 CuExecutionUnit::Step(step) => {
273 if !matches!(
274 exec_entities[step.node_id as usize].kind,
275 ExecutionEntityKind::Task { .. }
276 ) {
277 return None;
278 }
279 let enum_entry_name = config_id_to_enum(step.node.get_id().as_str());
280 let enum_ident = Ident::new(&enum_entry_name, Span::call_site());
281 let inputs: Vec<Type> = step
282 .input_msg_indices_types
283 .iter()
284 .map(|(_, t)| parse_str::<Type>(format!("CuMsg<{t}>").as_str()).unwrap())
285 .collect();
286 let output: Option<Type> = step
287 .output_msg_index_type
288 .as_ref()
289 .map(|(_, t)| parse_str::<Type>(format!("CuMsg<{t}>").as_str()).unwrap());
290 let no_output = parse_str::<Type>("CuMsg<()>").unwrap();
291 let output = output.as_ref().unwrap_or(&no_output);
292
293 let inputs_type = if inputs.is_empty() {
294 quote! { () }
295 } else if inputs.len() == 1 {
296 let input = inputs.first().unwrap();
297 quote! { &'a #input }
298 } else {
299 quote! { &'a (#(&'a #inputs),*) }
300 };
301
302 Some(quote! {
303 #enum_ident(CuTaskCallbackState<#inputs_type, &'a mut #output>)
304 })
305 }
306 CuExecutionUnit::Loop(_) => {
307 todo!("Needs to be implemented")
308 }
309 })
310 .collect();
311 quote! {
312 #[allow(dead_code)]
314 pub enum SimStep<'a> {
315 #(#plan_enum),*
316 }
317 }
318}
319
320#[proc_macro_attribute]
324pub fn copper_runtime(args: TokenStream, input: TokenStream) -> TokenStream {
325 #[cfg(feature = "macro_debug")]
326 eprintln!("[entry]");
327 let mut application_struct = parse_macro_input!(input as ItemStruct);
328
329 let application_name = &application_struct.ident;
330 let builder_name = format_ident!("{}Builder", application_name);
331
332 let mut config_file: Option<LitStr> = None;
333 let mut sim_mode = false;
334
335 #[cfg(feature = "std")]
336 let std = true;
337
338 #[cfg(not(feature = "std"))]
339 let std = false;
340
341 let attribute_config_parser = parser(|meta| {
343 if meta.path.is_ident("config") {
344 config_file = Some(meta.value()?.parse()?);
345 Ok(())
346 } else if meta.path.is_ident("sim_mode") {
347 if meta.input.peek(syn::Token![=]) {
349 meta.input.parse::<syn::Token![=]>()?;
350 let value: syn::LitBool = meta.input.parse()?;
351 sim_mode = value.value();
352 Ok(())
353 } else {
354 sim_mode = true;
356 Ok(())
357 }
358 } else {
359 Err(meta.error("unsupported property"))
360 }
361 });
362
363 #[cfg(feature = "macro_debug")]
364 eprintln!("[parse]");
365 parse_macro_input!(args with attribute_config_parser);
367
368 let config_file = match config_file {
379 Some(file) => file.value(),
380 None => {
381 return return_error(
382 "Expected config file attribute like #[CopperRuntime(config = \"path\")]"
383 .to_string(),
384 )
385 }
386 };
387
388 if !std::path::Path::new(&config_full_path(&config_file)).exists() {
389 return return_error(format!(
390 "The configuration file `{config_file}` does not exist. Please provide a valid path."
391 ));
392 }
393
394 let copper_config = match read_config(&config_file) {
395 Ok(cuconfig) => cuconfig,
396 Err(e) => return return_error(e.to_string()),
397 };
398 let copper_config_content = match read_to_string(config_full_path(config_file.as_str())) {
399 Ok(ok) => ok,
400 Err(e) => return return_error(format!("Could not read the config file (should not happen because we just succeeded just before). {e}"))
401 };
402
403 #[cfg(feature = "macro_debug")]
404 eprintln!("[build monitor type]");
405 let monitor_type = if let Some(monitor_config) = copper_config.get_monitor_config() {
406 let monitor_type = parse_str::<Type>(monitor_config.get_type())
407 .expect("Could not transform the monitor type name into a Rust type.");
408 quote! { #monitor_type }
409 } else {
410 quote! { NoMonitor }
411 };
412
413 #[cfg(feature = "macro_debug")]
415 eprintln!("[build runtime field]");
416 let runtime_field: Field = if sim_mode {
418 parse_quote! {
419 copper_runtime: cu29::curuntime::CuRuntime<CuSimTasks, CuBridges, CuStampedDataSet, #monitor_type, #DEFAULT_CLNB>
420 }
421 } else {
422 parse_quote! {
423 copper_runtime: cu29::curuntime::CuRuntime<CuTasks, CuBridges, CuStampedDataSet, #monitor_type, #DEFAULT_CLNB>
424 }
425 };
426
427 #[cfg(feature = "macro_debug")]
428 eprintln!("[match struct anonymity]");
429 match &mut application_struct.fields {
430 Named(fields_named) => {
431 fields_named.named.push(runtime_field);
432 }
433 Unnamed(fields_unnamed) => {
434 fields_unnamed.unnamed.push(runtime_field);
435 }
436 Fields::Unit => {
437 panic!("This struct is a unit struct, it should have named or unnamed fields. use struct Something {{}} and not struct Something;")
438 }
439 };
440
441 let all_missions = copper_config.graphs.get_all_missions_graphs();
442 let mut all_missions_tokens = Vec::<proc_macro2::TokenStream>::new();
443 for (mission, graph) in &all_missions {
444 let mission_mod = parse_str::<Ident>(mission.as_str())
445 .expect("Could not make an identifier of the mission name");
446
447 #[cfg(feature = "macro_debug")]
448 eprintln!("[extract tasks ids & types]");
449 let task_specs = CuTaskSpecSet::from_graph(graph);
450
451 let culist_channel_usage = collect_bridge_channel_usage(graph);
452 let mut culist_bridge_specs =
453 build_bridge_specs(&copper_config, graph, &culist_channel_usage);
454 let (culist_plan, culist_exec_entities, culist_plan_to_original) =
455 match build_execution_plan(graph, &task_specs, &mut culist_bridge_specs) {
456 Ok(plan) => plan,
457 Err(e) => return return_error(format!("Could not compute copperlist plan: {e}")),
458 };
459 let task_member_names = collect_task_member_names(graph);
460 let (culist_call_order, node_output_positions) = collect_culist_metadata(
461 &culist_plan,
462 &culist_exec_entities,
463 &mut culist_bridge_specs,
464 &culist_plan_to_original,
465 );
466
467 #[cfg(feature = "macro_debug")]
468 {
469 eprintln!("[runtime plan for mission {mission}]");
470 eprintln!("{culist_plan:?}");
471 }
472
473 let culist_support: proc_macro2::TokenStream = gen_culist_support(
474 &culist_plan,
475 &culist_call_order,
476 &node_output_positions,
477 &task_member_names,
478 );
479
480 let ids = build_monitored_ids(&task_specs.ids, &mut culist_bridge_specs);
481
482 let bridge_types: Vec<Type> = culist_bridge_specs
483 .iter()
484 .map(|spec| spec.type_path.clone())
485 .collect();
486 let bridges_type_tokens: proc_macro2::TokenStream = if bridge_types.is_empty() {
487 quote! { () }
488 } else {
489 let tuple: TypeTuple = parse_quote! { (#(#bridge_types),*,) };
490 quote! { #tuple }
491 };
492
493 let bridge_binding_idents: Vec<Ident> = culist_bridge_specs
494 .iter()
495 .enumerate()
496 .map(|(idx, _)| format_ident!("bridge_{idx}"))
497 .collect();
498
499 let bridge_init_statements: Vec<proc_macro2::TokenStream> = culist_bridge_specs
500 .iter()
501 .enumerate()
502 .map(|(idx, spec)| {
503 let binding_ident = &bridge_binding_idents[idx];
504 let bridge_type = &spec.type_path;
505 let bridge_name = spec.id.clone();
506 let config_index = syn::Index::from(spec.config_index);
507 let tx_configs: Vec<proc_macro2::TokenStream> = spec
508 .tx_channels
509 .iter()
510 .map(|channel| {
511 let const_ident = &channel.const_ident;
512 let channel_name = channel.id.clone();
513 let channel_config_index = syn::Index::from(channel.config_index);
514 quote! {
515 cu29::cubridge::BridgeChannelConfig::from_static(
516 &<#bridge_type as cu29::cubridge::CuBridge>::Tx::#const_ident,
517 match &bridge_cfg.channels[#channel_config_index] {
518 cu29::config::BridgeChannel::Tx { config, .. } => config.clone(),
519 _ => panic!("Bridge '{}' channel '{}' expected to be Tx", #bridge_name, #channel_name),
520 },
521 )
522 }
523 })
524 .collect();
525 let rx_configs: Vec<proc_macro2::TokenStream> = spec
526 .rx_channels
527 .iter()
528 .map(|channel| {
529 let const_ident = &channel.const_ident;
530 let channel_name = channel.id.clone();
531 let channel_config_index = syn::Index::from(channel.config_index);
532 quote! {
533 cu29::cubridge::BridgeChannelConfig::from_static(
534 &<#bridge_type as cu29::cubridge::CuBridge>::Rx::#const_ident,
535 match &bridge_cfg.channels[#channel_config_index] {
536 cu29::config::BridgeChannel::Rx { config, .. } => config.clone(),
537 _ => panic!("Bridge '{}' channel '{}' expected to be Rx", #bridge_name, #channel_name),
538 },
539 )
540 }
541 })
542 .collect();
543 quote! {
544 let #binding_ident = {
545 let bridge_cfg = config
546 .bridges
547 .get(#config_index)
548 .unwrap_or_else(|| panic!("Bridge '{}' missing from configuration", #bridge_name));
549 let tx_channels: &[cu29::cubridge::BridgeChannelConfig<
550 <<#bridge_type as cu29::cubridge::CuBridge>::Tx as cu29::cubridge::BridgeChannelSet>::Id,
551 >] = &[#(#tx_configs),*];
552 let rx_channels: &[cu29::cubridge::BridgeChannelConfig<
553 <<#bridge_type as cu29::cubridge::CuBridge>::Rx as cu29::cubridge::BridgeChannelSet>::Id,
554 >] = &[#(#rx_configs),*];
555 <#bridge_type as cu29::cubridge::CuBridge>::new(
556 bridge_cfg.config.as_ref(),
557 tx_channels,
558 rx_channels,
559 )?
560 };
561 }
562 })
563 .collect();
564
565 let bridges_instanciator = if culist_bridge_specs.is_empty() {
566 quote! {
567 pub fn bridges_instanciator(_config: &CuConfig) -> CuResult<CuBridges> {
568 Ok(())
569 }
570 }
571 } else {
572 let bridge_bindings = bridge_binding_idents.clone();
573 quote! {
574 pub fn bridges_instanciator(config: &CuConfig) -> CuResult<CuBridges> {
575 #(#bridge_init_statements)*
576 Ok((#(#bridge_bindings),*,))
577 }
578 }
579 };
580
581 let all_sim_tasks_types: Vec<Type> = task_specs.ids
582 .iter()
583 .zip(&task_specs.cutypes)
584 .zip(&task_specs.sim_task_types)
585 .zip(&task_specs.background_flags)
586 .zip(&task_specs.run_in_sim_flags)
587 .map(|((((task_id, task_type), sim_type), background), run_in_sim)| {
588 match task_type {
589 CuTaskType::Source => {
590 if *background {
591 panic!("CuSrcTask {task_id} cannot be a background task, it should be a regular task.");
592 }
593 if *run_in_sim {
594 sim_type.clone()
595 } else {
596 let msg_type = graph
597 .get_node_output_msg_type(task_id.as_str())
598 .unwrap_or_else(|| panic!("CuSrcTask {task_id} should have an outgoing connection with a valid output msg type"));
599 let sim_task_name = format!("CuSimSrcTask<{msg_type}>");
600 parse_str(sim_task_name.as_str()).unwrap_or_else(|_| panic!("Could not build the placeholder for simulation: {sim_task_name}"))
601 }
602 }
603 CuTaskType::Regular => {
604 sim_type.clone()
607 },
608 CuTaskType::Sink => {
609 if *background {
610 panic!("CuSinkTask {task_id} cannot be a background task, it should be a regular task.");
611 }
612
613 if *run_in_sim {
614 sim_type.clone()
616 }
617 else {
618 let msg_type = graph
620 .get_node_input_msg_type(task_id.as_str())
621 .unwrap_or_else(|| panic!("CuSinkTask {task_id} should have an incoming connection with a valid input msg type"));
622 let sim_task_name = format!("CuSimSinkTask<{msg_type}>");
623 parse_str(sim_task_name.as_str()).unwrap_or_else(|_| panic!("Could not build the placeholder for simulation: {sim_task_name}"))
624 }
625 }
626 }
627 })
628 .collect();
629
630 #[cfg(feature = "macro_debug")]
631 eprintln!("[build task tuples]");
632
633 let task_types = &task_specs.task_types;
634 let task_types_tuple: TypeTuple = if task_types.is_empty() {
637 parse_quote! { () }
638 } else {
639 parse_quote! { (#(#task_types),*,) }
640 };
641
642 let task_types_tuple_sim: TypeTuple = if all_sim_tasks_types.is_empty() {
643 parse_quote! { () }
644 } else {
645 parse_quote! { (#(#all_sim_tasks_types),*,) }
646 };
647
648 #[cfg(feature = "macro_debug")]
649 eprintln!("[gen instances]");
650 let task_sim_instances_init_code = all_sim_tasks_types.iter().enumerate().map(|(index, ty)| {
652 let additional_error_info = format!(
653 "Failed to get create instance for {}, instance index {}.",
654 task_specs.type_names[index], index
655 );
656
657 quote! {
658 <#ty>::new(all_instances_configs[#index]).map_err(|e| e.add_cause(#additional_error_info))?
659 }
660 }).collect::<Vec<_>>();
661
662 let task_instances_init_code = task_specs.instantiation_types.iter().zip(&task_specs.background_flags).enumerate().map(|(index, (task_type, background))| {
663 let additional_error_info = format!(
664 "Failed to get create instance for {}, instance index {}.",
665 task_specs.type_names[index], index
666 );
667 if *background {
668 quote! {
669 #task_type::new(all_instances_configs[#index], threadpool.clone()).map_err(|e| e.add_cause(#additional_error_info))?
670 }
671 } else {
672 quote! {
673 #task_type::new(all_instances_configs[#index]).map_err(|e| e.add_cause(#additional_error_info))?
674 }
675 }
676 }).collect::<Vec<_>>();
677
678 let (
681 task_restore_code,
682 task_start_calls,
683 task_stop_calls,
684 task_preprocess_calls,
685 task_postprocess_calls,
686 ): (Vec<_>, Vec<_>, Vec<_>, Vec<_>, Vec<_>) = itertools::multiunzip(
687 (0..task_specs.task_types.len())
688 .map(|index| {
689 let task_index = int2sliceindex(index as u32);
690 let task_tuple_index = syn::Index::from(index);
691 let task_enum_name = config_id_to_enum(&task_specs.ids[index]);
692 let enum_name = Ident::new(&task_enum_name, Span::call_site());
693 (
694 quote! {
696 tasks.#task_tuple_index.thaw(&mut decoder).map_err(|e| CuError::new_with_cause("Failed to thaw", e))?
697 },
698 { let monitoring_action = quote! {
700 let decision = self.copper_runtime.monitor.process_error(#index, CuTaskState::Start, &error);
701 match decision {
702 Decision::Abort => {
703 debug!("Start: ABORT decision from monitoring. Task '{}' errored out \
704 during start. Aborting all the other starts.", #mission_mod::TASKS_IDS[#index]);
705 return Ok(());
706
707 }
708 Decision::Ignore => {
709 debug!("Start: IGNORE decision from monitoring. Task '{}' errored out \
710 during start. The runtime will continue.", #mission_mod::TASKS_IDS[#index]);
711 }
712 Decision::Shutdown => {
713 debug!("Start: SHUTDOWN decision from monitoring. Task '{}' errored out \
714 during start. The runtime cannot continue.", #mission_mod::TASKS_IDS[#index]);
715 return Err(CuError::new_with_cause("Task errored out during start.", error));
716 }
717 }
718 };
719
720 let call_sim_callback = if sim_mode {
721 quote! {
722 let ovr = sim_callback(SimStep::#enum_name(CuTaskCallbackState::Start));
724
725 let doit = if let SimOverride::Errored(reason) = ovr {
726 let error: CuError = reason.into();
727 #monitoring_action
728 false
729 }
730 else {
731 ovr == SimOverride::ExecuteByRuntime
732 };
733 }
734 } else {
735 quote! {
736 let doit = true; }
738 };
739
740
741 quote! {
742 #call_sim_callback
743 if doit {
744 let task = &mut self.copper_runtime.tasks.#task_index;
745 if let Err(error) = task.start(&self.copper_runtime.clock) {
746 #monitoring_action
747 }
748 }
749 }
750 },
751 { let monitoring_action = quote! {
753 let decision = self.copper_runtime.monitor.process_error(#index, CuTaskState::Stop, &error);
754 match decision {
755 Decision::Abort => {
756 debug!("Stop: ABORT decision from monitoring. Task '{}' errored out \
757 during stop. Aborting all the other starts.", #mission_mod::TASKS_IDS[#index]);
758 return Ok(());
759
760 }
761 Decision::Ignore => {
762 debug!("Stop: IGNORE decision from monitoring. Task '{}' errored out \
763 during stop. The runtime will continue.", #mission_mod::TASKS_IDS[#index]);
764 }
765 Decision::Shutdown => {
766 debug!("Stop: SHUTDOWN decision from monitoring. Task '{}' errored out \
767 during stop. The runtime cannot continue.", #mission_mod::TASKS_IDS[#index]);
768 return Err(CuError::new_with_cause("Task errored out during stop.", error));
769 }
770 }
771 };
772 let call_sim_callback = if sim_mode {
773 quote! {
774 let ovr = sim_callback(SimStep::#enum_name(CuTaskCallbackState::Stop));
776
777 let doit = if let SimOverride::Errored(reason) = ovr {
778 let error: CuError = reason.into();
779 #monitoring_action
780 false
781 }
782 else {
783 ovr == SimOverride::ExecuteByRuntime
784 };
785 }
786 } else {
787 quote! {
788 let doit = true; }
790 };
791 quote! {
792 #call_sim_callback
793 if doit {
794 let task = &mut self.copper_runtime.tasks.#task_index;
795 if let Err(error) = task.stop(&self.copper_runtime.clock) {
796 #monitoring_action
797 }
798 }
799 }
800 },
801 { let monitoring_action = quote! {
803 let decision = monitor.process_error(#index, CuTaskState::Preprocess, &error);
804 match decision {
805 Decision::Abort => {
806 debug!("Preprocess: ABORT decision from monitoring. Task '{}' errored out \
807 during preprocess. Aborting all the other starts.", #mission_mod::TASKS_IDS[#index]);
808 return Ok(());
809
810 }
811 Decision::Ignore => {
812 debug!("Preprocess: IGNORE decision from monitoring. Task '{}' errored out \
813 during preprocess. The runtime will continue.", #mission_mod::TASKS_IDS[#index]);
814 }
815 Decision::Shutdown => {
816 debug!("Preprocess: SHUTDOWN decision from monitoring. Task '{}' errored out \
817 during preprocess. The runtime cannot continue.", #mission_mod::TASKS_IDS[#index]);
818 return Err(CuError::new_with_cause("Task errored out during preprocess.", error));
819 }
820 }
821 };
822 let call_sim_callback = if sim_mode {
823 quote! {
824 let ovr = sim_callback(SimStep::#enum_name(CuTaskCallbackState::Preprocess));
826
827 let doit = if let SimOverride::Errored(reason) = ovr {
828 let error: CuError = reason.into();
829 #monitoring_action
830 false
831 } else {
832 ovr == SimOverride::ExecuteByRuntime
833 };
834 }
835 } else {
836 quote! {
837 let doit = true; }
839 };
840 quote! {
841 #call_sim_callback
842 if doit {
843 if let Err(error) = tasks.#task_index.preprocess(clock) {
844 #monitoring_action
845 }
846 }
847 }
848 },
849 { let monitoring_action = quote! {
851 let decision = monitor.process_error(#index, CuTaskState::Postprocess, &error);
852 match decision {
853 Decision::Abort => {
854 debug!("Postprocess: ABORT decision from monitoring. Task '{}' errored out \
855 during postprocess. Aborting all the other starts.", #mission_mod::TASKS_IDS[#index]);
856 return Ok(());
857
858 }
859 Decision::Ignore => {
860 debug!("Postprocess: IGNORE decision from monitoring. Task '{}' errored out \
861 during postprocess. The runtime will continue.", #mission_mod::TASKS_IDS[#index]);
862 }
863 Decision::Shutdown => {
864 debug!("Postprocess: SHUTDOWN decision from monitoring. Task '{}' errored out \
865 during postprocess. The runtime cannot continue.", #mission_mod::TASKS_IDS[#index]);
866 return Err(CuError::new_with_cause("Task errored out during postprocess.", error));
867 }
868 }
869 };
870 let call_sim_callback = if sim_mode {
871 quote! {
872 let ovr = sim_callback(SimStep::#enum_name(CuTaskCallbackState::Postprocess));
874
875 let doit = if let SimOverride::Errored(reason) = ovr {
876 let error: CuError = reason.into();
877 #monitoring_action
878 false
879 } else {
880 ovr == SimOverride::ExecuteByRuntime
881 };
882 }
883 } else {
884 quote! {
885 let doit = true; }
887 };
888 quote! {
889 #call_sim_callback
890 if doit {
891 if let Err(error) = tasks.#task_index.postprocess(clock) {
892 #monitoring_action
893 }
894 }
895 }
896 }
897 )
898 })
899 );
900
901 let bridge_start_calls: Vec<proc_macro2::TokenStream> = culist_bridge_specs
902 .iter()
903 .map(|spec| {
904 let bridge_index = int2sliceindex(spec.tuple_index as u32);
905 let monitor_index = syn::Index::from(
906 spec.monitor_index
907 .expect("Bridge missing monitor index for start"),
908 );
909 quote! {
910 {
911 let bridge = &mut self.copper_runtime.bridges.#bridge_index;
912 if let Err(error) = bridge.start(&self.copper_runtime.clock) {
913 let decision = self.copper_runtime.monitor.process_error(#monitor_index, CuTaskState::Start, &error);
914 match decision {
915 Decision::Abort => {
916 debug!("Start: ABORT decision from monitoring. Task '{}' errored out during start. Aborting all the other starts.", #mission_mod::TASKS_IDS[#monitor_index]);
917 return Ok(());
918 }
919 Decision::Ignore => {
920 debug!("Start: IGNORE decision from monitoring. Task '{}' errored out during start. The runtime will continue.", #mission_mod::TASKS_IDS[#monitor_index]);
921 }
922 Decision::Shutdown => {
923 debug!("Start: SHUTDOWN decision from monitoring. Task '{}' errored out during start. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
924 return Err(CuError::new_with_cause("Task errored out during start.", error));
925 }
926 }
927 }
928 }
929 }
930 })
931 .collect();
932
933 let bridge_stop_calls: Vec<proc_macro2::TokenStream> = culist_bridge_specs
934 .iter()
935 .map(|spec| {
936 let bridge_index = int2sliceindex(spec.tuple_index as u32);
937 let monitor_index = syn::Index::from(
938 spec.monitor_index
939 .expect("Bridge missing monitor index for stop"),
940 );
941 quote! {
942 {
943 let bridge = &mut self.copper_runtime.bridges.#bridge_index;
944 if let Err(error) = bridge.stop(&self.copper_runtime.clock) {
945 let decision = self.copper_runtime.monitor.process_error(#monitor_index, CuTaskState::Stop, &error);
946 match decision {
947 Decision::Abort => {
948 debug!("Stop: ABORT decision from monitoring. Task '{}' errored out during stop. Aborting all the other stops.", #mission_mod::TASKS_IDS[#monitor_index]);
949 return Ok(());
950 }
951 Decision::Ignore => {
952 debug!("Stop: IGNORE decision from monitoring. Task '{}' errored out during stop. The runtime will continue.", #mission_mod::TASKS_IDS[#monitor_index]);
953 }
954 Decision::Shutdown => {
955 debug!("Stop: SHUTDOWN decision from monitoring. Task '{}' errored out during stop. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
956 return Err(CuError::new_with_cause("Task errored out during stop.", error));
957 }
958 }
959 }
960 }
961 }
962 })
963 .collect();
964
965 let bridge_preprocess_calls: Vec<proc_macro2::TokenStream> = culist_bridge_specs
966 .iter()
967 .map(|spec| {
968 let bridge_index = int2sliceindex(spec.tuple_index as u32);
969 let monitor_index = syn::Index::from(
970 spec.monitor_index
971 .expect("Bridge missing monitor index for preprocess"),
972 );
973 quote! {
974 {
975 let bridge = &mut bridges.#bridge_index;
976 if let Err(error) = bridge.preprocess(clock) {
977 let decision = monitor.process_error(#monitor_index, CuTaskState::Preprocess, &error);
978 match decision {
979 Decision::Abort => {
980 debug!("Preprocess: ABORT decision from monitoring. Task '{}' errored out during preprocess. Aborting all the other starts.", #mission_mod::TASKS_IDS[#monitor_index]);
981 return Ok(());
982 }
983 Decision::Ignore => {
984 debug!("Preprocess: IGNORE decision from monitoring. Task '{}' errored out during preprocess. The runtime will continue.", #mission_mod::TASKS_IDS[#monitor_index]);
985 }
986 Decision::Shutdown => {
987 debug!("Preprocess: SHUTDOWN decision from monitoring. Task '{}' errored out during preprocess. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
988 return Err(CuError::new_with_cause("Task errored out during preprocess.", error));
989 }
990 }
991 }
992 }
993 }
994 })
995 .collect();
996
997 let bridge_postprocess_calls: Vec<proc_macro2::TokenStream> = culist_bridge_specs
998 .iter()
999 .map(|spec| {
1000 let bridge_index = int2sliceindex(spec.tuple_index as u32);
1001 let monitor_index = syn::Index::from(
1002 spec.monitor_index
1003 .expect("Bridge missing monitor index for postprocess"),
1004 );
1005 quote! {
1006 {
1007 let bridge = &mut bridges.#bridge_index;
1008 if let Err(error) = bridge.postprocess(clock) {
1009 let decision = monitor.process_error(#monitor_index, CuTaskState::Postprocess, &error);
1010 match decision {
1011 Decision::Abort => {
1012 debug!("Postprocess: ABORT decision from monitoring. Task '{}' errored out during postprocess. Aborting all the other starts.", #mission_mod::TASKS_IDS[#monitor_index]);
1013 return Ok(());
1014 }
1015 Decision::Ignore => {
1016 debug!("Postprocess: IGNORE decision from monitoring. Task '{}' errored out during postprocess. The runtime will continue.", #mission_mod::TASKS_IDS[#monitor_index]);
1017 }
1018 Decision::Shutdown => {
1019 debug!("Postprocess: SHUTDOWN decision from monitoring. Task '{}' errored out during postprocess. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
1020 return Err(CuError::new_with_cause("Task errored out during postprocess.", error));
1021 }
1022 }
1023 }
1024 }
1025 }
1026 })
1027 .collect();
1028
1029 let mut start_calls = bridge_start_calls;
1030 start_calls.extend(task_start_calls);
1031 let mut stop_calls = task_stop_calls;
1032 stop_calls.extend(bridge_stop_calls);
1033 let mut preprocess_calls = bridge_preprocess_calls;
1034 preprocess_calls.extend(task_preprocess_calls);
1035 let mut postprocess_calls = task_postprocess_calls;
1036 postprocess_calls.extend(bridge_postprocess_calls);
1037
1038 let runtime_plan_code_and_logging: Vec<(
1039 proc_macro2::TokenStream,
1040 proc_macro2::TokenStream,
1041 )> = culist_plan
1042 .steps
1043 .iter()
1044 .map(|unit| match unit {
1045 CuExecutionUnit::Step(step) => {
1046 #[cfg(feature = "macro_debug")]
1047 eprintln!(
1048 "{} -> {} as {:?}. task_id: {} Input={:?}, Output={:?}",
1049 step.node.get_id(),
1050 step.node.get_type(),
1051 step.task_type,
1052 step.node_id,
1053 step.input_msg_indices_types,
1054 step.output_msg_index_type
1055 );
1056
1057 match &culist_exec_entities[step.node_id as usize].kind {
1058 ExecutionEntityKind::Task { task_index } => generate_task_execution_tokens(
1059 step,
1060 *task_index,
1061 &task_specs,
1062 sim_mode,
1063 &mission_mod,
1064 ),
1065 ExecutionEntityKind::BridgeRx {
1066 bridge_index,
1067 channel_index,
1068 } => {
1069 let spec = &culist_bridge_specs[*bridge_index];
1070 generate_bridge_rx_execution_tokens(spec, *channel_index, &mission_mod)
1071 }
1072 ExecutionEntityKind::BridgeTx {
1073 bridge_index,
1074 channel_index,
1075 } => {
1076 let spec = &culist_bridge_specs[*bridge_index];
1077 generate_bridge_tx_execution_tokens(
1078 step,
1079 spec,
1080 *channel_index,
1081 &mission_mod,
1082 )
1083 }
1084 }
1085 }
1086 CuExecutionUnit::Loop(_) => {
1087 panic!("Execution loops are not supported in runtime generation");
1088 }
1089 })
1090 .collect();
1091
1092 let sim_support = if sim_mode {
1093 Some(gen_sim_support(&culist_plan, &culist_exec_entities))
1094 } else {
1095 None
1096 };
1097
1098 let (new, run_one_iteration, start_all_tasks, stop_all_tasks, run) = if sim_mode {
1099 (
1100 quote! {
1101 fn new(clock:RobotClock, unified_logger: Arc<Mutex<L>>, config_override: Option<CuConfig>, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<Self>
1102 },
1103 quote! {
1104 fn run_one_iteration(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()>
1105 },
1106 quote! {
1107 fn start_all_tasks(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()>
1108 },
1109 quote! {
1110 fn stop_all_tasks(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()>
1111 },
1112 quote! {
1113 fn run(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()>
1114 },
1115 )
1116 } else {
1117 (
1118 if std {
1119 quote! {
1120 fn new(clock:RobotClock, unified_logger: Arc<Mutex<L>>, config_override: Option<CuConfig>) -> CuResult<Self>
1121 }
1122 } else {
1123 quote! {
1124 fn new(clock:RobotClock, unified_logger: Arc<Mutex<L>>) -> CuResult<Self>
1126 }
1127 },
1128 quote! {
1129 fn run_one_iteration(&mut self) -> CuResult<()>
1130 },
1131 quote! {
1132 fn start_all_tasks(&mut self) -> CuResult<()>
1133 },
1134 quote! {
1135 fn stop_all_tasks(&mut self) -> CuResult<()>
1136 },
1137 quote! {
1138 fn run(&mut self) -> CuResult<()>
1139 },
1140 )
1141 };
1142
1143 let sim_callback_arg = if sim_mode {
1144 Some(quote!(sim_callback))
1145 } else {
1146 None
1147 };
1148
1149 let app_trait = if sim_mode {
1150 quote!(CuSimApplication)
1151 } else {
1152 quote!(CuApplication)
1153 };
1154
1155 let sim_callback_on_new_calls = task_specs.ids.iter().enumerate().map(|(i, id)| {
1156 let enum_name = config_id_to_enum(id);
1157 let enum_ident = Ident::new(&enum_name, Span::call_site());
1158 quote! {
1159 sim_callback(SimStep::#enum_ident(CuTaskCallbackState::New(all_instances_configs[#i].cloned())));
1161 }
1162 });
1163
1164 let sim_callback_on_new = if sim_mode {
1165 Some(quote! {
1166 let graph = config.get_graph(Some(#mission)).expect("Could not find the mission #mission");
1167 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
1168 .get_all_nodes()
1169 .iter()
1170 .map(|(_, node)| node.get_instance_config())
1171 .collect();
1172 #(#sim_callback_on_new_calls)*
1173 })
1174 } else {
1175 None
1176 };
1177
1178 let (runtime_plan_code, preprocess_logging_calls): (Vec<_>, Vec<_>) =
1179 itertools::multiunzip(runtime_plan_code_and_logging);
1180
1181 let config_load_stmt = if std {
1182 quote! {
1183 let config = if let Some(overridden_config) = config_override {
1184 debug!("CuConfig: Overridden programmatically: {}", overridden_config.serialize_ron());
1185 overridden_config
1186 } else if ::std::path::Path::new(config_filename).exists() {
1187 debug!("CuConfig: Reading configuration from file: {}", config_filename);
1188 cu29::config::read_configuration(config_filename)?
1189 } else {
1190 let original_config = <Self as #app_trait<S, L>>::get_original_config();
1191 debug!("CuConfig: Using the original configuration the project was compiled with: {}", &original_config);
1192 cu29::config::read_configuration_str(original_config, None)?
1193 };
1194 }
1195 } else {
1196 quote! {
1197 let original_config = <Self as #app_trait<S, L>>::get_original_config();
1199 debug!("CuConfig: Using the original configuration the project was compiled with: {}", &original_config);
1200 let config = cu29::config::read_configuration_str(original_config, None)?;
1201 }
1202 };
1203
1204 let kill_handler = if std {
1205 Some(quote! {
1206 ctrlc::set_handler(move || {
1207 STOP_FLAG.store(true, Ordering::SeqCst);
1208 }).expect("Error setting Ctrl-C handler");
1209 })
1210 } else {
1211 None
1212 };
1213
1214 let run_loop = if std {
1215 quote! {
1216 loop {
1217 let iter_start = self.copper_runtime.clock.now();
1218 let result = <Self as #app_trait<S, L>>::run_one_iteration(self, #sim_callback_arg);
1219
1220 if let Some(rate) = self.copper_runtime.runtime_config.rate_target_hz {
1221 let period: CuDuration = (1_000_000_000u64 / rate).into();
1222 let elapsed = self.copper_runtime.clock.now() - iter_start;
1223 if elapsed < period {
1224 std::thread::sleep(std::time::Duration::from_nanos(period.as_nanos() - elapsed.as_nanos()));
1225 }
1226 }
1227
1228 if STOP_FLAG.load(Ordering::SeqCst) || result.is_err() {
1229 break result;
1230 }
1231 }
1232 }
1233 } else {
1234 quote! {
1235 loop {
1236 let iter_start = self.copper_runtime.clock.now();
1237 let result = <Self as #app_trait<S, L>>::run_one_iteration(self, #sim_callback_arg);
1238 if let Some(rate) = self.copper_runtime.runtime_config.rate_target_hz {
1239 let period: CuDuration = (1_000_000_000u64 / rate).into();
1240 let elapsed = self.copper_runtime.clock.now() - iter_start;
1241 if elapsed < period {
1242 busy_wait_for(period - elapsed);
1243 }
1244 }
1245
1246 if STOP_FLAG.load(Ordering::SeqCst) || result.is_err() {
1247 break result;
1248 }
1249 }
1250 }
1251 };
1252
1253 #[cfg(feature = "macro_debug")]
1254 eprintln!("[build the run methods]");
1255 let run_methods = quote! {
1256
1257 #run_one_iteration {
1258
1259 let runtime = &mut self.copper_runtime;
1261 let clock = &runtime.clock;
1262 let monitor = &mut runtime.monitor;
1263 let tasks = &mut runtime.tasks;
1264 let bridges = &mut runtime.bridges;
1265 let cl_manager = &mut runtime.copperlists_manager;
1266 let kf_manager = &mut runtime.keyframes_manager;
1267
1268 #(#preprocess_calls)*
1270
1271 let culist = cl_manager.inner.create().expect("Ran out of space for copper lists"); let clid = culist.id;
1273 kf_manager.reset(clid, clock); culist.change_state(cu29::copperlist::CopperListState::Processing);
1275 culist.msgs.init_zeroed();
1276 {
1277 let msgs = &mut culist.msgs.0;
1278 #(#runtime_plan_code)*
1279 } monitor.process_copperlist(&#mission_mod::collect_metadata(&culist))?;
1281
1282 #(#preprocess_logging_calls)*
1284
1285 cl_manager.end_of_processing(clid)?;
1286 kf_manager.end_of_processing(clid)?;
1287
1288 #(#postprocess_calls)*
1290 Ok(())
1291 }
1292
1293 fn restore_keyframe(&mut self, keyframe: &KeyFrame) -> CuResult<()> {
1294 let runtime = &mut self.copper_runtime;
1295 let clock = &runtime.clock;
1296 let tasks = &mut runtime.tasks;
1297 let config = cu29::bincode::config::standard();
1298 let reader = cu29::bincode::de::read::SliceReader::new(&keyframe.serialized_tasks);
1299 let mut decoder = DecoderImpl::new(reader, config, ());
1300 #(#task_restore_code);*;
1301 Ok(())
1302 }
1303
1304 #start_all_tasks {
1305 #(#start_calls)*
1306 self.copper_runtime.monitor.start(&self.copper_runtime.clock)?;
1307 Ok(())
1308 }
1309
1310 #stop_all_tasks {
1311 #(#stop_calls)*
1312 self.copper_runtime.monitor.stop(&self.copper_runtime.clock)?;
1313 Ok(())
1314 }
1315
1316 #run {
1317 static STOP_FLAG: AtomicBool = AtomicBool::new(false);
1318
1319 #kill_handler
1320
1321 <Self as #app_trait<S, L>>::start_all_tasks(self, #sim_callback_arg)?;
1322 let result = #run_loop;
1323
1324 if result.is_err() {
1325 error!("A task errored out: {}", &result);
1326 }
1327 <Self as #app_trait<S, L>>::stop_all_tasks(self, #sim_callback_arg)?;
1328 result
1329 }
1330 };
1331
1332 let tasks_type = if sim_mode {
1333 quote!(CuSimTasks)
1334 } else {
1335 quote!(CuTasks)
1336 };
1337
1338 let tasks_instanciator_fn = if sim_mode {
1339 quote!(tasks_instanciator_sim)
1340 } else {
1341 quote!(tasks_instanciator)
1342 };
1343
1344 let app_impl_decl = if sim_mode {
1345 quote!(impl<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static> CuSimApplication<S, L> for #application_name)
1346 } else {
1347 quote!(impl<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static> CuApplication<S, L> for #application_name)
1348 };
1349
1350 let simstep_type_decl = if sim_mode {
1351 quote!(
1352 type Step<'z> = SimStep<'z>;
1353 )
1354 } else {
1355 quote!()
1356 };
1357
1358 #[cfg(feature = "std")]
1359 #[cfg(feature = "macro_debug")]
1360 eprintln!("[build result]");
1361 let application_impl = quote! {
1362 #app_impl_decl {
1363 #simstep_type_decl
1364
1365 #new {
1366 let config_filename = #config_file;
1367
1368 #config_load_stmt
1369
1370 let mut default_section_size = size_of::<super::#mission_mod::CuList>() * 64;
1373 if let Some(section_size_mib) = config.logging.as_ref().and_then(|l| l.section_size_mib) {
1375 default_section_size = section_size_mib as usize * 1024usize * 1024usize;
1377 }
1378 let copperlist_stream = stream_write::<#mission_mod::CuList, S>(
1379 unified_logger.clone(),
1380 UnifiedLogType::CopperList,
1381 default_section_size,
1382 )?;
1386
1387 let keyframes_stream = stream_write::<KeyFrame, S>(
1388 unified_logger.clone(),
1389 UnifiedLogType::FrozenTasks,
1390 1024 * 1024 * 10, )?;
1392
1393
1394 let application = Ok(#application_name {
1395 copper_runtime: CuRuntime::<#mission_mod::#tasks_type, #mission_mod::CuBridges, #mission_mod::CuStampedDataSet, #monitor_type, #DEFAULT_CLNB>::new(
1396 clock,
1397 &config,
1398 Some(#mission),
1399 #mission_mod::#tasks_instanciator_fn,
1400 #mission_mod::monitor_instanciator,
1401 #mission_mod::bridges_instanciator,
1402 copperlist_stream,
1403 keyframes_stream)?, });
1405
1406 #sim_callback_on_new
1407
1408 application
1409 }
1410
1411 fn get_original_config() -> String {
1412 #copper_config_content.to_string()
1413 }
1414
1415 #run_methods
1416 }
1417 };
1418
1419 let (
1420 builder_struct,
1421 builder_new,
1422 builder_impl,
1423 builder_sim_callback_method,
1424 builder_build_sim_callback_arg,
1425 ) = if sim_mode {
1426 (
1427 quote! {
1428 #[allow(dead_code)]
1429 pub struct #builder_name <'a, F> {
1430 clock: Option<RobotClock>,
1431 unified_logger: Option<Arc<Mutex<UnifiedLoggerWrite>>>,
1432 config_override: Option<CuConfig>,
1433 sim_callback: Option<&'a mut F>
1434 }
1435 },
1436 quote! {
1437 #[allow(dead_code)]
1438 pub fn new() -> Self {
1439 Self {
1440 clock: None,
1441 unified_logger: None,
1442 config_override: None,
1443 sim_callback: None,
1444 }
1445 }
1446 },
1447 quote! {
1448 impl<'a, F> #builder_name <'a, F>
1449 where
1450 F: FnMut(SimStep) -> SimOverride,
1451 },
1452 Some(quote! {
1453 pub fn with_sim_callback(mut self, sim_callback: &'a mut F) -> Self
1454 {
1455 self.sim_callback = Some(sim_callback);
1456 self
1457 }
1458 }),
1459 Some(quote! {
1460 self.sim_callback
1461 .ok_or(CuError::from("Sim callback missing from builder"))?,
1462 }),
1463 )
1464 } else {
1465 (
1466 quote! {
1467 #[allow(dead_code)]
1468 pub struct #builder_name {
1469 clock: Option<RobotClock>,
1470 unified_logger: Option<Arc<Mutex<UnifiedLoggerWrite>>>,
1471 config_override: Option<CuConfig>,
1472 }
1473 },
1474 quote! {
1475 #[allow(dead_code)]
1476 pub fn new() -> Self {
1477 Self {
1478 clock: None,
1479 unified_logger: None,
1480 config_override: None,
1481 }
1482 }
1483 },
1484 quote! {
1485 impl #builder_name
1486 },
1487 None,
1488 None,
1489 )
1490 };
1491
1492 let std_application_impl = if sim_mode {
1494 Some(quote! {
1496 impl #application_name {
1497 pub fn start_all_tasks(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()> {
1498 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::start_all_tasks(self, sim_callback)
1499 }
1500 pub fn run_one_iteration(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()> {
1501 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::run_one_iteration(self, sim_callback)
1502 }
1503 pub fn run(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()> {
1504 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::run(self, sim_callback)
1505 }
1506 pub fn stop_all_tasks(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()> {
1507 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::stop_all_tasks(self, sim_callback)
1508 }
1509 }
1510 })
1511 } else if std {
1512 Some(quote! {
1514 impl #application_name {
1515 pub fn start_all_tasks(&mut self) -> CuResult<()> {
1516 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::start_all_tasks(self)
1517 }
1518 pub fn run_one_iteration(&mut self) -> CuResult<()> {
1519 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::run_one_iteration(self)
1520 }
1521 pub fn run(&mut self) -> CuResult<()> {
1522 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::run(self)
1523 }
1524 pub fn stop_all_tasks(&mut self) -> CuResult<()> {
1525 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::stop_all_tasks(self)
1526 }
1527 }
1528 })
1529 } else {
1530 None };
1532
1533 let application_builder = if std {
1534 Some(quote! {
1535 #builder_struct
1536
1537 #builder_impl
1538 {
1539 #builder_new
1540
1541 #[allow(dead_code)]
1542 pub fn with_clock(mut self, clock: RobotClock) -> Self {
1543 self.clock = Some(clock);
1544 self
1545 }
1546
1547 #[allow(dead_code)]
1548 pub fn with_unified_logger(mut self, unified_logger: Arc<Mutex<UnifiedLoggerWrite>>) -> Self {
1549 self.unified_logger = Some(unified_logger);
1550 self
1551 }
1552
1553 #[allow(dead_code)]
1554 pub fn with_context(mut self, copper_ctx: &CopperContext) -> Self {
1555 self.clock = Some(copper_ctx.clock.clone());
1556 self.unified_logger = Some(copper_ctx.unified_logger.clone());
1557 self
1558 }
1559
1560 #[allow(dead_code)]
1561 pub fn with_config(mut self, config_override: CuConfig) -> Self {
1562 self.config_override = Some(config_override);
1563 self
1564 }
1565
1566 #builder_sim_callback_method
1567
1568 #[allow(dead_code)]
1569 pub fn build(self) -> CuResult<#application_name> {
1570 #application_name::new(
1571 self.clock
1572 .ok_or(CuError::from("Clock missing from builder"))?,
1573 self.unified_logger
1574 .ok_or(CuError::from("Unified logger missing from builder"))?,
1575 self.config_override,
1576 #builder_build_sim_callback_arg
1577 )
1578 }
1579 }
1580 })
1581 } else {
1582 None
1584 };
1585
1586 let sim_imports = if sim_mode {
1587 Some(quote! {
1588 use cu29::simulation::SimOverride;
1589 use cu29::simulation::CuTaskCallbackState;
1590 use cu29::simulation::CuSimSrcTask;
1591 use cu29::simulation::CuSimSinkTask;
1592 use cu29::prelude::app::CuSimApplication;
1593 })
1594 } else {
1595 None
1596 };
1597
1598 let sim_tasks = if sim_mode {
1599 Some(quote! {
1600 pub type CuSimTasks = #task_types_tuple_sim;
1603 })
1604 } else {
1605 None
1606 };
1607
1608 let sim_inst_body = if task_sim_instances_init_code.is_empty() {
1609 quote! { Ok(()) }
1610 } else {
1611 quote! { Ok(( #(#task_sim_instances_init_code),*, )) }
1612 };
1613
1614 let sim_tasks_instanciator = if sim_mode {
1615 Some(quote! {
1616 pub fn tasks_instanciator_sim(all_instances_configs: Vec<Option<&ComponentConfig>>, _threadpool: Arc<ThreadPool>) -> CuResult<CuSimTasks> {
1617 #sim_inst_body
1618 }})
1619 } else {
1620 None
1621 };
1622
1623 let tasks_inst_body_std = if task_instances_init_code.is_empty() {
1624 quote! {
1625 let _ = threadpool;
1626 Ok(())
1627 }
1628 } else {
1629 quote! { Ok(( #(#task_instances_init_code),*, )) }
1630 };
1631
1632 let tasks_inst_body_nostd = if task_instances_init_code.is_empty() {
1633 quote! { Ok(()) }
1634 } else {
1635 quote! { Ok(( #(#task_instances_init_code),*, )) }
1636 };
1637
1638 let tasks_instanciator = if std {
1639 quote! {
1640 pub fn tasks_instanciator<'c>(all_instances_configs: Vec<Option<&'c ComponentConfig>>, threadpool: Arc<ThreadPool>) -> CuResult<CuTasks> {
1641 #tasks_inst_body_std
1642 }
1643 }
1644 } else {
1645 quote! {
1647 pub fn tasks_instanciator<'c>(all_instances_configs: Vec<Option<&'c ComponentConfig>>) -> CuResult<CuTasks> {
1648 #tasks_inst_body_nostd
1649 }
1650 }
1651 };
1652
1653 let imports = if std {
1654 quote! {
1655 use cu29::rayon::ThreadPool;
1656 use cu29::cuasynctask::CuAsyncTask;
1657 use cu29::curuntime::CopperContext;
1658 use cu29::prelude::UnifiedLoggerWrite;
1659 use cu29::prelude::memmap::MmapSectionStorage;
1660 use std::fmt::{Debug, Formatter};
1661 use std::fmt::Result as FmtResult;
1662 use std::mem::size_of;
1663 use std::sync::Arc;
1664 use std::sync::atomic::{AtomicBool, Ordering};
1665 use std::sync::Mutex;
1666 }
1667 } else {
1668 quote! {
1669 use alloc::sync::Arc;
1670 use alloc::string::String;
1671 use alloc::string::ToString;
1672 use core::sync::atomic::{AtomicBool, Ordering};
1673 use core::fmt::{Debug, Formatter};
1674 use core::fmt::Result as FmtResult;
1675 use core::mem::size_of;
1676 use spin::Mutex;
1677 }
1678 };
1679
1680 let mission_mod_tokens = quote! {
1682 mod #mission_mod {
1683 use super::*; use cu29::bincode::Encode;
1686 use cu29::bincode::enc::Encoder;
1687 use cu29::bincode::error::EncodeError;
1688 use cu29::bincode::Decode;
1689 use cu29::bincode::de::Decoder;
1690 use cu29::bincode::de::DecoderImpl;
1691 use cu29::bincode::error::DecodeError;
1692 use cu29::clock::RobotClock;
1693 use cu29::config::CuConfig;
1694 use cu29::config::ComponentConfig;
1695 use cu29::curuntime::CuRuntime;
1696 use cu29::curuntime::KeyFrame;
1697 use cu29::CuResult;
1698 use cu29::CuError;
1699 use cu29::cutask::CuSrcTask;
1700 use cu29::cutask::CuSinkTask;
1701 use cu29::cutask::CuTask;
1702 use cu29::cutask::CuMsg;
1703 use cu29::cutask::CuMsgMetadata;
1704 use cu29::copperlist::CopperList;
1705 use cu29::monitoring::CuMonitor; use cu29::monitoring::CuTaskState;
1707 use cu29::monitoring::Decision;
1708 use cu29::prelude::app::CuApplication;
1709 use cu29::prelude::debug;
1710 use cu29::prelude::stream_write;
1711 use cu29::prelude::UnifiedLogType;
1712 use cu29::prelude::UnifiedLogWrite;
1713
1714 #imports
1715
1716 #sim_imports
1717
1718 #[allow(unused_imports)]
1720 use cu29::monitoring::NoMonitor;
1721
1722 pub type CuTasks = #task_types_tuple;
1726 pub type CuBridges = #bridges_type_tokens;
1727
1728 #sim_tasks
1729 #sim_support
1730 #sim_tasks_instanciator
1731
1732 pub const TASKS_IDS: &'static [&'static str] = &[#( #ids ),*];
1733
1734 #culist_support
1735 #tasks_instanciator
1736 #bridges_instanciator
1737
1738 pub fn monitor_instanciator(config: &CuConfig) -> #monitor_type {
1739 #monitor_type::new(config, #mission_mod::TASKS_IDS).expect("Failed to create the given monitor.")
1740 }
1741
1742 pub #application_struct
1744
1745 #application_impl
1746
1747 #std_application_impl
1748
1749 #application_builder
1750 }
1751
1752 };
1753 all_missions_tokens.push(mission_mod_tokens);
1754 }
1755
1756 let default_application_tokens = if all_missions.contains_key("default") {
1757 let default_builder = if std {
1758 Some(quote! {
1759 #[allow(unused_imports)]
1761 use default::#builder_name;
1762 })
1763 } else {
1764 None
1765 };
1766 quote! {
1767 #default_builder
1768
1769 #[allow(unused_imports)]
1770 use default::#application_name;
1771 }
1772 } else {
1773 quote!() };
1775
1776 let result: proc_macro2::TokenStream = quote! {
1777 #(#all_missions_tokens)*
1778 #default_application_tokens
1779 };
1780
1781 #[cfg(feature = "macro_debug")]
1783 {
1784 let formatted_code = rustfmt_generated_code(result.to_string());
1785 eprintln!("\n === Gen. Runtime ===\n");
1786 eprintln!("{formatted_code}");
1787 eprintln!("\n === === === === === ===\n");
1790 }
1791 result.into()
1792}
1793
1794fn read_config(config_file: &str) -> CuResult<CuConfig> {
1795 let filename = config_full_path(config_file);
1796
1797 read_configuration(filename.as_str())
1798}
1799
1800fn config_full_path(config_file: &str) -> String {
1801 let mut config_full_path = utils::caller_crate_root();
1802 config_full_path.push(config_file);
1803 let filename = config_full_path
1804 .as_os_str()
1805 .to_str()
1806 .expect("Could not interpret the config file name");
1807 filename.to_string()
1808}
1809
1810fn extract_tasks_output_types(graph: &CuGraph) -> Vec<Option<Type>> {
1811 let result = graph
1812 .get_all_nodes()
1813 .iter()
1814 .map(|(_, node)| {
1815 let id = node.get_id();
1816 let type_str = graph.get_node_output_msg_type(id.as_str());
1817 let result = type_str.map(|type_str| {
1818 let result = parse_str::<Type>(type_str.as_str())
1819 .expect("Could not parse output message type.");
1820 result
1821 });
1822 result
1823 })
1824 .collect();
1825 result
1826}
1827
1828struct CuTaskSpecSet {
1829 pub ids: Vec<String>,
1830 pub cutypes: Vec<CuTaskType>,
1831 pub background_flags: Vec<bool>,
1832 pub logging_enabled: Vec<bool>,
1833 pub type_names: Vec<String>,
1834 pub task_types: Vec<Type>,
1835 pub instantiation_types: Vec<Type>,
1836 pub sim_task_types: Vec<Type>,
1837 pub run_in_sim_flags: Vec<bool>,
1838 #[allow(dead_code)]
1839 pub output_types: Vec<Option<Type>>,
1840 pub node_id_to_task_index: Vec<Option<usize>>,
1841}
1842
1843impl CuTaskSpecSet {
1844 pub fn from_graph(graph: &CuGraph) -> Self {
1845 let all_id_nodes: Vec<(NodeId, &Node)> = graph
1846 .get_all_nodes()
1847 .into_iter()
1848 .filter(|(_, node)| node.get_flavor() == Flavor::Task)
1849 .collect();
1850
1851 let ids = all_id_nodes
1852 .iter()
1853 .map(|(_, node)| node.get_id().to_string())
1854 .collect();
1855
1856 let cutypes = all_id_nodes
1857 .iter()
1858 .map(|(id, _)| find_task_type_for_id(graph, *id))
1859 .collect();
1860
1861 let background_flags: Vec<bool> = all_id_nodes
1862 .iter()
1863 .map(|(_, node)| node.is_background())
1864 .collect();
1865
1866 let logging_enabled: Vec<bool> = all_id_nodes
1867 .iter()
1868 .map(|(_, node)| node.is_logging_enabled())
1869 .collect();
1870
1871 let type_names: Vec<String> = all_id_nodes
1872 .iter()
1873 .map(|(_, node)| node.get_type().to_string())
1874 .collect();
1875
1876 let output_types = extract_tasks_output_types(graph);
1877
1878 let task_types = type_names
1879 .iter()
1880 .zip(background_flags.iter())
1881 .zip(output_types.iter())
1882 .map(|((name, &background), output_type)| {
1883 let name_type = parse_str::<Type>(name).unwrap_or_else(|error| {
1884 panic!("Could not transform {name} into a Task Rust type: {error}");
1885 });
1886 if background {
1887 if let Some(output_type) = output_type {
1888 parse_quote!(CuAsyncTask<#name_type, #output_type>)
1889 } else {
1890 panic!("{name}: If a task is background, it has to have an output");
1891 }
1892 } else {
1893 name_type
1894 }
1895 })
1896 .collect();
1897
1898 let instantiation_types = type_names
1899 .iter()
1900 .zip(background_flags.iter())
1901 .zip(output_types.iter())
1902 .map(|((name, &background), output_type)| {
1903 let name_type = parse_str::<Type>(name).unwrap_or_else(|error| {
1904 panic!("Could not transform {name} into a Task Rust type: {error}");
1905 });
1906 if background {
1907 if let Some(output_type) = output_type {
1908 parse_quote!(CuAsyncTask::<#name_type, #output_type>)
1909 } else {
1910 panic!("{name}: If a task is background, it has to have an output");
1911 }
1912 } else {
1913 name_type
1914 }
1915 })
1916 .collect();
1917
1918 let sim_task_types = type_names
1919 .iter()
1920 .map(|name| {
1921 parse_str::<Type>(name).unwrap_or_else(|err| {
1922 eprintln!("Could not transform {name} into a Task Rust type.");
1923 panic!("{err}")
1924 })
1925 })
1926 .collect();
1927
1928 let run_in_sim_flags = all_id_nodes
1929 .iter()
1930 .map(|(_, node)| node.is_run_in_sim())
1931 .collect();
1932
1933 let mut node_id_to_task_index = vec![None; graph.node_count()];
1934 for (index, (node_id, _)) in all_id_nodes.iter().enumerate() {
1935 node_id_to_task_index[*node_id as usize] = Some(index);
1936 }
1937
1938 Self {
1939 ids,
1940 cutypes,
1941 background_flags,
1942 logging_enabled,
1943 type_names,
1944 task_types,
1945 instantiation_types,
1946 sim_task_types,
1947 run_in_sim_flags,
1948 output_types,
1949 node_id_to_task_index,
1950 }
1951 }
1952}
1953
1954fn extract_msg_types(runtime_plan: &CuExecutionLoop) -> Vec<Type> {
1955 runtime_plan
1956 .steps
1957 .iter()
1958 .filter_map(|unit| match unit {
1959 CuExecutionUnit::Step(step) => {
1960 if let Some((_, output_msg_type)) = &step.output_msg_index_type {
1961 Some(
1962 parse_str::<Type>(output_msg_type.as_str()).unwrap_or_else(|_| {
1963 panic!(
1964 "Could not transform {output_msg_type} into a message Rust type."
1965 )
1966 }),
1967 )
1968 } else {
1969 None
1970 }
1971 }
1972 CuExecutionUnit::Loop(_) => todo!("Needs to be implemented"),
1973 })
1974 .collect()
1975}
1976
1977fn build_culist_tuple(all_msgs_types_in_culist_order: &[Type]) -> TypeTuple {
1979 if all_msgs_types_in_culist_order.is_empty() {
1980 parse_quote! { () }
1981 } else {
1982 parse_quote! {
1983 ( #( CuMsg<#all_msgs_types_in_culist_order> ),* )
1984 }
1985 }
1986}
1987
1988fn build_culist_tuple_encode(all_msgs_types_in_culist_order: &[Type]) -> ItemImpl {
1990 let indices: Vec<usize> = (0..all_msgs_types_in_culist_order.len()).collect();
1991
1992 let encode_fields: Vec<_> = indices
1994 .iter()
1995 .map(|i| {
1996 let idx = syn::Index::from(*i);
1997 quote! { self.0.#idx.encode(encoder)?; }
1998 })
1999 .collect();
2000
2001 parse_quote! {
2002 impl Encode for CuStampedDataSet {
2003 fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
2004 #(#encode_fields)*
2005 Ok(())
2006 }
2007 }
2008 }
2009}
2010
2011fn build_culist_tuple_decode(all_msgs_types_in_culist_order: &[Type]) -> ItemImpl {
2013 let indices: Vec<usize> = (0..all_msgs_types_in_culist_order.len()).collect();
2014
2015 let decode_fields: Vec<_> = indices
2017 .iter()
2018 .map(|i| {
2019 let t = &all_msgs_types_in_culist_order[*i];
2020 quote! { CuMsg::<#t>::decode(decoder)? }
2021 })
2022 .collect();
2023
2024 parse_quote! {
2025 impl Decode<()> for CuStampedDataSet {
2026 fn decode<D: Decoder<Context=()>>(decoder: &mut D) -> Result<Self, DecodeError> {
2027 Ok(CuStampedDataSet ((
2028 #(#decode_fields),*
2029 )))
2030 }
2031 }
2032 }
2033}
2034
2035fn build_culist_erasedcumsgs(all_msgs_types_in_culist_order: &[Type]) -> ItemImpl {
2036 let indices: Vec<usize> = (0..all_msgs_types_in_culist_order.len()).collect();
2037 let casted_fields: Vec<_> = indices
2038 .iter()
2039 .map(|i| {
2040 let idx = syn::Index::from(*i);
2041 quote! { &self.0.#idx as &dyn ErasedCuStampedData }
2042 })
2043 .collect();
2044 parse_quote! {
2045 impl ErasedCuStampedDataSet for CuStampedDataSet {
2046 fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
2047 vec![
2048 #(#casted_fields),*
2049 ]
2050 }
2051 }
2052 }
2053}
2054
2055fn build_culist_tuple_debug(all_msgs_types_in_culist_order: &[Type]) -> ItemImpl {
2056 let indices: Vec<usize> = (0..all_msgs_types_in_culist_order.len()).collect();
2057
2058 let debug_fields: Vec<_> = indices
2059 .iter()
2060 .map(|i| {
2061 let idx = syn::Index::from(*i);
2062 quote! { .field(&self.0.#idx) }
2063 })
2064 .collect();
2065
2066 parse_quote! {
2067 impl Debug for CuStampedDataSet {
2068 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
2069 f.debug_tuple("CuStampedDataSet")
2070 #(#debug_fields)*
2071 .finish()
2072 }
2073 }
2074 }
2075}
2076
2077fn build_culist_tuple_serialize(all_msgs_types_in_culist_order: &[Type]) -> ItemImpl {
2079 let indices: Vec<usize> = (0..all_msgs_types_in_culist_order.len()).collect();
2080 let tuple_len = all_msgs_types_in_culist_order.len();
2081
2082 let serialize_fields: Vec<_> = indices
2084 .iter()
2085 .map(|i| {
2086 let idx = syn::Index::from(*i);
2087 quote! { &self.0.#idx }
2088 })
2089 .collect();
2090
2091 parse_quote! {
2092 impl Serialize for CuStampedDataSet {
2093 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2094 where
2095 S: serde::Serializer,
2096 {
2097 use serde::ser::SerializeTuple;
2098 let mut tuple = serializer.serialize_tuple(#tuple_len)?;
2099 #(tuple.serialize_element(#serialize_fields)?;)*
2100 tuple.end()
2101 }
2102 }
2103 }
2104}
2105
2106fn build_culist_tuple_default(all_msgs_types_in_culist_order: &[Type]) -> ItemImpl {
2108 let default_fields: Vec<_> = all_msgs_types_in_culist_order
2110 .iter()
2111 .map(|msg_type| quote! { CuStampedData::<#msg_type, CuMsgMetadata>::default() })
2112 .collect();
2113
2114 parse_quote! {
2115 impl Default for CuStampedDataSet {
2116 fn default() -> CuStampedDataSet
2117 {
2118 CuStampedDataSet((
2119 #(#default_fields),*
2120 ))
2121 }
2122 }
2123 }
2124}
2125
2126fn collect_bridge_channel_usage(graph: &CuGraph) -> HashMap<BridgeChannelKey, String> {
2127 let mut usage = HashMap::new();
2128 for edge_idx in graph.0.edge_indices() {
2129 let cnx = graph
2130 .0
2131 .edge_weight(edge_idx)
2132 .expect("Edge should exist while collecting bridge usage")
2133 .clone();
2134 if let Some(channel) = &cnx.src_channel {
2135 let key = BridgeChannelKey {
2136 bridge_id: cnx.src.clone(),
2137 channel_id: channel.clone(),
2138 direction: BridgeChannelDirection::Rx,
2139 };
2140 usage
2141 .entry(key)
2142 .and_modify(|msg| {
2143 if msg != &cnx.msg {
2144 panic!(
2145 "Bridge '{}' channel '{}' is used with incompatible message types: {} vs {}",
2146 cnx.src, channel, msg, cnx.msg
2147 );
2148 }
2149 })
2150 .or_insert(cnx.msg.clone());
2151 }
2152 if let Some(channel) = &cnx.dst_channel {
2153 let key = BridgeChannelKey {
2154 bridge_id: cnx.dst.clone(),
2155 channel_id: channel.clone(),
2156 direction: BridgeChannelDirection::Tx,
2157 };
2158 usage
2159 .entry(key)
2160 .and_modify(|msg| {
2161 if msg != &cnx.msg {
2162 panic!(
2163 "Bridge '{}' channel '{}' is used with incompatible message types: {} vs {}",
2164 cnx.dst, channel, msg, cnx.msg
2165 );
2166 }
2167 })
2168 .or_insert(cnx.msg.clone());
2169 }
2170 }
2171 usage
2172}
2173
2174fn build_bridge_specs(
2175 config: &CuConfig,
2176 graph: &CuGraph,
2177 channel_usage: &HashMap<BridgeChannelKey, String>,
2178) -> Vec<BridgeSpec> {
2179 let mut specs = Vec::new();
2180 for (bridge_index, bridge_cfg) in config.bridges.iter().enumerate() {
2181 if graph.get_node_id_by_name(bridge_cfg.id.as_str()).is_none() {
2182 continue;
2183 }
2184
2185 let type_path = parse_str::<Type>(bridge_cfg.type_.as_str()).unwrap_or_else(|err| {
2186 panic!(
2187 "Could not parse bridge type '{}' for '{}': {err}",
2188 bridge_cfg.type_, bridge_cfg.id
2189 )
2190 });
2191
2192 let mut rx_channels = Vec::new();
2193 let mut tx_channels = Vec::new();
2194
2195 for (channel_index, channel) in bridge_cfg.channels.iter().enumerate() {
2196 match channel {
2197 BridgeChannel::Rx { id, .. } => {
2198 let key = BridgeChannelKey {
2199 bridge_id: bridge_cfg.id.clone(),
2200 channel_id: id.clone(),
2201 direction: BridgeChannelDirection::Rx,
2202 };
2203 if let Some(msg_type) = channel_usage.get(&key) {
2204 let msg_type = parse_str::<Type>(msg_type).unwrap_or_else(|err| {
2205 panic!(
2206 "Could not parse message type '{msg_type}' for bridge '{}' channel '{}': {err}",
2207 bridge_cfg.id, id
2208 )
2209 });
2210 let const_ident =
2211 Ident::new(&config_id_to_bridge_const(id.as_str()), Span::call_site());
2212 rx_channels.push(BridgeChannelSpec {
2213 id: id.clone(),
2214 const_ident,
2215 msg_type,
2216 config_index: channel_index,
2217 plan_node_id: None,
2218 culist_index: None,
2219 monitor_index: None,
2220 });
2221 }
2222 }
2223 BridgeChannel::Tx { id, .. } => {
2224 let key = BridgeChannelKey {
2225 bridge_id: bridge_cfg.id.clone(),
2226 channel_id: id.clone(),
2227 direction: BridgeChannelDirection::Tx,
2228 };
2229 if let Some(msg_type) = channel_usage.get(&key) {
2230 let msg_type = parse_str::<Type>(msg_type).unwrap_or_else(|err| {
2231 panic!(
2232 "Could not parse message type '{msg_type}' for bridge '{}' channel '{}': {err}",
2233 bridge_cfg.id, id
2234 )
2235 });
2236 let const_ident =
2237 Ident::new(&config_id_to_bridge_const(id.as_str()), Span::call_site());
2238 tx_channels.push(BridgeChannelSpec {
2239 id: id.clone(),
2240 const_ident,
2241 msg_type,
2242 config_index: channel_index,
2243 plan_node_id: None,
2244 culist_index: None,
2245 monitor_index: None,
2246 });
2247 }
2248 }
2249 }
2250 }
2251
2252 if rx_channels.is_empty() && tx_channels.is_empty() {
2253 continue;
2254 }
2255
2256 specs.push(BridgeSpec {
2257 id: bridge_cfg.id.clone(),
2258 type_path,
2259 config_index: bridge_index,
2260 tuple_index: 0,
2261 monitor_index: None,
2262 rx_channels,
2263 tx_channels,
2264 });
2265 }
2266
2267 for (tuple_index, spec) in specs.iter_mut().enumerate() {
2268 spec.tuple_index = tuple_index;
2269 }
2270
2271 specs
2272}
2273
2274fn collect_task_member_names(graph: &CuGraph) -> Vec<(NodeId, String)> {
2275 graph
2276 .get_all_nodes()
2277 .iter()
2278 .filter(|(_, node)| node.get_flavor() == Flavor::Task)
2279 .map(|(node_id, node)| (*node_id, config_id_to_struct_member(node.get_id().as_str())))
2280 .collect()
2281}
2282
2283fn build_execution_plan(
2284 graph: &CuGraph,
2285 task_specs: &CuTaskSpecSet,
2286 bridge_specs: &mut [BridgeSpec],
2287) -> CuResult<(
2288 CuExecutionLoop,
2289 Vec<ExecutionEntity>,
2290 HashMap<NodeId, NodeId>,
2291)> {
2292 let mut plan_graph = CuGraph::default();
2293 let mut exec_entities = Vec::new();
2294 let mut original_to_plan = HashMap::new();
2295 let mut plan_to_original = HashMap::new();
2296 let mut name_to_original = HashMap::new();
2297 let mut channel_nodes = HashMap::new();
2298
2299 for (node_id, node) in graph.get_all_nodes() {
2300 name_to_original.insert(node.get_id(), node_id);
2301 if node.get_flavor() != Flavor::Task {
2302 continue;
2303 }
2304 let plan_node_id = plan_graph.add_node(node.clone())?;
2305 let task_index = task_specs.node_id_to_task_index[node_id as usize]
2306 .expect("Task missing from specifications");
2307 plan_to_original.insert(plan_node_id, node_id);
2308 original_to_plan.insert(node_id, plan_node_id);
2309 if plan_node_id as usize != exec_entities.len() {
2310 panic!("Unexpected node ordering while mirroring tasks in plan graph");
2311 }
2312 exec_entities.push(ExecutionEntity {
2313 kind: ExecutionEntityKind::Task { task_index },
2314 });
2315 }
2316
2317 for (bridge_index, spec) in bridge_specs.iter_mut().enumerate() {
2318 for (channel_index, channel_spec) in spec.rx_channels.iter_mut().enumerate() {
2319 let mut node = Node::new(
2320 format!("{}::rx::{}", spec.id, channel_spec.id).as_str(),
2321 "__CuBridgeRxChannel",
2322 );
2323 node.set_flavor(Flavor::Bridge);
2324 let plan_node_id = plan_graph.add_node(node)?;
2325 if plan_node_id as usize != exec_entities.len() {
2326 panic!("Unexpected node ordering while inserting bridge rx channel");
2327 }
2328 channel_spec.plan_node_id = Some(plan_node_id);
2329 exec_entities.push(ExecutionEntity {
2330 kind: ExecutionEntityKind::BridgeRx {
2331 bridge_index,
2332 channel_index,
2333 },
2334 });
2335 channel_nodes.insert(
2336 BridgeChannelKey {
2337 bridge_id: spec.id.clone(),
2338 channel_id: channel_spec.id.clone(),
2339 direction: BridgeChannelDirection::Rx,
2340 },
2341 plan_node_id,
2342 );
2343 }
2344
2345 for (channel_index, channel_spec) in spec.tx_channels.iter_mut().enumerate() {
2346 let mut node = Node::new(
2347 format!("{}::tx::{}", spec.id, channel_spec.id).as_str(),
2348 "__CuBridgeTxChannel",
2349 );
2350 node.set_flavor(Flavor::Bridge);
2351 let plan_node_id = plan_graph.add_node(node)?;
2352 if plan_node_id as usize != exec_entities.len() {
2353 panic!("Unexpected node ordering while inserting bridge tx channel");
2354 }
2355 channel_spec.plan_node_id = Some(plan_node_id);
2356 exec_entities.push(ExecutionEntity {
2357 kind: ExecutionEntityKind::BridgeTx {
2358 bridge_index,
2359 channel_index,
2360 },
2361 });
2362 channel_nodes.insert(
2363 BridgeChannelKey {
2364 bridge_id: spec.id.clone(),
2365 channel_id: channel_spec.id.clone(),
2366 direction: BridgeChannelDirection::Tx,
2367 },
2368 plan_node_id,
2369 );
2370 }
2371 }
2372
2373 for edge_idx in graph.0.edge_indices() {
2374 let cnx = graph
2375 .0
2376 .edge_weight(edge_idx)
2377 .expect("Edge should exist while building plan")
2378 .clone();
2379
2380 let src_plan = if let Some(channel) = &cnx.src_channel {
2381 let key = BridgeChannelKey {
2382 bridge_id: cnx.src.clone(),
2383 channel_id: channel.clone(),
2384 direction: BridgeChannelDirection::Rx,
2385 };
2386 *channel_nodes
2387 .get(&key)
2388 .unwrap_or_else(|| panic!("Bridge source {:?} missing from plan graph", key))
2389 } else {
2390 let node_id = name_to_original
2391 .get(&cnx.src)
2392 .copied()
2393 .unwrap_or_else(|| panic!("Unknown source node '{}'", cnx.src));
2394 *original_to_plan
2395 .get(&node_id)
2396 .unwrap_or_else(|| panic!("Source node '{}' missing from plan", cnx.src))
2397 };
2398
2399 let dst_plan = if let Some(channel) = &cnx.dst_channel {
2400 let key = BridgeChannelKey {
2401 bridge_id: cnx.dst.clone(),
2402 channel_id: channel.clone(),
2403 direction: BridgeChannelDirection::Tx,
2404 };
2405 *channel_nodes
2406 .get(&key)
2407 .unwrap_or_else(|| panic!("Bridge destination {:?} missing from plan graph", key))
2408 } else {
2409 let node_id = name_to_original
2410 .get(&cnx.dst)
2411 .copied()
2412 .unwrap_or_else(|| panic!("Unknown destination node '{}'", cnx.dst));
2413 *original_to_plan
2414 .get(&node_id)
2415 .unwrap_or_else(|| panic!("Destination node '{}' missing from plan", cnx.dst))
2416 };
2417
2418 plan_graph
2419 .connect_ext(
2420 src_plan,
2421 dst_plan,
2422 &cnx.msg,
2423 cnx.missions.clone(),
2424 None,
2425 None,
2426 )
2427 .map_err(|e| CuError::from(e.to_string()))?;
2428 }
2429
2430 let runtime_plan = compute_runtime_plan(&plan_graph)?;
2431 Ok((runtime_plan, exec_entities, plan_to_original))
2432}
2433
2434fn collect_culist_metadata(
2435 runtime_plan: &CuExecutionLoop,
2436 exec_entities: &[ExecutionEntity],
2437 bridge_specs: &mut [BridgeSpec],
2438 plan_to_original: &HashMap<NodeId, NodeId>,
2439) -> (Vec<usize>, HashMap<NodeId, usize>) {
2440 let mut culist_order = Vec::new();
2441 let mut node_output_positions = HashMap::new();
2442
2443 for unit in &runtime_plan.steps {
2444 if let CuExecutionUnit::Step(step) = unit {
2445 if let Some((output_idx, _)) = &step.output_msg_index_type {
2446 culist_order.push(*output_idx as usize);
2447 match &exec_entities[step.node_id as usize].kind {
2448 ExecutionEntityKind::Task { .. } => {
2449 if let Some(original_node_id) = plan_to_original.get(&step.node_id) {
2450 node_output_positions.insert(*original_node_id, *output_idx as usize);
2451 }
2452 }
2453 ExecutionEntityKind::BridgeRx {
2454 bridge_index,
2455 channel_index,
2456 } => {
2457 bridge_specs[*bridge_index].rx_channels[*channel_index].culist_index =
2458 Some(*output_idx as usize);
2459 }
2460 ExecutionEntityKind::BridgeTx { .. } => {}
2461 }
2462 }
2463 }
2464 }
2465
2466 (culist_order, node_output_positions)
2467}
2468
2469#[allow(dead_code)]
2470fn build_monitored_ids(task_ids: &[String], bridge_specs: &mut [BridgeSpec]) -> Vec<String> {
2471 let mut names = task_ids.to_vec();
2472 for spec in bridge_specs.iter_mut() {
2473 spec.monitor_index = Some(names.len());
2474 names.push(format!("bridge::{}", spec.id));
2475 for channel in spec.rx_channels.iter_mut() {
2476 channel.monitor_index = Some(names.len());
2477 names.push(format!("bridge::{}::rx::{}", spec.id, channel.id));
2478 }
2479 for channel in spec.tx_channels.iter_mut() {
2480 channel.monitor_index = Some(names.len());
2481 names.push(format!("bridge::{}::tx::{}", spec.id, channel.id));
2482 }
2483 }
2484 names
2485}
2486
2487fn generate_task_execution_tokens(
2488 step: &CuExecutionStep,
2489 task_index: usize,
2490 task_specs: &CuTaskSpecSet,
2491 sim_mode: bool,
2492 mission_mod: &Ident,
2493) -> (proc_macro2::TokenStream, proc_macro2::TokenStream) {
2494 let node_index = int2sliceindex(task_index as u32);
2495 let task_instance = quote! { tasks.#node_index };
2496 let comment_str = format!(
2497 "DEBUG ->> {} ({:?}) Id:{} I:{:?} O:{:?}",
2498 step.node.get_id(),
2499 step.task_type,
2500 step.node_id,
2501 step.input_msg_indices_types,
2502 step.output_msg_index_type
2503 );
2504 let comment_tokens = quote! {{
2505 let _ = stringify!(#comment_str);
2506 }};
2507 let tid = task_index;
2508 let task_enum_name = config_id_to_enum(&task_specs.ids[tid]);
2509 let enum_name = Ident::new(&task_enum_name, Span::call_site());
2510
2511 match step.task_type {
2512 CuTaskType::Source => {
2513 if let Some((output_index, _)) = &step.output_msg_index_type {
2514 let output_culist_index = int2sliceindex(*output_index);
2515
2516 let monitoring_action = quote! {
2517 debug!("Task {}: Error during process: {}", #mission_mod::TASKS_IDS[#tid], &error);
2518 let decision = monitor.process_error(#tid, CuTaskState::Process, &error);
2519 match decision {
2520 Decision::Abort => {
2521 debug!("Process: ABORT decision from monitoring. Task '{}' errored out \
2522 during process. Skipping the processing of CL {}.", #mission_mod::TASKS_IDS[#tid], clid);
2523 monitor.process_copperlist(&#mission_mod::collect_metadata(&culist))?;
2524 cl_manager.end_of_processing(clid)?;
2525 return Ok(());
2526 }
2527 Decision::Ignore => {
2528 debug!("Process: IGNORE decision from monitoring. Task '{}' errored out \
2529 during process. The runtime will continue with a forced empty message.", #mission_mod::TASKS_IDS[#tid]);
2530 let cumsg_output = &mut msgs.#output_culist_index;
2531 cumsg_output.clear_payload();
2532 }
2533 Decision::Shutdown => {
2534 debug!("Process: SHUTDOWN decision from monitoring. Task '{}' errored out \
2535 during process. The runtime cannot continue.", #mission_mod::TASKS_IDS[#tid]);
2536 return Err(CuError::new_with_cause("Task errored out during process.", error));
2537 }
2538 }
2539 };
2540
2541 let call_sim_callback = if sim_mode {
2542 quote! {
2543 let doit = {
2544 let cumsg_output = &mut msgs.#output_culist_index;
2545 let state = CuTaskCallbackState::Process((), cumsg_output);
2546 let ovr = sim_callback(SimStep::#enum_name(state));
2547
2548 if let SimOverride::Errored(reason) = ovr {
2549 let error: CuError = reason.into();
2550 #monitoring_action
2551 false
2552 } else {
2553 ovr == SimOverride::ExecuteByRuntime
2554 }
2555 };
2556 }
2557 } else {
2558 quote! { let doit = true; }
2559 };
2560
2561 let logging_tokens = if !task_specs.logging_enabled[tid] {
2562 let output_culist_index = int2sliceindex(*output_index);
2563 quote! {
2564 let mut cumsg_output = &mut culist.msgs.0.#output_culist_index;
2565 cumsg_output.clear_payload();
2566 }
2567 } else {
2568 quote!()
2569 };
2570
2571 (
2572 quote! {
2573 {
2574 #comment_tokens
2575 kf_manager.freeze_task(clid, &#task_instance)?;
2576 #call_sim_callback
2577 let cumsg_output = &mut msgs.#output_culist_index;
2578 cumsg_output.metadata.process_time.start = clock.now().into();
2579 let maybe_error = if doit { #task_instance.process(clock, cumsg_output) } else { Ok(()) };
2580 cumsg_output.metadata.process_time.end = clock.now().into();
2581 if let Err(error) = maybe_error {
2582 #monitoring_action
2583 }
2584 }
2585 },
2586 logging_tokens,
2587 )
2588 } else {
2589 panic!("Source task should have an output message index.");
2590 }
2591 }
2592 CuTaskType::Sink => {
2593 if let Some((output_index, _)) = &step.output_msg_index_type {
2594 let output_culist_index = int2sliceindex(*output_index);
2595 let indices = step
2596 .input_msg_indices_types
2597 .iter()
2598 .map(|(index, _)| int2sliceindex(*index));
2599
2600 let monitoring_action = quote! {
2601 debug!("Task {}: Error during process: {}", #mission_mod::TASKS_IDS[#tid], &error);
2602 let decision = monitor.process_error(#tid, CuTaskState::Process, &error);
2603 match decision {
2604 Decision::Abort => {
2605 debug!("Process: ABORT decision from monitoring. Task '{}' errored out \
2606 during process. Skipping the processing of CL {}.", #mission_mod::TASKS_IDS[#tid], clid);
2607 monitor.process_copperlist(&#mission_mod::collect_metadata(&culist))?;
2608 cl_manager.end_of_processing(clid)?;
2609 return Ok(());
2610 }
2611 Decision::Ignore => {
2612 debug!("Process: IGNORE decision from monitoring. Task '{}' errored out \
2613 during process. The runtime will continue with a forced empty message.", #mission_mod::TASKS_IDS[#tid]);
2614 let cumsg_output = &mut msgs.#output_culist_index;
2615 cumsg_output.clear_payload();
2616 }
2617 Decision::Shutdown => {
2618 debug!("Process: SHUTDOWN decision from monitoring. Task '{}' errored out \
2619 during process. The runtime cannot continue.", #mission_mod::TASKS_IDS[#tid]);
2620 return Err(CuError::new_with_cause("Task errored out during process.", error));
2621 }
2622 }
2623 };
2624
2625 let inputs_type = if indices.len() == 1 {
2626 quote! { #(msgs.#indices)* }
2627 } else {
2628 quote! { (#(&msgs.#indices),*) }
2629 };
2630
2631 let call_sim_callback = if sim_mode {
2632 quote! {
2633 let doit = {
2634 let cumsg_input = &#inputs_type;
2635 let cumsg_output = &mut msgs.#output_culist_index;
2636 let state = CuTaskCallbackState::Process(cumsg_input, cumsg_output);
2637 let ovr = sim_callback(SimStep::#enum_name(state));
2638
2639 if let SimOverride::Errored(reason) = ovr {
2640 let error: CuError = reason.into();
2641 #monitoring_action
2642 false
2643 } else {
2644 ovr == SimOverride::ExecuteByRuntime
2645 }
2646 };
2647 }
2648 } else {
2649 quote! { let doit = true; }
2650 };
2651
2652 (
2653 quote! {
2654 {
2655 #comment_tokens
2656 kf_manager.freeze_task(clid, &#task_instance)?;
2657 #call_sim_callback
2658 let cumsg_input = &#inputs_type;
2659 let cumsg_output = &mut msgs.#output_culist_index;
2660 cumsg_output.metadata.process_time.start = clock.now().into();
2661 let maybe_error = if doit { #task_instance.process(clock, cumsg_input) } else { Ok(()) };
2662 cumsg_output.metadata.process_time.end = clock.now().into();
2663 if let Err(error) = maybe_error {
2664 #monitoring_action
2665 }
2666 }
2667 },
2668 quote! {},
2669 )
2670 } else {
2671 panic!("Sink tasks should have a virtual output message index.");
2672 }
2673 }
2674 CuTaskType::Regular => {
2675 if let Some((output_index, _)) = &step.output_msg_index_type {
2676 let output_culist_index = int2sliceindex(*output_index);
2677 let indices = step
2678 .input_msg_indices_types
2679 .iter()
2680 .map(|(index, _)| int2sliceindex(*index));
2681
2682 let monitoring_action = quote! {
2683 debug!("Task {}: Error during process: {}", #mission_mod::TASKS_IDS[#tid], &error);
2684 let decision = monitor.process_error(#tid, CuTaskState::Process, &error);
2685 match decision {
2686 Decision::Abort => {
2687 debug!("Process: ABORT decision from monitoring. Task '{}' errored out \
2688 during process. Skipping the processing of CL {}.", #mission_mod::TASKS_IDS[#tid], clid);
2689 monitor.process_copperlist(&#mission_mod::collect_metadata(&culist))?;
2690 cl_manager.end_of_processing(clid)?;
2691 return Ok(());
2692 }
2693 Decision::Ignore => {
2694 debug!("Process: IGNORE decision from monitoring. Task '{}' errored out \
2695 during process. The runtime will continue with a forced empty message.", #mission_mod::TASKS_IDS[#tid]);
2696 let cumsg_output = &mut msgs.#output_culist_index;
2697 cumsg_output.clear_payload();
2698 }
2699 Decision::Shutdown => {
2700 debug!("Process: SHUTDOWN decision from monitoring. Task '{}' errored out \
2701 during process. The runtime cannot continue.", #mission_mod::TASKS_IDS[#tid]);
2702 return Err(CuError::new_with_cause("Task errored out during process.", error));
2703 }
2704 }
2705 };
2706
2707 let inputs_type = if indices.len() == 1 {
2708 quote! { #(msgs.#indices)* }
2709 } else {
2710 quote! { (#(&msgs.#indices),*) }
2711 };
2712
2713 let call_sim_callback = if sim_mode {
2714 quote! {
2715 let doit = {
2716 let cumsg_input = &#inputs_type;
2717 let cumsg_output = &mut msgs.#output_culist_index;
2718 let state = CuTaskCallbackState::Process(cumsg_input, cumsg_output);
2719 let ovr = sim_callback(SimStep::#enum_name(state));
2720
2721 if let SimOverride::Errored(reason) = ovr {
2722 let error: CuError = reason.into();
2723 #monitoring_action
2724 false
2725 }
2726 else {
2727 ovr == SimOverride::ExecuteByRuntime
2728 }
2729 };
2730 }
2731 } else {
2732 quote! { let doit = true; }
2733 };
2734
2735 let logging_tokens = if !task_specs.logging_enabled[tid] {
2736 let output_culist_index = int2sliceindex(*output_index);
2737 quote! {
2738 let mut cumsg_output = &mut culist.msgs.0.#output_culist_index;
2739 cumsg_output.clear_payload();
2740 }
2741 } else {
2742 quote!()
2743 };
2744
2745 (
2746 quote! {
2747 {
2748 #comment_tokens
2749 kf_manager.freeze_task(clid, &#task_instance)?;
2750 #call_sim_callback
2751 let cumsg_input = &#inputs_type;
2752 let cumsg_output = &mut msgs.#output_culist_index;
2753 cumsg_output.metadata.process_time.start = clock.now().into();
2754 let maybe_error = if doit { #task_instance.process(clock, cumsg_input, cumsg_output) } else { Ok(()) };
2755 cumsg_output.metadata.process_time.end = clock.now().into();
2756 if let Err(error) = maybe_error {
2757 #monitoring_action
2758 }
2759 }
2760 },
2761 logging_tokens,
2762 )
2763 } else {
2764 panic!("Regular task should have an output message index.");
2765 }
2766 }
2767 }
2768}
2769
2770fn generate_bridge_rx_execution_tokens(
2771 bridge_spec: &BridgeSpec,
2772 channel_index: usize,
2773 mission_mod: &Ident,
2774) -> (proc_macro2::TokenStream, proc_macro2::TokenStream) {
2775 let bridge_tuple_index = int2sliceindex(bridge_spec.tuple_index as u32);
2776 let channel = &bridge_spec.rx_channels[channel_index];
2777 let culist_index = channel
2778 .culist_index
2779 .unwrap_or_else(|| panic!("Bridge Rx channel missing output index"));
2780 let culist_index_ts = int2sliceindex(culist_index as u32);
2781 let monitor_index = syn::Index::from(
2782 channel
2783 .monitor_index
2784 .expect("Bridge Rx channel missing monitor index"),
2785 );
2786 let bridge_type = &bridge_spec.type_path;
2787 let const_ident = &channel.const_ident;
2788 (
2789 quote! {
2790 {
2791 let bridge = &mut bridges.#bridge_tuple_index;
2792 let cumsg_output = &mut msgs.#culist_index_ts;
2793 cumsg_output.metadata.process_time.start = clock.now().into();
2794 let maybe_error = bridge.receive(
2795 clock,
2796 &<#bridge_type as cu29::cubridge::CuBridge>::Rx::#const_ident,
2797 cumsg_output,
2798 );
2799 cumsg_output.metadata.process_time.end = clock.now().into();
2800 if let Err(error) = maybe_error {
2801 let decision = monitor.process_error(#monitor_index, CuTaskState::Process, &error);
2802 match decision {
2803 Decision::Abort => {
2804 debug!("Process: ABORT decision from monitoring. Task '{}' errored out during process. Skipping the processing of CL {}.", #mission_mod::TASKS_IDS[#monitor_index], clid);
2805 monitor.process_copperlist(&#mission_mod::collect_metadata(&culist))?;
2806 cl_manager.end_of_processing(clid)?;
2807 return Ok(());
2808 }
2809 Decision::Ignore => {
2810 debug!("Process: IGNORE decision from monitoring. Task '{}' errored out during process. The runtime will continue with a forced empty message.", #mission_mod::TASKS_IDS[#monitor_index]);
2811 let cumsg_output = &mut msgs.#culist_index_ts;
2812 cumsg_output.clear_payload();
2813 }
2814 Decision::Shutdown => {
2815 debug!("Process: SHUTDOWN decision from monitoring. Task '{}' errored out during process. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
2816 return Err(CuError::new_with_cause("Task errored out during process.", error));
2817 }
2818 }
2819 }
2820 }
2821 },
2822 quote! {},
2823 )
2824}
2825
2826fn generate_bridge_tx_execution_tokens(
2827 step: &CuExecutionStep,
2828 bridge_spec: &BridgeSpec,
2829 channel_index: usize,
2830 mission_mod: &Ident,
2831) -> (proc_macro2::TokenStream, proc_macro2::TokenStream) {
2832 let channel = &bridge_spec.tx_channels[channel_index];
2833 let monitor_index = syn::Index::from(
2834 channel
2835 .monitor_index
2836 .expect("Bridge Tx channel missing monitor index"),
2837 );
2838 let input_index = step
2839 .input_msg_indices_types
2840 .first()
2841 .map(|(idx, _)| int2sliceindex(*idx))
2842 .expect("Bridge Tx channel should have exactly one input");
2843 let bridge_tuple_index = int2sliceindex(bridge_spec.tuple_index as u32);
2844 let bridge_type = &bridge_spec.type_path;
2845 let const_ident = &channel.const_ident;
2846 (
2847 quote! {
2848 {
2849 let bridge = &mut bridges.#bridge_tuple_index;
2850 let cumsg_input = &msgs.#input_index;
2851 if let Err(error) = bridge.send(
2852 clock,
2853 &<#bridge_type as cu29::cubridge::CuBridge>::Tx::#const_ident,
2854 cumsg_input,
2855 ) {
2856 let decision = monitor.process_error(#monitor_index, CuTaskState::Process, &error);
2857 match decision {
2858 Decision::Abort => {
2859 debug!("Process: ABORT decision from monitoring. Task '{}' errored out during process. Skipping the processing of CL {}.", #mission_mod::TASKS_IDS[#monitor_index], clid);
2860 monitor.process_copperlist(&#mission_mod::collect_metadata(&culist))?;
2861 cl_manager.end_of_processing(clid)?;
2862 return Ok(());
2863 }
2864 Decision::Ignore => {
2865 debug!("Process: IGNORE decision from monitoring. Task '{}' errored out during process. The runtime will continue with a forced empty message.", #mission_mod::TASKS_IDS[#monitor_index]);
2866 }
2867 Decision::Shutdown => {
2868 debug!("Process: SHUTDOWN decision from monitoring. Task '{}' errored out during process. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
2869 return Err(CuError::new_with_cause("Task errored out during process.", error));
2870 }
2871 }
2872 }
2873 }
2874 },
2875 quote! {},
2876 )
2877}
2878
2879#[cfg(test)]
2880mod tests {
2881 #[test]
2883 fn test_compile_fail() {
2884 let t = trybuild::TestCases::new();
2885 t.compile_fail("tests/compile_fail/*/*.rs");
2886 }
2887}
2888#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
2889enum BridgeChannelDirection {
2890 Rx,
2891 Tx,
2892}
2893
2894#[derive(Clone, Debug, PartialEq, Eq, Hash)]
2895struct BridgeChannelKey {
2896 bridge_id: String,
2897 channel_id: String,
2898 direction: BridgeChannelDirection,
2899}
2900
2901#[derive(Clone)]
2902struct BridgeChannelSpec {
2903 id: String,
2904 const_ident: Ident,
2905 #[allow(dead_code)]
2906 msg_type: Type,
2907 config_index: usize,
2908 plan_node_id: Option<NodeId>,
2909 culist_index: Option<usize>,
2910 monitor_index: Option<usize>,
2911}
2912
2913#[derive(Clone)]
2914struct BridgeSpec {
2915 id: String,
2916 type_path: Type,
2917 config_index: usize,
2918 tuple_index: usize,
2919 monitor_index: Option<usize>,
2920 rx_channels: Vec<BridgeChannelSpec>,
2921 tx_channels: Vec<BridgeChannelSpec>,
2922}
2923
2924#[derive(Clone)]
2925struct ExecutionEntity {
2926 kind: ExecutionEntityKind,
2927}
2928
2929#[derive(Clone)]
2930enum ExecutionEntityKind {
2931 Task {
2932 task_index: usize,
2933 },
2934 BridgeRx {
2935 bridge_index: usize,
2936 channel_index: usize,
2937 },
2938 BridgeTx {
2939 bridge_index: usize,
2940 channel_index: usize,
2941 },
2942}