1use proc_macro::TokenStream;
2use quote::{format_ident, quote};
3use std::collections::HashMap;
4use std::fs::read_to_string;
5use syn::meta::parser;
6use syn::Fields::{Named, Unnamed};
7use syn::{
8 parse_macro_input, parse_quote, parse_str, Field, Fields, ItemImpl, ItemStruct, LitStr, Type,
9 TypeTuple,
10};
11
12#[cfg(feature = "macro_debug")]
13use crate::format::rustfmt_generated_code;
14use crate::utils::{config_id_to_bridge_const, config_id_to_enum, config_id_to_struct_member};
15use cu29_runtime::config::CuConfig;
16use cu29_runtime::config::{
17 read_configuration, BridgeChannelConfigRepresentation, CuGraph, Flavor, Node, NodeId,
18};
19use cu29_runtime::curuntime::{
20 compute_runtime_plan, find_task_type_for_id, CuExecutionLoop, CuExecutionStep, CuExecutionUnit,
21 CuTaskType,
22};
23use cu29_traits::{CuError, CuResult};
24use proc_macro2::{Ident, Span};
25
26mod format;
27mod utils;
28
29const DEFAULT_CLNB: usize = 10;
31
32#[inline]
33fn int2sliceindex(i: u32) -> syn::Index {
34 syn::Index::from(i as usize)
35}
36
37#[inline(always)]
38fn return_error(msg: String) -> TokenStream {
39 syn::Error::new(Span::call_site(), msg)
40 .to_compile_error()
41 .into()
42}
43
44#[proc_macro]
48pub fn gen_cumsgs(config_path_lit: TokenStream) -> TokenStream {
49 #[cfg(feature = "std")]
50 let std = true;
51
52 #[cfg(not(feature = "std"))]
53 let std = false;
54
55 let config = parse_macro_input!(config_path_lit as LitStr).value();
56 if !std::path::Path::new(&config_full_path(&config)).exists() {
57 return return_error(format!(
58 "The configuration file `{config}` does not exist. Please provide a valid path."
59 ));
60 }
61 #[cfg(feature = "macro_debug")]
62 eprintln!("[gen culist support with {config:?}]");
63 let cuconfig = match read_config(&config) {
64 Ok(cuconfig) => cuconfig,
65 Err(e) => return return_error(e.to_string()),
66 };
67 let graph = cuconfig
68 .get_graph(None) .expect("Could not find the specified mission for gen_cumsgs");
70 let task_specs = CuTaskSpecSet::from_graph(graph);
71 let channel_usage = collect_bridge_channel_usage(graph);
72 let mut bridge_specs = build_bridge_specs(&cuconfig, graph, &channel_usage);
73 let (culist_plan, exec_entities, plan_to_original) =
74 match build_execution_plan(graph, &task_specs, &mut bridge_specs) {
75 Ok(plan) => plan,
76 Err(e) => return return_error(format!("Could not compute copperlist plan: {e}")),
77 };
78 let task_member_names = collect_task_member_names(graph);
79 let (culist_order, node_output_positions) = collect_culist_metadata(
80 &culist_plan,
81 &exec_entities,
82 &mut bridge_specs,
83 &plan_to_original,
84 );
85
86 #[cfg(feature = "macro_debug")]
87 eprintln!(
88 "[The CuStampedDataSet matching tasks ids are {:?}]",
89 culist_order
90 );
91
92 let support = gen_culist_support(
93 &culist_plan,
94 &culist_order,
95 &node_output_positions,
96 &task_member_names,
97 );
98
99 let extra_imports = if !std {
100 quote! {
101 use core::fmt::Debug;
102 use core::fmt::Formatter;
103 use core::fmt::Result as FmtResult;
104 use alloc::vec;
105 }
106 } else {
107 quote! {
108 use std::fmt::Debug;
109 use std::fmt::Formatter;
110 use std::fmt::Result as FmtResult;
111 }
112 };
113
114 let with_uses = quote! {
115 mod cumsgs {
116 use cu29::bincode::Encode;
117 use cu29::bincode::enc::Encoder;
118 use cu29::bincode::error::EncodeError;
119 use cu29::bincode::Decode;
120 use cu29::bincode::de::Decoder;
121 use cu29::bincode::error::DecodeError;
122 use cu29::copperlist::CopperList;
123 use cu29::prelude::CuStampedData;
124 use cu29::prelude::ErasedCuStampedData;
125 use cu29::prelude::ErasedCuStampedDataSet;
126 use cu29::prelude::MatchingTasks;
127 use cu29::prelude::Serialize;
128 use cu29::prelude::CuMsg;
129 use cu29::prelude::CuMsgMetadata;
130 use cu29::prelude::CuListZeroedInit;
131 use cu29::prelude::CuCompactString;
132 #extra_imports
133 #support
134 }
135 use cumsgs::CuStampedDataSet;
136 type CuMsgs=CuStampedDataSet;
137 };
138 with_uses.into()
139}
140
141fn gen_culist_support(
143 runtime_plan: &CuExecutionLoop,
144 culist_indices_in_plan_order: &[usize],
145 node_output_positions: &HashMap<NodeId, usize>,
146 task_member_names: &[(NodeId, String)],
147) -> proc_macro2::TokenStream {
148 #[cfg(feature = "macro_debug")]
149 eprintln!("[Extract msgs types]");
150 let all_msgs_types_in_culist_order = extract_msg_types(runtime_plan);
151
152 let culist_size = all_msgs_types_in_culist_order.len();
153 let task_indices: Vec<_> = culist_indices_in_plan_order
154 .iter()
155 .map(|i| syn::Index::from(*i))
156 .collect();
157
158 #[cfg(feature = "macro_debug")]
159 eprintln!("[build the copperlist struct]");
160 let msgs_types_tuple: TypeTuple = build_culist_tuple(&all_msgs_types_in_culist_order);
161
162 #[cfg(feature = "macro_debug")]
163 eprintln!("[build the copperlist tuple bincode support]");
164 let msgs_types_tuple_encode = build_culist_tuple_encode(&all_msgs_types_in_culist_order);
165 let msgs_types_tuple_decode = build_culist_tuple_decode(&all_msgs_types_in_culist_order);
166
167 #[cfg(feature = "macro_debug")]
168 eprintln!("[build the copperlist tuple debug support]");
169 let msgs_types_tuple_debug = build_culist_tuple_debug(&all_msgs_types_in_culist_order);
170
171 #[cfg(feature = "macro_debug")]
172 eprintln!("[build the copperlist tuple serialize support]");
173 let msgs_types_tuple_serialize = build_culist_tuple_serialize(&all_msgs_types_in_culist_order);
174
175 #[cfg(feature = "macro_debug")]
176 eprintln!("[build the default tuple support]");
177 let msgs_types_tuple_default = build_culist_tuple_default(&all_msgs_types_in_culist_order);
178
179 #[cfg(feature = "macro_debug")]
180 eprintln!("[build erasedcumsgs]");
181
182 let erasedmsg_trait_impl = build_culist_erasedcumsgs(&all_msgs_types_in_culist_order);
183
184 let collect_metadata_function = quote! {
185 pub fn collect_metadata<'a>(culist: &'a CuList) -> [&'a CuMsgMetadata; #culist_size] {
186 [#( &culist.msgs.0.#task_indices.metadata, )*]
187 }
188 };
189
190 let task_name_literals: Vec<String> = task_member_names
191 .iter()
192 .map(|(_, name)| name.clone())
193 .collect();
194
195 let methods = task_member_names.iter().map(|(node_id, name)| {
196 let output_position = node_output_positions
197 .get(node_id)
198 .unwrap_or_else(|| panic!("Task {name} (id: {node_id}) not found in execution order"));
199
200 let fn_name = format_ident!("get_{}_output", name);
201 let payload_type = all_msgs_types_in_culist_order[*output_position].clone();
202 let index = syn::Index::from(*output_position);
203 quote! {
204 #[allow(dead_code)]
205 pub fn #fn_name(&self) -> &CuMsg<#payload_type> {
206 &self.0.#index
207 }
208 }
209 });
210
211 quote! {
213 #collect_metadata_function
214
215 pub struct CuStampedDataSet(pub #msgs_types_tuple);
216
217 pub type CuList = CopperList<CuStampedDataSet>;
218
219 impl CuStampedDataSet {
220 #(#methods)*
221
222 #[allow(dead_code)]
223 fn get_tuple(&self) -> &#msgs_types_tuple {
224 &self.0
225 }
226
227 #[allow(dead_code)]
228 fn get_tuple_mut(&mut self) -> &mut #msgs_types_tuple {
229 &mut self.0
230 }
231 }
232
233 impl MatchingTasks for CuStampedDataSet {
234 #[allow(dead_code)]
235 fn get_all_task_ids() -> &'static [&'static str] {
236 &[#(#task_name_literals),*]
237 }
238 }
239
240 #msgs_types_tuple_encode
242 #msgs_types_tuple_decode
243
244 #msgs_types_tuple_debug
246
247 #msgs_types_tuple_serialize
249
250 #msgs_types_tuple_default
252
253 #erasedmsg_trait_impl
255
256 impl CuListZeroedInit for CuStampedDataSet {
257 fn init_zeroed(&mut self) {
258 #(self.0.#task_indices.metadata.status_txt = CuCompactString::default();)*
259 }
260 }
261 }
262}
263
264fn gen_sim_support(
265 runtime_plan: &CuExecutionLoop,
266 exec_entities: &[ExecutionEntity],
267) -> proc_macro2::TokenStream {
268 #[cfg(feature = "macro_debug")]
269 eprintln!("[Sim: Build SimEnum]");
270 let plan_enum: Vec<proc_macro2::TokenStream> = runtime_plan
271 .steps
272 .iter()
273 .filter_map(|unit| match unit {
274 CuExecutionUnit::Step(step) => {
275 if !matches!(
276 exec_entities[step.node_id as usize].kind,
277 ExecutionEntityKind::Task { .. }
278 ) {
279 return None;
280 }
281 let enum_entry_name = config_id_to_enum(step.node.get_id().as_str());
282 let enum_ident = Ident::new(&enum_entry_name, Span::call_site());
283 let inputs: Vec<Type> = step
284 .input_msg_indices_types
285 .iter()
286 .map(|(_, t)| parse_str::<Type>(format!("CuMsg<{t}>").as_str()).unwrap())
287 .collect();
288 let output: Option<Type> = step
289 .output_msg_index_type
290 .as_ref()
291 .map(|(_, t)| parse_str::<Type>(format!("CuMsg<{t}>").as_str()).unwrap());
292 let no_output = parse_str::<Type>("CuMsg<()>").unwrap();
293 let output = output.as_ref().unwrap_or(&no_output);
294
295 let inputs_type = if inputs.is_empty() {
296 quote! { () }
297 } else if inputs.len() == 1 {
298 let input = inputs.first().unwrap();
299 quote! { &'a #input }
300 } else {
301 quote! { &'a (#(&'a #inputs),*) }
302 };
303
304 Some(quote! {
305 #enum_ident(CuTaskCallbackState<#inputs_type, &'a mut #output>)
306 })
307 }
308 CuExecutionUnit::Loop(_) => {
309 todo!("Needs to be implemented")
310 }
311 })
312 .collect();
313 let mut variants = plan_enum;
314 variants.push(quote! { __Phantom(core::marker::PhantomData<&'a ()>) });
315 quote! {
316 #[allow(dead_code, unused_lifetimes)]
318 pub enum SimStep<'a> {
319 #(#variants),*
320 }
321 }
322}
323
324#[proc_macro_attribute]
328pub fn copper_runtime(args: TokenStream, input: TokenStream) -> TokenStream {
329 #[cfg(feature = "macro_debug")]
330 eprintln!("[entry]");
331 let mut application_struct = parse_macro_input!(input as ItemStruct);
332
333 let application_name = &application_struct.ident;
334 let builder_name = format_ident!("{}Builder", application_name);
335
336 let mut config_file: Option<LitStr> = None;
337 let mut sim_mode = false;
338
339 #[cfg(feature = "std")]
340 let std = true;
341
342 #[cfg(not(feature = "std"))]
343 let std = false;
344
345 let attribute_config_parser = parser(|meta| {
347 if meta.path.is_ident("config") {
348 config_file = Some(meta.value()?.parse()?);
349 Ok(())
350 } else if meta.path.is_ident("sim_mode") {
351 if meta.input.peek(syn::Token![=]) {
353 meta.input.parse::<syn::Token![=]>()?;
354 let value: syn::LitBool = meta.input.parse()?;
355 sim_mode = value.value();
356 Ok(())
357 } else {
358 sim_mode = true;
360 Ok(())
361 }
362 } else {
363 Err(meta.error("unsupported property"))
364 }
365 });
366
367 #[cfg(feature = "macro_debug")]
368 eprintln!("[parse]");
369 parse_macro_input!(args with attribute_config_parser);
371
372 let config_file = match config_file {
383 Some(file) => file.value(),
384 None => {
385 return return_error(
386 "Expected config file attribute like #[CopperRuntime(config = \"path\")]"
387 .to_string(),
388 )
389 }
390 };
391
392 if !std::path::Path::new(&config_full_path(&config_file)).exists() {
393 return return_error(format!(
394 "The configuration file `{config_file}` does not exist. Please provide a valid path."
395 ));
396 }
397
398 let copper_config = match read_config(&config_file) {
399 Ok(cuconfig) => cuconfig,
400 Err(e) => return return_error(e.to_string()),
401 };
402 let copper_config_content = match read_to_string(config_full_path(config_file.as_str())) {
403 Ok(ok) => ok,
404 Err(e) => return return_error(format!("Could not read the config file (should not happen because we just succeeded just before). {e}"))
405 };
406
407 #[cfg(feature = "macro_debug")]
408 eprintln!("[build monitor type]");
409 let monitor_type = if let Some(monitor_config) = copper_config.get_monitor_config() {
410 let monitor_type = parse_str::<Type>(monitor_config.get_type())
411 .expect("Could not transform the monitor type name into a Rust type.");
412 quote! { #monitor_type }
413 } else {
414 quote! { NoMonitor }
415 };
416
417 #[cfg(feature = "macro_debug")]
419 eprintln!("[build runtime field]");
420 let runtime_field: Field = if sim_mode {
422 parse_quote! {
423 copper_runtime: cu29::curuntime::CuRuntime<CuSimTasks, CuBridges, CuStampedDataSet, #monitor_type, #DEFAULT_CLNB>
424 }
425 } else {
426 parse_quote! {
427 copper_runtime: cu29::curuntime::CuRuntime<CuTasks, CuBridges, CuStampedDataSet, #monitor_type, #DEFAULT_CLNB>
428 }
429 };
430
431 #[cfg(feature = "macro_debug")]
432 eprintln!("[match struct anonymity]");
433 match &mut application_struct.fields {
434 Named(fields_named) => {
435 fields_named.named.push(runtime_field);
436 }
437 Unnamed(fields_unnamed) => {
438 fields_unnamed.unnamed.push(runtime_field);
439 }
440 Fields::Unit => {
441 panic!("This struct is a unit struct, it should have named or unnamed fields. use struct Something {{}} and not struct Something;")
442 }
443 };
444
445 let all_missions = copper_config.graphs.get_all_missions_graphs();
446 let mut all_missions_tokens = Vec::<proc_macro2::TokenStream>::new();
447 for (mission, graph) in &all_missions {
448 let mission_mod = parse_str::<Ident>(mission.as_str())
449 .expect("Could not make an identifier of the mission name");
450
451 #[cfg(feature = "macro_debug")]
452 eprintln!("[extract tasks ids & types]");
453 let task_specs = CuTaskSpecSet::from_graph(graph);
454
455 let culist_channel_usage = collect_bridge_channel_usage(graph);
456 let mut culist_bridge_specs =
457 build_bridge_specs(&copper_config, graph, &culist_channel_usage);
458 let (culist_plan, culist_exec_entities, culist_plan_to_original) =
459 match build_execution_plan(graph, &task_specs, &mut culist_bridge_specs) {
460 Ok(plan) => plan,
461 Err(e) => return return_error(format!("Could not compute copperlist plan: {e}")),
462 };
463 let task_member_names = collect_task_member_names(graph);
464 let (culist_call_order, node_output_positions) = collect_culist_metadata(
465 &culist_plan,
466 &culist_exec_entities,
467 &mut culist_bridge_specs,
468 &culist_plan_to_original,
469 );
470
471 #[cfg(feature = "macro_debug")]
472 {
473 eprintln!("[runtime plan for mission {mission}]");
474 eprintln!("{culist_plan:?}");
475 }
476
477 let culist_support: proc_macro2::TokenStream = gen_culist_support(
478 &culist_plan,
479 &culist_call_order,
480 &node_output_positions,
481 &task_member_names,
482 );
483
484 let ids = build_monitored_ids(&task_specs.ids, &mut culist_bridge_specs);
485
486 let bridge_types: Vec<Type> = culist_bridge_specs
487 .iter()
488 .map(|spec| spec.type_path.clone())
489 .collect();
490 let bridges_type_tokens: proc_macro2::TokenStream = if bridge_types.is_empty() {
491 quote! { () }
492 } else {
493 let tuple: TypeTuple = parse_quote! { (#(#bridge_types),*,) };
494 quote! { #tuple }
495 };
496
497 let bridge_binding_idents: Vec<Ident> = culist_bridge_specs
498 .iter()
499 .enumerate()
500 .map(|(idx, _)| format_ident!("bridge_{idx}"))
501 .collect();
502
503 let bridge_init_statements: Vec<proc_macro2::TokenStream> = culist_bridge_specs
504 .iter()
505 .enumerate()
506 .map(|(idx, spec)| {
507 let binding_ident = &bridge_binding_idents[idx];
508 let bridge_type = &spec.type_path;
509 let bridge_name = spec.id.clone();
510 let config_index = syn::Index::from(spec.config_index);
511 let tx_configs: Vec<proc_macro2::TokenStream> = spec
512 .tx_channels
513 .iter()
514 .map(|channel| {
515 let const_ident = &channel.const_ident;
516 let channel_name = channel.id.clone();
517 let channel_config_index = syn::Index::from(channel.config_index);
518 quote! {
519 {
520 let (channel_route, channel_config) = match &bridge_cfg.channels[#channel_config_index] {
521 cu29::config::BridgeChannelConfigRepresentation::Tx { route, config, .. } => {
522 (route.clone(), config.clone())
523 }
524 _ => panic!(
525 "Bridge '{}' channel '{}' expected to be Tx",
526 #bridge_name,
527 #channel_name
528 ),
529 };
530 cu29::cubridge::BridgeChannelConfig::from_static(
531 &<#bridge_type as cu29::cubridge::CuBridge>::Tx::#const_ident,
532 channel_route,
533 channel_config,
534 )
535 }
536 }
537 })
538 .collect();
539 let rx_configs: Vec<proc_macro2::TokenStream> = spec
540 .rx_channels
541 .iter()
542 .map(|channel| {
543 let const_ident = &channel.const_ident;
544 let channel_name = channel.id.clone();
545 let channel_config_index = syn::Index::from(channel.config_index);
546 quote! {
547 {
548 let (channel_route, channel_config) = match &bridge_cfg.channels[#channel_config_index] {
549 cu29::config::BridgeChannelConfigRepresentation::Rx { route, config, .. } => {
550 (route.clone(), config.clone())
551 }
552 _ => panic!(
553 "Bridge '{}' channel '{}' expected to be Rx",
554 #bridge_name,
555 #channel_name
556 ),
557 };
558 cu29::cubridge::BridgeChannelConfig::from_static(
559 &<#bridge_type as cu29::cubridge::CuBridge>::Rx::#const_ident,
560 channel_route,
561 channel_config,
562 )
563 }
564 }
565 })
566 .collect();
567 quote! {
568 let #binding_ident = {
569 let bridge_cfg = config
570 .bridges
571 .get(#config_index)
572 .unwrap_or_else(|| panic!("Bridge '{}' missing from configuration", #bridge_name));
573 let tx_channels: &[cu29::cubridge::BridgeChannelConfig<
574 <<#bridge_type as cu29::cubridge::CuBridge>::Tx as cu29::cubridge::BridgeChannelSet>::Id,
575 >] = &[#(#tx_configs),*];
576 let rx_channels: &[cu29::cubridge::BridgeChannelConfig<
577 <<#bridge_type as cu29::cubridge::CuBridge>::Rx as cu29::cubridge::BridgeChannelSet>::Id,
578 >] = &[#(#rx_configs),*];
579 <#bridge_type as cu29::cubridge::CuBridge>::new(
580 bridge_cfg.config.as_ref(),
581 tx_channels,
582 rx_channels,
583 )?
584 };
585 }
586 })
587 .collect();
588
589 let bridges_instanciator = if culist_bridge_specs.is_empty() {
590 quote! {
591 pub fn bridges_instanciator(_config: &CuConfig) -> CuResult<CuBridges> {
592 Ok(())
593 }
594 }
595 } else {
596 let bridge_bindings = bridge_binding_idents.clone();
597 quote! {
598 pub fn bridges_instanciator(config: &CuConfig) -> CuResult<CuBridges> {
599 #(#bridge_init_statements)*
600 Ok((#(#bridge_bindings),*,))
601 }
602 }
603 };
604
605 let all_sim_tasks_types: Vec<Type> = task_specs.ids
606 .iter()
607 .zip(&task_specs.cutypes)
608 .zip(&task_specs.sim_task_types)
609 .zip(&task_specs.background_flags)
610 .zip(&task_specs.run_in_sim_flags)
611 .map(|((((task_id, task_type), sim_type), background), run_in_sim)| {
612 match task_type {
613 CuTaskType::Source => {
614 if *background {
615 panic!("CuSrcTask {task_id} cannot be a background task, it should be a regular task.");
616 }
617 if *run_in_sim {
618 sim_type.clone()
619 } else {
620 let msg_type = graph
621 .get_node_output_msg_type(task_id.as_str())
622 .unwrap_or_else(|| panic!("CuSrcTask {task_id} should have an outgoing connection with a valid output msg type"));
623 let sim_task_name = format!("CuSimSrcTask<{msg_type}>");
624 parse_str(sim_task_name.as_str()).unwrap_or_else(|_| panic!("Could not build the placeholder for simulation: {sim_task_name}"))
625 }
626 }
627 CuTaskType::Regular => {
628 sim_type.clone()
631 },
632 CuTaskType::Sink => {
633 if *background {
634 panic!("CuSinkTask {task_id} cannot be a background task, it should be a regular task.");
635 }
636
637 if *run_in_sim {
638 sim_type.clone()
640 }
641 else {
642 let msg_types = graph
644 .get_node_input_msg_types(task_id.as_str())
645 .unwrap_or_else(|| panic!("CuSinkTask {task_id} should have an incoming connection with a valid input msg type"));
646 let msg_type = if msg_types.len() == 1 {
647 format!("({},)", msg_types[0])
648 } else {
649 format!("({})", msg_types.join(", "))
650 };
651 let sim_task_name = format!("CuSimSinkTask<{msg_type}>");
652 parse_str(sim_task_name.as_str()).unwrap_or_else(|_| panic!("Could not build the placeholder for simulation: {sim_task_name}"))
653 }
654 }
655 }
656 })
657 .collect();
658
659 #[cfg(feature = "macro_debug")]
660 eprintln!("[build task tuples]");
661
662 let task_types = &task_specs.task_types;
663 let task_types_tuple: TypeTuple = if task_types.is_empty() {
666 parse_quote! { () }
667 } else {
668 parse_quote! { (#(#task_types),*,) }
669 };
670
671 let task_types_tuple_sim: TypeTuple = if all_sim_tasks_types.is_empty() {
672 parse_quote! { () }
673 } else {
674 parse_quote! { (#(#all_sim_tasks_types),*,) }
675 };
676
677 #[cfg(feature = "macro_debug")]
678 eprintln!("[gen instances]");
679 let task_sim_instances_init_code = all_sim_tasks_types.iter().enumerate().map(|(index, ty)| {
681 let additional_error_info = format!(
682 "Failed to get create instance for {}, instance index {}.",
683 task_specs.type_names[index], index
684 );
685
686 quote! {
687 <#ty>::new(all_instances_configs[#index]).map_err(|e| e.add_cause(#additional_error_info))?
688 }
689 }).collect::<Vec<_>>();
690
691 let task_instances_init_code = task_specs.instantiation_types.iter().zip(&task_specs.background_flags).enumerate().map(|(index, (task_type, background))| {
692 let additional_error_info = format!(
693 "Failed to get create instance for {}, instance index {}.",
694 task_specs.type_names[index], index
695 );
696 if *background {
697 quote! {
698 #task_type::new(all_instances_configs[#index], threadpool.clone()).map_err(|e| e.add_cause(#additional_error_info))?
699 }
700 } else {
701 quote! {
702 #task_type::new(all_instances_configs[#index]).map_err(|e| e.add_cause(#additional_error_info))?
703 }
704 }
705 }).collect::<Vec<_>>();
706
707 let (
710 task_restore_code,
711 task_start_calls,
712 task_stop_calls,
713 task_preprocess_calls,
714 task_postprocess_calls,
715 ): (Vec<_>, Vec<_>, Vec<_>, Vec<_>, Vec<_>) = itertools::multiunzip(
716 (0..task_specs.task_types.len())
717 .map(|index| {
718 let task_index = int2sliceindex(index as u32);
719 let task_tuple_index = syn::Index::from(index);
720 let task_enum_name = config_id_to_enum(&task_specs.ids[index]);
721 let enum_name = Ident::new(&task_enum_name, Span::call_site());
722 (
723 quote! {
725 tasks.#task_tuple_index.thaw(&mut decoder).map_err(|e| CuError::new_with_cause("Failed to thaw", e))?
726 },
727 { let monitoring_action = quote! {
729 let decision = self.copper_runtime.monitor.process_error(#index, CuTaskState::Start, &error);
730 match decision {
731 Decision::Abort => {
732 debug!("Start: ABORT decision from monitoring. Task '{}' errored out \
733 during start. Aborting all the other starts.", #mission_mod::TASKS_IDS[#index]);
734 return Ok(());
735
736 }
737 Decision::Ignore => {
738 debug!("Start: IGNORE decision from monitoring. Task '{}' errored out \
739 during start. The runtime will continue.", #mission_mod::TASKS_IDS[#index]);
740 }
741 Decision::Shutdown => {
742 debug!("Start: SHUTDOWN decision from monitoring. Task '{}' errored out \
743 during start. The runtime cannot continue.", #mission_mod::TASKS_IDS[#index]);
744 return Err(CuError::new_with_cause("Task errored out during start.", error));
745 }
746 }
747 };
748
749 let call_sim_callback = if sim_mode {
750 quote! {
751 let ovr = sim_callback(SimStep::#enum_name(CuTaskCallbackState::Start));
753
754 let doit = if let SimOverride::Errored(reason) = ovr {
755 let error: CuError = reason.into();
756 #monitoring_action
757 false
758 }
759 else {
760 ovr == SimOverride::ExecuteByRuntime
761 };
762 }
763 } else {
764 quote! {
765 let doit = true; }
767 };
768
769
770 quote! {
771 #call_sim_callback
772 if doit {
773 let task = &mut self.copper_runtime.tasks.#task_index;
774 if let Err(error) = task.start(&self.copper_runtime.clock) {
775 #monitoring_action
776 }
777 }
778 }
779 },
780 { let monitoring_action = quote! {
782 let decision = self.copper_runtime.monitor.process_error(#index, CuTaskState::Stop, &error);
783 match decision {
784 Decision::Abort => {
785 debug!("Stop: ABORT decision from monitoring. Task '{}' errored out \
786 during stop. Aborting all the other starts.", #mission_mod::TASKS_IDS[#index]);
787 return Ok(());
788
789 }
790 Decision::Ignore => {
791 debug!("Stop: IGNORE decision from monitoring. Task '{}' errored out \
792 during stop. The runtime will continue.", #mission_mod::TASKS_IDS[#index]);
793 }
794 Decision::Shutdown => {
795 debug!("Stop: SHUTDOWN decision from monitoring. Task '{}' errored out \
796 during stop. The runtime cannot continue.", #mission_mod::TASKS_IDS[#index]);
797 return Err(CuError::new_with_cause("Task errored out during stop.", error));
798 }
799 }
800 };
801 let call_sim_callback = if sim_mode {
802 quote! {
803 let ovr = sim_callback(SimStep::#enum_name(CuTaskCallbackState::Stop));
805
806 let doit = if let SimOverride::Errored(reason) = ovr {
807 let error: CuError = reason.into();
808 #monitoring_action
809 false
810 }
811 else {
812 ovr == SimOverride::ExecuteByRuntime
813 };
814 }
815 } else {
816 quote! {
817 let doit = true; }
819 };
820 quote! {
821 #call_sim_callback
822 if doit {
823 let task = &mut self.copper_runtime.tasks.#task_index;
824 if let Err(error) = task.stop(&self.copper_runtime.clock) {
825 #monitoring_action
826 }
827 }
828 }
829 },
830 { let monitoring_action = quote! {
832 let decision = monitor.process_error(#index, CuTaskState::Preprocess, &error);
833 match decision {
834 Decision::Abort => {
835 debug!("Preprocess: ABORT decision from monitoring. Task '{}' errored out \
836 during preprocess. Aborting all the other starts.", #mission_mod::TASKS_IDS[#index]);
837 return Ok(());
838
839 }
840 Decision::Ignore => {
841 debug!("Preprocess: IGNORE decision from monitoring. Task '{}' errored out \
842 during preprocess. The runtime will continue.", #mission_mod::TASKS_IDS[#index]);
843 }
844 Decision::Shutdown => {
845 debug!("Preprocess: SHUTDOWN decision from monitoring. Task '{}' errored out \
846 during preprocess. The runtime cannot continue.", #mission_mod::TASKS_IDS[#index]);
847 return Err(CuError::new_with_cause("Task errored out during preprocess.", error));
848 }
849 }
850 };
851 let call_sim_callback = if sim_mode {
852 quote! {
853 let ovr = sim_callback(SimStep::#enum_name(CuTaskCallbackState::Preprocess));
855
856 let doit = if let SimOverride::Errored(reason) = ovr {
857 let error: CuError = reason.into();
858 #monitoring_action
859 false
860 } else {
861 ovr == SimOverride::ExecuteByRuntime
862 };
863 }
864 } else {
865 quote! {
866 let doit = true; }
868 };
869 quote! {
870 #call_sim_callback
871 if doit {
872 if let Err(error) = tasks.#task_index.preprocess(clock) {
873 #monitoring_action
874 }
875 }
876 }
877 },
878 { let monitoring_action = quote! {
880 let decision = monitor.process_error(#index, CuTaskState::Postprocess, &error);
881 match decision {
882 Decision::Abort => {
883 debug!("Postprocess: ABORT decision from monitoring. Task '{}' errored out \
884 during postprocess. Aborting all the other starts.", #mission_mod::TASKS_IDS[#index]);
885 return Ok(());
886
887 }
888 Decision::Ignore => {
889 debug!("Postprocess: IGNORE decision from monitoring. Task '{}' errored out \
890 during postprocess. The runtime will continue.", #mission_mod::TASKS_IDS[#index]);
891 }
892 Decision::Shutdown => {
893 debug!("Postprocess: SHUTDOWN decision from monitoring. Task '{}' errored out \
894 during postprocess. The runtime cannot continue.", #mission_mod::TASKS_IDS[#index]);
895 return Err(CuError::new_with_cause("Task errored out during postprocess.", error));
896 }
897 }
898 };
899 let call_sim_callback = if sim_mode {
900 quote! {
901 let ovr = sim_callback(SimStep::#enum_name(CuTaskCallbackState::Postprocess));
903
904 let doit = if let SimOverride::Errored(reason) = ovr {
905 let error: CuError = reason.into();
906 #monitoring_action
907 false
908 } else {
909 ovr == SimOverride::ExecuteByRuntime
910 };
911 }
912 } else {
913 quote! {
914 let doit = true; }
916 };
917 quote! {
918 #call_sim_callback
919 if doit {
920 if let Err(error) = tasks.#task_index.postprocess(clock) {
921 #monitoring_action
922 }
923 }
924 }
925 }
926 )
927 })
928 );
929
930 let bridge_start_calls: Vec<proc_macro2::TokenStream> = culist_bridge_specs
931 .iter()
932 .map(|spec| {
933 let bridge_index = int2sliceindex(spec.tuple_index as u32);
934 let monitor_index = syn::Index::from(
935 spec.monitor_index
936 .expect("Bridge missing monitor index for start"),
937 );
938 quote! {
939 {
940 let bridge = &mut self.copper_runtime.bridges.#bridge_index;
941 if let Err(error) = bridge.start(&self.copper_runtime.clock) {
942 let decision = self.copper_runtime.monitor.process_error(#monitor_index, CuTaskState::Start, &error);
943 match decision {
944 Decision::Abort => {
945 debug!("Start: ABORT decision from monitoring. Task '{}' errored out during start. Aborting all the other starts.", #mission_mod::TASKS_IDS[#monitor_index]);
946 return Ok(());
947 }
948 Decision::Ignore => {
949 debug!("Start: IGNORE decision from monitoring. Task '{}' errored out during start. The runtime will continue.", #mission_mod::TASKS_IDS[#monitor_index]);
950 }
951 Decision::Shutdown => {
952 debug!("Start: SHUTDOWN decision from monitoring. Task '{}' errored out during start. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
953 return Err(CuError::new_with_cause("Task errored out during start.", error));
954 }
955 }
956 }
957 }
958 }
959 })
960 .collect();
961
962 let bridge_stop_calls: Vec<proc_macro2::TokenStream> = culist_bridge_specs
963 .iter()
964 .map(|spec| {
965 let bridge_index = int2sliceindex(spec.tuple_index as u32);
966 let monitor_index = syn::Index::from(
967 spec.monitor_index
968 .expect("Bridge missing monitor index for stop"),
969 );
970 quote! {
971 {
972 let bridge = &mut self.copper_runtime.bridges.#bridge_index;
973 if let Err(error) = bridge.stop(&self.copper_runtime.clock) {
974 let decision = self.copper_runtime.monitor.process_error(#monitor_index, CuTaskState::Stop, &error);
975 match decision {
976 Decision::Abort => {
977 debug!("Stop: ABORT decision from monitoring. Task '{}' errored out during stop. Aborting all the other stops.", #mission_mod::TASKS_IDS[#monitor_index]);
978 return Ok(());
979 }
980 Decision::Ignore => {
981 debug!("Stop: IGNORE decision from monitoring. Task '{}' errored out during stop. The runtime will continue.", #mission_mod::TASKS_IDS[#monitor_index]);
982 }
983 Decision::Shutdown => {
984 debug!("Stop: SHUTDOWN decision from monitoring. Task '{}' errored out during stop. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
985 return Err(CuError::new_with_cause("Task errored out during stop.", error));
986 }
987 }
988 }
989 }
990 }
991 })
992 .collect();
993
994 let bridge_preprocess_calls: Vec<proc_macro2::TokenStream> = culist_bridge_specs
995 .iter()
996 .map(|spec| {
997 let bridge_index = int2sliceindex(spec.tuple_index as u32);
998 let monitor_index = syn::Index::from(
999 spec.monitor_index
1000 .expect("Bridge missing monitor index for preprocess"),
1001 );
1002 quote! {
1003 {
1004 let bridge = &mut bridges.#bridge_index;
1005 if let Err(error) = bridge.preprocess(clock) {
1006 let decision = monitor.process_error(#monitor_index, CuTaskState::Preprocess, &error);
1007 match decision {
1008 Decision::Abort => {
1009 debug!("Preprocess: ABORT decision from monitoring. Task '{}' errored out during preprocess. Aborting all the other starts.", #mission_mod::TASKS_IDS[#monitor_index]);
1010 return Ok(());
1011 }
1012 Decision::Ignore => {
1013 debug!("Preprocess: IGNORE decision from monitoring. Task '{}' errored out during preprocess. The runtime will continue.", #mission_mod::TASKS_IDS[#monitor_index]);
1014 }
1015 Decision::Shutdown => {
1016 debug!("Preprocess: SHUTDOWN decision from monitoring. Task '{}' errored out during preprocess. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
1017 return Err(CuError::new_with_cause("Task errored out during preprocess.", error));
1018 }
1019 }
1020 }
1021 }
1022 }
1023 })
1024 .collect();
1025
1026 let bridge_postprocess_calls: Vec<proc_macro2::TokenStream> = culist_bridge_specs
1027 .iter()
1028 .map(|spec| {
1029 let bridge_index = int2sliceindex(spec.tuple_index as u32);
1030 let monitor_index = syn::Index::from(
1031 spec.monitor_index
1032 .expect("Bridge missing monitor index for postprocess"),
1033 );
1034 quote! {
1035 {
1036 let bridge = &mut bridges.#bridge_index;
1037 if let Err(error) = bridge.postprocess(clock) {
1038 let decision = monitor.process_error(#monitor_index, CuTaskState::Postprocess, &error);
1039 match decision {
1040 Decision::Abort => {
1041 debug!("Postprocess: ABORT decision from monitoring. Task '{}' errored out during postprocess. Aborting all the other starts.", #mission_mod::TASKS_IDS[#monitor_index]);
1042 return Ok(());
1043 }
1044 Decision::Ignore => {
1045 debug!("Postprocess: IGNORE decision from monitoring. Task '{}' errored out during postprocess. The runtime will continue.", #mission_mod::TASKS_IDS[#monitor_index]);
1046 }
1047 Decision::Shutdown => {
1048 debug!("Postprocess: SHUTDOWN decision from monitoring. Task '{}' errored out during postprocess. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
1049 return Err(CuError::new_with_cause("Task errored out during postprocess.", error));
1050 }
1051 }
1052 }
1053 }
1054 }
1055 })
1056 .collect();
1057
1058 let mut start_calls = bridge_start_calls;
1059 start_calls.extend(task_start_calls);
1060 let mut stop_calls = task_stop_calls;
1061 stop_calls.extend(bridge_stop_calls);
1062 let mut preprocess_calls = bridge_preprocess_calls;
1063 preprocess_calls.extend(task_preprocess_calls);
1064 let mut postprocess_calls = task_postprocess_calls;
1065 postprocess_calls.extend(bridge_postprocess_calls);
1066
1067 let runtime_plan_code_and_logging: Vec<(
1068 proc_macro2::TokenStream,
1069 proc_macro2::TokenStream,
1070 )> = culist_plan
1071 .steps
1072 .iter()
1073 .map(|unit| match unit {
1074 CuExecutionUnit::Step(step) => {
1075 #[cfg(feature = "macro_debug")]
1076 eprintln!(
1077 "{} -> {} as {:?}. task_id: {} Input={:?}, Output={:?}",
1078 step.node.get_id(),
1079 step.node.get_type(),
1080 step.task_type,
1081 step.node_id,
1082 step.input_msg_indices_types,
1083 step.output_msg_index_type
1084 );
1085
1086 match &culist_exec_entities[step.node_id as usize].kind {
1087 ExecutionEntityKind::Task { task_index } => generate_task_execution_tokens(
1088 step,
1089 *task_index,
1090 &task_specs,
1091 sim_mode,
1092 &mission_mod,
1093 ),
1094 ExecutionEntityKind::BridgeRx {
1095 bridge_index,
1096 channel_index,
1097 } => {
1098 let spec = &culist_bridge_specs[*bridge_index];
1099 generate_bridge_rx_execution_tokens(spec, *channel_index, &mission_mod)
1100 }
1101 ExecutionEntityKind::BridgeTx {
1102 bridge_index,
1103 channel_index,
1104 } => {
1105 let spec = &culist_bridge_specs[*bridge_index];
1106 generate_bridge_tx_execution_tokens(
1107 step,
1108 spec,
1109 *channel_index,
1110 &mission_mod,
1111 )
1112 }
1113 }
1114 }
1115 CuExecutionUnit::Loop(_) => {
1116 panic!("Execution loops are not supported in runtime generation");
1117 }
1118 })
1119 .collect();
1120
1121 let sim_support = if sim_mode {
1122 Some(gen_sim_support(&culist_plan, &culist_exec_entities))
1123 } else {
1124 None
1125 };
1126
1127 let (new, run_one_iteration, start_all_tasks, stop_all_tasks, run) = if sim_mode {
1128 (
1129 quote! {
1130 fn new(clock:RobotClock, unified_logger: Arc<Mutex<L>>, config_override: Option<CuConfig>, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<Self>
1131 },
1132 quote! {
1133 fn run_one_iteration(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()>
1134 },
1135 quote! {
1136 fn start_all_tasks(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()>
1137 },
1138 quote! {
1139 fn stop_all_tasks(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()>
1140 },
1141 quote! {
1142 fn run(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()>
1143 },
1144 )
1145 } else {
1146 (
1147 if std {
1148 quote! {
1149 fn new(clock:RobotClock, unified_logger: Arc<Mutex<L>>, config_override: Option<CuConfig>) -> CuResult<Self>
1150 }
1151 } else {
1152 quote! {
1153 fn new(clock:RobotClock, unified_logger: Arc<Mutex<L>>) -> CuResult<Self>
1155 }
1156 },
1157 quote! {
1158 fn run_one_iteration(&mut self) -> CuResult<()>
1159 },
1160 quote! {
1161 fn start_all_tasks(&mut self) -> CuResult<()>
1162 },
1163 quote! {
1164 fn stop_all_tasks(&mut self) -> CuResult<()>
1165 },
1166 quote! {
1167 fn run(&mut self) -> CuResult<()>
1168 },
1169 )
1170 };
1171
1172 let sim_callback_arg = if sim_mode {
1173 Some(quote!(sim_callback))
1174 } else {
1175 None
1176 };
1177
1178 let app_trait = if sim_mode {
1179 quote!(CuSimApplication)
1180 } else {
1181 quote!(CuApplication)
1182 };
1183
1184 let sim_callback_on_new_calls = task_specs.ids.iter().enumerate().map(|(i, id)| {
1185 let enum_name = config_id_to_enum(id);
1186 let enum_ident = Ident::new(&enum_name, Span::call_site());
1187 quote! {
1188 sim_callback(SimStep::#enum_ident(CuTaskCallbackState::New(all_instances_configs[#i].cloned())));
1190 }
1191 });
1192
1193 let sim_callback_on_new = if sim_mode {
1194 Some(quote! {
1195 let graph = config.get_graph(Some(#mission)).expect("Could not find the mission #mission");
1196 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
1197 .get_all_nodes()
1198 .iter()
1199 .map(|(_, node)| node.get_instance_config())
1200 .collect();
1201 #(#sim_callback_on_new_calls)*
1202 })
1203 } else {
1204 None
1205 };
1206
1207 let (runtime_plan_code, preprocess_logging_calls): (Vec<_>, Vec<_>) =
1208 itertools::multiunzip(runtime_plan_code_and_logging);
1209
1210 let config_load_stmt = if std {
1211 quote! {
1212 let config = if let Some(overridden_config) = config_override {
1213 debug!("CuConfig: Overridden programmatically: {}", overridden_config.serialize_ron());
1214 overridden_config
1215 } else if ::std::path::Path::new(config_filename).exists() {
1216 debug!("CuConfig: Reading configuration from file: {}", config_filename);
1217 cu29::config::read_configuration(config_filename)?
1218 } else {
1219 let original_config = <Self as #app_trait<S, L>>::get_original_config();
1220 debug!("CuConfig: Using the original configuration the project was compiled with: {}", &original_config);
1221 cu29::config::read_configuration_str(original_config, None)?
1222 };
1223 }
1224 } else {
1225 quote! {
1226 let original_config = <Self as #app_trait<S, L>>::get_original_config();
1228 debug!("CuConfig: Using the original configuration the project was compiled with: {}", &original_config);
1229 let config = cu29::config::read_configuration_str(original_config, None)?;
1230 }
1231 };
1232
1233 let kill_handler = if std {
1234 Some(quote! {
1235 ctrlc::set_handler(move || {
1236 STOP_FLAG.store(true, Ordering::SeqCst);
1237 }).expect("Error setting Ctrl-C handler");
1238 })
1239 } else {
1240 None
1241 };
1242
1243 let run_loop = if std {
1244 quote! {
1245 loop {
1246 let iter_start = self.copper_runtime.clock.now();
1247 let result = <Self as #app_trait<S, L>>::run_one_iteration(self, #sim_callback_arg);
1248
1249 if let Some(rate) = self.copper_runtime.runtime_config.rate_target_hz {
1250 let period: CuDuration = (1_000_000_000u64 / rate).into();
1251 let elapsed = self.copper_runtime.clock.now() - iter_start;
1252 if elapsed < period {
1253 std::thread::sleep(std::time::Duration::from_nanos(period.as_nanos() - elapsed.as_nanos()));
1254 }
1255 }
1256
1257 if STOP_FLAG.load(Ordering::SeqCst) || result.is_err() {
1258 break result;
1259 }
1260 }
1261 }
1262 } else {
1263 quote! {
1264 loop {
1265 let iter_start = self.copper_runtime.clock.now();
1266 let result = <Self as #app_trait<S, L>>::run_one_iteration(self, #sim_callback_arg);
1267 if let Some(rate) = self.copper_runtime.runtime_config.rate_target_hz {
1268 let period: CuDuration = (1_000_000_000u64 / rate).into();
1269 let elapsed = self.copper_runtime.clock.now() - iter_start;
1270 if elapsed < period {
1271 busy_wait_for(period - elapsed);
1272 }
1273 }
1274
1275 if STOP_FLAG.load(Ordering::SeqCst) || result.is_err() {
1276 break result;
1277 }
1278 }
1279 }
1280 };
1281
1282 #[cfg(feature = "macro_debug")]
1283 eprintln!("[build the run methods]");
1284 let run_methods = quote! {
1285
1286 #run_one_iteration {
1287
1288 let runtime = &mut self.copper_runtime;
1290 let clock = &runtime.clock;
1291 let monitor = &mut runtime.monitor;
1292 let tasks = &mut runtime.tasks;
1293 let bridges = &mut runtime.bridges;
1294 let cl_manager = &mut runtime.copperlists_manager;
1295 let kf_manager = &mut runtime.keyframes_manager;
1296
1297 #(#preprocess_calls)*
1299
1300 let culist = cl_manager.inner.create().expect("Ran out of space for copper lists"); let clid = culist.id;
1302 kf_manager.reset(clid, clock); culist.change_state(cu29::copperlist::CopperListState::Processing);
1304 culist.msgs.init_zeroed();
1305 {
1306 let msgs = &mut culist.msgs.0;
1307 #(#runtime_plan_code)*
1308 } monitor.process_copperlist(&#mission_mod::collect_metadata(&culist))?;
1310
1311 #(#preprocess_logging_calls)*
1313
1314 cl_manager.end_of_processing(clid)?;
1315 kf_manager.end_of_processing(clid)?;
1316
1317 #(#postprocess_calls)*
1319 Ok(())
1320 }
1321
1322 fn restore_keyframe(&mut self, keyframe: &KeyFrame) -> CuResult<()> {
1323 let runtime = &mut self.copper_runtime;
1324 let clock = &runtime.clock;
1325 let tasks = &mut runtime.tasks;
1326 let config = cu29::bincode::config::standard();
1327 let reader = cu29::bincode::de::read::SliceReader::new(&keyframe.serialized_tasks);
1328 let mut decoder = DecoderImpl::new(reader, config, ());
1329 #(#task_restore_code);*;
1330 Ok(())
1331 }
1332
1333 #start_all_tasks {
1334 #(#start_calls)*
1335 self.copper_runtime.monitor.start(&self.copper_runtime.clock)?;
1336 Ok(())
1337 }
1338
1339 #stop_all_tasks {
1340 #(#stop_calls)*
1341 self.copper_runtime.monitor.stop(&self.copper_runtime.clock)?;
1342 Ok(())
1343 }
1344
1345 #run {
1346 static STOP_FLAG: AtomicBool = AtomicBool::new(false);
1347
1348 #kill_handler
1349
1350 <Self as #app_trait<S, L>>::start_all_tasks(self, #sim_callback_arg)?;
1351 let result = #run_loop;
1352
1353 if result.is_err() {
1354 error!("A task errored out: {}", &result);
1355 }
1356 <Self as #app_trait<S, L>>::stop_all_tasks(self, #sim_callback_arg)?;
1357 result
1358 }
1359 };
1360
1361 let tasks_type = if sim_mode {
1362 quote!(CuSimTasks)
1363 } else {
1364 quote!(CuTasks)
1365 };
1366
1367 let tasks_instanciator_fn = if sim_mode {
1368 quote!(tasks_instanciator_sim)
1369 } else {
1370 quote!(tasks_instanciator)
1371 };
1372
1373 let app_impl_decl = if sim_mode {
1374 quote!(impl<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static> CuSimApplication<S, L> for #application_name)
1375 } else {
1376 quote!(impl<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static> CuApplication<S, L> for #application_name)
1377 };
1378
1379 let simstep_type_decl = if sim_mode {
1380 quote!(
1381 type Step<'z> = SimStep<'z>;
1382 )
1383 } else {
1384 quote!()
1385 };
1386
1387 #[cfg(feature = "std")]
1388 #[cfg(feature = "macro_debug")]
1389 eprintln!("[build result]");
1390 let application_impl = quote! {
1391 #app_impl_decl {
1392 #simstep_type_decl
1393
1394 #new {
1395 let config_filename = #config_file;
1396
1397 #config_load_stmt
1398
1399 let mut default_section_size = size_of::<super::#mission_mod::CuList>() * 64;
1402 if let Some(section_size_mib) = config.logging.as_ref().and_then(|l| l.section_size_mib) {
1404 default_section_size = section_size_mib as usize * 1024usize * 1024usize;
1406 }
1407 let copperlist_stream = stream_write::<#mission_mod::CuList, S>(
1408 unified_logger.clone(),
1409 UnifiedLogType::CopperList,
1410 default_section_size,
1411 )?;
1415
1416 let keyframes_stream = stream_write::<KeyFrame, S>(
1417 unified_logger.clone(),
1418 UnifiedLogType::FrozenTasks,
1419 1024 * 1024 * 10, )?;
1421
1422
1423 let application = Ok(#application_name {
1424 copper_runtime: CuRuntime::<#mission_mod::#tasks_type, #mission_mod::CuBridges, #mission_mod::CuStampedDataSet, #monitor_type, #DEFAULT_CLNB>::new(
1425 clock,
1426 &config,
1427 Some(#mission),
1428 #mission_mod::#tasks_instanciator_fn,
1429 #mission_mod::monitor_instanciator,
1430 #mission_mod::bridges_instanciator,
1431 copperlist_stream,
1432 keyframes_stream)?, });
1434
1435 #sim_callback_on_new
1436
1437 application
1438 }
1439
1440 fn get_original_config() -> String {
1441 #copper_config_content.to_string()
1442 }
1443
1444 #run_methods
1445 }
1446 };
1447
1448 let (
1449 builder_struct,
1450 builder_new,
1451 builder_impl,
1452 builder_sim_callback_method,
1453 builder_build_sim_callback_arg,
1454 ) = if sim_mode {
1455 (
1456 quote! {
1457 #[allow(dead_code)]
1458 pub struct #builder_name <'a, F> {
1459 clock: Option<RobotClock>,
1460 unified_logger: Option<Arc<Mutex<UnifiedLoggerWrite>>>,
1461 config_override: Option<CuConfig>,
1462 sim_callback: Option<&'a mut F>
1463 }
1464 },
1465 quote! {
1466 #[allow(dead_code)]
1467 pub fn new() -> Self {
1468 Self {
1469 clock: None,
1470 unified_logger: None,
1471 config_override: None,
1472 sim_callback: None,
1473 }
1474 }
1475 },
1476 quote! {
1477 impl<'a, F> #builder_name <'a, F>
1478 where
1479 F: FnMut(SimStep) -> SimOverride,
1480 },
1481 Some(quote! {
1482 pub fn with_sim_callback(mut self, sim_callback: &'a mut F) -> Self
1483 {
1484 self.sim_callback = Some(sim_callback);
1485 self
1486 }
1487 }),
1488 Some(quote! {
1489 self.sim_callback
1490 .ok_or(CuError::from("Sim callback missing from builder"))?,
1491 }),
1492 )
1493 } else {
1494 (
1495 quote! {
1496 #[allow(dead_code)]
1497 pub struct #builder_name {
1498 clock: Option<RobotClock>,
1499 unified_logger: Option<Arc<Mutex<UnifiedLoggerWrite>>>,
1500 config_override: Option<CuConfig>,
1501 }
1502 },
1503 quote! {
1504 #[allow(dead_code)]
1505 pub fn new() -> Self {
1506 Self {
1507 clock: None,
1508 unified_logger: None,
1509 config_override: None,
1510 }
1511 }
1512 },
1513 quote! {
1514 impl #builder_name
1515 },
1516 None,
1517 None,
1518 )
1519 };
1520
1521 let std_application_impl = if sim_mode {
1523 Some(quote! {
1525 impl #application_name {
1526 pub fn start_all_tasks(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()> {
1527 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::start_all_tasks(self, sim_callback)
1528 }
1529 pub fn run_one_iteration(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()> {
1530 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::run_one_iteration(self, sim_callback)
1531 }
1532 pub fn run(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()> {
1533 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::run(self, sim_callback)
1534 }
1535 pub fn stop_all_tasks(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()> {
1536 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::stop_all_tasks(self, sim_callback)
1537 }
1538 }
1539 })
1540 } else if std {
1541 Some(quote! {
1543 impl #application_name {
1544 pub fn start_all_tasks(&mut self) -> CuResult<()> {
1545 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::start_all_tasks(self)
1546 }
1547 pub fn run_one_iteration(&mut self) -> CuResult<()> {
1548 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::run_one_iteration(self)
1549 }
1550 pub fn run(&mut self) -> CuResult<()> {
1551 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::run(self)
1552 }
1553 pub fn stop_all_tasks(&mut self) -> CuResult<()> {
1554 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::stop_all_tasks(self)
1555 }
1556 }
1557 })
1558 } else {
1559 None };
1561
1562 let application_builder = if std {
1563 Some(quote! {
1564 #builder_struct
1565
1566 #builder_impl
1567 {
1568 #builder_new
1569
1570 #[allow(dead_code)]
1571 pub fn with_clock(mut self, clock: RobotClock) -> Self {
1572 self.clock = Some(clock);
1573 self
1574 }
1575
1576 #[allow(dead_code)]
1577 pub fn with_unified_logger(mut self, unified_logger: Arc<Mutex<UnifiedLoggerWrite>>) -> Self {
1578 self.unified_logger = Some(unified_logger);
1579 self
1580 }
1581
1582 #[allow(dead_code)]
1583 pub fn with_context(mut self, copper_ctx: &CopperContext) -> Self {
1584 self.clock = Some(copper_ctx.clock.clone());
1585 self.unified_logger = Some(copper_ctx.unified_logger.clone());
1586 self
1587 }
1588
1589 #[allow(dead_code)]
1590 pub fn with_config(mut self, config_override: CuConfig) -> Self {
1591 self.config_override = Some(config_override);
1592 self
1593 }
1594
1595 #builder_sim_callback_method
1596
1597 #[allow(dead_code)]
1598 pub fn build(self) -> CuResult<#application_name> {
1599 #application_name::new(
1600 self.clock
1601 .ok_or(CuError::from("Clock missing from builder"))?,
1602 self.unified_logger
1603 .ok_or(CuError::from("Unified logger missing from builder"))?,
1604 self.config_override,
1605 #builder_build_sim_callback_arg
1606 )
1607 }
1608 }
1609 })
1610 } else {
1611 None
1613 };
1614
1615 let sim_imports = if sim_mode {
1616 Some(quote! {
1617 use cu29::simulation::SimOverride;
1618 use cu29::simulation::CuTaskCallbackState;
1619 use cu29::simulation::CuSimSrcTask;
1620 use cu29::simulation::CuSimSinkTask;
1621 use cu29::prelude::app::CuSimApplication;
1622 })
1623 } else {
1624 None
1625 };
1626
1627 let sim_tasks = if sim_mode {
1628 Some(quote! {
1629 pub type CuSimTasks = #task_types_tuple_sim;
1632 })
1633 } else {
1634 None
1635 };
1636
1637 let sim_inst_body = if task_sim_instances_init_code.is_empty() {
1638 quote! { Ok(()) }
1639 } else {
1640 quote! { Ok(( #(#task_sim_instances_init_code),*, )) }
1641 };
1642
1643 let sim_tasks_instanciator = if sim_mode {
1644 Some(quote! {
1645 pub fn tasks_instanciator_sim(all_instances_configs: Vec<Option<&ComponentConfig>>, _threadpool: Arc<ThreadPool>) -> CuResult<CuSimTasks> {
1646 #sim_inst_body
1647 }})
1648 } else {
1649 None
1650 };
1651
1652 let tasks_inst_body_std = if task_instances_init_code.is_empty() {
1653 quote! {
1654 let _ = threadpool;
1655 Ok(())
1656 }
1657 } else {
1658 quote! { Ok(( #(#task_instances_init_code),*, )) }
1659 };
1660
1661 let tasks_inst_body_nostd = if task_instances_init_code.is_empty() {
1662 quote! { Ok(()) }
1663 } else {
1664 quote! { Ok(( #(#task_instances_init_code),*, )) }
1665 };
1666
1667 let tasks_instanciator = if std {
1668 quote! {
1669 pub fn tasks_instanciator<'c>(all_instances_configs: Vec<Option<&'c ComponentConfig>>, threadpool: Arc<ThreadPool>) -> CuResult<CuTasks> {
1670 #tasks_inst_body_std
1671 }
1672 }
1673 } else {
1674 quote! {
1676 pub fn tasks_instanciator<'c>(all_instances_configs: Vec<Option<&'c ComponentConfig>>) -> CuResult<CuTasks> {
1677 #tasks_inst_body_nostd
1678 }
1679 }
1680 };
1681
1682 let imports = if std {
1683 quote! {
1684 use cu29::rayon::ThreadPool;
1685 use cu29::cuasynctask::CuAsyncTask;
1686 use cu29::curuntime::CopperContext;
1687 use cu29::prelude::UnifiedLoggerWrite;
1688 use cu29::prelude::memmap::MmapSectionStorage;
1689 use std::fmt::{Debug, Formatter};
1690 use std::fmt::Result as FmtResult;
1691 use std::mem::size_of;
1692 use std::sync::Arc;
1693 use std::sync::atomic::{AtomicBool, Ordering};
1694 use std::sync::Mutex;
1695 }
1696 } else {
1697 quote! {
1698 use alloc::sync::Arc;
1699 use alloc::string::String;
1700 use alloc::string::ToString;
1701 use core::sync::atomic::{AtomicBool, Ordering};
1702 use core::fmt::{Debug, Formatter};
1703 use core::fmt::Result as FmtResult;
1704 use core::mem::size_of;
1705 use spin::Mutex;
1706 }
1707 };
1708
1709 let mission_mod_tokens = quote! {
1711 mod #mission_mod {
1712 use super::*; use cu29::bincode::Encode;
1715 use cu29::bincode::enc::Encoder;
1716 use cu29::bincode::error::EncodeError;
1717 use cu29::bincode::Decode;
1718 use cu29::bincode::de::Decoder;
1719 use cu29::bincode::de::DecoderImpl;
1720 use cu29::bincode::error::DecodeError;
1721 use cu29::clock::RobotClock;
1722 use cu29::config::CuConfig;
1723 use cu29::config::ComponentConfig;
1724 use cu29::curuntime::CuRuntime;
1725 use cu29::curuntime::KeyFrame;
1726 use cu29::CuResult;
1727 use cu29::CuError;
1728 use cu29::cutask::CuSrcTask;
1729 use cu29::cutask::CuSinkTask;
1730 use cu29::cutask::CuTask;
1731 use cu29::cutask::CuMsg;
1732 use cu29::cutask::CuMsgMetadata;
1733 use cu29::copperlist::CopperList;
1734 use cu29::monitoring::CuMonitor; use cu29::monitoring::CuTaskState;
1736 use cu29::monitoring::Decision;
1737 use cu29::prelude::app::CuApplication;
1738 use cu29::prelude::debug;
1739 use cu29::prelude::stream_write;
1740 use cu29::prelude::UnifiedLogType;
1741 use cu29::prelude::UnifiedLogWrite;
1742
1743 #imports
1744
1745 #sim_imports
1746
1747 #[allow(unused_imports)]
1749 use cu29::monitoring::NoMonitor;
1750
1751 pub type CuTasks = #task_types_tuple;
1755 pub type CuBridges = #bridges_type_tokens;
1756
1757 #sim_tasks
1758 #sim_support
1759 #sim_tasks_instanciator
1760
1761 pub const TASKS_IDS: &'static [&'static str] = &[#( #ids ),*];
1762
1763 #culist_support
1764 #tasks_instanciator
1765 #bridges_instanciator
1766
1767 pub fn monitor_instanciator(config: &CuConfig) -> #monitor_type {
1768 #monitor_type::new(config, #mission_mod::TASKS_IDS).expect("Failed to create the given monitor.")
1769 }
1770
1771 pub #application_struct
1773
1774 #application_impl
1775
1776 #std_application_impl
1777
1778 #application_builder
1779 }
1780
1781 };
1782 all_missions_tokens.push(mission_mod_tokens);
1783 }
1784
1785 let default_application_tokens = if all_missions.contains_key("default") {
1786 let default_builder = if std {
1787 Some(quote! {
1788 #[allow(unused_imports)]
1790 use default::#builder_name;
1791 })
1792 } else {
1793 None
1794 };
1795 quote! {
1796 #default_builder
1797
1798 #[allow(unused_imports)]
1799 use default::#application_name;
1800 }
1801 } else {
1802 quote!() };
1804
1805 let result: proc_macro2::TokenStream = quote! {
1806 #(#all_missions_tokens)*
1807 #default_application_tokens
1808 };
1809
1810 #[cfg(feature = "macro_debug")]
1812 {
1813 let formatted_code = rustfmt_generated_code(result.to_string());
1814 eprintln!("\n === Gen. Runtime ===\n");
1815 eprintln!("{formatted_code}");
1816 eprintln!("\n === === === === === ===\n");
1819 }
1820 result.into()
1821}
1822
1823fn read_config(config_file: &str) -> CuResult<CuConfig> {
1824 let filename = config_full_path(config_file);
1825
1826 read_configuration(filename.as_str())
1827}
1828
1829fn config_full_path(config_file: &str) -> String {
1830 let mut config_full_path = utils::caller_crate_root();
1831 config_full_path.push(config_file);
1832 let filename = config_full_path
1833 .as_os_str()
1834 .to_str()
1835 .expect("Could not interpret the config file name");
1836 filename.to_string()
1837}
1838
1839fn extract_tasks_output_types(graph: &CuGraph) -> Vec<Option<Type>> {
1840 let result = graph
1841 .get_all_nodes()
1842 .iter()
1843 .map(|(_, node)| {
1844 let id = node.get_id();
1845 let type_str = graph.get_node_output_msg_type(id.as_str());
1846 let result = type_str.map(|type_str| {
1847 let result = parse_str::<Type>(type_str.as_str())
1848 .expect("Could not parse output message type.");
1849 result
1850 });
1851 result
1852 })
1853 .collect();
1854 result
1855}
1856
1857struct CuTaskSpecSet {
1858 pub ids: Vec<String>,
1859 pub cutypes: Vec<CuTaskType>,
1860 pub background_flags: Vec<bool>,
1861 pub logging_enabled: Vec<bool>,
1862 pub type_names: Vec<String>,
1863 pub task_types: Vec<Type>,
1864 pub instantiation_types: Vec<Type>,
1865 pub sim_task_types: Vec<Type>,
1866 pub run_in_sim_flags: Vec<bool>,
1867 #[allow(dead_code)]
1868 pub output_types: Vec<Option<Type>>,
1869 pub node_id_to_task_index: Vec<Option<usize>>,
1870}
1871
1872impl CuTaskSpecSet {
1873 pub fn from_graph(graph: &CuGraph) -> Self {
1874 let all_id_nodes: Vec<(NodeId, &Node)> = graph
1875 .get_all_nodes()
1876 .into_iter()
1877 .filter(|(_, node)| node.get_flavor() == Flavor::Task)
1878 .collect();
1879
1880 let ids = all_id_nodes
1881 .iter()
1882 .map(|(_, node)| node.get_id().to_string())
1883 .collect();
1884
1885 let cutypes = all_id_nodes
1886 .iter()
1887 .map(|(id, _)| find_task_type_for_id(graph, *id))
1888 .collect();
1889
1890 let background_flags: Vec<bool> = all_id_nodes
1891 .iter()
1892 .map(|(_, node)| node.is_background())
1893 .collect();
1894
1895 let logging_enabled: Vec<bool> = all_id_nodes
1896 .iter()
1897 .map(|(_, node)| node.is_logging_enabled())
1898 .collect();
1899
1900 let type_names: Vec<String> = all_id_nodes
1901 .iter()
1902 .map(|(_, node)| node.get_type().to_string())
1903 .collect();
1904
1905 let output_types = extract_tasks_output_types(graph);
1906
1907 let task_types = type_names
1908 .iter()
1909 .zip(background_flags.iter())
1910 .zip(output_types.iter())
1911 .map(|((name, &background), output_type)| {
1912 let name_type = parse_str::<Type>(name).unwrap_or_else(|error| {
1913 panic!("Could not transform {name} into a Task Rust type: {error}");
1914 });
1915 if background {
1916 if let Some(output_type) = output_type {
1917 parse_quote!(CuAsyncTask<#name_type, #output_type>)
1918 } else {
1919 panic!("{name}: If a task is background, it has to have an output");
1920 }
1921 } else {
1922 name_type
1923 }
1924 })
1925 .collect();
1926
1927 let instantiation_types = type_names
1928 .iter()
1929 .zip(background_flags.iter())
1930 .zip(output_types.iter())
1931 .map(|((name, &background), output_type)| {
1932 let name_type = parse_str::<Type>(name).unwrap_or_else(|error| {
1933 panic!("Could not transform {name} into a Task Rust type: {error}");
1934 });
1935 if background {
1936 if let Some(output_type) = output_type {
1937 parse_quote!(CuAsyncTask::<#name_type, #output_type>)
1938 } else {
1939 panic!("{name}: If a task is background, it has to have an output");
1940 }
1941 } else {
1942 name_type
1943 }
1944 })
1945 .collect();
1946
1947 let sim_task_types = type_names
1948 .iter()
1949 .map(|name| {
1950 parse_str::<Type>(name).unwrap_or_else(|err| {
1951 eprintln!("Could not transform {name} into a Task Rust type.");
1952 panic!("{err}")
1953 })
1954 })
1955 .collect();
1956
1957 let run_in_sim_flags = all_id_nodes
1958 .iter()
1959 .map(|(_, node)| node.is_run_in_sim())
1960 .collect();
1961
1962 let mut node_id_to_task_index = vec![None; graph.node_count()];
1963 for (index, (node_id, _)) in all_id_nodes.iter().enumerate() {
1964 node_id_to_task_index[*node_id as usize] = Some(index);
1965 }
1966
1967 Self {
1968 ids,
1969 cutypes,
1970 background_flags,
1971 logging_enabled,
1972 type_names,
1973 task_types,
1974 instantiation_types,
1975 sim_task_types,
1976 run_in_sim_flags,
1977 output_types,
1978 node_id_to_task_index,
1979 }
1980 }
1981}
1982
1983fn extract_msg_types(runtime_plan: &CuExecutionLoop) -> Vec<Type> {
1984 runtime_plan
1985 .steps
1986 .iter()
1987 .filter_map(|unit| match unit {
1988 CuExecutionUnit::Step(step) => {
1989 if let Some((_, output_msg_type)) = &step.output_msg_index_type {
1990 Some(
1991 parse_str::<Type>(output_msg_type.as_str()).unwrap_or_else(|_| {
1992 panic!(
1993 "Could not transform {output_msg_type} into a message Rust type."
1994 )
1995 }),
1996 )
1997 } else {
1998 None
1999 }
2000 }
2001 CuExecutionUnit::Loop(_) => todo!("Needs to be implemented"),
2002 })
2003 .collect()
2004}
2005
2006fn build_culist_tuple(all_msgs_types_in_culist_order: &[Type]) -> TypeTuple {
2008 if all_msgs_types_in_culist_order.is_empty() {
2009 parse_quote! { () }
2010 } else {
2011 parse_quote! {
2012 ( #( CuMsg<#all_msgs_types_in_culist_order> ),* )
2013 }
2014 }
2015}
2016
2017fn build_culist_tuple_encode(all_msgs_types_in_culist_order: &[Type]) -> ItemImpl {
2019 let indices: Vec<usize> = (0..all_msgs_types_in_culist_order.len()).collect();
2020
2021 let encode_fields: Vec<_> = indices
2023 .iter()
2024 .map(|i| {
2025 let idx = syn::Index::from(*i);
2026 quote! { self.0.#idx.encode(encoder)?; }
2027 })
2028 .collect();
2029
2030 parse_quote! {
2031 impl Encode for CuStampedDataSet {
2032 fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
2033 #(#encode_fields)*
2034 Ok(())
2035 }
2036 }
2037 }
2038}
2039
2040fn build_culist_tuple_decode(all_msgs_types_in_culist_order: &[Type]) -> ItemImpl {
2042 let indices: Vec<usize> = (0..all_msgs_types_in_culist_order.len()).collect();
2043
2044 let decode_fields: Vec<_> = indices
2046 .iter()
2047 .map(|i| {
2048 let t = &all_msgs_types_in_culist_order[*i];
2049 quote! { CuMsg::<#t>::decode(decoder)? }
2050 })
2051 .collect();
2052
2053 parse_quote! {
2054 impl Decode<()> for CuStampedDataSet {
2055 fn decode<D: Decoder<Context=()>>(decoder: &mut D) -> Result<Self, DecodeError> {
2056 Ok(CuStampedDataSet ((
2057 #(#decode_fields),*
2058 )))
2059 }
2060 }
2061 }
2062}
2063
2064fn build_culist_erasedcumsgs(all_msgs_types_in_culist_order: &[Type]) -> ItemImpl {
2065 let indices: Vec<usize> = (0..all_msgs_types_in_culist_order.len()).collect();
2066 let casted_fields: Vec<_> = indices
2067 .iter()
2068 .map(|i| {
2069 let idx = syn::Index::from(*i);
2070 quote! { &self.0.#idx as &dyn ErasedCuStampedData }
2071 })
2072 .collect();
2073 parse_quote! {
2074 impl ErasedCuStampedDataSet for CuStampedDataSet {
2075 fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
2076 vec![
2077 #(#casted_fields),*
2078 ]
2079 }
2080 }
2081 }
2082}
2083
2084fn build_culist_tuple_debug(all_msgs_types_in_culist_order: &[Type]) -> ItemImpl {
2085 let indices: Vec<usize> = (0..all_msgs_types_in_culist_order.len()).collect();
2086
2087 let debug_fields: Vec<_> = indices
2088 .iter()
2089 .map(|i| {
2090 let idx = syn::Index::from(*i);
2091 quote! { .field(&self.0.#idx) }
2092 })
2093 .collect();
2094
2095 parse_quote! {
2096 impl Debug for CuStampedDataSet {
2097 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
2098 f.debug_tuple("CuStampedDataSet")
2099 #(#debug_fields)*
2100 .finish()
2101 }
2102 }
2103 }
2104}
2105
2106fn build_culist_tuple_serialize(all_msgs_types_in_culist_order: &[Type]) -> ItemImpl {
2108 let indices: Vec<usize> = (0..all_msgs_types_in_culist_order.len()).collect();
2109 let tuple_len = all_msgs_types_in_culist_order.len();
2110
2111 let serialize_fields: Vec<_> = indices
2113 .iter()
2114 .map(|i| {
2115 let idx = syn::Index::from(*i);
2116 quote! { &self.0.#idx }
2117 })
2118 .collect();
2119
2120 parse_quote! {
2121 impl Serialize for CuStampedDataSet {
2122 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2123 where
2124 S: serde::Serializer,
2125 {
2126 use serde::ser::SerializeTuple;
2127 let mut tuple = serializer.serialize_tuple(#tuple_len)?;
2128 #(tuple.serialize_element(#serialize_fields)?;)*
2129 tuple.end()
2130 }
2131 }
2132 }
2133}
2134
2135fn build_culist_tuple_default(all_msgs_types_in_culist_order: &[Type]) -> ItemImpl {
2137 let default_fields: Vec<_> = all_msgs_types_in_culist_order
2139 .iter()
2140 .map(|msg_type| quote! { CuStampedData::<#msg_type, CuMsgMetadata>::default() })
2141 .collect();
2142
2143 parse_quote! {
2144 impl Default for CuStampedDataSet {
2145 fn default() -> CuStampedDataSet
2146 {
2147 CuStampedDataSet((
2148 #(#default_fields),*
2149 ))
2150 }
2151 }
2152 }
2153}
2154
2155fn collect_bridge_channel_usage(graph: &CuGraph) -> HashMap<BridgeChannelKey, String> {
2156 let mut usage = HashMap::new();
2157 for edge_idx in graph.0.edge_indices() {
2158 let cnx = graph
2159 .0
2160 .edge_weight(edge_idx)
2161 .expect("Edge should exist while collecting bridge usage")
2162 .clone();
2163 if let Some(channel) = &cnx.src_channel {
2164 let key = BridgeChannelKey {
2165 bridge_id: cnx.src.clone(),
2166 channel_id: channel.clone(),
2167 direction: BridgeChannelDirection::Rx,
2168 };
2169 usage
2170 .entry(key)
2171 .and_modify(|msg| {
2172 if msg != &cnx.msg {
2173 panic!(
2174 "Bridge '{}' channel '{}' is used with incompatible message types: {} vs {}",
2175 cnx.src, channel, msg, cnx.msg
2176 );
2177 }
2178 })
2179 .or_insert(cnx.msg.clone());
2180 }
2181 if let Some(channel) = &cnx.dst_channel {
2182 let key = BridgeChannelKey {
2183 bridge_id: cnx.dst.clone(),
2184 channel_id: channel.clone(),
2185 direction: BridgeChannelDirection::Tx,
2186 };
2187 usage
2188 .entry(key)
2189 .and_modify(|msg| {
2190 if msg != &cnx.msg {
2191 panic!(
2192 "Bridge '{}' channel '{}' is used with incompatible message types: {} vs {}",
2193 cnx.dst, channel, msg, cnx.msg
2194 );
2195 }
2196 })
2197 .or_insert(cnx.msg.clone());
2198 }
2199 }
2200 usage
2201}
2202
2203fn build_bridge_specs(
2204 config: &CuConfig,
2205 graph: &CuGraph,
2206 channel_usage: &HashMap<BridgeChannelKey, String>,
2207) -> Vec<BridgeSpec> {
2208 let mut specs = Vec::new();
2209 for (bridge_index, bridge_cfg) in config.bridges.iter().enumerate() {
2210 if graph.get_node_id_by_name(bridge_cfg.id.as_str()).is_none() {
2211 continue;
2212 }
2213
2214 let type_path = parse_str::<Type>(bridge_cfg.type_.as_str()).unwrap_or_else(|err| {
2215 panic!(
2216 "Could not parse bridge type '{}' for '{}': {err}",
2217 bridge_cfg.type_, bridge_cfg.id
2218 )
2219 });
2220
2221 let mut rx_channels = Vec::new();
2222 let mut tx_channels = Vec::new();
2223
2224 for (channel_index, channel) in bridge_cfg.channels.iter().enumerate() {
2225 match channel {
2226 BridgeChannelConfigRepresentation::Rx { id, .. } => {
2227 let key = BridgeChannelKey {
2228 bridge_id: bridge_cfg.id.clone(),
2229 channel_id: id.clone(),
2230 direction: BridgeChannelDirection::Rx,
2231 };
2232 if let Some(msg_type) = channel_usage.get(&key) {
2233 let msg_type = parse_str::<Type>(msg_type).unwrap_or_else(|err| {
2234 panic!(
2235 "Could not parse message type '{msg_type}' for bridge '{}' channel '{}': {err}",
2236 bridge_cfg.id, id
2237 )
2238 });
2239 let const_ident =
2240 Ident::new(&config_id_to_bridge_const(id.as_str()), Span::call_site());
2241 rx_channels.push(BridgeChannelSpec {
2242 id: id.clone(),
2243 const_ident,
2244 msg_type,
2245 config_index: channel_index,
2246 plan_node_id: None,
2247 culist_index: None,
2248 monitor_index: None,
2249 });
2250 }
2251 }
2252 BridgeChannelConfigRepresentation::Tx { id, .. } => {
2253 let key = BridgeChannelKey {
2254 bridge_id: bridge_cfg.id.clone(),
2255 channel_id: id.clone(),
2256 direction: BridgeChannelDirection::Tx,
2257 };
2258 if let Some(msg_type) = channel_usage.get(&key) {
2259 let msg_type = parse_str::<Type>(msg_type).unwrap_or_else(|err| {
2260 panic!(
2261 "Could not parse message type '{msg_type}' for bridge '{}' channel '{}': {err}",
2262 bridge_cfg.id, id
2263 )
2264 });
2265 let const_ident =
2266 Ident::new(&config_id_to_bridge_const(id.as_str()), Span::call_site());
2267 tx_channels.push(BridgeChannelSpec {
2268 id: id.clone(),
2269 const_ident,
2270 msg_type,
2271 config_index: channel_index,
2272 plan_node_id: None,
2273 culist_index: None,
2274 monitor_index: None,
2275 });
2276 }
2277 }
2278 }
2279 }
2280
2281 if rx_channels.is_empty() && tx_channels.is_empty() {
2282 continue;
2283 }
2284
2285 specs.push(BridgeSpec {
2286 id: bridge_cfg.id.clone(),
2287 type_path,
2288 config_index: bridge_index,
2289 tuple_index: 0,
2290 monitor_index: None,
2291 rx_channels,
2292 tx_channels,
2293 });
2294 }
2295
2296 for (tuple_index, spec) in specs.iter_mut().enumerate() {
2297 spec.tuple_index = tuple_index;
2298 }
2299
2300 specs
2301}
2302
2303fn collect_task_member_names(graph: &CuGraph) -> Vec<(NodeId, String)> {
2304 graph
2305 .get_all_nodes()
2306 .iter()
2307 .filter(|(_, node)| node.get_flavor() == Flavor::Task)
2308 .map(|(node_id, node)| (*node_id, config_id_to_struct_member(node.get_id().as_str())))
2309 .collect()
2310}
2311
2312fn build_execution_plan(
2313 graph: &CuGraph,
2314 task_specs: &CuTaskSpecSet,
2315 bridge_specs: &mut [BridgeSpec],
2316) -> CuResult<(
2317 CuExecutionLoop,
2318 Vec<ExecutionEntity>,
2319 HashMap<NodeId, NodeId>,
2320)> {
2321 let mut plan_graph = CuGraph::default();
2322 let mut exec_entities = Vec::new();
2323 let mut original_to_plan = HashMap::new();
2324 let mut plan_to_original = HashMap::new();
2325 let mut name_to_original = HashMap::new();
2326 let mut channel_nodes = HashMap::new();
2327
2328 for (node_id, node) in graph.get_all_nodes() {
2329 name_to_original.insert(node.get_id(), node_id);
2330 if node.get_flavor() != Flavor::Task {
2331 continue;
2332 }
2333 let plan_node_id = plan_graph.add_node(node.clone())?;
2334 let task_index = task_specs.node_id_to_task_index[node_id as usize]
2335 .expect("Task missing from specifications");
2336 plan_to_original.insert(plan_node_id, node_id);
2337 original_to_plan.insert(node_id, plan_node_id);
2338 if plan_node_id as usize != exec_entities.len() {
2339 panic!("Unexpected node ordering while mirroring tasks in plan graph");
2340 }
2341 exec_entities.push(ExecutionEntity {
2342 kind: ExecutionEntityKind::Task { task_index },
2343 });
2344 }
2345
2346 for (bridge_index, spec) in bridge_specs.iter_mut().enumerate() {
2347 for (channel_index, channel_spec) in spec.rx_channels.iter_mut().enumerate() {
2348 let mut node = Node::new(
2349 format!("{}::rx::{}", spec.id, channel_spec.id).as_str(),
2350 "__CuBridgeRxChannel",
2351 );
2352 node.set_flavor(Flavor::Bridge);
2353 let plan_node_id = plan_graph.add_node(node)?;
2354 if plan_node_id as usize != exec_entities.len() {
2355 panic!("Unexpected node ordering while inserting bridge rx channel");
2356 }
2357 channel_spec.plan_node_id = Some(plan_node_id);
2358 exec_entities.push(ExecutionEntity {
2359 kind: ExecutionEntityKind::BridgeRx {
2360 bridge_index,
2361 channel_index,
2362 },
2363 });
2364 channel_nodes.insert(
2365 BridgeChannelKey {
2366 bridge_id: spec.id.clone(),
2367 channel_id: channel_spec.id.clone(),
2368 direction: BridgeChannelDirection::Rx,
2369 },
2370 plan_node_id,
2371 );
2372 }
2373
2374 for (channel_index, channel_spec) in spec.tx_channels.iter_mut().enumerate() {
2375 let mut node = Node::new(
2376 format!("{}::tx::{}", spec.id, channel_spec.id).as_str(),
2377 "__CuBridgeTxChannel",
2378 );
2379 node.set_flavor(Flavor::Bridge);
2380 let plan_node_id = plan_graph.add_node(node)?;
2381 if plan_node_id as usize != exec_entities.len() {
2382 panic!("Unexpected node ordering while inserting bridge tx channel");
2383 }
2384 channel_spec.plan_node_id = Some(plan_node_id);
2385 exec_entities.push(ExecutionEntity {
2386 kind: ExecutionEntityKind::BridgeTx {
2387 bridge_index,
2388 channel_index,
2389 },
2390 });
2391 channel_nodes.insert(
2392 BridgeChannelKey {
2393 bridge_id: spec.id.clone(),
2394 channel_id: channel_spec.id.clone(),
2395 direction: BridgeChannelDirection::Tx,
2396 },
2397 plan_node_id,
2398 );
2399 }
2400 }
2401
2402 for edge_idx in graph.0.edge_indices() {
2403 let cnx = graph
2404 .0
2405 .edge_weight(edge_idx)
2406 .expect("Edge should exist while building plan")
2407 .clone();
2408
2409 let src_plan = if let Some(channel) = &cnx.src_channel {
2410 let key = BridgeChannelKey {
2411 bridge_id: cnx.src.clone(),
2412 channel_id: channel.clone(),
2413 direction: BridgeChannelDirection::Rx,
2414 };
2415 *channel_nodes
2416 .get(&key)
2417 .unwrap_or_else(|| panic!("Bridge source {:?} missing from plan graph", key))
2418 } else {
2419 let node_id = name_to_original
2420 .get(&cnx.src)
2421 .copied()
2422 .unwrap_or_else(|| panic!("Unknown source node '{}'", cnx.src));
2423 *original_to_plan
2424 .get(&node_id)
2425 .unwrap_or_else(|| panic!("Source node '{}' missing from plan", cnx.src))
2426 };
2427
2428 let dst_plan = if let Some(channel) = &cnx.dst_channel {
2429 let key = BridgeChannelKey {
2430 bridge_id: cnx.dst.clone(),
2431 channel_id: channel.clone(),
2432 direction: BridgeChannelDirection::Tx,
2433 };
2434 *channel_nodes
2435 .get(&key)
2436 .unwrap_or_else(|| panic!("Bridge destination {:?} missing from plan graph", key))
2437 } else {
2438 let node_id = name_to_original
2439 .get(&cnx.dst)
2440 .copied()
2441 .unwrap_or_else(|| panic!("Unknown destination node '{}'", cnx.dst));
2442 *original_to_plan
2443 .get(&node_id)
2444 .unwrap_or_else(|| panic!("Destination node '{}' missing from plan", cnx.dst))
2445 };
2446
2447 plan_graph
2448 .connect_ext(
2449 src_plan,
2450 dst_plan,
2451 &cnx.msg,
2452 cnx.missions.clone(),
2453 None,
2454 None,
2455 )
2456 .map_err(|e| CuError::from(e.to_string()))?;
2457 }
2458
2459 let runtime_plan = compute_runtime_plan(&plan_graph)?;
2460 Ok((runtime_plan, exec_entities, plan_to_original))
2461}
2462
2463fn collect_culist_metadata(
2464 runtime_plan: &CuExecutionLoop,
2465 exec_entities: &[ExecutionEntity],
2466 bridge_specs: &mut [BridgeSpec],
2467 plan_to_original: &HashMap<NodeId, NodeId>,
2468) -> (Vec<usize>, HashMap<NodeId, usize>) {
2469 let mut culist_order = Vec::new();
2470 let mut node_output_positions = HashMap::new();
2471
2472 for unit in &runtime_plan.steps {
2473 if let CuExecutionUnit::Step(step) = unit {
2474 if let Some((output_idx, _)) = &step.output_msg_index_type {
2475 culist_order.push(*output_idx as usize);
2476 match &exec_entities[step.node_id as usize].kind {
2477 ExecutionEntityKind::Task { .. } => {
2478 if let Some(original_node_id) = plan_to_original.get(&step.node_id) {
2479 node_output_positions.insert(*original_node_id, *output_idx as usize);
2480 }
2481 }
2482 ExecutionEntityKind::BridgeRx {
2483 bridge_index,
2484 channel_index,
2485 } => {
2486 bridge_specs[*bridge_index].rx_channels[*channel_index].culist_index =
2487 Some(*output_idx as usize);
2488 }
2489 ExecutionEntityKind::BridgeTx { .. } => {}
2490 }
2491 }
2492 }
2493 }
2494
2495 (culist_order, node_output_positions)
2496}
2497
2498#[allow(dead_code)]
2499fn build_monitored_ids(task_ids: &[String], bridge_specs: &mut [BridgeSpec]) -> Vec<String> {
2500 let mut names = task_ids.to_vec();
2501 for spec in bridge_specs.iter_mut() {
2502 spec.monitor_index = Some(names.len());
2503 names.push(format!("bridge::{}", spec.id));
2504 for channel in spec.rx_channels.iter_mut() {
2505 channel.monitor_index = Some(names.len());
2506 names.push(format!("bridge::{}::rx::{}", spec.id, channel.id));
2507 }
2508 for channel in spec.tx_channels.iter_mut() {
2509 channel.monitor_index = Some(names.len());
2510 names.push(format!("bridge::{}::tx::{}", spec.id, channel.id));
2511 }
2512 }
2513 names
2514}
2515
2516fn generate_task_execution_tokens(
2517 step: &CuExecutionStep,
2518 task_index: usize,
2519 task_specs: &CuTaskSpecSet,
2520 sim_mode: bool,
2521 mission_mod: &Ident,
2522) -> (proc_macro2::TokenStream, proc_macro2::TokenStream) {
2523 let node_index = int2sliceindex(task_index as u32);
2524 let task_instance = quote! { tasks.#node_index };
2525 let comment_str = format!(
2526 "DEBUG ->> {} ({:?}) Id:{} I:{:?} O:{:?}",
2527 step.node.get_id(),
2528 step.task_type,
2529 step.node_id,
2530 step.input_msg_indices_types,
2531 step.output_msg_index_type
2532 );
2533 let comment_tokens = quote! {{
2534 let _ = stringify!(#comment_str);
2535 }};
2536 let tid = task_index;
2537 let task_enum_name = config_id_to_enum(&task_specs.ids[tid]);
2538 let enum_name = Ident::new(&task_enum_name, Span::call_site());
2539
2540 match step.task_type {
2541 CuTaskType::Source => {
2542 if let Some((output_index, _)) = &step.output_msg_index_type {
2543 let output_culist_index = int2sliceindex(*output_index);
2544
2545 let monitoring_action = quote! {
2546 debug!("Task {}: Error during process: {}", #mission_mod::TASKS_IDS[#tid], &error);
2547 let decision = monitor.process_error(#tid, CuTaskState::Process, &error);
2548 match decision {
2549 Decision::Abort => {
2550 debug!("Process: ABORT decision from monitoring. Task '{}' errored out \
2551 during process. Skipping the processing of CL {}.", #mission_mod::TASKS_IDS[#tid], clid);
2552 monitor.process_copperlist(&#mission_mod::collect_metadata(&culist))?;
2553 cl_manager.end_of_processing(clid)?;
2554 return Ok(());
2555 }
2556 Decision::Ignore => {
2557 debug!("Process: IGNORE decision from monitoring. Task '{}' errored out \
2558 during process. The runtime will continue with a forced empty message.", #mission_mod::TASKS_IDS[#tid]);
2559 let cumsg_output = &mut msgs.#output_culist_index;
2560 cumsg_output.clear_payload();
2561 }
2562 Decision::Shutdown => {
2563 debug!("Process: SHUTDOWN decision from monitoring. Task '{}' errored out \
2564 during process. The runtime cannot continue.", #mission_mod::TASKS_IDS[#tid]);
2565 return Err(CuError::new_with_cause("Task errored out during process.", error));
2566 }
2567 }
2568 };
2569
2570 let call_sim_callback = if sim_mode {
2571 quote! {
2572 let doit = {
2573 let cumsg_output = &mut msgs.#output_culist_index;
2574 let state = CuTaskCallbackState::Process((), cumsg_output);
2575 let ovr = sim_callback(SimStep::#enum_name(state));
2576
2577 if let SimOverride::Errored(reason) = ovr {
2578 let error: CuError = reason.into();
2579 #monitoring_action
2580 false
2581 } else {
2582 ovr == SimOverride::ExecuteByRuntime
2583 }
2584 };
2585 }
2586 } else {
2587 quote! { let doit = true; }
2588 };
2589
2590 let logging_tokens = if !task_specs.logging_enabled[tid] {
2591 let output_culist_index = int2sliceindex(*output_index);
2592 quote! {
2593 let mut cumsg_output = &mut culist.msgs.0.#output_culist_index;
2594 cumsg_output.clear_payload();
2595 }
2596 } else {
2597 quote!()
2598 };
2599
2600 (
2601 quote! {
2602 {
2603 #comment_tokens
2604 kf_manager.freeze_task(clid, &#task_instance)?;
2605 #call_sim_callback
2606 let cumsg_output = &mut msgs.#output_culist_index;
2607 cumsg_output.metadata.process_time.start = clock.now().into();
2608 let maybe_error = if doit { #task_instance.process(clock, cumsg_output) } else { Ok(()) };
2609 cumsg_output.metadata.process_time.end = clock.now().into();
2610 if let Err(error) = maybe_error {
2611 #monitoring_action
2612 }
2613 }
2614 },
2615 logging_tokens,
2616 )
2617 } else {
2618 panic!("Source task should have an output message index.");
2619 }
2620 }
2621 CuTaskType::Sink => {
2622 if let Some((output_index, _)) = &step.output_msg_index_type {
2623 let output_culist_index = int2sliceindex(*output_index);
2624 let indices = step
2625 .input_msg_indices_types
2626 .iter()
2627 .map(|(index, _)| int2sliceindex(*index));
2628
2629 let monitoring_action = quote! {
2630 debug!("Task {}: Error during process: {}", #mission_mod::TASKS_IDS[#tid], &error);
2631 let decision = monitor.process_error(#tid, CuTaskState::Process, &error);
2632 match decision {
2633 Decision::Abort => {
2634 debug!("Process: ABORT decision from monitoring. Task '{}' errored out \
2635 during process. Skipping the processing of CL {}.", #mission_mod::TASKS_IDS[#tid], clid);
2636 monitor.process_copperlist(&#mission_mod::collect_metadata(&culist))?;
2637 cl_manager.end_of_processing(clid)?;
2638 return Ok(());
2639 }
2640 Decision::Ignore => {
2641 debug!("Process: IGNORE decision from monitoring. Task '{}' errored out \
2642 during process. The runtime will continue with a forced empty message.", #mission_mod::TASKS_IDS[#tid]);
2643 let cumsg_output = &mut msgs.#output_culist_index;
2644 cumsg_output.clear_payload();
2645 }
2646 Decision::Shutdown => {
2647 debug!("Process: SHUTDOWN decision from monitoring. Task '{}' errored out \
2648 during process. The runtime cannot continue.", #mission_mod::TASKS_IDS[#tid]);
2649 return Err(CuError::new_with_cause("Task errored out during process.", error));
2650 }
2651 }
2652 };
2653
2654 let inputs_type = if indices.len() == 1 {
2655 quote! { #(msgs.#indices)* }
2656 } else {
2657 quote! { (#(&msgs.#indices),*) }
2658 };
2659
2660 let call_sim_callback = if sim_mode {
2661 quote! {
2662 let doit = {
2663 let cumsg_input = &#inputs_type;
2664 let cumsg_output = &mut msgs.#output_culist_index;
2665 let state = CuTaskCallbackState::Process(cumsg_input, cumsg_output);
2666 let ovr = sim_callback(SimStep::#enum_name(state));
2667
2668 if let SimOverride::Errored(reason) = ovr {
2669 let error: CuError = reason.into();
2670 #monitoring_action
2671 false
2672 } else {
2673 ovr == SimOverride::ExecuteByRuntime
2674 }
2675 };
2676 }
2677 } else {
2678 quote! { let doit = true; }
2679 };
2680
2681 (
2682 quote! {
2683 {
2684 #comment_tokens
2685 kf_manager.freeze_task(clid, &#task_instance)?;
2686 #call_sim_callback
2687 let cumsg_input = &#inputs_type;
2688 let cumsg_output = &mut msgs.#output_culist_index;
2689 cumsg_output.metadata.process_time.start = clock.now().into();
2690 let maybe_error = if doit { #task_instance.process(clock, cumsg_input) } else { Ok(()) };
2691 cumsg_output.metadata.process_time.end = clock.now().into();
2692 if let Err(error) = maybe_error {
2693 #monitoring_action
2694 }
2695 }
2696 },
2697 quote! {},
2698 )
2699 } else {
2700 panic!("Sink tasks should have a virtual output message index.");
2701 }
2702 }
2703 CuTaskType::Regular => {
2704 if let Some((output_index, _)) = &step.output_msg_index_type {
2705 let output_culist_index = int2sliceindex(*output_index);
2706 let indices = step
2707 .input_msg_indices_types
2708 .iter()
2709 .map(|(index, _)| int2sliceindex(*index));
2710
2711 let monitoring_action = quote! {
2712 debug!("Task {}: Error during process: {}", #mission_mod::TASKS_IDS[#tid], &error);
2713 let decision = monitor.process_error(#tid, CuTaskState::Process, &error);
2714 match decision {
2715 Decision::Abort => {
2716 debug!("Process: ABORT decision from monitoring. Task '{}' errored out \
2717 during process. Skipping the processing of CL {}.", #mission_mod::TASKS_IDS[#tid], clid);
2718 monitor.process_copperlist(&#mission_mod::collect_metadata(&culist))?;
2719 cl_manager.end_of_processing(clid)?;
2720 return Ok(());
2721 }
2722 Decision::Ignore => {
2723 debug!("Process: IGNORE decision from monitoring. Task '{}' errored out \
2724 during process. The runtime will continue with a forced empty message.", #mission_mod::TASKS_IDS[#tid]);
2725 let cumsg_output = &mut msgs.#output_culist_index;
2726 cumsg_output.clear_payload();
2727 }
2728 Decision::Shutdown => {
2729 debug!("Process: SHUTDOWN decision from monitoring. Task '{}' errored out \
2730 during process. The runtime cannot continue.", #mission_mod::TASKS_IDS[#tid]);
2731 return Err(CuError::new_with_cause("Task errored out during process.", error));
2732 }
2733 }
2734 };
2735
2736 let inputs_type = if indices.len() == 1 {
2737 quote! { #(msgs.#indices)* }
2738 } else {
2739 quote! { (#(&msgs.#indices),*) }
2740 };
2741
2742 let call_sim_callback = if sim_mode {
2743 quote! {
2744 let doit = {
2745 let cumsg_input = &#inputs_type;
2746 let cumsg_output = &mut msgs.#output_culist_index;
2747 let state = CuTaskCallbackState::Process(cumsg_input, cumsg_output);
2748 let ovr = sim_callback(SimStep::#enum_name(state));
2749
2750 if let SimOverride::Errored(reason) = ovr {
2751 let error: CuError = reason.into();
2752 #monitoring_action
2753 false
2754 }
2755 else {
2756 ovr == SimOverride::ExecuteByRuntime
2757 }
2758 };
2759 }
2760 } else {
2761 quote! { let doit = true; }
2762 };
2763
2764 let logging_tokens = if !task_specs.logging_enabled[tid] {
2765 let output_culist_index = int2sliceindex(*output_index);
2766 quote! {
2767 let mut cumsg_output = &mut culist.msgs.0.#output_culist_index;
2768 cumsg_output.clear_payload();
2769 }
2770 } else {
2771 quote!()
2772 };
2773
2774 (
2775 quote! {
2776 {
2777 #comment_tokens
2778 kf_manager.freeze_task(clid, &#task_instance)?;
2779 #call_sim_callback
2780 let cumsg_input = &#inputs_type;
2781 let cumsg_output = &mut msgs.#output_culist_index;
2782 cumsg_output.metadata.process_time.start = clock.now().into();
2783 let maybe_error = if doit { #task_instance.process(clock, cumsg_input, cumsg_output) } else { Ok(()) };
2784 cumsg_output.metadata.process_time.end = clock.now().into();
2785 if let Err(error) = maybe_error {
2786 #monitoring_action
2787 }
2788 }
2789 },
2790 logging_tokens,
2791 )
2792 } else {
2793 panic!("Regular task should have an output message index.");
2794 }
2795 }
2796 }
2797}
2798
2799fn generate_bridge_rx_execution_tokens(
2800 bridge_spec: &BridgeSpec,
2801 channel_index: usize,
2802 mission_mod: &Ident,
2803) -> (proc_macro2::TokenStream, proc_macro2::TokenStream) {
2804 let bridge_tuple_index = int2sliceindex(bridge_spec.tuple_index as u32);
2805 let channel = &bridge_spec.rx_channels[channel_index];
2806 let culist_index = channel
2807 .culist_index
2808 .unwrap_or_else(|| panic!("Bridge Rx channel missing output index"));
2809 let culist_index_ts = int2sliceindex(culist_index as u32);
2810 let monitor_index = syn::Index::from(
2811 channel
2812 .monitor_index
2813 .expect("Bridge Rx channel missing monitor index"),
2814 );
2815 let bridge_type = &bridge_spec.type_path;
2816 let const_ident = &channel.const_ident;
2817 (
2818 quote! {
2819 {
2820 let bridge = &mut bridges.#bridge_tuple_index;
2821 let cumsg_output = &mut msgs.#culist_index_ts;
2822 cumsg_output.metadata.process_time.start = clock.now().into();
2823 let maybe_error = bridge.receive(
2824 clock,
2825 &<#bridge_type as cu29::cubridge::CuBridge>::Rx::#const_ident,
2826 cumsg_output,
2827 );
2828 cumsg_output.metadata.process_time.end = clock.now().into();
2829 if let Err(error) = maybe_error {
2830 let decision = monitor.process_error(#monitor_index, CuTaskState::Process, &error);
2831 match decision {
2832 Decision::Abort => {
2833 debug!("Process: ABORT decision from monitoring. Task '{}' errored out during process. Skipping the processing of CL {}.", #mission_mod::TASKS_IDS[#monitor_index], clid);
2834 monitor.process_copperlist(&#mission_mod::collect_metadata(&culist))?;
2835 cl_manager.end_of_processing(clid)?;
2836 return Ok(());
2837 }
2838 Decision::Ignore => {
2839 debug!("Process: IGNORE decision from monitoring. Task '{}' errored out during process. The runtime will continue with a forced empty message.", #mission_mod::TASKS_IDS[#monitor_index]);
2840 let cumsg_output = &mut msgs.#culist_index_ts;
2841 cumsg_output.clear_payload();
2842 }
2843 Decision::Shutdown => {
2844 debug!("Process: SHUTDOWN decision from monitoring. Task '{}' errored out during process. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
2845 return Err(CuError::new_with_cause("Task errored out during process.", error));
2846 }
2847 }
2848 }
2849 }
2850 },
2851 quote! {},
2852 )
2853}
2854
2855fn generate_bridge_tx_execution_tokens(
2856 step: &CuExecutionStep,
2857 bridge_spec: &BridgeSpec,
2858 channel_index: usize,
2859 mission_mod: &Ident,
2860) -> (proc_macro2::TokenStream, proc_macro2::TokenStream) {
2861 let channel = &bridge_spec.tx_channels[channel_index];
2862 let monitor_index = syn::Index::from(
2863 channel
2864 .monitor_index
2865 .expect("Bridge Tx channel missing monitor index"),
2866 );
2867 let input_index = step
2868 .input_msg_indices_types
2869 .first()
2870 .map(|(idx, _)| int2sliceindex(*idx))
2871 .expect("Bridge Tx channel should have exactly one input");
2872 let bridge_tuple_index = int2sliceindex(bridge_spec.tuple_index as u32);
2873 let bridge_type = &bridge_spec.type_path;
2874 let const_ident = &channel.const_ident;
2875 (
2876 quote! {
2877 {
2878 let bridge = &mut bridges.#bridge_tuple_index;
2879 let cumsg_input = &mut msgs.#input_index;
2880 cumsg_input.metadata.process_time.start = clock.now().into();
2882 if let Err(error) = bridge.send(
2883 clock,
2884 &<#bridge_type as cu29::cubridge::CuBridge>::Tx::#const_ident,
2885 &*cumsg_input,
2886 ) {
2887 let decision = monitor.process_error(#monitor_index, CuTaskState::Process, &error);
2888 match decision {
2889 Decision::Abort => {
2890 debug!("Process: ABORT decision from monitoring. Task '{}' errored out during process. Skipping the processing of CL {}.", #mission_mod::TASKS_IDS[#monitor_index], clid);
2891 monitor.process_copperlist(&#mission_mod::collect_metadata(&culist))?;
2892 cl_manager.end_of_processing(clid)?;
2893 return Ok(());
2894 }
2895 Decision::Ignore => {
2896 debug!("Process: IGNORE decision from monitoring. Task '{}' errored out during process. The runtime will continue with a forced empty message.", #mission_mod::TASKS_IDS[#monitor_index]);
2897 }
2898 Decision::Shutdown => {
2899 debug!("Process: SHUTDOWN decision from monitoring. Task '{}' errored out during process. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
2900 return Err(CuError::new_with_cause("Task errored out during process.", error));
2901 }
2902 }
2903 }
2904 cumsg_input.metadata.process_time.end = clock.now().into();
2905 }
2906 },
2907 quote! {},
2908 )
2909}
2910
2911#[cfg(test)]
2912mod tests {
2913 #[test]
2915 fn test_compile_fail() {
2916 let t = trybuild::TestCases::new();
2917 t.compile_fail("tests/compile_fail/*/*.rs");
2918 }
2919}
2920#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
2921enum BridgeChannelDirection {
2922 Rx,
2923 Tx,
2924}
2925
2926#[derive(Clone, Debug, PartialEq, Eq, Hash)]
2927struct BridgeChannelKey {
2928 bridge_id: String,
2929 channel_id: String,
2930 direction: BridgeChannelDirection,
2931}
2932
2933#[derive(Clone)]
2934struct BridgeChannelSpec {
2935 id: String,
2936 const_ident: Ident,
2937 #[allow(dead_code)]
2938 msg_type: Type,
2939 config_index: usize,
2940 plan_node_id: Option<NodeId>,
2941 culist_index: Option<usize>,
2942 monitor_index: Option<usize>,
2943}
2944
2945#[derive(Clone)]
2946struct BridgeSpec {
2947 id: String,
2948 type_path: Type,
2949 config_index: usize,
2950 tuple_index: usize,
2951 monitor_index: Option<usize>,
2952 rx_channels: Vec<BridgeChannelSpec>,
2953 tx_channels: Vec<BridgeChannelSpec>,
2954}
2955
2956#[derive(Clone)]
2957struct ExecutionEntity {
2958 kind: ExecutionEntityKind,
2959}
2960
2961#[derive(Clone)]
2962enum ExecutionEntityKind {
2963 Task {
2964 task_index: usize,
2965 },
2966 BridgeRx {
2967 bridge_index: usize,
2968 channel_index: usize,
2969 },
2970 BridgeTx {
2971 bridge_index: usize,
2972 channel_index: usize,
2973 },
2974}