1use proc_macro::TokenStream;
2use quote::{format_ident, quote};
3use std::collections::HashMap;
4use std::fs::read_to_string;
5use syn::meta::parser;
6use syn::Fields::{Named, Unnamed};
7use syn::{
8 parse_macro_input, parse_quote, parse_str, Field, Fields, ItemImpl, ItemStruct, LitStr, Type,
9 TypeTuple,
10};
11
12#[cfg(feature = "macro_debug")]
13use crate::format::rustfmt_generated_code;
14use crate::utils::{config_id_to_bridge_const, config_id_to_enum, config_id_to_struct_member};
15use cu29_runtime::config::CuConfig;
16use cu29_runtime::config::{
17 read_configuration, BridgeChannelConfigRepresentation, CuGraph, Flavor, Node, NodeId,
18};
19use cu29_runtime::curuntime::{
20 compute_runtime_plan, find_task_type_for_id, CuExecutionLoop, CuExecutionStep, CuExecutionUnit,
21 CuTaskType,
22};
23use cu29_traits::{CuError, CuResult};
24use proc_macro2::{Ident, Span};
25
26mod format;
27mod utils;
28
29const DEFAULT_CLNB: usize = 10;
31
32#[inline]
33fn int2sliceindex(i: u32) -> syn::Index {
34 syn::Index::from(i as usize)
35}
36
37#[inline(always)]
38fn return_error(msg: String) -> TokenStream {
39 syn::Error::new(Span::call_site(), msg)
40 .to_compile_error()
41 .into()
42}
43
44#[proc_macro]
48pub fn gen_cumsgs(config_path_lit: TokenStream) -> TokenStream {
49 #[cfg(feature = "std")]
50 let std = true;
51
52 #[cfg(not(feature = "std"))]
53 let std = false;
54
55 let config = parse_macro_input!(config_path_lit as LitStr).value();
56 if !std::path::Path::new(&config_full_path(&config)).exists() {
57 return return_error(format!(
58 "The configuration file `{config}` does not exist. Please provide a valid path."
59 ));
60 }
61 #[cfg(feature = "macro_debug")]
62 eprintln!("[gen culist support with {config:?}]");
63 let cuconfig = match read_config(&config) {
64 Ok(cuconfig) => cuconfig,
65 Err(e) => return return_error(e.to_string()),
66 };
67 let graph = cuconfig
68 .get_graph(None) .expect("Could not find the specified mission for gen_cumsgs");
70 let task_specs = CuTaskSpecSet::from_graph(graph);
71 let channel_usage = collect_bridge_channel_usage(graph);
72 let mut bridge_specs = build_bridge_specs(&cuconfig, graph, &channel_usage);
73 let (culist_plan, exec_entities, plan_to_original) =
74 match build_execution_plan(graph, &task_specs, &mut bridge_specs) {
75 Ok(plan) => plan,
76 Err(e) => return return_error(format!("Could not compute copperlist plan: {e}")),
77 };
78 let task_member_names = collect_task_member_names(graph);
79 let (culist_order, node_output_positions) = collect_culist_metadata(
80 &culist_plan,
81 &exec_entities,
82 &mut bridge_specs,
83 &plan_to_original,
84 );
85
86 #[cfg(feature = "macro_debug")]
87 eprintln!(
88 "[The CuStampedDataSet matching tasks ids are {:?}]",
89 culist_order
90 );
91
92 let support = gen_culist_support(
93 &culist_plan,
94 &culist_order,
95 &node_output_positions,
96 &task_member_names,
97 );
98
99 let extra_imports = if !std {
100 quote! {
101 use core::fmt::Debug;
102 use core::fmt::Formatter;
103 use core::fmt::Result as FmtResult;
104 use alloc::vec;
105 }
106 } else {
107 quote! {
108 use std::fmt::Debug;
109 use std::fmt::Formatter;
110 use std::fmt::Result as FmtResult;
111 }
112 };
113
114 let with_uses = quote! {
115 mod cumsgs {
116 use cu29::bincode::Encode;
117 use cu29::bincode::enc::Encoder;
118 use cu29::bincode::error::EncodeError;
119 use cu29::bincode::Decode;
120 use cu29::bincode::de::Decoder;
121 use cu29::bincode::error::DecodeError;
122 use cu29::copperlist::CopperList;
123 use cu29::prelude::CuStampedData;
124 use cu29::prelude::ErasedCuStampedData;
125 use cu29::prelude::ErasedCuStampedDataSet;
126 use cu29::prelude::MatchingTasks;
127 use cu29::prelude::Serialize;
128 use cu29::prelude::CuMsg;
129 use cu29::prelude::CuMsgMetadata;
130 use cu29::prelude::CuListZeroedInit;
131 use cu29::prelude::CuCompactString;
132 #extra_imports
133 #support
134 }
135 use cumsgs::CuStampedDataSet;
136 type CuMsgs=CuStampedDataSet;
137 };
138 with_uses.into()
139}
140
141fn gen_culist_support(
143 runtime_plan: &CuExecutionLoop,
144 culist_indices_in_plan_order: &[usize],
145 node_output_positions: &HashMap<NodeId, usize>,
146 task_member_names: &[(NodeId, String)],
147) -> proc_macro2::TokenStream {
148 #[cfg(feature = "macro_debug")]
149 eprintln!("[Extract msgs types]");
150 let all_msgs_types_in_culist_order = extract_msg_types(runtime_plan);
151
152 let culist_size = all_msgs_types_in_culist_order.len();
153 let task_indices: Vec<_> = culist_indices_in_plan_order
154 .iter()
155 .map(|i| syn::Index::from(*i))
156 .collect();
157
158 #[cfg(feature = "macro_debug")]
159 eprintln!("[build the copperlist struct]");
160 let msgs_types_tuple: TypeTuple = build_culist_tuple(&all_msgs_types_in_culist_order);
161
162 #[cfg(feature = "macro_debug")]
163 eprintln!("[build the copperlist tuple bincode support]");
164 let msgs_types_tuple_encode = build_culist_tuple_encode(&all_msgs_types_in_culist_order);
165 let msgs_types_tuple_decode = build_culist_tuple_decode(&all_msgs_types_in_culist_order);
166
167 #[cfg(feature = "macro_debug")]
168 eprintln!("[build the copperlist tuple debug support]");
169 let msgs_types_tuple_debug = build_culist_tuple_debug(&all_msgs_types_in_culist_order);
170
171 #[cfg(feature = "macro_debug")]
172 eprintln!("[build the copperlist tuple serialize support]");
173 let msgs_types_tuple_serialize = build_culist_tuple_serialize(&all_msgs_types_in_culist_order);
174
175 #[cfg(feature = "macro_debug")]
176 eprintln!("[build the default tuple support]");
177 let msgs_types_tuple_default = build_culist_tuple_default(&all_msgs_types_in_culist_order);
178
179 #[cfg(feature = "macro_debug")]
180 eprintln!("[build erasedcumsgs]");
181
182 let erasedmsg_trait_impl = build_culist_erasedcumsgs(&all_msgs_types_in_culist_order);
183
184 let collect_metadata_function = quote! {
185 pub fn collect_metadata<'a>(culist: &'a CuList) -> [&'a CuMsgMetadata; #culist_size] {
186 [#( &culist.msgs.0.#task_indices.metadata, )*]
187 }
188 };
189
190 let task_name_literals: Vec<String> = task_member_names
191 .iter()
192 .map(|(_, name)| name.clone())
193 .collect();
194
195 let methods = task_member_names.iter().map(|(node_id, name)| {
196 let output_position = node_output_positions
197 .get(node_id)
198 .unwrap_or_else(|| panic!("Task {name} (id: {node_id}) not found in execution order"));
199
200 let fn_name = format_ident!("get_{}_output", name);
201 let payload_type = all_msgs_types_in_culist_order[*output_position].clone();
202 let index = syn::Index::from(*output_position);
203 quote! {
204 #[allow(dead_code)]
205 pub fn #fn_name(&self) -> &CuMsg<#payload_type> {
206 &self.0.#index
207 }
208 }
209 });
210
211 quote! {
213 #collect_metadata_function
214
215 pub struct CuStampedDataSet(pub #msgs_types_tuple);
216
217 pub type CuList = CopperList<CuStampedDataSet>;
218
219 impl CuStampedDataSet {
220 #(#methods)*
221
222 #[allow(dead_code)]
223 fn get_tuple(&self) -> &#msgs_types_tuple {
224 &self.0
225 }
226
227 #[allow(dead_code)]
228 fn get_tuple_mut(&mut self) -> &mut #msgs_types_tuple {
229 &mut self.0
230 }
231 }
232
233 impl MatchingTasks for CuStampedDataSet {
234 #[allow(dead_code)]
235 fn get_all_task_ids() -> &'static [&'static str] {
236 &[#(#task_name_literals),*]
237 }
238 }
239
240 #msgs_types_tuple_encode
242 #msgs_types_tuple_decode
243
244 #msgs_types_tuple_debug
246
247 #msgs_types_tuple_serialize
249
250 #msgs_types_tuple_default
252
253 #erasedmsg_trait_impl
255
256 impl CuListZeroedInit for CuStampedDataSet {
257 fn init_zeroed(&mut self) {
258 #(self.0.#task_indices.metadata.status_txt = CuCompactString::default();)*
259 }
260 }
261 }
262}
263
264fn gen_sim_support(
265 runtime_plan: &CuExecutionLoop,
266 exec_entities: &[ExecutionEntity],
267) -> proc_macro2::TokenStream {
268 #[cfg(feature = "macro_debug")]
269 eprintln!("[Sim: Build SimEnum]");
270 let plan_enum: Vec<proc_macro2::TokenStream> = runtime_plan
271 .steps
272 .iter()
273 .filter_map(|unit| match unit {
274 CuExecutionUnit::Step(step) => {
275 if !matches!(
276 exec_entities[step.node_id as usize].kind,
277 ExecutionEntityKind::Task { .. }
278 ) {
279 return None;
280 }
281 let enum_entry_name = config_id_to_enum(step.node.get_id().as_str());
282 let enum_ident = Ident::new(&enum_entry_name, Span::call_site());
283 let inputs: Vec<Type> = step
284 .input_msg_indices_types
285 .iter()
286 .map(|(_, t)| parse_str::<Type>(format!("CuMsg<{t}>").as_str()).unwrap())
287 .collect();
288 let output: Option<Type> = step
289 .output_msg_index_type
290 .as_ref()
291 .map(|(_, t)| parse_str::<Type>(format!("CuMsg<{t}>").as_str()).unwrap());
292 let no_output = parse_str::<Type>("CuMsg<()>").unwrap();
293 let output = output.as_ref().unwrap_or(&no_output);
294
295 let inputs_type = if inputs.is_empty() {
296 quote! { () }
297 } else if inputs.len() == 1 {
298 let input = inputs.first().unwrap();
299 quote! { &'a #input }
300 } else {
301 quote! { &'a (#(&'a #inputs),*) }
302 };
303
304 Some(quote! {
305 #enum_ident(CuTaskCallbackState<#inputs_type, &'a mut #output>)
306 })
307 }
308 CuExecutionUnit::Loop(_) => {
309 todo!("Needs to be implemented")
310 }
311 })
312 .collect();
313 quote! {
314 #[allow(dead_code)]
316 pub enum SimStep<'a> {
317 #(#plan_enum),*
318 }
319 }
320}
321
322#[proc_macro_attribute]
326pub fn copper_runtime(args: TokenStream, input: TokenStream) -> TokenStream {
327 #[cfg(feature = "macro_debug")]
328 eprintln!("[entry]");
329 let mut application_struct = parse_macro_input!(input as ItemStruct);
330
331 let application_name = &application_struct.ident;
332 let builder_name = format_ident!("{}Builder", application_name);
333
334 let mut config_file: Option<LitStr> = None;
335 let mut sim_mode = false;
336
337 #[cfg(feature = "std")]
338 let std = true;
339
340 #[cfg(not(feature = "std"))]
341 let std = false;
342
343 let attribute_config_parser = parser(|meta| {
345 if meta.path.is_ident("config") {
346 config_file = Some(meta.value()?.parse()?);
347 Ok(())
348 } else if meta.path.is_ident("sim_mode") {
349 if meta.input.peek(syn::Token![=]) {
351 meta.input.parse::<syn::Token![=]>()?;
352 let value: syn::LitBool = meta.input.parse()?;
353 sim_mode = value.value();
354 Ok(())
355 } else {
356 sim_mode = true;
358 Ok(())
359 }
360 } else {
361 Err(meta.error("unsupported property"))
362 }
363 });
364
365 #[cfg(feature = "macro_debug")]
366 eprintln!("[parse]");
367 parse_macro_input!(args with attribute_config_parser);
369
370 let config_file = match config_file {
381 Some(file) => file.value(),
382 None => {
383 return return_error(
384 "Expected config file attribute like #[CopperRuntime(config = \"path\")]"
385 .to_string(),
386 )
387 }
388 };
389
390 if !std::path::Path::new(&config_full_path(&config_file)).exists() {
391 return return_error(format!(
392 "The configuration file `{config_file}` does not exist. Please provide a valid path."
393 ));
394 }
395
396 let copper_config = match read_config(&config_file) {
397 Ok(cuconfig) => cuconfig,
398 Err(e) => return return_error(e.to_string()),
399 };
400 let copper_config_content = match read_to_string(config_full_path(config_file.as_str())) {
401 Ok(ok) => ok,
402 Err(e) => return return_error(format!("Could not read the config file (should not happen because we just succeeded just before). {e}"))
403 };
404
405 #[cfg(feature = "macro_debug")]
406 eprintln!("[build monitor type]");
407 let monitor_type = if let Some(monitor_config) = copper_config.get_monitor_config() {
408 let monitor_type = parse_str::<Type>(monitor_config.get_type())
409 .expect("Could not transform the monitor type name into a Rust type.");
410 quote! { #monitor_type }
411 } else {
412 quote! { NoMonitor }
413 };
414
415 #[cfg(feature = "macro_debug")]
417 eprintln!("[build runtime field]");
418 let runtime_field: Field = if sim_mode {
420 parse_quote! {
421 copper_runtime: cu29::curuntime::CuRuntime<CuSimTasks, CuBridges, CuStampedDataSet, #monitor_type, #DEFAULT_CLNB>
422 }
423 } else {
424 parse_quote! {
425 copper_runtime: cu29::curuntime::CuRuntime<CuTasks, CuBridges, CuStampedDataSet, #monitor_type, #DEFAULT_CLNB>
426 }
427 };
428
429 #[cfg(feature = "macro_debug")]
430 eprintln!("[match struct anonymity]");
431 match &mut application_struct.fields {
432 Named(fields_named) => {
433 fields_named.named.push(runtime_field);
434 }
435 Unnamed(fields_unnamed) => {
436 fields_unnamed.unnamed.push(runtime_field);
437 }
438 Fields::Unit => {
439 panic!("This struct is a unit struct, it should have named or unnamed fields. use struct Something {{}} and not struct Something;")
440 }
441 };
442
443 let all_missions = copper_config.graphs.get_all_missions_graphs();
444 let mut all_missions_tokens = Vec::<proc_macro2::TokenStream>::new();
445 for (mission, graph) in &all_missions {
446 let mission_mod = parse_str::<Ident>(mission.as_str())
447 .expect("Could not make an identifier of the mission name");
448
449 #[cfg(feature = "macro_debug")]
450 eprintln!("[extract tasks ids & types]");
451 let task_specs = CuTaskSpecSet::from_graph(graph);
452
453 let culist_channel_usage = collect_bridge_channel_usage(graph);
454 let mut culist_bridge_specs =
455 build_bridge_specs(&copper_config, graph, &culist_channel_usage);
456 let (culist_plan, culist_exec_entities, culist_plan_to_original) =
457 match build_execution_plan(graph, &task_specs, &mut culist_bridge_specs) {
458 Ok(plan) => plan,
459 Err(e) => return return_error(format!("Could not compute copperlist plan: {e}")),
460 };
461 let task_member_names = collect_task_member_names(graph);
462 let (culist_call_order, node_output_positions) = collect_culist_metadata(
463 &culist_plan,
464 &culist_exec_entities,
465 &mut culist_bridge_specs,
466 &culist_plan_to_original,
467 );
468
469 #[cfg(feature = "macro_debug")]
470 {
471 eprintln!("[runtime plan for mission {mission}]");
472 eprintln!("{culist_plan:?}");
473 }
474
475 let culist_support: proc_macro2::TokenStream = gen_culist_support(
476 &culist_plan,
477 &culist_call_order,
478 &node_output_positions,
479 &task_member_names,
480 );
481
482 let ids = build_monitored_ids(&task_specs.ids, &mut culist_bridge_specs);
483
484 let bridge_types: Vec<Type> = culist_bridge_specs
485 .iter()
486 .map(|spec| spec.type_path.clone())
487 .collect();
488 let bridges_type_tokens: proc_macro2::TokenStream = if bridge_types.is_empty() {
489 quote! { () }
490 } else {
491 let tuple: TypeTuple = parse_quote! { (#(#bridge_types),*,) };
492 quote! { #tuple }
493 };
494
495 let bridge_binding_idents: Vec<Ident> = culist_bridge_specs
496 .iter()
497 .enumerate()
498 .map(|(idx, _)| format_ident!("bridge_{idx}"))
499 .collect();
500
501 let bridge_init_statements: Vec<proc_macro2::TokenStream> = culist_bridge_specs
502 .iter()
503 .enumerate()
504 .map(|(idx, spec)| {
505 let binding_ident = &bridge_binding_idents[idx];
506 let bridge_type = &spec.type_path;
507 let bridge_name = spec.id.clone();
508 let config_index = syn::Index::from(spec.config_index);
509 let tx_configs: Vec<proc_macro2::TokenStream> = spec
510 .tx_channels
511 .iter()
512 .map(|channel| {
513 let const_ident = &channel.const_ident;
514 let channel_name = channel.id.clone();
515 let channel_config_index = syn::Index::from(channel.config_index);
516 quote! {
517 {
518 let (channel_route, channel_config) = match &bridge_cfg.channels[#channel_config_index] {
519 cu29::config::BridgeChannelConfigRepresentation::Tx { route, config, .. } => {
520 (route.clone(), config.clone())
521 }
522 _ => panic!(
523 "Bridge '{}' channel '{}' expected to be Tx",
524 #bridge_name,
525 #channel_name
526 ),
527 };
528 cu29::cubridge::BridgeChannelConfig::from_static(
529 &<#bridge_type as cu29::cubridge::CuBridge>::Tx::#const_ident,
530 channel_route,
531 channel_config,
532 )
533 }
534 }
535 })
536 .collect();
537 let rx_configs: Vec<proc_macro2::TokenStream> = spec
538 .rx_channels
539 .iter()
540 .map(|channel| {
541 let const_ident = &channel.const_ident;
542 let channel_name = channel.id.clone();
543 let channel_config_index = syn::Index::from(channel.config_index);
544 quote! {
545 {
546 let (channel_route, channel_config) = match &bridge_cfg.channels[#channel_config_index] {
547 cu29::config::BridgeChannelConfigRepresentation::Rx { route, config, .. } => {
548 (route.clone(), config.clone())
549 }
550 _ => panic!(
551 "Bridge '{}' channel '{}' expected to be Rx",
552 #bridge_name,
553 #channel_name
554 ),
555 };
556 cu29::cubridge::BridgeChannelConfig::from_static(
557 &<#bridge_type as cu29::cubridge::CuBridge>::Rx::#const_ident,
558 channel_route,
559 channel_config,
560 )
561 }
562 }
563 })
564 .collect();
565 quote! {
566 let #binding_ident = {
567 let bridge_cfg = config
568 .bridges
569 .get(#config_index)
570 .unwrap_or_else(|| panic!("Bridge '{}' missing from configuration", #bridge_name));
571 let tx_channels: &[cu29::cubridge::BridgeChannelConfig<
572 <<#bridge_type as cu29::cubridge::CuBridge>::Tx as cu29::cubridge::BridgeChannelSet>::Id,
573 >] = &[#(#tx_configs),*];
574 let rx_channels: &[cu29::cubridge::BridgeChannelConfig<
575 <<#bridge_type as cu29::cubridge::CuBridge>::Rx as cu29::cubridge::BridgeChannelSet>::Id,
576 >] = &[#(#rx_configs),*];
577 <#bridge_type as cu29::cubridge::CuBridge>::new(
578 bridge_cfg.config.as_ref(),
579 tx_channels,
580 rx_channels,
581 )?
582 };
583 }
584 })
585 .collect();
586
587 let bridges_instanciator = if culist_bridge_specs.is_empty() {
588 quote! {
589 pub fn bridges_instanciator(_config: &CuConfig) -> CuResult<CuBridges> {
590 Ok(())
591 }
592 }
593 } else {
594 let bridge_bindings = bridge_binding_idents.clone();
595 quote! {
596 pub fn bridges_instanciator(config: &CuConfig) -> CuResult<CuBridges> {
597 #(#bridge_init_statements)*
598 Ok((#(#bridge_bindings),*,))
599 }
600 }
601 };
602
603 let all_sim_tasks_types: Vec<Type> = task_specs.ids
604 .iter()
605 .zip(&task_specs.cutypes)
606 .zip(&task_specs.sim_task_types)
607 .zip(&task_specs.background_flags)
608 .zip(&task_specs.run_in_sim_flags)
609 .map(|((((task_id, task_type), sim_type), background), run_in_sim)| {
610 match task_type {
611 CuTaskType::Source => {
612 if *background {
613 panic!("CuSrcTask {task_id} cannot be a background task, it should be a regular task.");
614 }
615 if *run_in_sim {
616 sim_type.clone()
617 } else {
618 let msg_type = graph
619 .get_node_output_msg_type(task_id.as_str())
620 .unwrap_or_else(|| panic!("CuSrcTask {task_id} should have an outgoing connection with a valid output msg type"));
621 let sim_task_name = format!("CuSimSrcTask<{msg_type}>");
622 parse_str(sim_task_name.as_str()).unwrap_or_else(|_| panic!("Could not build the placeholder for simulation: {sim_task_name}"))
623 }
624 }
625 CuTaskType::Regular => {
626 sim_type.clone()
629 },
630 CuTaskType::Sink => {
631 if *background {
632 panic!("CuSinkTask {task_id} cannot be a background task, it should be a regular task.");
633 }
634
635 if *run_in_sim {
636 sim_type.clone()
638 }
639 else {
640 let msg_type = graph
642 .get_node_input_msg_type(task_id.as_str())
643 .unwrap_or_else(|| panic!("CuSinkTask {task_id} should have an incoming connection with a valid input msg type"));
644 let sim_task_name = format!("CuSimSinkTask<{msg_type}>");
645 parse_str(sim_task_name.as_str()).unwrap_or_else(|_| panic!("Could not build the placeholder for simulation: {sim_task_name}"))
646 }
647 }
648 }
649 })
650 .collect();
651
652 #[cfg(feature = "macro_debug")]
653 eprintln!("[build task tuples]");
654
655 let task_types = &task_specs.task_types;
656 let task_types_tuple: TypeTuple = if task_types.is_empty() {
659 parse_quote! { () }
660 } else {
661 parse_quote! { (#(#task_types),*,) }
662 };
663
664 let task_types_tuple_sim: TypeTuple = if all_sim_tasks_types.is_empty() {
665 parse_quote! { () }
666 } else {
667 parse_quote! { (#(#all_sim_tasks_types),*,) }
668 };
669
670 #[cfg(feature = "macro_debug")]
671 eprintln!("[gen instances]");
672 let task_sim_instances_init_code = all_sim_tasks_types.iter().enumerate().map(|(index, ty)| {
674 let additional_error_info = format!(
675 "Failed to get create instance for {}, instance index {}.",
676 task_specs.type_names[index], index
677 );
678
679 quote! {
680 <#ty>::new(all_instances_configs[#index]).map_err(|e| e.add_cause(#additional_error_info))?
681 }
682 }).collect::<Vec<_>>();
683
684 let task_instances_init_code = task_specs.instantiation_types.iter().zip(&task_specs.background_flags).enumerate().map(|(index, (task_type, background))| {
685 let additional_error_info = format!(
686 "Failed to get create instance for {}, instance index {}.",
687 task_specs.type_names[index], index
688 );
689 if *background {
690 quote! {
691 #task_type::new(all_instances_configs[#index], threadpool.clone()).map_err(|e| e.add_cause(#additional_error_info))?
692 }
693 } else {
694 quote! {
695 #task_type::new(all_instances_configs[#index]).map_err(|e| e.add_cause(#additional_error_info))?
696 }
697 }
698 }).collect::<Vec<_>>();
699
700 let (
703 task_restore_code,
704 task_start_calls,
705 task_stop_calls,
706 task_preprocess_calls,
707 task_postprocess_calls,
708 ): (Vec<_>, Vec<_>, Vec<_>, Vec<_>, Vec<_>) = itertools::multiunzip(
709 (0..task_specs.task_types.len())
710 .map(|index| {
711 let task_index = int2sliceindex(index as u32);
712 let task_tuple_index = syn::Index::from(index);
713 let task_enum_name = config_id_to_enum(&task_specs.ids[index]);
714 let enum_name = Ident::new(&task_enum_name, Span::call_site());
715 (
716 quote! {
718 tasks.#task_tuple_index.thaw(&mut decoder).map_err(|e| CuError::new_with_cause("Failed to thaw", e))?
719 },
720 { let monitoring_action = quote! {
722 let decision = self.copper_runtime.monitor.process_error(#index, CuTaskState::Start, &error);
723 match decision {
724 Decision::Abort => {
725 debug!("Start: ABORT decision from monitoring. Task '{}' errored out \
726 during start. Aborting all the other starts.", #mission_mod::TASKS_IDS[#index]);
727 return Ok(());
728
729 }
730 Decision::Ignore => {
731 debug!("Start: IGNORE decision from monitoring. Task '{}' errored out \
732 during start. The runtime will continue.", #mission_mod::TASKS_IDS[#index]);
733 }
734 Decision::Shutdown => {
735 debug!("Start: SHUTDOWN decision from monitoring. Task '{}' errored out \
736 during start. The runtime cannot continue.", #mission_mod::TASKS_IDS[#index]);
737 return Err(CuError::new_with_cause("Task errored out during start.", error));
738 }
739 }
740 };
741
742 let call_sim_callback = if sim_mode {
743 quote! {
744 let ovr = sim_callback(SimStep::#enum_name(CuTaskCallbackState::Start));
746
747 let doit = if let SimOverride::Errored(reason) = ovr {
748 let error: CuError = reason.into();
749 #monitoring_action
750 false
751 }
752 else {
753 ovr == SimOverride::ExecuteByRuntime
754 };
755 }
756 } else {
757 quote! {
758 let doit = true; }
760 };
761
762
763 quote! {
764 #call_sim_callback
765 if doit {
766 let task = &mut self.copper_runtime.tasks.#task_index;
767 if let Err(error) = task.start(&self.copper_runtime.clock) {
768 #monitoring_action
769 }
770 }
771 }
772 },
773 { let monitoring_action = quote! {
775 let decision = self.copper_runtime.monitor.process_error(#index, CuTaskState::Stop, &error);
776 match decision {
777 Decision::Abort => {
778 debug!("Stop: ABORT decision from monitoring. Task '{}' errored out \
779 during stop. Aborting all the other starts.", #mission_mod::TASKS_IDS[#index]);
780 return Ok(());
781
782 }
783 Decision::Ignore => {
784 debug!("Stop: IGNORE decision from monitoring. Task '{}' errored out \
785 during stop. The runtime will continue.", #mission_mod::TASKS_IDS[#index]);
786 }
787 Decision::Shutdown => {
788 debug!("Stop: SHUTDOWN decision from monitoring. Task '{}' errored out \
789 during stop. The runtime cannot continue.", #mission_mod::TASKS_IDS[#index]);
790 return Err(CuError::new_with_cause("Task errored out during stop.", error));
791 }
792 }
793 };
794 let call_sim_callback = if sim_mode {
795 quote! {
796 let ovr = sim_callback(SimStep::#enum_name(CuTaskCallbackState::Stop));
798
799 let doit = if let SimOverride::Errored(reason) = ovr {
800 let error: CuError = reason.into();
801 #monitoring_action
802 false
803 }
804 else {
805 ovr == SimOverride::ExecuteByRuntime
806 };
807 }
808 } else {
809 quote! {
810 let doit = true; }
812 };
813 quote! {
814 #call_sim_callback
815 if doit {
816 let task = &mut self.copper_runtime.tasks.#task_index;
817 if let Err(error) = task.stop(&self.copper_runtime.clock) {
818 #monitoring_action
819 }
820 }
821 }
822 },
823 { let monitoring_action = quote! {
825 let decision = monitor.process_error(#index, CuTaskState::Preprocess, &error);
826 match decision {
827 Decision::Abort => {
828 debug!("Preprocess: ABORT decision from monitoring. Task '{}' errored out \
829 during preprocess. Aborting all the other starts.", #mission_mod::TASKS_IDS[#index]);
830 return Ok(());
831
832 }
833 Decision::Ignore => {
834 debug!("Preprocess: IGNORE decision from monitoring. Task '{}' errored out \
835 during preprocess. The runtime will continue.", #mission_mod::TASKS_IDS[#index]);
836 }
837 Decision::Shutdown => {
838 debug!("Preprocess: SHUTDOWN decision from monitoring. Task '{}' errored out \
839 during preprocess. The runtime cannot continue.", #mission_mod::TASKS_IDS[#index]);
840 return Err(CuError::new_with_cause("Task errored out during preprocess.", error));
841 }
842 }
843 };
844 let call_sim_callback = if sim_mode {
845 quote! {
846 let ovr = sim_callback(SimStep::#enum_name(CuTaskCallbackState::Preprocess));
848
849 let doit = if let SimOverride::Errored(reason) = ovr {
850 let error: CuError = reason.into();
851 #monitoring_action
852 false
853 } else {
854 ovr == SimOverride::ExecuteByRuntime
855 };
856 }
857 } else {
858 quote! {
859 let doit = true; }
861 };
862 quote! {
863 #call_sim_callback
864 if doit {
865 if let Err(error) = tasks.#task_index.preprocess(clock) {
866 #monitoring_action
867 }
868 }
869 }
870 },
871 { let monitoring_action = quote! {
873 let decision = monitor.process_error(#index, CuTaskState::Postprocess, &error);
874 match decision {
875 Decision::Abort => {
876 debug!("Postprocess: ABORT decision from monitoring. Task '{}' errored out \
877 during postprocess. Aborting all the other starts.", #mission_mod::TASKS_IDS[#index]);
878 return Ok(());
879
880 }
881 Decision::Ignore => {
882 debug!("Postprocess: IGNORE decision from monitoring. Task '{}' errored out \
883 during postprocess. The runtime will continue.", #mission_mod::TASKS_IDS[#index]);
884 }
885 Decision::Shutdown => {
886 debug!("Postprocess: SHUTDOWN decision from monitoring. Task '{}' errored out \
887 during postprocess. The runtime cannot continue.", #mission_mod::TASKS_IDS[#index]);
888 return Err(CuError::new_with_cause("Task errored out during postprocess.", error));
889 }
890 }
891 };
892 let call_sim_callback = if sim_mode {
893 quote! {
894 let ovr = sim_callback(SimStep::#enum_name(CuTaskCallbackState::Postprocess));
896
897 let doit = if let SimOverride::Errored(reason) = ovr {
898 let error: CuError = reason.into();
899 #monitoring_action
900 false
901 } else {
902 ovr == SimOverride::ExecuteByRuntime
903 };
904 }
905 } else {
906 quote! {
907 let doit = true; }
909 };
910 quote! {
911 #call_sim_callback
912 if doit {
913 if let Err(error) = tasks.#task_index.postprocess(clock) {
914 #monitoring_action
915 }
916 }
917 }
918 }
919 )
920 })
921 );
922
923 let bridge_start_calls: Vec<proc_macro2::TokenStream> = culist_bridge_specs
924 .iter()
925 .map(|spec| {
926 let bridge_index = int2sliceindex(spec.tuple_index as u32);
927 let monitor_index = syn::Index::from(
928 spec.monitor_index
929 .expect("Bridge missing monitor index for start"),
930 );
931 quote! {
932 {
933 let bridge = &mut self.copper_runtime.bridges.#bridge_index;
934 if let Err(error) = bridge.start(&self.copper_runtime.clock) {
935 let decision = self.copper_runtime.monitor.process_error(#monitor_index, CuTaskState::Start, &error);
936 match decision {
937 Decision::Abort => {
938 debug!("Start: ABORT decision from monitoring. Task '{}' errored out during start. Aborting all the other starts.", #mission_mod::TASKS_IDS[#monitor_index]);
939 return Ok(());
940 }
941 Decision::Ignore => {
942 debug!("Start: IGNORE decision from monitoring. Task '{}' errored out during start. The runtime will continue.", #mission_mod::TASKS_IDS[#monitor_index]);
943 }
944 Decision::Shutdown => {
945 debug!("Start: SHUTDOWN decision from monitoring. Task '{}' errored out during start. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
946 return Err(CuError::new_with_cause("Task errored out during start.", error));
947 }
948 }
949 }
950 }
951 }
952 })
953 .collect();
954
955 let bridge_stop_calls: Vec<proc_macro2::TokenStream> = culist_bridge_specs
956 .iter()
957 .map(|spec| {
958 let bridge_index = int2sliceindex(spec.tuple_index as u32);
959 let monitor_index = syn::Index::from(
960 spec.monitor_index
961 .expect("Bridge missing monitor index for stop"),
962 );
963 quote! {
964 {
965 let bridge = &mut self.copper_runtime.bridges.#bridge_index;
966 if let Err(error) = bridge.stop(&self.copper_runtime.clock) {
967 let decision = self.copper_runtime.monitor.process_error(#monitor_index, CuTaskState::Stop, &error);
968 match decision {
969 Decision::Abort => {
970 debug!("Stop: ABORT decision from monitoring. Task '{}' errored out during stop. Aborting all the other stops.", #mission_mod::TASKS_IDS[#monitor_index]);
971 return Ok(());
972 }
973 Decision::Ignore => {
974 debug!("Stop: IGNORE decision from monitoring. Task '{}' errored out during stop. The runtime will continue.", #mission_mod::TASKS_IDS[#monitor_index]);
975 }
976 Decision::Shutdown => {
977 debug!("Stop: SHUTDOWN decision from monitoring. Task '{}' errored out during stop. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
978 return Err(CuError::new_with_cause("Task errored out during stop.", error));
979 }
980 }
981 }
982 }
983 }
984 })
985 .collect();
986
987 let bridge_preprocess_calls: Vec<proc_macro2::TokenStream> = culist_bridge_specs
988 .iter()
989 .map(|spec| {
990 let bridge_index = int2sliceindex(spec.tuple_index as u32);
991 let monitor_index = syn::Index::from(
992 spec.monitor_index
993 .expect("Bridge missing monitor index for preprocess"),
994 );
995 quote! {
996 {
997 let bridge = &mut bridges.#bridge_index;
998 if let Err(error) = bridge.preprocess(clock) {
999 let decision = monitor.process_error(#monitor_index, CuTaskState::Preprocess, &error);
1000 match decision {
1001 Decision::Abort => {
1002 debug!("Preprocess: ABORT decision from monitoring. Task '{}' errored out during preprocess. Aborting all the other starts.", #mission_mod::TASKS_IDS[#monitor_index]);
1003 return Ok(());
1004 }
1005 Decision::Ignore => {
1006 debug!("Preprocess: IGNORE decision from monitoring. Task '{}' errored out during preprocess. The runtime will continue.", #mission_mod::TASKS_IDS[#monitor_index]);
1007 }
1008 Decision::Shutdown => {
1009 debug!("Preprocess: SHUTDOWN decision from monitoring. Task '{}' errored out during preprocess. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
1010 return Err(CuError::new_with_cause("Task errored out during preprocess.", error));
1011 }
1012 }
1013 }
1014 }
1015 }
1016 })
1017 .collect();
1018
1019 let bridge_postprocess_calls: Vec<proc_macro2::TokenStream> = culist_bridge_specs
1020 .iter()
1021 .map(|spec| {
1022 let bridge_index = int2sliceindex(spec.tuple_index as u32);
1023 let monitor_index = syn::Index::from(
1024 spec.monitor_index
1025 .expect("Bridge missing monitor index for postprocess"),
1026 );
1027 quote! {
1028 {
1029 let bridge = &mut bridges.#bridge_index;
1030 if let Err(error) = bridge.postprocess(clock) {
1031 let decision = monitor.process_error(#monitor_index, CuTaskState::Postprocess, &error);
1032 match decision {
1033 Decision::Abort => {
1034 debug!("Postprocess: ABORT decision from monitoring. Task '{}' errored out during postprocess. Aborting all the other starts.", #mission_mod::TASKS_IDS[#monitor_index]);
1035 return Ok(());
1036 }
1037 Decision::Ignore => {
1038 debug!("Postprocess: IGNORE decision from monitoring. Task '{}' errored out during postprocess. The runtime will continue.", #mission_mod::TASKS_IDS[#monitor_index]);
1039 }
1040 Decision::Shutdown => {
1041 debug!("Postprocess: SHUTDOWN decision from monitoring. Task '{}' errored out during postprocess. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
1042 return Err(CuError::new_with_cause("Task errored out during postprocess.", error));
1043 }
1044 }
1045 }
1046 }
1047 }
1048 })
1049 .collect();
1050
1051 let mut start_calls = bridge_start_calls;
1052 start_calls.extend(task_start_calls);
1053 let mut stop_calls = task_stop_calls;
1054 stop_calls.extend(bridge_stop_calls);
1055 let mut preprocess_calls = bridge_preprocess_calls;
1056 preprocess_calls.extend(task_preprocess_calls);
1057 let mut postprocess_calls = task_postprocess_calls;
1058 postprocess_calls.extend(bridge_postprocess_calls);
1059
1060 let runtime_plan_code_and_logging: Vec<(
1061 proc_macro2::TokenStream,
1062 proc_macro2::TokenStream,
1063 )> = culist_plan
1064 .steps
1065 .iter()
1066 .map(|unit| match unit {
1067 CuExecutionUnit::Step(step) => {
1068 #[cfg(feature = "macro_debug")]
1069 eprintln!(
1070 "{} -> {} as {:?}. task_id: {} Input={:?}, Output={:?}",
1071 step.node.get_id(),
1072 step.node.get_type(),
1073 step.task_type,
1074 step.node_id,
1075 step.input_msg_indices_types,
1076 step.output_msg_index_type
1077 );
1078
1079 match &culist_exec_entities[step.node_id as usize].kind {
1080 ExecutionEntityKind::Task { task_index } => generate_task_execution_tokens(
1081 step,
1082 *task_index,
1083 &task_specs,
1084 sim_mode,
1085 &mission_mod,
1086 ),
1087 ExecutionEntityKind::BridgeRx {
1088 bridge_index,
1089 channel_index,
1090 } => {
1091 let spec = &culist_bridge_specs[*bridge_index];
1092 generate_bridge_rx_execution_tokens(spec, *channel_index, &mission_mod)
1093 }
1094 ExecutionEntityKind::BridgeTx {
1095 bridge_index,
1096 channel_index,
1097 } => {
1098 let spec = &culist_bridge_specs[*bridge_index];
1099 generate_bridge_tx_execution_tokens(
1100 step,
1101 spec,
1102 *channel_index,
1103 &mission_mod,
1104 )
1105 }
1106 }
1107 }
1108 CuExecutionUnit::Loop(_) => {
1109 panic!("Execution loops are not supported in runtime generation");
1110 }
1111 })
1112 .collect();
1113
1114 let sim_support = if sim_mode {
1115 Some(gen_sim_support(&culist_plan, &culist_exec_entities))
1116 } else {
1117 None
1118 };
1119
1120 let (new, run_one_iteration, start_all_tasks, stop_all_tasks, run) = if sim_mode {
1121 (
1122 quote! {
1123 fn new(clock:RobotClock, unified_logger: Arc<Mutex<L>>, config_override: Option<CuConfig>, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<Self>
1124 },
1125 quote! {
1126 fn run_one_iteration(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()>
1127 },
1128 quote! {
1129 fn start_all_tasks(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()>
1130 },
1131 quote! {
1132 fn stop_all_tasks(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()>
1133 },
1134 quote! {
1135 fn run(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()>
1136 },
1137 )
1138 } else {
1139 (
1140 if std {
1141 quote! {
1142 fn new(clock:RobotClock, unified_logger: Arc<Mutex<L>>, config_override: Option<CuConfig>) -> CuResult<Self>
1143 }
1144 } else {
1145 quote! {
1146 fn new(clock:RobotClock, unified_logger: Arc<Mutex<L>>) -> CuResult<Self>
1148 }
1149 },
1150 quote! {
1151 fn run_one_iteration(&mut self) -> CuResult<()>
1152 },
1153 quote! {
1154 fn start_all_tasks(&mut self) -> CuResult<()>
1155 },
1156 quote! {
1157 fn stop_all_tasks(&mut self) -> CuResult<()>
1158 },
1159 quote! {
1160 fn run(&mut self) -> CuResult<()>
1161 },
1162 )
1163 };
1164
1165 let sim_callback_arg = if sim_mode {
1166 Some(quote!(sim_callback))
1167 } else {
1168 None
1169 };
1170
1171 let app_trait = if sim_mode {
1172 quote!(CuSimApplication)
1173 } else {
1174 quote!(CuApplication)
1175 };
1176
1177 let sim_callback_on_new_calls = task_specs.ids.iter().enumerate().map(|(i, id)| {
1178 let enum_name = config_id_to_enum(id);
1179 let enum_ident = Ident::new(&enum_name, Span::call_site());
1180 quote! {
1181 sim_callback(SimStep::#enum_ident(CuTaskCallbackState::New(all_instances_configs[#i].cloned())));
1183 }
1184 });
1185
1186 let sim_callback_on_new = if sim_mode {
1187 Some(quote! {
1188 let graph = config.get_graph(Some(#mission)).expect("Could not find the mission #mission");
1189 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
1190 .get_all_nodes()
1191 .iter()
1192 .map(|(_, node)| node.get_instance_config())
1193 .collect();
1194 #(#sim_callback_on_new_calls)*
1195 })
1196 } else {
1197 None
1198 };
1199
1200 let (runtime_plan_code, preprocess_logging_calls): (Vec<_>, Vec<_>) =
1201 itertools::multiunzip(runtime_plan_code_and_logging);
1202
1203 let config_load_stmt = if std {
1204 quote! {
1205 let config = if let Some(overridden_config) = config_override {
1206 debug!("CuConfig: Overridden programmatically: {}", overridden_config.serialize_ron());
1207 overridden_config
1208 } else if ::std::path::Path::new(config_filename).exists() {
1209 debug!("CuConfig: Reading configuration from file: {}", config_filename);
1210 cu29::config::read_configuration(config_filename)?
1211 } else {
1212 let original_config = <Self as #app_trait<S, L>>::get_original_config();
1213 debug!("CuConfig: Using the original configuration the project was compiled with: {}", &original_config);
1214 cu29::config::read_configuration_str(original_config, None)?
1215 };
1216 }
1217 } else {
1218 quote! {
1219 let original_config = <Self as #app_trait<S, L>>::get_original_config();
1221 debug!("CuConfig: Using the original configuration the project was compiled with: {}", &original_config);
1222 let config = cu29::config::read_configuration_str(original_config, None)?;
1223 }
1224 };
1225
1226 let kill_handler = if std {
1227 Some(quote! {
1228 ctrlc::set_handler(move || {
1229 STOP_FLAG.store(true, Ordering::SeqCst);
1230 }).expect("Error setting Ctrl-C handler");
1231 })
1232 } else {
1233 None
1234 };
1235
1236 let run_loop = if std {
1237 quote! {
1238 loop {
1239 let iter_start = self.copper_runtime.clock.now();
1240 let result = <Self as #app_trait<S, L>>::run_one_iteration(self, #sim_callback_arg);
1241
1242 if let Some(rate) = self.copper_runtime.runtime_config.rate_target_hz {
1243 let period: CuDuration = (1_000_000_000u64 / rate).into();
1244 let elapsed = self.copper_runtime.clock.now() - iter_start;
1245 if elapsed < period {
1246 std::thread::sleep(std::time::Duration::from_nanos(period.as_nanos() - elapsed.as_nanos()));
1247 }
1248 }
1249
1250 if STOP_FLAG.load(Ordering::SeqCst) || result.is_err() {
1251 break result;
1252 }
1253 }
1254 }
1255 } else {
1256 quote! {
1257 loop {
1258 let iter_start = self.copper_runtime.clock.now();
1259 let result = <Self as #app_trait<S, L>>::run_one_iteration(self, #sim_callback_arg);
1260 if let Some(rate) = self.copper_runtime.runtime_config.rate_target_hz {
1261 let period: CuDuration = (1_000_000_000u64 / rate).into();
1262 let elapsed = self.copper_runtime.clock.now() - iter_start;
1263 if elapsed < period {
1264 busy_wait_for(period - elapsed);
1265 }
1266 }
1267
1268 if STOP_FLAG.load(Ordering::SeqCst) || result.is_err() {
1269 break result;
1270 }
1271 }
1272 }
1273 };
1274
1275 #[cfg(feature = "macro_debug")]
1276 eprintln!("[build the run methods]");
1277 let run_methods = quote! {
1278
1279 #run_one_iteration {
1280
1281 let runtime = &mut self.copper_runtime;
1283 let clock = &runtime.clock;
1284 let monitor = &mut runtime.monitor;
1285 let tasks = &mut runtime.tasks;
1286 let bridges = &mut runtime.bridges;
1287 let cl_manager = &mut runtime.copperlists_manager;
1288 let kf_manager = &mut runtime.keyframes_manager;
1289
1290 #(#preprocess_calls)*
1292
1293 let culist = cl_manager.inner.create().expect("Ran out of space for copper lists"); let clid = culist.id;
1295 kf_manager.reset(clid, clock); culist.change_state(cu29::copperlist::CopperListState::Processing);
1297 culist.msgs.init_zeroed();
1298 {
1299 let msgs = &mut culist.msgs.0;
1300 #(#runtime_plan_code)*
1301 } monitor.process_copperlist(&#mission_mod::collect_metadata(&culist))?;
1303
1304 #(#preprocess_logging_calls)*
1306
1307 cl_manager.end_of_processing(clid)?;
1308 kf_manager.end_of_processing(clid)?;
1309
1310 #(#postprocess_calls)*
1312 Ok(())
1313 }
1314
1315 fn restore_keyframe(&mut self, keyframe: &KeyFrame) -> CuResult<()> {
1316 let runtime = &mut self.copper_runtime;
1317 let clock = &runtime.clock;
1318 let tasks = &mut runtime.tasks;
1319 let config = cu29::bincode::config::standard();
1320 let reader = cu29::bincode::de::read::SliceReader::new(&keyframe.serialized_tasks);
1321 let mut decoder = DecoderImpl::new(reader, config, ());
1322 #(#task_restore_code);*;
1323 Ok(())
1324 }
1325
1326 #start_all_tasks {
1327 #(#start_calls)*
1328 self.copper_runtime.monitor.start(&self.copper_runtime.clock)?;
1329 Ok(())
1330 }
1331
1332 #stop_all_tasks {
1333 #(#stop_calls)*
1334 self.copper_runtime.monitor.stop(&self.copper_runtime.clock)?;
1335 Ok(())
1336 }
1337
1338 #run {
1339 static STOP_FLAG: AtomicBool = AtomicBool::new(false);
1340
1341 #kill_handler
1342
1343 <Self as #app_trait<S, L>>::start_all_tasks(self, #sim_callback_arg)?;
1344 let result = #run_loop;
1345
1346 if result.is_err() {
1347 error!("A task errored out: {}", &result);
1348 }
1349 <Self as #app_trait<S, L>>::stop_all_tasks(self, #sim_callback_arg)?;
1350 result
1351 }
1352 };
1353
1354 let tasks_type = if sim_mode {
1355 quote!(CuSimTasks)
1356 } else {
1357 quote!(CuTasks)
1358 };
1359
1360 let tasks_instanciator_fn = if sim_mode {
1361 quote!(tasks_instanciator_sim)
1362 } else {
1363 quote!(tasks_instanciator)
1364 };
1365
1366 let app_impl_decl = if sim_mode {
1367 quote!(impl<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static> CuSimApplication<S, L> for #application_name)
1368 } else {
1369 quote!(impl<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static> CuApplication<S, L> for #application_name)
1370 };
1371
1372 let simstep_type_decl = if sim_mode {
1373 quote!(
1374 type Step<'z> = SimStep<'z>;
1375 )
1376 } else {
1377 quote!()
1378 };
1379
1380 #[cfg(feature = "std")]
1381 #[cfg(feature = "macro_debug")]
1382 eprintln!("[build result]");
1383 let application_impl = quote! {
1384 #app_impl_decl {
1385 #simstep_type_decl
1386
1387 #new {
1388 let config_filename = #config_file;
1389
1390 #config_load_stmt
1391
1392 let mut default_section_size = size_of::<super::#mission_mod::CuList>() * 64;
1395 if let Some(section_size_mib) = config.logging.as_ref().and_then(|l| l.section_size_mib) {
1397 default_section_size = section_size_mib as usize * 1024usize * 1024usize;
1399 }
1400 let copperlist_stream = stream_write::<#mission_mod::CuList, S>(
1401 unified_logger.clone(),
1402 UnifiedLogType::CopperList,
1403 default_section_size,
1404 )?;
1408
1409 let keyframes_stream = stream_write::<KeyFrame, S>(
1410 unified_logger.clone(),
1411 UnifiedLogType::FrozenTasks,
1412 1024 * 1024 * 10, )?;
1414
1415
1416 let application = Ok(#application_name {
1417 copper_runtime: CuRuntime::<#mission_mod::#tasks_type, #mission_mod::CuBridges, #mission_mod::CuStampedDataSet, #monitor_type, #DEFAULT_CLNB>::new(
1418 clock,
1419 &config,
1420 Some(#mission),
1421 #mission_mod::#tasks_instanciator_fn,
1422 #mission_mod::monitor_instanciator,
1423 #mission_mod::bridges_instanciator,
1424 copperlist_stream,
1425 keyframes_stream)?, });
1427
1428 #sim_callback_on_new
1429
1430 application
1431 }
1432
1433 fn get_original_config() -> String {
1434 #copper_config_content.to_string()
1435 }
1436
1437 #run_methods
1438 }
1439 };
1440
1441 let (
1442 builder_struct,
1443 builder_new,
1444 builder_impl,
1445 builder_sim_callback_method,
1446 builder_build_sim_callback_arg,
1447 ) = if sim_mode {
1448 (
1449 quote! {
1450 #[allow(dead_code)]
1451 pub struct #builder_name <'a, F> {
1452 clock: Option<RobotClock>,
1453 unified_logger: Option<Arc<Mutex<UnifiedLoggerWrite>>>,
1454 config_override: Option<CuConfig>,
1455 sim_callback: Option<&'a mut F>
1456 }
1457 },
1458 quote! {
1459 #[allow(dead_code)]
1460 pub fn new() -> Self {
1461 Self {
1462 clock: None,
1463 unified_logger: None,
1464 config_override: None,
1465 sim_callback: None,
1466 }
1467 }
1468 },
1469 quote! {
1470 impl<'a, F> #builder_name <'a, F>
1471 where
1472 F: FnMut(SimStep) -> SimOverride,
1473 },
1474 Some(quote! {
1475 pub fn with_sim_callback(mut self, sim_callback: &'a mut F) -> Self
1476 {
1477 self.sim_callback = Some(sim_callback);
1478 self
1479 }
1480 }),
1481 Some(quote! {
1482 self.sim_callback
1483 .ok_or(CuError::from("Sim callback missing from builder"))?,
1484 }),
1485 )
1486 } else {
1487 (
1488 quote! {
1489 #[allow(dead_code)]
1490 pub struct #builder_name {
1491 clock: Option<RobotClock>,
1492 unified_logger: Option<Arc<Mutex<UnifiedLoggerWrite>>>,
1493 config_override: Option<CuConfig>,
1494 }
1495 },
1496 quote! {
1497 #[allow(dead_code)]
1498 pub fn new() -> Self {
1499 Self {
1500 clock: None,
1501 unified_logger: None,
1502 config_override: None,
1503 }
1504 }
1505 },
1506 quote! {
1507 impl #builder_name
1508 },
1509 None,
1510 None,
1511 )
1512 };
1513
1514 let std_application_impl = if sim_mode {
1516 Some(quote! {
1518 impl #application_name {
1519 pub fn start_all_tasks(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()> {
1520 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::start_all_tasks(self, sim_callback)
1521 }
1522 pub fn run_one_iteration(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()> {
1523 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::run_one_iteration(self, sim_callback)
1524 }
1525 pub fn run(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()> {
1526 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::run(self, sim_callback)
1527 }
1528 pub fn stop_all_tasks(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()> {
1529 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::stop_all_tasks(self, sim_callback)
1530 }
1531 }
1532 })
1533 } else if std {
1534 Some(quote! {
1536 impl #application_name {
1537 pub fn start_all_tasks(&mut self) -> CuResult<()> {
1538 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::start_all_tasks(self)
1539 }
1540 pub fn run_one_iteration(&mut self) -> CuResult<()> {
1541 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::run_one_iteration(self)
1542 }
1543 pub fn run(&mut self) -> CuResult<()> {
1544 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::run(self)
1545 }
1546 pub fn stop_all_tasks(&mut self) -> CuResult<()> {
1547 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::stop_all_tasks(self)
1548 }
1549 }
1550 })
1551 } else {
1552 None };
1554
1555 let application_builder = if std {
1556 Some(quote! {
1557 #builder_struct
1558
1559 #builder_impl
1560 {
1561 #builder_new
1562
1563 #[allow(dead_code)]
1564 pub fn with_clock(mut self, clock: RobotClock) -> Self {
1565 self.clock = Some(clock);
1566 self
1567 }
1568
1569 #[allow(dead_code)]
1570 pub fn with_unified_logger(mut self, unified_logger: Arc<Mutex<UnifiedLoggerWrite>>) -> Self {
1571 self.unified_logger = Some(unified_logger);
1572 self
1573 }
1574
1575 #[allow(dead_code)]
1576 pub fn with_context(mut self, copper_ctx: &CopperContext) -> Self {
1577 self.clock = Some(copper_ctx.clock.clone());
1578 self.unified_logger = Some(copper_ctx.unified_logger.clone());
1579 self
1580 }
1581
1582 #[allow(dead_code)]
1583 pub fn with_config(mut self, config_override: CuConfig) -> Self {
1584 self.config_override = Some(config_override);
1585 self
1586 }
1587
1588 #builder_sim_callback_method
1589
1590 #[allow(dead_code)]
1591 pub fn build(self) -> CuResult<#application_name> {
1592 #application_name::new(
1593 self.clock
1594 .ok_or(CuError::from("Clock missing from builder"))?,
1595 self.unified_logger
1596 .ok_or(CuError::from("Unified logger missing from builder"))?,
1597 self.config_override,
1598 #builder_build_sim_callback_arg
1599 )
1600 }
1601 }
1602 })
1603 } else {
1604 None
1606 };
1607
1608 let sim_imports = if sim_mode {
1609 Some(quote! {
1610 use cu29::simulation::SimOverride;
1611 use cu29::simulation::CuTaskCallbackState;
1612 use cu29::simulation::CuSimSrcTask;
1613 use cu29::simulation::CuSimSinkTask;
1614 use cu29::prelude::app::CuSimApplication;
1615 })
1616 } else {
1617 None
1618 };
1619
1620 let sim_tasks = if sim_mode {
1621 Some(quote! {
1622 pub type CuSimTasks = #task_types_tuple_sim;
1625 })
1626 } else {
1627 None
1628 };
1629
1630 let sim_inst_body = if task_sim_instances_init_code.is_empty() {
1631 quote! { Ok(()) }
1632 } else {
1633 quote! { Ok(( #(#task_sim_instances_init_code),*, )) }
1634 };
1635
1636 let sim_tasks_instanciator = if sim_mode {
1637 Some(quote! {
1638 pub fn tasks_instanciator_sim(all_instances_configs: Vec<Option<&ComponentConfig>>, _threadpool: Arc<ThreadPool>) -> CuResult<CuSimTasks> {
1639 #sim_inst_body
1640 }})
1641 } else {
1642 None
1643 };
1644
1645 let tasks_inst_body_std = if task_instances_init_code.is_empty() {
1646 quote! {
1647 let _ = threadpool;
1648 Ok(())
1649 }
1650 } else {
1651 quote! { Ok(( #(#task_instances_init_code),*, )) }
1652 };
1653
1654 let tasks_inst_body_nostd = if task_instances_init_code.is_empty() {
1655 quote! { Ok(()) }
1656 } else {
1657 quote! { Ok(( #(#task_instances_init_code),*, )) }
1658 };
1659
1660 let tasks_instanciator = if std {
1661 quote! {
1662 pub fn tasks_instanciator<'c>(all_instances_configs: Vec<Option<&'c ComponentConfig>>, threadpool: Arc<ThreadPool>) -> CuResult<CuTasks> {
1663 #tasks_inst_body_std
1664 }
1665 }
1666 } else {
1667 quote! {
1669 pub fn tasks_instanciator<'c>(all_instances_configs: Vec<Option<&'c ComponentConfig>>) -> CuResult<CuTasks> {
1670 #tasks_inst_body_nostd
1671 }
1672 }
1673 };
1674
1675 let imports = if std {
1676 quote! {
1677 use cu29::rayon::ThreadPool;
1678 use cu29::cuasynctask::CuAsyncTask;
1679 use cu29::curuntime::CopperContext;
1680 use cu29::prelude::UnifiedLoggerWrite;
1681 use cu29::prelude::memmap::MmapSectionStorage;
1682 use std::fmt::{Debug, Formatter};
1683 use std::fmt::Result as FmtResult;
1684 use std::mem::size_of;
1685 use std::sync::Arc;
1686 use std::sync::atomic::{AtomicBool, Ordering};
1687 use std::sync::Mutex;
1688 }
1689 } else {
1690 quote! {
1691 use alloc::sync::Arc;
1692 use alloc::string::String;
1693 use alloc::string::ToString;
1694 use core::sync::atomic::{AtomicBool, Ordering};
1695 use core::fmt::{Debug, Formatter};
1696 use core::fmt::Result as FmtResult;
1697 use core::mem::size_of;
1698 use spin::Mutex;
1699 }
1700 };
1701
1702 let mission_mod_tokens = quote! {
1704 mod #mission_mod {
1705 use super::*; use cu29::bincode::Encode;
1708 use cu29::bincode::enc::Encoder;
1709 use cu29::bincode::error::EncodeError;
1710 use cu29::bincode::Decode;
1711 use cu29::bincode::de::Decoder;
1712 use cu29::bincode::de::DecoderImpl;
1713 use cu29::bincode::error::DecodeError;
1714 use cu29::clock::RobotClock;
1715 use cu29::config::CuConfig;
1716 use cu29::config::ComponentConfig;
1717 use cu29::curuntime::CuRuntime;
1718 use cu29::curuntime::KeyFrame;
1719 use cu29::CuResult;
1720 use cu29::CuError;
1721 use cu29::cutask::CuSrcTask;
1722 use cu29::cutask::CuSinkTask;
1723 use cu29::cutask::CuTask;
1724 use cu29::cutask::CuMsg;
1725 use cu29::cutask::CuMsgMetadata;
1726 use cu29::copperlist::CopperList;
1727 use cu29::monitoring::CuMonitor; use cu29::monitoring::CuTaskState;
1729 use cu29::monitoring::Decision;
1730 use cu29::prelude::app::CuApplication;
1731 use cu29::prelude::debug;
1732 use cu29::prelude::stream_write;
1733 use cu29::prelude::UnifiedLogType;
1734 use cu29::prelude::UnifiedLogWrite;
1735
1736 #imports
1737
1738 #sim_imports
1739
1740 #[allow(unused_imports)]
1742 use cu29::monitoring::NoMonitor;
1743
1744 pub type CuTasks = #task_types_tuple;
1748 pub type CuBridges = #bridges_type_tokens;
1749
1750 #sim_tasks
1751 #sim_support
1752 #sim_tasks_instanciator
1753
1754 pub const TASKS_IDS: &'static [&'static str] = &[#( #ids ),*];
1755
1756 #culist_support
1757 #tasks_instanciator
1758 #bridges_instanciator
1759
1760 pub fn monitor_instanciator(config: &CuConfig) -> #monitor_type {
1761 #monitor_type::new(config, #mission_mod::TASKS_IDS).expect("Failed to create the given monitor.")
1762 }
1763
1764 pub #application_struct
1766
1767 #application_impl
1768
1769 #std_application_impl
1770
1771 #application_builder
1772 }
1773
1774 };
1775 all_missions_tokens.push(mission_mod_tokens);
1776 }
1777
1778 let default_application_tokens = if all_missions.contains_key("default") {
1779 let default_builder = if std {
1780 Some(quote! {
1781 #[allow(unused_imports)]
1783 use default::#builder_name;
1784 })
1785 } else {
1786 None
1787 };
1788 quote! {
1789 #default_builder
1790
1791 #[allow(unused_imports)]
1792 use default::#application_name;
1793 }
1794 } else {
1795 quote!() };
1797
1798 let result: proc_macro2::TokenStream = quote! {
1799 #(#all_missions_tokens)*
1800 #default_application_tokens
1801 };
1802
1803 #[cfg(feature = "macro_debug")]
1805 {
1806 let formatted_code = rustfmt_generated_code(result.to_string());
1807 eprintln!("\n === Gen. Runtime ===\n");
1808 eprintln!("{formatted_code}");
1809 eprintln!("\n === === === === === ===\n");
1812 }
1813 result.into()
1814}
1815
1816fn read_config(config_file: &str) -> CuResult<CuConfig> {
1817 let filename = config_full_path(config_file);
1818
1819 read_configuration(filename.as_str())
1820}
1821
1822fn config_full_path(config_file: &str) -> String {
1823 let mut config_full_path = utils::caller_crate_root();
1824 config_full_path.push(config_file);
1825 let filename = config_full_path
1826 .as_os_str()
1827 .to_str()
1828 .expect("Could not interpret the config file name");
1829 filename.to_string()
1830}
1831
1832fn extract_tasks_output_types(graph: &CuGraph) -> Vec<Option<Type>> {
1833 let result = graph
1834 .get_all_nodes()
1835 .iter()
1836 .map(|(_, node)| {
1837 let id = node.get_id();
1838 let type_str = graph.get_node_output_msg_type(id.as_str());
1839 let result = type_str.map(|type_str| {
1840 let result = parse_str::<Type>(type_str.as_str())
1841 .expect("Could not parse output message type.");
1842 result
1843 });
1844 result
1845 })
1846 .collect();
1847 result
1848}
1849
1850struct CuTaskSpecSet {
1851 pub ids: Vec<String>,
1852 pub cutypes: Vec<CuTaskType>,
1853 pub background_flags: Vec<bool>,
1854 pub logging_enabled: Vec<bool>,
1855 pub type_names: Vec<String>,
1856 pub task_types: Vec<Type>,
1857 pub instantiation_types: Vec<Type>,
1858 pub sim_task_types: Vec<Type>,
1859 pub run_in_sim_flags: Vec<bool>,
1860 #[allow(dead_code)]
1861 pub output_types: Vec<Option<Type>>,
1862 pub node_id_to_task_index: Vec<Option<usize>>,
1863}
1864
1865impl CuTaskSpecSet {
1866 pub fn from_graph(graph: &CuGraph) -> Self {
1867 let all_id_nodes: Vec<(NodeId, &Node)> = graph
1868 .get_all_nodes()
1869 .into_iter()
1870 .filter(|(_, node)| node.get_flavor() == Flavor::Task)
1871 .collect();
1872
1873 let ids = all_id_nodes
1874 .iter()
1875 .map(|(_, node)| node.get_id().to_string())
1876 .collect();
1877
1878 let cutypes = all_id_nodes
1879 .iter()
1880 .map(|(id, _)| find_task_type_for_id(graph, *id))
1881 .collect();
1882
1883 let background_flags: Vec<bool> = all_id_nodes
1884 .iter()
1885 .map(|(_, node)| node.is_background())
1886 .collect();
1887
1888 let logging_enabled: Vec<bool> = all_id_nodes
1889 .iter()
1890 .map(|(_, node)| node.is_logging_enabled())
1891 .collect();
1892
1893 let type_names: Vec<String> = all_id_nodes
1894 .iter()
1895 .map(|(_, node)| node.get_type().to_string())
1896 .collect();
1897
1898 let output_types = extract_tasks_output_types(graph);
1899
1900 let task_types = type_names
1901 .iter()
1902 .zip(background_flags.iter())
1903 .zip(output_types.iter())
1904 .map(|((name, &background), output_type)| {
1905 let name_type = parse_str::<Type>(name).unwrap_or_else(|error| {
1906 panic!("Could not transform {name} into a Task Rust type: {error}");
1907 });
1908 if background {
1909 if let Some(output_type) = output_type {
1910 parse_quote!(CuAsyncTask<#name_type, #output_type>)
1911 } else {
1912 panic!("{name}: If a task is background, it has to have an output");
1913 }
1914 } else {
1915 name_type
1916 }
1917 })
1918 .collect();
1919
1920 let instantiation_types = type_names
1921 .iter()
1922 .zip(background_flags.iter())
1923 .zip(output_types.iter())
1924 .map(|((name, &background), output_type)| {
1925 let name_type = parse_str::<Type>(name).unwrap_or_else(|error| {
1926 panic!("Could not transform {name} into a Task Rust type: {error}");
1927 });
1928 if background {
1929 if let Some(output_type) = output_type {
1930 parse_quote!(CuAsyncTask::<#name_type, #output_type>)
1931 } else {
1932 panic!("{name}: If a task is background, it has to have an output");
1933 }
1934 } else {
1935 name_type
1936 }
1937 })
1938 .collect();
1939
1940 let sim_task_types = type_names
1941 .iter()
1942 .map(|name| {
1943 parse_str::<Type>(name).unwrap_or_else(|err| {
1944 eprintln!("Could not transform {name} into a Task Rust type.");
1945 panic!("{err}")
1946 })
1947 })
1948 .collect();
1949
1950 let run_in_sim_flags = all_id_nodes
1951 .iter()
1952 .map(|(_, node)| node.is_run_in_sim())
1953 .collect();
1954
1955 let mut node_id_to_task_index = vec![None; graph.node_count()];
1956 for (index, (node_id, _)) in all_id_nodes.iter().enumerate() {
1957 node_id_to_task_index[*node_id as usize] = Some(index);
1958 }
1959
1960 Self {
1961 ids,
1962 cutypes,
1963 background_flags,
1964 logging_enabled,
1965 type_names,
1966 task_types,
1967 instantiation_types,
1968 sim_task_types,
1969 run_in_sim_flags,
1970 output_types,
1971 node_id_to_task_index,
1972 }
1973 }
1974}
1975
1976fn extract_msg_types(runtime_plan: &CuExecutionLoop) -> Vec<Type> {
1977 runtime_plan
1978 .steps
1979 .iter()
1980 .filter_map(|unit| match unit {
1981 CuExecutionUnit::Step(step) => {
1982 if let Some((_, output_msg_type)) = &step.output_msg_index_type {
1983 Some(
1984 parse_str::<Type>(output_msg_type.as_str()).unwrap_or_else(|_| {
1985 panic!(
1986 "Could not transform {output_msg_type} into a message Rust type."
1987 )
1988 }),
1989 )
1990 } else {
1991 None
1992 }
1993 }
1994 CuExecutionUnit::Loop(_) => todo!("Needs to be implemented"),
1995 })
1996 .collect()
1997}
1998
1999fn build_culist_tuple(all_msgs_types_in_culist_order: &[Type]) -> TypeTuple {
2001 if all_msgs_types_in_culist_order.is_empty() {
2002 parse_quote! { () }
2003 } else {
2004 parse_quote! {
2005 ( #( CuMsg<#all_msgs_types_in_culist_order> ),* )
2006 }
2007 }
2008}
2009
2010fn build_culist_tuple_encode(all_msgs_types_in_culist_order: &[Type]) -> ItemImpl {
2012 let indices: Vec<usize> = (0..all_msgs_types_in_culist_order.len()).collect();
2013
2014 let encode_fields: Vec<_> = indices
2016 .iter()
2017 .map(|i| {
2018 let idx = syn::Index::from(*i);
2019 quote! { self.0.#idx.encode(encoder)?; }
2020 })
2021 .collect();
2022
2023 parse_quote! {
2024 impl Encode for CuStampedDataSet {
2025 fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
2026 #(#encode_fields)*
2027 Ok(())
2028 }
2029 }
2030 }
2031}
2032
2033fn build_culist_tuple_decode(all_msgs_types_in_culist_order: &[Type]) -> ItemImpl {
2035 let indices: Vec<usize> = (0..all_msgs_types_in_culist_order.len()).collect();
2036
2037 let decode_fields: Vec<_> = indices
2039 .iter()
2040 .map(|i| {
2041 let t = &all_msgs_types_in_culist_order[*i];
2042 quote! { CuMsg::<#t>::decode(decoder)? }
2043 })
2044 .collect();
2045
2046 parse_quote! {
2047 impl Decode<()> for CuStampedDataSet {
2048 fn decode<D: Decoder<Context=()>>(decoder: &mut D) -> Result<Self, DecodeError> {
2049 Ok(CuStampedDataSet ((
2050 #(#decode_fields),*
2051 )))
2052 }
2053 }
2054 }
2055}
2056
2057fn build_culist_erasedcumsgs(all_msgs_types_in_culist_order: &[Type]) -> ItemImpl {
2058 let indices: Vec<usize> = (0..all_msgs_types_in_culist_order.len()).collect();
2059 let casted_fields: Vec<_> = indices
2060 .iter()
2061 .map(|i| {
2062 let idx = syn::Index::from(*i);
2063 quote! { &self.0.#idx as &dyn ErasedCuStampedData }
2064 })
2065 .collect();
2066 parse_quote! {
2067 impl ErasedCuStampedDataSet for CuStampedDataSet {
2068 fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
2069 vec![
2070 #(#casted_fields),*
2071 ]
2072 }
2073 }
2074 }
2075}
2076
2077fn build_culist_tuple_debug(all_msgs_types_in_culist_order: &[Type]) -> ItemImpl {
2078 let indices: Vec<usize> = (0..all_msgs_types_in_culist_order.len()).collect();
2079
2080 let debug_fields: Vec<_> = indices
2081 .iter()
2082 .map(|i| {
2083 let idx = syn::Index::from(*i);
2084 quote! { .field(&self.0.#idx) }
2085 })
2086 .collect();
2087
2088 parse_quote! {
2089 impl Debug for CuStampedDataSet {
2090 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
2091 f.debug_tuple("CuStampedDataSet")
2092 #(#debug_fields)*
2093 .finish()
2094 }
2095 }
2096 }
2097}
2098
2099fn build_culist_tuple_serialize(all_msgs_types_in_culist_order: &[Type]) -> ItemImpl {
2101 let indices: Vec<usize> = (0..all_msgs_types_in_culist_order.len()).collect();
2102 let tuple_len = all_msgs_types_in_culist_order.len();
2103
2104 let serialize_fields: Vec<_> = indices
2106 .iter()
2107 .map(|i| {
2108 let idx = syn::Index::from(*i);
2109 quote! { &self.0.#idx }
2110 })
2111 .collect();
2112
2113 parse_quote! {
2114 impl Serialize for CuStampedDataSet {
2115 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2116 where
2117 S: serde::Serializer,
2118 {
2119 use serde::ser::SerializeTuple;
2120 let mut tuple = serializer.serialize_tuple(#tuple_len)?;
2121 #(tuple.serialize_element(#serialize_fields)?;)*
2122 tuple.end()
2123 }
2124 }
2125 }
2126}
2127
2128fn build_culist_tuple_default(all_msgs_types_in_culist_order: &[Type]) -> ItemImpl {
2130 let default_fields: Vec<_> = all_msgs_types_in_culist_order
2132 .iter()
2133 .map(|msg_type| quote! { CuStampedData::<#msg_type, CuMsgMetadata>::default() })
2134 .collect();
2135
2136 parse_quote! {
2137 impl Default for CuStampedDataSet {
2138 fn default() -> CuStampedDataSet
2139 {
2140 CuStampedDataSet((
2141 #(#default_fields),*
2142 ))
2143 }
2144 }
2145 }
2146}
2147
2148fn collect_bridge_channel_usage(graph: &CuGraph) -> HashMap<BridgeChannelKey, String> {
2149 let mut usage = HashMap::new();
2150 for edge_idx in graph.0.edge_indices() {
2151 let cnx = graph
2152 .0
2153 .edge_weight(edge_idx)
2154 .expect("Edge should exist while collecting bridge usage")
2155 .clone();
2156 if let Some(channel) = &cnx.src_channel {
2157 let key = BridgeChannelKey {
2158 bridge_id: cnx.src.clone(),
2159 channel_id: channel.clone(),
2160 direction: BridgeChannelDirection::Rx,
2161 };
2162 usage
2163 .entry(key)
2164 .and_modify(|msg| {
2165 if msg != &cnx.msg {
2166 panic!(
2167 "Bridge '{}' channel '{}' is used with incompatible message types: {} vs {}",
2168 cnx.src, channel, msg, cnx.msg
2169 );
2170 }
2171 })
2172 .or_insert(cnx.msg.clone());
2173 }
2174 if let Some(channel) = &cnx.dst_channel {
2175 let key = BridgeChannelKey {
2176 bridge_id: cnx.dst.clone(),
2177 channel_id: channel.clone(),
2178 direction: BridgeChannelDirection::Tx,
2179 };
2180 usage
2181 .entry(key)
2182 .and_modify(|msg| {
2183 if msg != &cnx.msg {
2184 panic!(
2185 "Bridge '{}' channel '{}' is used with incompatible message types: {} vs {}",
2186 cnx.dst, channel, msg, cnx.msg
2187 );
2188 }
2189 })
2190 .or_insert(cnx.msg.clone());
2191 }
2192 }
2193 usage
2194}
2195
2196fn build_bridge_specs(
2197 config: &CuConfig,
2198 graph: &CuGraph,
2199 channel_usage: &HashMap<BridgeChannelKey, String>,
2200) -> Vec<BridgeSpec> {
2201 let mut specs = Vec::new();
2202 for (bridge_index, bridge_cfg) in config.bridges.iter().enumerate() {
2203 if graph.get_node_id_by_name(bridge_cfg.id.as_str()).is_none() {
2204 continue;
2205 }
2206
2207 let type_path = parse_str::<Type>(bridge_cfg.type_.as_str()).unwrap_or_else(|err| {
2208 panic!(
2209 "Could not parse bridge type '{}' for '{}': {err}",
2210 bridge_cfg.type_, bridge_cfg.id
2211 )
2212 });
2213
2214 let mut rx_channels = Vec::new();
2215 let mut tx_channels = Vec::new();
2216
2217 for (channel_index, channel) in bridge_cfg.channels.iter().enumerate() {
2218 match channel {
2219 BridgeChannelConfigRepresentation::Rx { id, .. } => {
2220 let key = BridgeChannelKey {
2221 bridge_id: bridge_cfg.id.clone(),
2222 channel_id: id.clone(),
2223 direction: BridgeChannelDirection::Rx,
2224 };
2225 if let Some(msg_type) = channel_usage.get(&key) {
2226 let msg_type = parse_str::<Type>(msg_type).unwrap_or_else(|err| {
2227 panic!(
2228 "Could not parse message type '{msg_type}' for bridge '{}' channel '{}': {err}",
2229 bridge_cfg.id, id
2230 )
2231 });
2232 let const_ident =
2233 Ident::new(&config_id_to_bridge_const(id.as_str()), Span::call_site());
2234 rx_channels.push(BridgeChannelSpec {
2235 id: id.clone(),
2236 const_ident,
2237 msg_type,
2238 config_index: channel_index,
2239 plan_node_id: None,
2240 culist_index: None,
2241 monitor_index: None,
2242 });
2243 }
2244 }
2245 BridgeChannelConfigRepresentation::Tx { id, .. } => {
2246 let key = BridgeChannelKey {
2247 bridge_id: bridge_cfg.id.clone(),
2248 channel_id: id.clone(),
2249 direction: BridgeChannelDirection::Tx,
2250 };
2251 if let Some(msg_type) = channel_usage.get(&key) {
2252 let msg_type = parse_str::<Type>(msg_type).unwrap_or_else(|err| {
2253 panic!(
2254 "Could not parse message type '{msg_type}' for bridge '{}' channel '{}': {err}",
2255 bridge_cfg.id, id
2256 )
2257 });
2258 let const_ident =
2259 Ident::new(&config_id_to_bridge_const(id.as_str()), Span::call_site());
2260 tx_channels.push(BridgeChannelSpec {
2261 id: id.clone(),
2262 const_ident,
2263 msg_type,
2264 config_index: channel_index,
2265 plan_node_id: None,
2266 culist_index: None,
2267 monitor_index: None,
2268 });
2269 }
2270 }
2271 }
2272 }
2273
2274 if rx_channels.is_empty() && tx_channels.is_empty() {
2275 continue;
2276 }
2277
2278 specs.push(BridgeSpec {
2279 id: bridge_cfg.id.clone(),
2280 type_path,
2281 config_index: bridge_index,
2282 tuple_index: 0,
2283 monitor_index: None,
2284 rx_channels,
2285 tx_channels,
2286 });
2287 }
2288
2289 for (tuple_index, spec) in specs.iter_mut().enumerate() {
2290 spec.tuple_index = tuple_index;
2291 }
2292
2293 specs
2294}
2295
2296fn collect_task_member_names(graph: &CuGraph) -> Vec<(NodeId, String)> {
2297 graph
2298 .get_all_nodes()
2299 .iter()
2300 .filter(|(_, node)| node.get_flavor() == Flavor::Task)
2301 .map(|(node_id, node)| (*node_id, config_id_to_struct_member(node.get_id().as_str())))
2302 .collect()
2303}
2304
2305fn build_execution_plan(
2306 graph: &CuGraph,
2307 task_specs: &CuTaskSpecSet,
2308 bridge_specs: &mut [BridgeSpec],
2309) -> CuResult<(
2310 CuExecutionLoop,
2311 Vec<ExecutionEntity>,
2312 HashMap<NodeId, NodeId>,
2313)> {
2314 let mut plan_graph = CuGraph::default();
2315 let mut exec_entities = Vec::new();
2316 let mut original_to_plan = HashMap::new();
2317 let mut plan_to_original = HashMap::new();
2318 let mut name_to_original = HashMap::new();
2319 let mut channel_nodes = HashMap::new();
2320
2321 for (node_id, node) in graph.get_all_nodes() {
2322 name_to_original.insert(node.get_id(), node_id);
2323 if node.get_flavor() != Flavor::Task {
2324 continue;
2325 }
2326 let plan_node_id = plan_graph.add_node(node.clone())?;
2327 let task_index = task_specs.node_id_to_task_index[node_id as usize]
2328 .expect("Task missing from specifications");
2329 plan_to_original.insert(plan_node_id, node_id);
2330 original_to_plan.insert(node_id, plan_node_id);
2331 if plan_node_id as usize != exec_entities.len() {
2332 panic!("Unexpected node ordering while mirroring tasks in plan graph");
2333 }
2334 exec_entities.push(ExecutionEntity {
2335 kind: ExecutionEntityKind::Task { task_index },
2336 });
2337 }
2338
2339 for (bridge_index, spec) in bridge_specs.iter_mut().enumerate() {
2340 for (channel_index, channel_spec) in spec.rx_channels.iter_mut().enumerate() {
2341 let mut node = Node::new(
2342 format!("{}::rx::{}", spec.id, channel_spec.id).as_str(),
2343 "__CuBridgeRxChannel",
2344 );
2345 node.set_flavor(Flavor::Bridge);
2346 let plan_node_id = plan_graph.add_node(node)?;
2347 if plan_node_id as usize != exec_entities.len() {
2348 panic!("Unexpected node ordering while inserting bridge rx channel");
2349 }
2350 channel_spec.plan_node_id = Some(plan_node_id);
2351 exec_entities.push(ExecutionEntity {
2352 kind: ExecutionEntityKind::BridgeRx {
2353 bridge_index,
2354 channel_index,
2355 },
2356 });
2357 channel_nodes.insert(
2358 BridgeChannelKey {
2359 bridge_id: spec.id.clone(),
2360 channel_id: channel_spec.id.clone(),
2361 direction: BridgeChannelDirection::Rx,
2362 },
2363 plan_node_id,
2364 );
2365 }
2366
2367 for (channel_index, channel_spec) in spec.tx_channels.iter_mut().enumerate() {
2368 let mut node = Node::new(
2369 format!("{}::tx::{}", spec.id, channel_spec.id).as_str(),
2370 "__CuBridgeTxChannel",
2371 );
2372 node.set_flavor(Flavor::Bridge);
2373 let plan_node_id = plan_graph.add_node(node)?;
2374 if plan_node_id as usize != exec_entities.len() {
2375 panic!("Unexpected node ordering while inserting bridge tx channel");
2376 }
2377 channel_spec.plan_node_id = Some(plan_node_id);
2378 exec_entities.push(ExecutionEntity {
2379 kind: ExecutionEntityKind::BridgeTx {
2380 bridge_index,
2381 channel_index,
2382 },
2383 });
2384 channel_nodes.insert(
2385 BridgeChannelKey {
2386 bridge_id: spec.id.clone(),
2387 channel_id: channel_spec.id.clone(),
2388 direction: BridgeChannelDirection::Tx,
2389 },
2390 plan_node_id,
2391 );
2392 }
2393 }
2394
2395 for edge_idx in graph.0.edge_indices() {
2396 let cnx = graph
2397 .0
2398 .edge_weight(edge_idx)
2399 .expect("Edge should exist while building plan")
2400 .clone();
2401
2402 let src_plan = if let Some(channel) = &cnx.src_channel {
2403 let key = BridgeChannelKey {
2404 bridge_id: cnx.src.clone(),
2405 channel_id: channel.clone(),
2406 direction: BridgeChannelDirection::Rx,
2407 };
2408 *channel_nodes
2409 .get(&key)
2410 .unwrap_or_else(|| panic!("Bridge source {:?} missing from plan graph", key))
2411 } else {
2412 let node_id = name_to_original
2413 .get(&cnx.src)
2414 .copied()
2415 .unwrap_or_else(|| panic!("Unknown source node '{}'", cnx.src));
2416 *original_to_plan
2417 .get(&node_id)
2418 .unwrap_or_else(|| panic!("Source node '{}' missing from plan", cnx.src))
2419 };
2420
2421 let dst_plan = if let Some(channel) = &cnx.dst_channel {
2422 let key = BridgeChannelKey {
2423 bridge_id: cnx.dst.clone(),
2424 channel_id: channel.clone(),
2425 direction: BridgeChannelDirection::Tx,
2426 };
2427 *channel_nodes
2428 .get(&key)
2429 .unwrap_or_else(|| panic!("Bridge destination {:?} missing from plan graph", key))
2430 } else {
2431 let node_id = name_to_original
2432 .get(&cnx.dst)
2433 .copied()
2434 .unwrap_or_else(|| panic!("Unknown destination node '{}'", cnx.dst));
2435 *original_to_plan
2436 .get(&node_id)
2437 .unwrap_or_else(|| panic!("Destination node '{}' missing from plan", cnx.dst))
2438 };
2439
2440 plan_graph
2441 .connect_ext(
2442 src_plan,
2443 dst_plan,
2444 &cnx.msg,
2445 cnx.missions.clone(),
2446 None,
2447 None,
2448 )
2449 .map_err(|e| CuError::from(e.to_string()))?;
2450 }
2451
2452 let runtime_plan = compute_runtime_plan(&plan_graph)?;
2453 Ok((runtime_plan, exec_entities, plan_to_original))
2454}
2455
2456fn collect_culist_metadata(
2457 runtime_plan: &CuExecutionLoop,
2458 exec_entities: &[ExecutionEntity],
2459 bridge_specs: &mut [BridgeSpec],
2460 plan_to_original: &HashMap<NodeId, NodeId>,
2461) -> (Vec<usize>, HashMap<NodeId, usize>) {
2462 let mut culist_order = Vec::new();
2463 let mut node_output_positions = HashMap::new();
2464
2465 for unit in &runtime_plan.steps {
2466 if let CuExecutionUnit::Step(step) = unit {
2467 if let Some((output_idx, _)) = &step.output_msg_index_type {
2468 culist_order.push(*output_idx as usize);
2469 match &exec_entities[step.node_id as usize].kind {
2470 ExecutionEntityKind::Task { .. } => {
2471 if let Some(original_node_id) = plan_to_original.get(&step.node_id) {
2472 node_output_positions.insert(*original_node_id, *output_idx as usize);
2473 }
2474 }
2475 ExecutionEntityKind::BridgeRx {
2476 bridge_index,
2477 channel_index,
2478 } => {
2479 bridge_specs[*bridge_index].rx_channels[*channel_index].culist_index =
2480 Some(*output_idx as usize);
2481 }
2482 ExecutionEntityKind::BridgeTx { .. } => {}
2483 }
2484 }
2485 }
2486 }
2487
2488 (culist_order, node_output_positions)
2489}
2490
2491#[allow(dead_code)]
2492fn build_monitored_ids(task_ids: &[String], bridge_specs: &mut [BridgeSpec]) -> Vec<String> {
2493 let mut names = task_ids.to_vec();
2494 for spec in bridge_specs.iter_mut() {
2495 spec.monitor_index = Some(names.len());
2496 names.push(format!("bridge::{}", spec.id));
2497 for channel in spec.rx_channels.iter_mut() {
2498 channel.monitor_index = Some(names.len());
2499 names.push(format!("bridge::{}::rx::{}", spec.id, channel.id));
2500 }
2501 for channel in spec.tx_channels.iter_mut() {
2502 channel.monitor_index = Some(names.len());
2503 names.push(format!("bridge::{}::tx::{}", spec.id, channel.id));
2504 }
2505 }
2506 names
2507}
2508
2509fn generate_task_execution_tokens(
2510 step: &CuExecutionStep,
2511 task_index: usize,
2512 task_specs: &CuTaskSpecSet,
2513 sim_mode: bool,
2514 mission_mod: &Ident,
2515) -> (proc_macro2::TokenStream, proc_macro2::TokenStream) {
2516 let node_index = int2sliceindex(task_index as u32);
2517 let task_instance = quote! { tasks.#node_index };
2518 let comment_str = format!(
2519 "DEBUG ->> {} ({:?}) Id:{} I:{:?} O:{:?}",
2520 step.node.get_id(),
2521 step.task_type,
2522 step.node_id,
2523 step.input_msg_indices_types,
2524 step.output_msg_index_type
2525 );
2526 let comment_tokens = quote! {{
2527 let _ = stringify!(#comment_str);
2528 }};
2529 let tid = task_index;
2530 let task_enum_name = config_id_to_enum(&task_specs.ids[tid]);
2531 let enum_name = Ident::new(&task_enum_name, Span::call_site());
2532
2533 match step.task_type {
2534 CuTaskType::Source => {
2535 if let Some((output_index, _)) = &step.output_msg_index_type {
2536 let output_culist_index = int2sliceindex(*output_index);
2537
2538 let monitoring_action = quote! {
2539 debug!("Task {}: Error during process: {}", #mission_mod::TASKS_IDS[#tid], &error);
2540 let decision = monitor.process_error(#tid, CuTaskState::Process, &error);
2541 match decision {
2542 Decision::Abort => {
2543 debug!("Process: ABORT decision from monitoring. Task '{}' errored out \
2544 during process. Skipping the processing of CL {}.", #mission_mod::TASKS_IDS[#tid], clid);
2545 monitor.process_copperlist(&#mission_mod::collect_metadata(&culist))?;
2546 cl_manager.end_of_processing(clid)?;
2547 return Ok(());
2548 }
2549 Decision::Ignore => {
2550 debug!("Process: IGNORE decision from monitoring. Task '{}' errored out \
2551 during process. The runtime will continue with a forced empty message.", #mission_mod::TASKS_IDS[#tid]);
2552 let cumsg_output = &mut msgs.#output_culist_index;
2553 cumsg_output.clear_payload();
2554 }
2555 Decision::Shutdown => {
2556 debug!("Process: SHUTDOWN decision from monitoring. Task '{}' errored out \
2557 during process. The runtime cannot continue.", #mission_mod::TASKS_IDS[#tid]);
2558 return Err(CuError::new_with_cause("Task errored out during process.", error));
2559 }
2560 }
2561 };
2562
2563 let call_sim_callback = if sim_mode {
2564 quote! {
2565 let doit = {
2566 let cumsg_output = &mut msgs.#output_culist_index;
2567 let state = CuTaskCallbackState::Process((), cumsg_output);
2568 let ovr = sim_callback(SimStep::#enum_name(state));
2569
2570 if let SimOverride::Errored(reason) = ovr {
2571 let error: CuError = reason.into();
2572 #monitoring_action
2573 false
2574 } else {
2575 ovr == SimOverride::ExecuteByRuntime
2576 }
2577 };
2578 }
2579 } else {
2580 quote! { let doit = true; }
2581 };
2582
2583 let logging_tokens = if !task_specs.logging_enabled[tid] {
2584 let output_culist_index = int2sliceindex(*output_index);
2585 quote! {
2586 let mut cumsg_output = &mut culist.msgs.0.#output_culist_index;
2587 cumsg_output.clear_payload();
2588 }
2589 } else {
2590 quote!()
2591 };
2592
2593 (
2594 quote! {
2595 {
2596 #comment_tokens
2597 kf_manager.freeze_task(clid, &#task_instance)?;
2598 #call_sim_callback
2599 let cumsg_output = &mut msgs.#output_culist_index;
2600 cumsg_output.metadata.process_time.start = clock.now().into();
2601 let maybe_error = if doit { #task_instance.process(clock, cumsg_output) } else { Ok(()) };
2602 cumsg_output.metadata.process_time.end = clock.now().into();
2603 if let Err(error) = maybe_error {
2604 #monitoring_action
2605 }
2606 }
2607 },
2608 logging_tokens,
2609 )
2610 } else {
2611 panic!("Source task should have an output message index.");
2612 }
2613 }
2614 CuTaskType::Sink => {
2615 if let Some((output_index, _)) = &step.output_msg_index_type {
2616 let output_culist_index = int2sliceindex(*output_index);
2617 let indices = step
2618 .input_msg_indices_types
2619 .iter()
2620 .map(|(index, _)| int2sliceindex(*index));
2621
2622 let monitoring_action = quote! {
2623 debug!("Task {}: Error during process: {}", #mission_mod::TASKS_IDS[#tid], &error);
2624 let decision = monitor.process_error(#tid, CuTaskState::Process, &error);
2625 match decision {
2626 Decision::Abort => {
2627 debug!("Process: ABORT decision from monitoring. Task '{}' errored out \
2628 during process. Skipping the processing of CL {}.", #mission_mod::TASKS_IDS[#tid], clid);
2629 monitor.process_copperlist(&#mission_mod::collect_metadata(&culist))?;
2630 cl_manager.end_of_processing(clid)?;
2631 return Ok(());
2632 }
2633 Decision::Ignore => {
2634 debug!("Process: IGNORE decision from monitoring. Task '{}' errored out \
2635 during process. The runtime will continue with a forced empty message.", #mission_mod::TASKS_IDS[#tid]);
2636 let cumsg_output = &mut msgs.#output_culist_index;
2637 cumsg_output.clear_payload();
2638 }
2639 Decision::Shutdown => {
2640 debug!("Process: SHUTDOWN decision from monitoring. Task '{}' errored out \
2641 during process. The runtime cannot continue.", #mission_mod::TASKS_IDS[#tid]);
2642 return Err(CuError::new_with_cause("Task errored out during process.", error));
2643 }
2644 }
2645 };
2646
2647 let inputs_type = if indices.len() == 1 {
2648 quote! { #(msgs.#indices)* }
2649 } else {
2650 quote! { (#(&msgs.#indices),*) }
2651 };
2652
2653 let call_sim_callback = if sim_mode {
2654 quote! {
2655 let doit = {
2656 let cumsg_input = &#inputs_type;
2657 let cumsg_output = &mut msgs.#output_culist_index;
2658 let state = CuTaskCallbackState::Process(cumsg_input, cumsg_output);
2659 let ovr = sim_callback(SimStep::#enum_name(state));
2660
2661 if let SimOverride::Errored(reason) = ovr {
2662 let error: CuError = reason.into();
2663 #monitoring_action
2664 false
2665 } else {
2666 ovr == SimOverride::ExecuteByRuntime
2667 }
2668 };
2669 }
2670 } else {
2671 quote! { let doit = true; }
2672 };
2673
2674 (
2675 quote! {
2676 {
2677 #comment_tokens
2678 kf_manager.freeze_task(clid, &#task_instance)?;
2679 #call_sim_callback
2680 let cumsg_input = &#inputs_type;
2681 let cumsg_output = &mut msgs.#output_culist_index;
2682 cumsg_output.metadata.process_time.start = clock.now().into();
2683 let maybe_error = if doit { #task_instance.process(clock, cumsg_input) } else { Ok(()) };
2684 cumsg_output.metadata.process_time.end = clock.now().into();
2685 if let Err(error) = maybe_error {
2686 #monitoring_action
2687 }
2688 }
2689 },
2690 quote! {},
2691 )
2692 } else {
2693 panic!("Sink tasks should have a virtual output message index.");
2694 }
2695 }
2696 CuTaskType::Regular => {
2697 if let Some((output_index, _)) = &step.output_msg_index_type {
2698 let output_culist_index = int2sliceindex(*output_index);
2699 let indices = step
2700 .input_msg_indices_types
2701 .iter()
2702 .map(|(index, _)| int2sliceindex(*index));
2703
2704 let monitoring_action = quote! {
2705 debug!("Task {}: Error during process: {}", #mission_mod::TASKS_IDS[#tid], &error);
2706 let decision = monitor.process_error(#tid, CuTaskState::Process, &error);
2707 match decision {
2708 Decision::Abort => {
2709 debug!("Process: ABORT decision from monitoring. Task '{}' errored out \
2710 during process. Skipping the processing of CL {}.", #mission_mod::TASKS_IDS[#tid], clid);
2711 monitor.process_copperlist(&#mission_mod::collect_metadata(&culist))?;
2712 cl_manager.end_of_processing(clid)?;
2713 return Ok(());
2714 }
2715 Decision::Ignore => {
2716 debug!("Process: IGNORE decision from monitoring. Task '{}' errored out \
2717 during process. The runtime will continue with a forced empty message.", #mission_mod::TASKS_IDS[#tid]);
2718 let cumsg_output = &mut msgs.#output_culist_index;
2719 cumsg_output.clear_payload();
2720 }
2721 Decision::Shutdown => {
2722 debug!("Process: SHUTDOWN decision from monitoring. Task '{}' errored out \
2723 during process. The runtime cannot continue.", #mission_mod::TASKS_IDS[#tid]);
2724 return Err(CuError::new_with_cause("Task errored out during process.", error));
2725 }
2726 }
2727 };
2728
2729 let inputs_type = if indices.len() == 1 {
2730 quote! { #(msgs.#indices)* }
2731 } else {
2732 quote! { (#(&msgs.#indices),*) }
2733 };
2734
2735 let call_sim_callback = if sim_mode {
2736 quote! {
2737 let doit = {
2738 let cumsg_input = &#inputs_type;
2739 let cumsg_output = &mut msgs.#output_culist_index;
2740 let state = CuTaskCallbackState::Process(cumsg_input, cumsg_output);
2741 let ovr = sim_callback(SimStep::#enum_name(state));
2742
2743 if let SimOverride::Errored(reason) = ovr {
2744 let error: CuError = reason.into();
2745 #monitoring_action
2746 false
2747 }
2748 else {
2749 ovr == SimOverride::ExecuteByRuntime
2750 }
2751 };
2752 }
2753 } else {
2754 quote! { let doit = true; }
2755 };
2756
2757 let logging_tokens = if !task_specs.logging_enabled[tid] {
2758 let output_culist_index = int2sliceindex(*output_index);
2759 quote! {
2760 let mut cumsg_output = &mut culist.msgs.0.#output_culist_index;
2761 cumsg_output.clear_payload();
2762 }
2763 } else {
2764 quote!()
2765 };
2766
2767 (
2768 quote! {
2769 {
2770 #comment_tokens
2771 kf_manager.freeze_task(clid, &#task_instance)?;
2772 #call_sim_callback
2773 let cumsg_input = &#inputs_type;
2774 let cumsg_output = &mut msgs.#output_culist_index;
2775 cumsg_output.metadata.process_time.start = clock.now().into();
2776 let maybe_error = if doit { #task_instance.process(clock, cumsg_input, cumsg_output) } else { Ok(()) };
2777 cumsg_output.metadata.process_time.end = clock.now().into();
2778 if let Err(error) = maybe_error {
2779 #monitoring_action
2780 }
2781 }
2782 },
2783 logging_tokens,
2784 )
2785 } else {
2786 panic!("Regular task should have an output message index.");
2787 }
2788 }
2789 }
2790}
2791
2792fn generate_bridge_rx_execution_tokens(
2793 bridge_spec: &BridgeSpec,
2794 channel_index: usize,
2795 mission_mod: &Ident,
2796) -> (proc_macro2::TokenStream, proc_macro2::TokenStream) {
2797 let bridge_tuple_index = int2sliceindex(bridge_spec.tuple_index as u32);
2798 let channel = &bridge_spec.rx_channels[channel_index];
2799 let culist_index = channel
2800 .culist_index
2801 .unwrap_or_else(|| panic!("Bridge Rx channel missing output index"));
2802 let culist_index_ts = int2sliceindex(culist_index as u32);
2803 let monitor_index = syn::Index::from(
2804 channel
2805 .monitor_index
2806 .expect("Bridge Rx channel missing monitor index"),
2807 );
2808 let bridge_type = &bridge_spec.type_path;
2809 let const_ident = &channel.const_ident;
2810 (
2811 quote! {
2812 {
2813 let bridge = &mut bridges.#bridge_tuple_index;
2814 let cumsg_output = &mut msgs.#culist_index_ts;
2815 cumsg_output.metadata.process_time.start = clock.now().into();
2816 let maybe_error = bridge.receive(
2817 clock,
2818 &<#bridge_type as cu29::cubridge::CuBridge>::Rx::#const_ident,
2819 cumsg_output,
2820 );
2821 cumsg_output.metadata.process_time.end = clock.now().into();
2822 if let Err(error) = maybe_error {
2823 let decision = monitor.process_error(#monitor_index, CuTaskState::Process, &error);
2824 match decision {
2825 Decision::Abort => {
2826 debug!("Process: ABORT decision from monitoring. Task '{}' errored out during process. Skipping the processing of CL {}.", #mission_mod::TASKS_IDS[#monitor_index], clid);
2827 monitor.process_copperlist(&#mission_mod::collect_metadata(&culist))?;
2828 cl_manager.end_of_processing(clid)?;
2829 return Ok(());
2830 }
2831 Decision::Ignore => {
2832 debug!("Process: IGNORE decision from monitoring. Task '{}' errored out during process. The runtime will continue with a forced empty message.", #mission_mod::TASKS_IDS[#monitor_index]);
2833 let cumsg_output = &mut msgs.#culist_index_ts;
2834 cumsg_output.clear_payload();
2835 }
2836 Decision::Shutdown => {
2837 debug!("Process: SHUTDOWN decision from monitoring. Task '{}' errored out during process. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
2838 return Err(CuError::new_with_cause("Task errored out during process.", error));
2839 }
2840 }
2841 }
2842 }
2843 },
2844 quote! {},
2845 )
2846}
2847
2848fn generate_bridge_tx_execution_tokens(
2849 step: &CuExecutionStep,
2850 bridge_spec: &BridgeSpec,
2851 channel_index: usize,
2852 mission_mod: &Ident,
2853) -> (proc_macro2::TokenStream, proc_macro2::TokenStream) {
2854 let channel = &bridge_spec.tx_channels[channel_index];
2855 let monitor_index = syn::Index::from(
2856 channel
2857 .monitor_index
2858 .expect("Bridge Tx channel missing monitor index"),
2859 );
2860 let input_index = step
2861 .input_msg_indices_types
2862 .first()
2863 .map(|(idx, _)| int2sliceindex(*idx))
2864 .expect("Bridge Tx channel should have exactly one input");
2865 let bridge_tuple_index = int2sliceindex(bridge_spec.tuple_index as u32);
2866 let bridge_type = &bridge_spec.type_path;
2867 let const_ident = &channel.const_ident;
2868 (
2869 quote! {
2870 {
2871 let bridge = &mut bridges.#bridge_tuple_index;
2872 let cumsg_input = &msgs.#input_index;
2873 if let Err(error) = bridge.send(
2874 clock,
2875 &<#bridge_type as cu29::cubridge::CuBridge>::Tx::#const_ident,
2876 cumsg_input,
2877 ) {
2878 let decision = monitor.process_error(#monitor_index, CuTaskState::Process, &error);
2879 match decision {
2880 Decision::Abort => {
2881 debug!("Process: ABORT decision from monitoring. Task '{}' errored out during process. Skipping the processing of CL {}.", #mission_mod::TASKS_IDS[#monitor_index], clid);
2882 monitor.process_copperlist(&#mission_mod::collect_metadata(&culist))?;
2883 cl_manager.end_of_processing(clid)?;
2884 return Ok(());
2885 }
2886 Decision::Ignore => {
2887 debug!("Process: IGNORE decision from monitoring. Task '{}' errored out during process. The runtime will continue with a forced empty message.", #mission_mod::TASKS_IDS[#monitor_index]);
2888 }
2889 Decision::Shutdown => {
2890 debug!("Process: SHUTDOWN decision from monitoring. Task '{}' errored out during process. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
2891 return Err(CuError::new_with_cause("Task errored out during process.", error));
2892 }
2893 }
2894 }
2895 }
2896 },
2897 quote! {},
2898 )
2899}
2900
2901#[cfg(test)]
2902mod tests {
2903 #[test]
2905 fn test_compile_fail() {
2906 let t = trybuild::TestCases::new();
2907 t.compile_fail("tests/compile_fail/*/*.rs");
2908 }
2909}
2910#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
2911enum BridgeChannelDirection {
2912 Rx,
2913 Tx,
2914}
2915
2916#[derive(Clone, Debug, PartialEq, Eq, Hash)]
2917struct BridgeChannelKey {
2918 bridge_id: String,
2919 channel_id: String,
2920 direction: BridgeChannelDirection,
2921}
2922
2923#[derive(Clone)]
2924struct BridgeChannelSpec {
2925 id: String,
2926 const_ident: Ident,
2927 #[allow(dead_code)]
2928 msg_type: Type,
2929 config_index: usize,
2930 plan_node_id: Option<NodeId>,
2931 culist_index: Option<usize>,
2932 monitor_index: Option<usize>,
2933}
2934
2935#[derive(Clone)]
2936struct BridgeSpec {
2937 id: String,
2938 type_path: Type,
2939 config_index: usize,
2940 tuple_index: usize,
2941 monitor_index: Option<usize>,
2942 rx_channels: Vec<BridgeChannelSpec>,
2943 tx_channels: Vec<BridgeChannelSpec>,
2944}
2945
2946#[derive(Clone)]
2947struct ExecutionEntity {
2948 kind: ExecutionEntityKind,
2949}
2950
2951#[derive(Clone)]
2952enum ExecutionEntityKind {
2953 Task {
2954 task_index: usize,
2955 },
2956 BridgeRx {
2957 bridge_index: usize,
2958 channel_index: usize,
2959 },
2960 BridgeTx {
2961 bridge_index: usize,
2962 channel_index: usize,
2963 },
2964}