1use proc_macro::TokenStream;
2use quote::{format_ident, quote};
3use std::collections::HashMap;
4use std::fs::read_to_string;
5use syn::meta::parser;
6use syn::Fields::{Named, Unnamed};
7use syn::{
8 parse_macro_input, parse_quote, parse_str, Field, Fields, ItemImpl, ItemStruct, LitStr, Type,
9 TypeTuple,
10};
11
12#[cfg(feature = "macro_debug")]
13use crate::format::rustfmt_generated_code;
14use crate::utils::{config_id_to_bridge_const, config_id_to_enum, config_id_to_struct_member};
15use cu29_runtime::config::CuConfig;
16use cu29_runtime::config::{
17 read_configuration, BridgeChannelConfigRepresentation, CuGraph, Flavor, Node, NodeId,
18};
19use cu29_runtime::curuntime::{
20 compute_runtime_plan, find_task_type_for_id, CuExecutionLoop, CuExecutionStep, CuExecutionUnit,
21 CuTaskType,
22};
23use cu29_traits::{CuError, CuResult};
24use proc_macro2::{Ident, Span};
25
26mod format;
27mod utils;
28
29const DEFAULT_CLNB: usize = 10;
31
32#[inline]
33fn int2sliceindex(i: u32) -> syn::Index {
34 syn::Index::from(i as usize)
35}
36
37#[inline(always)]
38fn return_error(msg: String) -> TokenStream {
39 syn::Error::new(Span::call_site(), msg)
40 .to_compile_error()
41 .into()
42}
43
44#[proc_macro]
48pub fn gen_cumsgs(config_path_lit: TokenStream) -> TokenStream {
49 #[cfg(feature = "std")]
50 let std = true;
51
52 #[cfg(not(feature = "std"))]
53 let std = false;
54
55 let config = parse_macro_input!(config_path_lit as LitStr).value();
56 if !std::path::Path::new(&config_full_path(&config)).exists() {
57 return return_error(format!(
58 "The configuration file `{config}` does not exist. Please provide a valid path."
59 ));
60 }
61 #[cfg(feature = "macro_debug")]
62 eprintln!("[gen culist support with {config:?}]");
63 let cuconfig = match read_config(&config) {
64 Ok(cuconfig) => cuconfig,
65 Err(e) => return return_error(e.to_string()),
66 };
67 let graph = cuconfig
68 .get_graph(None) .expect("Could not find the specified mission for gen_cumsgs");
70 let task_specs = CuTaskSpecSet::from_graph(graph);
71 let channel_usage = collect_bridge_channel_usage(graph);
72 let mut bridge_specs = build_bridge_specs(&cuconfig, graph, &channel_usage);
73 let (culist_plan, exec_entities, plan_to_original) =
74 match build_execution_plan(graph, &task_specs, &mut bridge_specs) {
75 Ok(plan) => plan,
76 Err(e) => return return_error(format!("Could not compute copperlist plan: {e}")),
77 };
78 let task_member_names = collect_task_member_names(graph);
79 let (culist_order, node_output_positions) = collect_culist_metadata(
80 &culist_plan,
81 &exec_entities,
82 &mut bridge_specs,
83 &plan_to_original,
84 );
85
86 #[cfg(feature = "macro_debug")]
87 eprintln!(
88 "[The CuStampedDataSet matching tasks ids are {:?}]",
89 culist_order
90 );
91
92 let support = gen_culist_support(
93 &culist_plan,
94 &culist_order,
95 &node_output_positions,
96 &task_member_names,
97 );
98
99 let extra_imports = if !std {
100 quote! {
101 use core::fmt::Debug;
102 use core::fmt::Formatter;
103 use core::fmt::Result as FmtResult;
104 use alloc::vec;
105 }
106 } else {
107 quote! {
108 use std::fmt::Debug;
109 use std::fmt::Formatter;
110 use std::fmt::Result as FmtResult;
111 }
112 };
113
114 let with_uses = quote! {
115 mod cumsgs {
116 use cu29::bincode::Encode;
117 use cu29::bincode::enc::Encoder;
118 use cu29::bincode::error::EncodeError;
119 use cu29::bincode::Decode;
120 use cu29::bincode::de::Decoder;
121 use cu29::bincode::error::DecodeError;
122 use cu29::copperlist::CopperList;
123 use cu29::prelude::CuStampedData;
124 use cu29::prelude::ErasedCuStampedData;
125 use cu29::prelude::ErasedCuStampedDataSet;
126 use cu29::prelude::MatchingTasks;
127 use cu29::prelude::Serialize;
128 use cu29::prelude::CuMsg;
129 use cu29::prelude::CuMsgMetadata;
130 use cu29::prelude::CuListZeroedInit;
131 use cu29::prelude::CuCompactString;
132 #extra_imports
133 #support
134 }
135 use cumsgs::CuStampedDataSet;
136 type CuMsgs=CuStampedDataSet;
137 };
138 with_uses.into()
139}
140
141fn gen_culist_support(
143 runtime_plan: &CuExecutionLoop,
144 culist_indices_in_plan_order: &[usize],
145 node_output_positions: &HashMap<NodeId, usize>,
146 task_member_names: &[(NodeId, String)],
147) -> proc_macro2::TokenStream {
148 #[cfg(feature = "macro_debug")]
149 eprintln!("[Extract msgs types]");
150 let all_msgs_types_in_culist_order = extract_msg_types(runtime_plan);
151
152 let culist_size = all_msgs_types_in_culist_order.len();
153 let task_indices: Vec<_> = culist_indices_in_plan_order
154 .iter()
155 .map(|i| syn::Index::from(*i))
156 .collect();
157
158 #[cfg(feature = "macro_debug")]
159 eprintln!("[build the copperlist struct]");
160 let msgs_types_tuple: TypeTuple = build_culist_tuple(&all_msgs_types_in_culist_order);
161
162 #[cfg(feature = "macro_debug")]
163 eprintln!("[build the copperlist tuple bincode support]");
164 let msgs_types_tuple_encode = build_culist_tuple_encode(&all_msgs_types_in_culist_order);
165 let msgs_types_tuple_decode = build_culist_tuple_decode(&all_msgs_types_in_culist_order);
166
167 #[cfg(feature = "macro_debug")]
168 eprintln!("[build the copperlist tuple debug support]");
169 let msgs_types_tuple_debug = build_culist_tuple_debug(&all_msgs_types_in_culist_order);
170
171 #[cfg(feature = "macro_debug")]
172 eprintln!("[build the copperlist tuple serialize support]");
173 let msgs_types_tuple_serialize = build_culist_tuple_serialize(&all_msgs_types_in_culist_order);
174
175 #[cfg(feature = "macro_debug")]
176 eprintln!("[build the default tuple support]");
177 let msgs_types_tuple_default = build_culist_tuple_default(&all_msgs_types_in_culist_order);
178
179 #[cfg(feature = "macro_debug")]
180 eprintln!("[build erasedcumsgs]");
181
182 let erasedmsg_trait_impl = build_culist_erasedcumsgs(&all_msgs_types_in_culist_order);
183
184 let collect_metadata_function = quote! {
185 pub fn collect_metadata<'a>(culist: &'a CuList) -> [&'a CuMsgMetadata; #culist_size] {
186 [#( &culist.msgs.0.#task_indices.metadata, )*]
187 }
188 };
189
190 let task_name_literals: Vec<String> = task_member_names
191 .iter()
192 .map(|(_, name)| name.clone())
193 .collect();
194
195 let methods = task_member_names.iter().map(|(node_id, name)| {
196 let output_position = node_output_positions
197 .get(node_id)
198 .unwrap_or_else(|| panic!("Task {name} (id: {node_id}) not found in execution order"));
199
200 let fn_name = format_ident!("get_{}_output", name);
201 let payload_type = all_msgs_types_in_culist_order[*output_position].clone();
202 let index = syn::Index::from(*output_position);
203 quote! {
204 #[allow(dead_code)]
205 pub fn #fn_name(&self) -> &CuMsg<#payload_type> {
206 &self.0.#index
207 }
208 }
209 });
210
211 quote! {
213 #collect_metadata_function
214
215 pub struct CuStampedDataSet(pub #msgs_types_tuple);
216
217 pub type CuList = CopperList<CuStampedDataSet>;
218
219 impl CuStampedDataSet {
220 #(#methods)*
221
222 #[allow(dead_code)]
223 fn get_tuple(&self) -> &#msgs_types_tuple {
224 &self.0
225 }
226
227 #[allow(dead_code)]
228 fn get_tuple_mut(&mut self) -> &mut #msgs_types_tuple {
229 &mut self.0
230 }
231 }
232
233 impl MatchingTasks for CuStampedDataSet {
234 #[allow(dead_code)]
235 fn get_all_task_ids() -> &'static [&'static str] {
236 &[#(#task_name_literals),*]
237 }
238 }
239
240 #msgs_types_tuple_encode
242 #msgs_types_tuple_decode
243
244 #msgs_types_tuple_debug
246
247 #msgs_types_tuple_serialize
249
250 #msgs_types_tuple_default
252
253 #erasedmsg_trait_impl
255
256 impl CuListZeroedInit for CuStampedDataSet {
257 fn init_zeroed(&mut self) {
258 #(self.0.#task_indices.metadata.status_txt = CuCompactString::default();)*
259 }
260 }
261 }
262}
263
264fn gen_sim_support(
265 runtime_plan: &CuExecutionLoop,
266 exec_entities: &[ExecutionEntity],
267) -> proc_macro2::TokenStream {
268 #[cfg(feature = "macro_debug")]
269 eprintln!("[Sim: Build SimEnum]");
270 let plan_enum: Vec<proc_macro2::TokenStream> = runtime_plan
271 .steps
272 .iter()
273 .filter_map(|unit| match unit {
274 CuExecutionUnit::Step(step) => {
275 if !matches!(
276 exec_entities[step.node_id as usize].kind,
277 ExecutionEntityKind::Task { .. }
278 ) {
279 return None;
280 }
281 let enum_entry_name = config_id_to_enum(step.node.get_id().as_str());
282 let enum_ident = Ident::new(&enum_entry_name, Span::call_site());
283 let inputs: Vec<Type> = step
284 .input_msg_indices_types
285 .iter()
286 .map(|(_, t)| parse_str::<Type>(format!("CuMsg<{t}>").as_str()).unwrap())
287 .collect();
288 let output: Option<Type> = step
289 .output_msg_index_type
290 .as_ref()
291 .map(|(_, t)| parse_str::<Type>(format!("CuMsg<{t}>").as_str()).unwrap());
292 let no_output = parse_str::<Type>("CuMsg<()>").unwrap();
293 let output = output.as_ref().unwrap_or(&no_output);
294
295 let inputs_type = if inputs.is_empty() {
296 quote! { () }
297 } else if inputs.len() == 1 {
298 let input = inputs.first().unwrap();
299 quote! { &'a #input }
300 } else {
301 quote! { &'a (#(&'a #inputs),*) }
302 };
303
304 Some(quote! {
305 #enum_ident(CuTaskCallbackState<#inputs_type, &'a mut #output>)
306 })
307 }
308 CuExecutionUnit::Loop(_) => {
309 todo!("Needs to be implemented")
310 }
311 })
312 .collect();
313 quote! {
314 #[allow(dead_code)]
316 pub enum SimStep<'a> {
317 #(#plan_enum),*
318 }
319 }
320}
321
322#[proc_macro_attribute]
326pub fn copper_runtime(args: TokenStream, input: TokenStream) -> TokenStream {
327 #[cfg(feature = "macro_debug")]
328 eprintln!("[entry]");
329 let mut application_struct = parse_macro_input!(input as ItemStruct);
330
331 let application_name = &application_struct.ident;
332 let builder_name = format_ident!("{}Builder", application_name);
333
334 let mut config_file: Option<LitStr> = None;
335 let mut sim_mode = false;
336
337 #[cfg(feature = "std")]
338 let std = true;
339
340 #[cfg(not(feature = "std"))]
341 let std = false;
342
343 let attribute_config_parser = parser(|meta| {
345 if meta.path.is_ident("config") {
346 config_file = Some(meta.value()?.parse()?);
347 Ok(())
348 } else if meta.path.is_ident("sim_mode") {
349 if meta.input.peek(syn::Token![=]) {
351 meta.input.parse::<syn::Token![=]>()?;
352 let value: syn::LitBool = meta.input.parse()?;
353 sim_mode = value.value();
354 Ok(())
355 } else {
356 sim_mode = true;
358 Ok(())
359 }
360 } else {
361 Err(meta.error("unsupported property"))
362 }
363 });
364
365 #[cfg(feature = "macro_debug")]
366 eprintln!("[parse]");
367 parse_macro_input!(args with attribute_config_parser);
369
370 let config_file = match config_file {
381 Some(file) => file.value(),
382 None => {
383 return return_error(
384 "Expected config file attribute like #[CopperRuntime(config = \"path\")]"
385 .to_string(),
386 )
387 }
388 };
389
390 if !std::path::Path::new(&config_full_path(&config_file)).exists() {
391 return return_error(format!(
392 "The configuration file `{config_file}` does not exist. Please provide a valid path."
393 ));
394 }
395
396 let copper_config = match read_config(&config_file) {
397 Ok(cuconfig) => cuconfig,
398 Err(e) => return return_error(e.to_string()),
399 };
400 let copper_config_content = match read_to_string(config_full_path(config_file.as_str())) {
401 Ok(ok) => ok,
402 Err(e) => return return_error(format!("Could not read the config file (should not happen because we just succeeded just before). {e}"))
403 };
404
405 #[cfg(feature = "macro_debug")]
406 eprintln!("[build monitor type]");
407 let monitor_type = if let Some(monitor_config) = copper_config.get_monitor_config() {
408 let monitor_type = parse_str::<Type>(monitor_config.get_type())
409 .expect("Could not transform the monitor type name into a Rust type.");
410 quote! { #monitor_type }
411 } else {
412 quote! { NoMonitor }
413 };
414
415 #[cfg(feature = "macro_debug")]
417 eprintln!("[build runtime field]");
418 let runtime_field: Field = if sim_mode {
420 parse_quote! {
421 copper_runtime: cu29::curuntime::CuRuntime<CuSimTasks, CuBridges, CuStampedDataSet, #monitor_type, #DEFAULT_CLNB>
422 }
423 } else {
424 parse_quote! {
425 copper_runtime: cu29::curuntime::CuRuntime<CuTasks, CuBridges, CuStampedDataSet, #monitor_type, #DEFAULT_CLNB>
426 }
427 };
428
429 #[cfg(feature = "macro_debug")]
430 eprintln!("[match struct anonymity]");
431 match &mut application_struct.fields {
432 Named(fields_named) => {
433 fields_named.named.push(runtime_field);
434 }
435 Unnamed(fields_unnamed) => {
436 fields_unnamed.unnamed.push(runtime_field);
437 }
438 Fields::Unit => {
439 panic!("This struct is a unit struct, it should have named or unnamed fields. use struct Something {{}} and not struct Something;")
440 }
441 };
442
443 let all_missions = copper_config.graphs.get_all_missions_graphs();
444 let mut all_missions_tokens = Vec::<proc_macro2::TokenStream>::new();
445 for (mission, graph) in &all_missions {
446 let mission_mod = parse_str::<Ident>(mission.as_str())
447 .expect("Could not make an identifier of the mission name");
448
449 #[cfg(feature = "macro_debug")]
450 eprintln!("[extract tasks ids & types]");
451 let task_specs = CuTaskSpecSet::from_graph(graph);
452
453 let culist_channel_usage = collect_bridge_channel_usage(graph);
454 let mut culist_bridge_specs =
455 build_bridge_specs(&copper_config, graph, &culist_channel_usage);
456 let (culist_plan, culist_exec_entities, culist_plan_to_original) =
457 match build_execution_plan(graph, &task_specs, &mut culist_bridge_specs) {
458 Ok(plan) => plan,
459 Err(e) => return return_error(format!("Could not compute copperlist plan: {e}")),
460 };
461 let task_member_names = collect_task_member_names(graph);
462 let (culist_call_order, node_output_positions) = collect_culist_metadata(
463 &culist_plan,
464 &culist_exec_entities,
465 &mut culist_bridge_specs,
466 &culist_plan_to_original,
467 );
468
469 #[cfg(feature = "macro_debug")]
470 {
471 eprintln!("[runtime plan for mission {mission}]");
472 eprintln!("{culist_plan:?}");
473 }
474
475 let culist_support: proc_macro2::TokenStream = gen_culist_support(
476 &culist_plan,
477 &culist_call_order,
478 &node_output_positions,
479 &task_member_names,
480 );
481
482 let ids = build_monitored_ids(&task_specs.ids, &mut culist_bridge_specs);
483
484 let bridge_types: Vec<Type> = culist_bridge_specs
485 .iter()
486 .map(|spec| spec.type_path.clone())
487 .collect();
488 let bridges_type_tokens: proc_macro2::TokenStream = if bridge_types.is_empty() {
489 quote! { () }
490 } else {
491 let tuple: TypeTuple = parse_quote! { (#(#bridge_types),*,) };
492 quote! { #tuple }
493 };
494
495 let bridge_binding_idents: Vec<Ident> = culist_bridge_specs
496 .iter()
497 .enumerate()
498 .map(|(idx, _)| format_ident!("bridge_{idx}"))
499 .collect();
500
501 let bridge_init_statements: Vec<proc_macro2::TokenStream> = culist_bridge_specs
502 .iter()
503 .enumerate()
504 .map(|(idx, spec)| {
505 let binding_ident = &bridge_binding_idents[idx];
506 let bridge_type = &spec.type_path;
507 let bridge_name = spec.id.clone();
508 let config_index = syn::Index::from(spec.config_index);
509 let tx_configs: Vec<proc_macro2::TokenStream> = spec
510 .tx_channels
511 .iter()
512 .map(|channel| {
513 let const_ident = &channel.const_ident;
514 let channel_name = channel.id.clone();
515 let channel_config_index = syn::Index::from(channel.config_index);
516 quote! {
517 {
518 let (channel_route, channel_config) = match &bridge_cfg.channels[#channel_config_index] {
519 cu29::config::BridgeChannelConfigRepresentation::Tx { route, config, .. } => {
520 (route.clone(), config.clone())
521 }
522 _ => panic!(
523 "Bridge '{}' channel '{}' expected to be Tx",
524 #bridge_name,
525 #channel_name
526 ),
527 };
528 cu29::cubridge::BridgeChannelConfig::from_static(
529 &<#bridge_type as cu29::cubridge::CuBridge>::Tx::#const_ident,
530 channel_route,
531 channel_config,
532 )
533 }
534 }
535 })
536 .collect();
537 let rx_configs: Vec<proc_macro2::TokenStream> = spec
538 .rx_channels
539 .iter()
540 .map(|channel| {
541 let const_ident = &channel.const_ident;
542 let channel_name = channel.id.clone();
543 let channel_config_index = syn::Index::from(channel.config_index);
544 quote! {
545 {
546 let (channel_route, channel_config) = match &bridge_cfg.channels[#channel_config_index] {
547 cu29::config::BridgeChannelConfigRepresentation::Rx { route, config, .. } => {
548 (route.clone(), config.clone())
549 }
550 _ => panic!(
551 "Bridge '{}' channel '{}' expected to be Rx",
552 #bridge_name,
553 #channel_name
554 ),
555 };
556 cu29::cubridge::BridgeChannelConfig::from_static(
557 &<#bridge_type as cu29::cubridge::CuBridge>::Rx::#const_ident,
558 channel_route,
559 channel_config,
560 )
561 }
562 }
563 })
564 .collect();
565 quote! {
566 let #binding_ident = {
567 let bridge_cfg = config
568 .bridges
569 .get(#config_index)
570 .unwrap_or_else(|| panic!("Bridge '{}' missing from configuration", #bridge_name));
571 let tx_channels: &[cu29::cubridge::BridgeChannelConfig<
572 <<#bridge_type as cu29::cubridge::CuBridge>::Tx as cu29::cubridge::BridgeChannelSet>::Id,
573 >] = &[#(#tx_configs),*];
574 let rx_channels: &[cu29::cubridge::BridgeChannelConfig<
575 <<#bridge_type as cu29::cubridge::CuBridge>::Rx as cu29::cubridge::BridgeChannelSet>::Id,
576 >] = &[#(#rx_configs),*];
577 <#bridge_type as cu29::cubridge::CuBridge>::new(
578 bridge_cfg.config.as_ref(),
579 tx_channels,
580 rx_channels,
581 )?
582 };
583 }
584 })
585 .collect();
586
587 let bridges_instanciator = if culist_bridge_specs.is_empty() {
588 quote! {
589 pub fn bridges_instanciator(_config: &CuConfig) -> CuResult<CuBridges> {
590 Ok(())
591 }
592 }
593 } else {
594 let bridge_bindings = bridge_binding_idents.clone();
595 quote! {
596 pub fn bridges_instanciator(config: &CuConfig) -> CuResult<CuBridges> {
597 #(#bridge_init_statements)*
598 Ok((#(#bridge_bindings),*,))
599 }
600 }
601 };
602
603 let all_sim_tasks_types: Vec<Type> = task_specs.ids
604 .iter()
605 .zip(&task_specs.cutypes)
606 .zip(&task_specs.sim_task_types)
607 .zip(&task_specs.background_flags)
608 .zip(&task_specs.run_in_sim_flags)
609 .map(|((((task_id, task_type), sim_type), background), run_in_sim)| {
610 match task_type {
611 CuTaskType::Source => {
612 if *background {
613 panic!("CuSrcTask {task_id} cannot be a background task, it should be a regular task.");
614 }
615 if *run_in_sim {
616 sim_type.clone()
617 } else {
618 let msg_type = graph
619 .get_node_output_msg_type(task_id.as_str())
620 .unwrap_or_else(|| panic!("CuSrcTask {task_id} should have an outgoing connection with a valid output msg type"));
621 let sim_task_name = format!("CuSimSrcTask<{msg_type}>");
622 parse_str(sim_task_name.as_str()).unwrap_or_else(|_| panic!("Could not build the placeholder for simulation: {sim_task_name}"))
623 }
624 }
625 CuTaskType::Regular => {
626 sim_type.clone()
629 },
630 CuTaskType::Sink => {
631 if *background {
632 panic!("CuSinkTask {task_id} cannot be a background task, it should be a regular task.");
633 }
634
635 if *run_in_sim {
636 sim_type.clone()
638 }
639 else {
640 let msg_types = graph
642 .get_node_input_msg_types(task_id.as_str())
643 .unwrap_or_else(|| panic!("CuSinkTask {task_id} should have an incoming connection with a valid input msg type"));
644 let msg_type = if msg_types.len() == 1 {
645 format!("({},)", msg_types[0])
646 } else {
647 format!("({})", msg_types.join(", "))
648 };
649 let sim_task_name = format!("CuSimSinkTask<{msg_type}>");
650 parse_str(sim_task_name.as_str()).unwrap_or_else(|_| panic!("Could not build the placeholder for simulation: {sim_task_name}"))
651 }
652 }
653 }
654 })
655 .collect();
656
657 #[cfg(feature = "macro_debug")]
658 eprintln!("[build task tuples]");
659
660 let task_types = &task_specs.task_types;
661 let task_types_tuple: TypeTuple = if task_types.is_empty() {
664 parse_quote! { () }
665 } else {
666 parse_quote! { (#(#task_types),*,) }
667 };
668
669 let task_types_tuple_sim: TypeTuple = if all_sim_tasks_types.is_empty() {
670 parse_quote! { () }
671 } else {
672 parse_quote! { (#(#all_sim_tasks_types),*,) }
673 };
674
675 #[cfg(feature = "macro_debug")]
676 eprintln!("[gen instances]");
677 let task_sim_instances_init_code = all_sim_tasks_types.iter().enumerate().map(|(index, ty)| {
679 let additional_error_info = format!(
680 "Failed to get create instance for {}, instance index {}.",
681 task_specs.type_names[index], index
682 );
683
684 quote! {
685 <#ty>::new(all_instances_configs[#index]).map_err(|e| e.add_cause(#additional_error_info))?
686 }
687 }).collect::<Vec<_>>();
688
689 let task_instances_init_code = task_specs.instantiation_types.iter().zip(&task_specs.background_flags).enumerate().map(|(index, (task_type, background))| {
690 let additional_error_info = format!(
691 "Failed to get create instance for {}, instance index {}.",
692 task_specs.type_names[index], index
693 );
694 if *background {
695 quote! {
696 #task_type::new(all_instances_configs[#index], threadpool.clone()).map_err(|e| e.add_cause(#additional_error_info))?
697 }
698 } else {
699 quote! {
700 #task_type::new(all_instances_configs[#index]).map_err(|e| e.add_cause(#additional_error_info))?
701 }
702 }
703 }).collect::<Vec<_>>();
704
705 let (
708 task_restore_code,
709 task_start_calls,
710 task_stop_calls,
711 task_preprocess_calls,
712 task_postprocess_calls,
713 ): (Vec<_>, Vec<_>, Vec<_>, Vec<_>, Vec<_>) = itertools::multiunzip(
714 (0..task_specs.task_types.len())
715 .map(|index| {
716 let task_index = int2sliceindex(index as u32);
717 let task_tuple_index = syn::Index::from(index);
718 let task_enum_name = config_id_to_enum(&task_specs.ids[index]);
719 let enum_name = Ident::new(&task_enum_name, Span::call_site());
720 (
721 quote! {
723 tasks.#task_tuple_index.thaw(&mut decoder).map_err(|e| CuError::new_with_cause("Failed to thaw", e))?
724 },
725 { let monitoring_action = quote! {
727 let decision = self.copper_runtime.monitor.process_error(#index, CuTaskState::Start, &error);
728 match decision {
729 Decision::Abort => {
730 debug!("Start: ABORT decision from monitoring. Task '{}' errored out \
731 during start. Aborting all the other starts.", #mission_mod::TASKS_IDS[#index]);
732 return Ok(());
733
734 }
735 Decision::Ignore => {
736 debug!("Start: IGNORE decision from monitoring. Task '{}' errored out \
737 during start. The runtime will continue.", #mission_mod::TASKS_IDS[#index]);
738 }
739 Decision::Shutdown => {
740 debug!("Start: SHUTDOWN decision from monitoring. Task '{}' errored out \
741 during start. The runtime cannot continue.", #mission_mod::TASKS_IDS[#index]);
742 return Err(CuError::new_with_cause("Task errored out during start.", error));
743 }
744 }
745 };
746
747 let call_sim_callback = if sim_mode {
748 quote! {
749 let ovr = sim_callback(SimStep::#enum_name(CuTaskCallbackState::Start));
751
752 let doit = if let SimOverride::Errored(reason) = ovr {
753 let error: CuError = reason.into();
754 #monitoring_action
755 false
756 }
757 else {
758 ovr == SimOverride::ExecuteByRuntime
759 };
760 }
761 } else {
762 quote! {
763 let doit = true; }
765 };
766
767
768 quote! {
769 #call_sim_callback
770 if doit {
771 let task = &mut self.copper_runtime.tasks.#task_index;
772 if let Err(error) = task.start(&self.copper_runtime.clock) {
773 #monitoring_action
774 }
775 }
776 }
777 },
778 { let monitoring_action = quote! {
780 let decision = self.copper_runtime.monitor.process_error(#index, CuTaskState::Stop, &error);
781 match decision {
782 Decision::Abort => {
783 debug!("Stop: ABORT decision from monitoring. Task '{}' errored out \
784 during stop. Aborting all the other starts.", #mission_mod::TASKS_IDS[#index]);
785 return Ok(());
786
787 }
788 Decision::Ignore => {
789 debug!("Stop: IGNORE decision from monitoring. Task '{}' errored out \
790 during stop. The runtime will continue.", #mission_mod::TASKS_IDS[#index]);
791 }
792 Decision::Shutdown => {
793 debug!("Stop: SHUTDOWN decision from monitoring. Task '{}' errored out \
794 during stop. The runtime cannot continue.", #mission_mod::TASKS_IDS[#index]);
795 return Err(CuError::new_with_cause("Task errored out during stop.", error));
796 }
797 }
798 };
799 let call_sim_callback = if sim_mode {
800 quote! {
801 let ovr = sim_callback(SimStep::#enum_name(CuTaskCallbackState::Stop));
803
804 let doit = if let SimOverride::Errored(reason) = ovr {
805 let error: CuError = reason.into();
806 #monitoring_action
807 false
808 }
809 else {
810 ovr == SimOverride::ExecuteByRuntime
811 };
812 }
813 } else {
814 quote! {
815 let doit = true; }
817 };
818 quote! {
819 #call_sim_callback
820 if doit {
821 let task = &mut self.copper_runtime.tasks.#task_index;
822 if let Err(error) = task.stop(&self.copper_runtime.clock) {
823 #monitoring_action
824 }
825 }
826 }
827 },
828 { let monitoring_action = quote! {
830 let decision = monitor.process_error(#index, CuTaskState::Preprocess, &error);
831 match decision {
832 Decision::Abort => {
833 debug!("Preprocess: ABORT decision from monitoring. Task '{}' errored out \
834 during preprocess. Aborting all the other starts.", #mission_mod::TASKS_IDS[#index]);
835 return Ok(());
836
837 }
838 Decision::Ignore => {
839 debug!("Preprocess: IGNORE decision from monitoring. Task '{}' errored out \
840 during preprocess. The runtime will continue.", #mission_mod::TASKS_IDS[#index]);
841 }
842 Decision::Shutdown => {
843 debug!("Preprocess: SHUTDOWN decision from monitoring. Task '{}' errored out \
844 during preprocess. The runtime cannot continue.", #mission_mod::TASKS_IDS[#index]);
845 return Err(CuError::new_with_cause("Task errored out during preprocess.", error));
846 }
847 }
848 };
849 let call_sim_callback = if sim_mode {
850 quote! {
851 let ovr = sim_callback(SimStep::#enum_name(CuTaskCallbackState::Preprocess));
853
854 let doit = if let SimOverride::Errored(reason) = ovr {
855 let error: CuError = reason.into();
856 #monitoring_action
857 false
858 } else {
859 ovr == SimOverride::ExecuteByRuntime
860 };
861 }
862 } else {
863 quote! {
864 let doit = true; }
866 };
867 quote! {
868 #call_sim_callback
869 if doit {
870 if let Err(error) = tasks.#task_index.preprocess(clock) {
871 #monitoring_action
872 }
873 }
874 }
875 },
876 { let monitoring_action = quote! {
878 let decision = monitor.process_error(#index, CuTaskState::Postprocess, &error);
879 match decision {
880 Decision::Abort => {
881 debug!("Postprocess: ABORT decision from monitoring. Task '{}' errored out \
882 during postprocess. Aborting all the other starts.", #mission_mod::TASKS_IDS[#index]);
883 return Ok(());
884
885 }
886 Decision::Ignore => {
887 debug!("Postprocess: IGNORE decision from monitoring. Task '{}' errored out \
888 during postprocess. The runtime will continue.", #mission_mod::TASKS_IDS[#index]);
889 }
890 Decision::Shutdown => {
891 debug!("Postprocess: SHUTDOWN decision from monitoring. Task '{}' errored out \
892 during postprocess. The runtime cannot continue.", #mission_mod::TASKS_IDS[#index]);
893 return Err(CuError::new_with_cause("Task errored out during postprocess.", error));
894 }
895 }
896 };
897 let call_sim_callback = if sim_mode {
898 quote! {
899 let ovr = sim_callback(SimStep::#enum_name(CuTaskCallbackState::Postprocess));
901
902 let doit = if let SimOverride::Errored(reason) = ovr {
903 let error: CuError = reason.into();
904 #monitoring_action
905 false
906 } else {
907 ovr == SimOverride::ExecuteByRuntime
908 };
909 }
910 } else {
911 quote! {
912 let doit = true; }
914 };
915 quote! {
916 #call_sim_callback
917 if doit {
918 if let Err(error) = tasks.#task_index.postprocess(clock) {
919 #monitoring_action
920 }
921 }
922 }
923 }
924 )
925 })
926 );
927
928 let bridge_start_calls: Vec<proc_macro2::TokenStream> = culist_bridge_specs
929 .iter()
930 .map(|spec| {
931 let bridge_index = int2sliceindex(spec.tuple_index as u32);
932 let monitor_index = syn::Index::from(
933 spec.monitor_index
934 .expect("Bridge missing monitor index for start"),
935 );
936 quote! {
937 {
938 let bridge = &mut self.copper_runtime.bridges.#bridge_index;
939 if let Err(error) = bridge.start(&self.copper_runtime.clock) {
940 let decision = self.copper_runtime.monitor.process_error(#monitor_index, CuTaskState::Start, &error);
941 match decision {
942 Decision::Abort => {
943 debug!("Start: ABORT decision from monitoring. Task '{}' errored out during start. Aborting all the other starts.", #mission_mod::TASKS_IDS[#monitor_index]);
944 return Ok(());
945 }
946 Decision::Ignore => {
947 debug!("Start: IGNORE decision from monitoring. Task '{}' errored out during start. The runtime will continue.", #mission_mod::TASKS_IDS[#monitor_index]);
948 }
949 Decision::Shutdown => {
950 debug!("Start: SHUTDOWN decision from monitoring. Task '{}' errored out during start. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
951 return Err(CuError::new_with_cause("Task errored out during start.", error));
952 }
953 }
954 }
955 }
956 }
957 })
958 .collect();
959
960 let bridge_stop_calls: Vec<proc_macro2::TokenStream> = culist_bridge_specs
961 .iter()
962 .map(|spec| {
963 let bridge_index = int2sliceindex(spec.tuple_index as u32);
964 let monitor_index = syn::Index::from(
965 spec.monitor_index
966 .expect("Bridge missing monitor index for stop"),
967 );
968 quote! {
969 {
970 let bridge = &mut self.copper_runtime.bridges.#bridge_index;
971 if let Err(error) = bridge.stop(&self.copper_runtime.clock) {
972 let decision = self.copper_runtime.monitor.process_error(#monitor_index, CuTaskState::Stop, &error);
973 match decision {
974 Decision::Abort => {
975 debug!("Stop: ABORT decision from monitoring. Task '{}' errored out during stop. Aborting all the other stops.", #mission_mod::TASKS_IDS[#monitor_index]);
976 return Ok(());
977 }
978 Decision::Ignore => {
979 debug!("Stop: IGNORE decision from monitoring. Task '{}' errored out during stop. The runtime will continue.", #mission_mod::TASKS_IDS[#monitor_index]);
980 }
981 Decision::Shutdown => {
982 debug!("Stop: SHUTDOWN decision from monitoring. Task '{}' errored out during stop. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
983 return Err(CuError::new_with_cause("Task errored out during stop.", error));
984 }
985 }
986 }
987 }
988 }
989 })
990 .collect();
991
992 let bridge_preprocess_calls: Vec<proc_macro2::TokenStream> = culist_bridge_specs
993 .iter()
994 .map(|spec| {
995 let bridge_index = int2sliceindex(spec.tuple_index as u32);
996 let monitor_index = syn::Index::from(
997 spec.monitor_index
998 .expect("Bridge missing monitor index for preprocess"),
999 );
1000 quote! {
1001 {
1002 let bridge = &mut bridges.#bridge_index;
1003 if let Err(error) = bridge.preprocess(clock) {
1004 let decision = monitor.process_error(#monitor_index, CuTaskState::Preprocess, &error);
1005 match decision {
1006 Decision::Abort => {
1007 debug!("Preprocess: ABORT decision from monitoring. Task '{}' errored out during preprocess. Aborting all the other starts.", #mission_mod::TASKS_IDS[#monitor_index]);
1008 return Ok(());
1009 }
1010 Decision::Ignore => {
1011 debug!("Preprocess: IGNORE decision from monitoring. Task '{}' errored out during preprocess. The runtime will continue.", #mission_mod::TASKS_IDS[#monitor_index]);
1012 }
1013 Decision::Shutdown => {
1014 debug!("Preprocess: SHUTDOWN decision from monitoring. Task '{}' errored out during preprocess. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
1015 return Err(CuError::new_with_cause("Task errored out during preprocess.", error));
1016 }
1017 }
1018 }
1019 }
1020 }
1021 })
1022 .collect();
1023
1024 let bridge_postprocess_calls: Vec<proc_macro2::TokenStream> = culist_bridge_specs
1025 .iter()
1026 .map(|spec| {
1027 let bridge_index = int2sliceindex(spec.tuple_index as u32);
1028 let monitor_index = syn::Index::from(
1029 spec.monitor_index
1030 .expect("Bridge missing monitor index for postprocess"),
1031 );
1032 quote! {
1033 {
1034 let bridge = &mut bridges.#bridge_index;
1035 if let Err(error) = bridge.postprocess(clock) {
1036 let decision = monitor.process_error(#monitor_index, CuTaskState::Postprocess, &error);
1037 match decision {
1038 Decision::Abort => {
1039 debug!("Postprocess: ABORT decision from monitoring. Task '{}' errored out during postprocess. Aborting all the other starts.", #mission_mod::TASKS_IDS[#monitor_index]);
1040 return Ok(());
1041 }
1042 Decision::Ignore => {
1043 debug!("Postprocess: IGNORE decision from monitoring. Task '{}' errored out during postprocess. The runtime will continue.", #mission_mod::TASKS_IDS[#monitor_index]);
1044 }
1045 Decision::Shutdown => {
1046 debug!("Postprocess: SHUTDOWN decision from monitoring. Task '{}' errored out during postprocess. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
1047 return Err(CuError::new_with_cause("Task errored out during postprocess.", error));
1048 }
1049 }
1050 }
1051 }
1052 }
1053 })
1054 .collect();
1055
1056 let mut start_calls = bridge_start_calls;
1057 start_calls.extend(task_start_calls);
1058 let mut stop_calls = task_stop_calls;
1059 stop_calls.extend(bridge_stop_calls);
1060 let mut preprocess_calls = bridge_preprocess_calls;
1061 preprocess_calls.extend(task_preprocess_calls);
1062 let mut postprocess_calls = task_postprocess_calls;
1063 postprocess_calls.extend(bridge_postprocess_calls);
1064
1065 let runtime_plan_code_and_logging: Vec<(
1066 proc_macro2::TokenStream,
1067 proc_macro2::TokenStream,
1068 )> = culist_plan
1069 .steps
1070 .iter()
1071 .map(|unit| match unit {
1072 CuExecutionUnit::Step(step) => {
1073 #[cfg(feature = "macro_debug")]
1074 eprintln!(
1075 "{} -> {} as {:?}. task_id: {} Input={:?}, Output={:?}",
1076 step.node.get_id(),
1077 step.node.get_type(),
1078 step.task_type,
1079 step.node_id,
1080 step.input_msg_indices_types,
1081 step.output_msg_index_type
1082 );
1083
1084 match &culist_exec_entities[step.node_id as usize].kind {
1085 ExecutionEntityKind::Task { task_index } => generate_task_execution_tokens(
1086 step,
1087 *task_index,
1088 &task_specs,
1089 sim_mode,
1090 &mission_mod,
1091 ),
1092 ExecutionEntityKind::BridgeRx {
1093 bridge_index,
1094 channel_index,
1095 } => {
1096 let spec = &culist_bridge_specs[*bridge_index];
1097 generate_bridge_rx_execution_tokens(spec, *channel_index, &mission_mod)
1098 }
1099 ExecutionEntityKind::BridgeTx {
1100 bridge_index,
1101 channel_index,
1102 } => {
1103 let spec = &culist_bridge_specs[*bridge_index];
1104 generate_bridge_tx_execution_tokens(
1105 step,
1106 spec,
1107 *channel_index,
1108 &mission_mod,
1109 )
1110 }
1111 }
1112 }
1113 CuExecutionUnit::Loop(_) => {
1114 panic!("Execution loops are not supported in runtime generation");
1115 }
1116 })
1117 .collect();
1118
1119 let sim_support = if sim_mode {
1120 Some(gen_sim_support(&culist_plan, &culist_exec_entities))
1121 } else {
1122 None
1123 };
1124
1125 let (new, run_one_iteration, start_all_tasks, stop_all_tasks, run) = if sim_mode {
1126 (
1127 quote! {
1128 fn new(clock:RobotClock, unified_logger: Arc<Mutex<L>>, config_override: Option<CuConfig>, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<Self>
1129 },
1130 quote! {
1131 fn run_one_iteration(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()>
1132 },
1133 quote! {
1134 fn start_all_tasks(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()>
1135 },
1136 quote! {
1137 fn stop_all_tasks(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()>
1138 },
1139 quote! {
1140 fn run(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()>
1141 },
1142 )
1143 } else {
1144 (
1145 if std {
1146 quote! {
1147 fn new(clock:RobotClock, unified_logger: Arc<Mutex<L>>, config_override: Option<CuConfig>) -> CuResult<Self>
1148 }
1149 } else {
1150 quote! {
1151 fn new(clock:RobotClock, unified_logger: Arc<Mutex<L>>) -> CuResult<Self>
1153 }
1154 },
1155 quote! {
1156 fn run_one_iteration(&mut self) -> CuResult<()>
1157 },
1158 quote! {
1159 fn start_all_tasks(&mut self) -> CuResult<()>
1160 },
1161 quote! {
1162 fn stop_all_tasks(&mut self) -> CuResult<()>
1163 },
1164 quote! {
1165 fn run(&mut self) -> CuResult<()>
1166 },
1167 )
1168 };
1169
1170 let sim_callback_arg = if sim_mode {
1171 Some(quote!(sim_callback))
1172 } else {
1173 None
1174 };
1175
1176 let app_trait = if sim_mode {
1177 quote!(CuSimApplication)
1178 } else {
1179 quote!(CuApplication)
1180 };
1181
1182 let sim_callback_on_new_calls = task_specs.ids.iter().enumerate().map(|(i, id)| {
1183 let enum_name = config_id_to_enum(id);
1184 let enum_ident = Ident::new(&enum_name, Span::call_site());
1185 quote! {
1186 sim_callback(SimStep::#enum_ident(CuTaskCallbackState::New(all_instances_configs[#i].cloned())));
1188 }
1189 });
1190
1191 let sim_callback_on_new = if sim_mode {
1192 Some(quote! {
1193 let graph = config.get_graph(Some(#mission)).expect("Could not find the mission #mission");
1194 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
1195 .get_all_nodes()
1196 .iter()
1197 .map(|(_, node)| node.get_instance_config())
1198 .collect();
1199 #(#sim_callback_on_new_calls)*
1200 })
1201 } else {
1202 None
1203 };
1204
1205 let (runtime_plan_code, preprocess_logging_calls): (Vec<_>, Vec<_>) =
1206 itertools::multiunzip(runtime_plan_code_and_logging);
1207
1208 let config_load_stmt = if std {
1209 quote! {
1210 let config = if let Some(overridden_config) = config_override {
1211 debug!("CuConfig: Overridden programmatically: {}", overridden_config.serialize_ron());
1212 overridden_config
1213 } else if ::std::path::Path::new(config_filename).exists() {
1214 debug!("CuConfig: Reading configuration from file: {}", config_filename);
1215 cu29::config::read_configuration(config_filename)?
1216 } else {
1217 let original_config = <Self as #app_trait<S, L>>::get_original_config();
1218 debug!("CuConfig: Using the original configuration the project was compiled with: {}", &original_config);
1219 cu29::config::read_configuration_str(original_config, None)?
1220 };
1221 }
1222 } else {
1223 quote! {
1224 let original_config = <Self as #app_trait<S, L>>::get_original_config();
1226 debug!("CuConfig: Using the original configuration the project was compiled with: {}", &original_config);
1227 let config = cu29::config::read_configuration_str(original_config, None)?;
1228 }
1229 };
1230
1231 let kill_handler = if std {
1232 Some(quote! {
1233 ctrlc::set_handler(move || {
1234 STOP_FLAG.store(true, Ordering::SeqCst);
1235 }).expect("Error setting Ctrl-C handler");
1236 })
1237 } else {
1238 None
1239 };
1240
1241 let run_loop = if std {
1242 quote! {
1243 loop {
1244 let iter_start = self.copper_runtime.clock.now();
1245 let result = <Self as #app_trait<S, L>>::run_one_iteration(self, #sim_callback_arg);
1246
1247 if let Some(rate) = self.copper_runtime.runtime_config.rate_target_hz {
1248 let period: CuDuration = (1_000_000_000u64 / rate).into();
1249 let elapsed = self.copper_runtime.clock.now() - iter_start;
1250 if elapsed < period {
1251 std::thread::sleep(std::time::Duration::from_nanos(period.as_nanos() - elapsed.as_nanos()));
1252 }
1253 }
1254
1255 if STOP_FLAG.load(Ordering::SeqCst) || result.is_err() {
1256 break result;
1257 }
1258 }
1259 }
1260 } else {
1261 quote! {
1262 loop {
1263 let iter_start = self.copper_runtime.clock.now();
1264 let result = <Self as #app_trait<S, L>>::run_one_iteration(self, #sim_callback_arg);
1265 if let Some(rate) = self.copper_runtime.runtime_config.rate_target_hz {
1266 let period: CuDuration = (1_000_000_000u64 / rate).into();
1267 let elapsed = self.copper_runtime.clock.now() - iter_start;
1268 if elapsed < period {
1269 busy_wait_for(period - elapsed);
1270 }
1271 }
1272
1273 if STOP_FLAG.load(Ordering::SeqCst) || result.is_err() {
1274 break result;
1275 }
1276 }
1277 }
1278 };
1279
1280 #[cfg(feature = "macro_debug")]
1281 eprintln!("[build the run methods]");
1282 let run_methods = quote! {
1283
1284 #run_one_iteration {
1285
1286 let runtime = &mut self.copper_runtime;
1288 let clock = &runtime.clock;
1289 let monitor = &mut runtime.monitor;
1290 let tasks = &mut runtime.tasks;
1291 let bridges = &mut runtime.bridges;
1292 let cl_manager = &mut runtime.copperlists_manager;
1293 let kf_manager = &mut runtime.keyframes_manager;
1294
1295 #(#preprocess_calls)*
1297
1298 let culist = cl_manager.inner.create().expect("Ran out of space for copper lists"); let clid = culist.id;
1300 kf_manager.reset(clid, clock); culist.change_state(cu29::copperlist::CopperListState::Processing);
1302 culist.msgs.init_zeroed();
1303 {
1304 let msgs = &mut culist.msgs.0;
1305 #(#runtime_plan_code)*
1306 } monitor.process_copperlist(&#mission_mod::collect_metadata(&culist))?;
1308
1309 #(#preprocess_logging_calls)*
1311
1312 cl_manager.end_of_processing(clid)?;
1313 kf_manager.end_of_processing(clid)?;
1314
1315 #(#postprocess_calls)*
1317 Ok(())
1318 }
1319
1320 fn restore_keyframe(&mut self, keyframe: &KeyFrame) -> CuResult<()> {
1321 let runtime = &mut self.copper_runtime;
1322 let clock = &runtime.clock;
1323 let tasks = &mut runtime.tasks;
1324 let config = cu29::bincode::config::standard();
1325 let reader = cu29::bincode::de::read::SliceReader::new(&keyframe.serialized_tasks);
1326 let mut decoder = DecoderImpl::new(reader, config, ());
1327 #(#task_restore_code);*;
1328 Ok(())
1329 }
1330
1331 #start_all_tasks {
1332 #(#start_calls)*
1333 self.copper_runtime.monitor.start(&self.copper_runtime.clock)?;
1334 Ok(())
1335 }
1336
1337 #stop_all_tasks {
1338 #(#stop_calls)*
1339 self.copper_runtime.monitor.stop(&self.copper_runtime.clock)?;
1340 Ok(())
1341 }
1342
1343 #run {
1344 static STOP_FLAG: AtomicBool = AtomicBool::new(false);
1345
1346 #kill_handler
1347
1348 <Self as #app_trait<S, L>>::start_all_tasks(self, #sim_callback_arg)?;
1349 let result = #run_loop;
1350
1351 if result.is_err() {
1352 error!("A task errored out: {}", &result);
1353 }
1354 <Self as #app_trait<S, L>>::stop_all_tasks(self, #sim_callback_arg)?;
1355 result
1356 }
1357 };
1358
1359 let tasks_type = if sim_mode {
1360 quote!(CuSimTasks)
1361 } else {
1362 quote!(CuTasks)
1363 };
1364
1365 let tasks_instanciator_fn = if sim_mode {
1366 quote!(tasks_instanciator_sim)
1367 } else {
1368 quote!(tasks_instanciator)
1369 };
1370
1371 let app_impl_decl = if sim_mode {
1372 quote!(impl<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static> CuSimApplication<S, L> for #application_name)
1373 } else {
1374 quote!(impl<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static> CuApplication<S, L> for #application_name)
1375 };
1376
1377 let simstep_type_decl = if sim_mode {
1378 quote!(
1379 type Step<'z> = SimStep<'z>;
1380 )
1381 } else {
1382 quote!()
1383 };
1384
1385 #[cfg(feature = "std")]
1386 #[cfg(feature = "macro_debug")]
1387 eprintln!("[build result]");
1388 let application_impl = quote! {
1389 #app_impl_decl {
1390 #simstep_type_decl
1391
1392 #new {
1393 let config_filename = #config_file;
1394
1395 #config_load_stmt
1396
1397 let mut default_section_size = size_of::<super::#mission_mod::CuList>() * 64;
1400 if let Some(section_size_mib) = config.logging.as_ref().and_then(|l| l.section_size_mib) {
1402 default_section_size = section_size_mib as usize * 1024usize * 1024usize;
1404 }
1405 let copperlist_stream = stream_write::<#mission_mod::CuList, S>(
1406 unified_logger.clone(),
1407 UnifiedLogType::CopperList,
1408 default_section_size,
1409 )?;
1413
1414 let keyframes_stream = stream_write::<KeyFrame, S>(
1415 unified_logger.clone(),
1416 UnifiedLogType::FrozenTasks,
1417 1024 * 1024 * 10, )?;
1419
1420
1421 let application = Ok(#application_name {
1422 copper_runtime: CuRuntime::<#mission_mod::#tasks_type, #mission_mod::CuBridges, #mission_mod::CuStampedDataSet, #monitor_type, #DEFAULT_CLNB>::new(
1423 clock,
1424 &config,
1425 Some(#mission),
1426 #mission_mod::#tasks_instanciator_fn,
1427 #mission_mod::monitor_instanciator,
1428 #mission_mod::bridges_instanciator,
1429 copperlist_stream,
1430 keyframes_stream)?, });
1432
1433 #sim_callback_on_new
1434
1435 application
1436 }
1437
1438 fn get_original_config() -> String {
1439 #copper_config_content.to_string()
1440 }
1441
1442 #run_methods
1443 }
1444 };
1445
1446 let (
1447 builder_struct,
1448 builder_new,
1449 builder_impl,
1450 builder_sim_callback_method,
1451 builder_build_sim_callback_arg,
1452 ) = if sim_mode {
1453 (
1454 quote! {
1455 #[allow(dead_code)]
1456 pub struct #builder_name <'a, F> {
1457 clock: Option<RobotClock>,
1458 unified_logger: Option<Arc<Mutex<UnifiedLoggerWrite>>>,
1459 config_override: Option<CuConfig>,
1460 sim_callback: Option<&'a mut F>
1461 }
1462 },
1463 quote! {
1464 #[allow(dead_code)]
1465 pub fn new() -> Self {
1466 Self {
1467 clock: None,
1468 unified_logger: None,
1469 config_override: None,
1470 sim_callback: None,
1471 }
1472 }
1473 },
1474 quote! {
1475 impl<'a, F> #builder_name <'a, F>
1476 where
1477 F: FnMut(SimStep) -> SimOverride,
1478 },
1479 Some(quote! {
1480 pub fn with_sim_callback(mut self, sim_callback: &'a mut F) -> Self
1481 {
1482 self.sim_callback = Some(sim_callback);
1483 self
1484 }
1485 }),
1486 Some(quote! {
1487 self.sim_callback
1488 .ok_or(CuError::from("Sim callback missing from builder"))?,
1489 }),
1490 )
1491 } else {
1492 (
1493 quote! {
1494 #[allow(dead_code)]
1495 pub struct #builder_name {
1496 clock: Option<RobotClock>,
1497 unified_logger: Option<Arc<Mutex<UnifiedLoggerWrite>>>,
1498 config_override: Option<CuConfig>,
1499 }
1500 },
1501 quote! {
1502 #[allow(dead_code)]
1503 pub fn new() -> Self {
1504 Self {
1505 clock: None,
1506 unified_logger: None,
1507 config_override: None,
1508 }
1509 }
1510 },
1511 quote! {
1512 impl #builder_name
1513 },
1514 None,
1515 None,
1516 )
1517 };
1518
1519 let std_application_impl = if sim_mode {
1521 Some(quote! {
1523 impl #application_name {
1524 pub fn start_all_tasks(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()> {
1525 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::start_all_tasks(self, sim_callback)
1526 }
1527 pub fn run_one_iteration(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()> {
1528 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::run_one_iteration(self, sim_callback)
1529 }
1530 pub fn run(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()> {
1531 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::run(self, sim_callback)
1532 }
1533 pub fn stop_all_tasks(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()> {
1534 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::stop_all_tasks(self, sim_callback)
1535 }
1536 }
1537 })
1538 } else if std {
1539 Some(quote! {
1541 impl #application_name {
1542 pub fn start_all_tasks(&mut self) -> CuResult<()> {
1543 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::start_all_tasks(self)
1544 }
1545 pub fn run_one_iteration(&mut self) -> CuResult<()> {
1546 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::run_one_iteration(self)
1547 }
1548 pub fn run(&mut self) -> CuResult<()> {
1549 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::run(self)
1550 }
1551 pub fn stop_all_tasks(&mut self) -> CuResult<()> {
1552 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::stop_all_tasks(self)
1553 }
1554 }
1555 })
1556 } else {
1557 None };
1559
1560 let application_builder = if std {
1561 Some(quote! {
1562 #builder_struct
1563
1564 #builder_impl
1565 {
1566 #builder_new
1567
1568 #[allow(dead_code)]
1569 pub fn with_clock(mut self, clock: RobotClock) -> Self {
1570 self.clock = Some(clock);
1571 self
1572 }
1573
1574 #[allow(dead_code)]
1575 pub fn with_unified_logger(mut self, unified_logger: Arc<Mutex<UnifiedLoggerWrite>>) -> Self {
1576 self.unified_logger = Some(unified_logger);
1577 self
1578 }
1579
1580 #[allow(dead_code)]
1581 pub fn with_context(mut self, copper_ctx: &CopperContext) -> Self {
1582 self.clock = Some(copper_ctx.clock.clone());
1583 self.unified_logger = Some(copper_ctx.unified_logger.clone());
1584 self
1585 }
1586
1587 #[allow(dead_code)]
1588 pub fn with_config(mut self, config_override: CuConfig) -> Self {
1589 self.config_override = Some(config_override);
1590 self
1591 }
1592
1593 #builder_sim_callback_method
1594
1595 #[allow(dead_code)]
1596 pub fn build(self) -> CuResult<#application_name> {
1597 #application_name::new(
1598 self.clock
1599 .ok_or(CuError::from("Clock missing from builder"))?,
1600 self.unified_logger
1601 .ok_or(CuError::from("Unified logger missing from builder"))?,
1602 self.config_override,
1603 #builder_build_sim_callback_arg
1604 )
1605 }
1606 }
1607 })
1608 } else {
1609 None
1611 };
1612
1613 let sim_imports = if sim_mode {
1614 Some(quote! {
1615 use cu29::simulation::SimOverride;
1616 use cu29::simulation::CuTaskCallbackState;
1617 use cu29::simulation::CuSimSrcTask;
1618 use cu29::simulation::CuSimSinkTask;
1619 use cu29::prelude::app::CuSimApplication;
1620 })
1621 } else {
1622 None
1623 };
1624
1625 let sim_tasks = if sim_mode {
1626 Some(quote! {
1627 pub type CuSimTasks = #task_types_tuple_sim;
1630 })
1631 } else {
1632 None
1633 };
1634
1635 let sim_inst_body = if task_sim_instances_init_code.is_empty() {
1636 quote! { Ok(()) }
1637 } else {
1638 quote! { Ok(( #(#task_sim_instances_init_code),*, )) }
1639 };
1640
1641 let sim_tasks_instanciator = if sim_mode {
1642 Some(quote! {
1643 pub fn tasks_instanciator_sim(all_instances_configs: Vec<Option<&ComponentConfig>>, _threadpool: Arc<ThreadPool>) -> CuResult<CuSimTasks> {
1644 #sim_inst_body
1645 }})
1646 } else {
1647 None
1648 };
1649
1650 let tasks_inst_body_std = if task_instances_init_code.is_empty() {
1651 quote! {
1652 let _ = threadpool;
1653 Ok(())
1654 }
1655 } else {
1656 quote! { Ok(( #(#task_instances_init_code),*, )) }
1657 };
1658
1659 let tasks_inst_body_nostd = if task_instances_init_code.is_empty() {
1660 quote! { Ok(()) }
1661 } else {
1662 quote! { Ok(( #(#task_instances_init_code),*, )) }
1663 };
1664
1665 let tasks_instanciator = if std {
1666 quote! {
1667 pub fn tasks_instanciator<'c>(all_instances_configs: Vec<Option<&'c ComponentConfig>>, threadpool: Arc<ThreadPool>) -> CuResult<CuTasks> {
1668 #tasks_inst_body_std
1669 }
1670 }
1671 } else {
1672 quote! {
1674 pub fn tasks_instanciator<'c>(all_instances_configs: Vec<Option<&'c ComponentConfig>>) -> CuResult<CuTasks> {
1675 #tasks_inst_body_nostd
1676 }
1677 }
1678 };
1679
1680 let imports = if std {
1681 quote! {
1682 use cu29::rayon::ThreadPool;
1683 use cu29::cuasynctask::CuAsyncTask;
1684 use cu29::curuntime::CopperContext;
1685 use cu29::prelude::UnifiedLoggerWrite;
1686 use cu29::prelude::memmap::MmapSectionStorage;
1687 use std::fmt::{Debug, Formatter};
1688 use std::fmt::Result as FmtResult;
1689 use std::mem::size_of;
1690 use std::sync::Arc;
1691 use std::sync::atomic::{AtomicBool, Ordering};
1692 use std::sync::Mutex;
1693 }
1694 } else {
1695 quote! {
1696 use alloc::sync::Arc;
1697 use alloc::string::String;
1698 use alloc::string::ToString;
1699 use core::sync::atomic::{AtomicBool, Ordering};
1700 use core::fmt::{Debug, Formatter};
1701 use core::fmt::Result as FmtResult;
1702 use core::mem::size_of;
1703 use spin::Mutex;
1704 }
1705 };
1706
1707 let mission_mod_tokens = quote! {
1709 mod #mission_mod {
1710 use super::*; use cu29::bincode::Encode;
1713 use cu29::bincode::enc::Encoder;
1714 use cu29::bincode::error::EncodeError;
1715 use cu29::bincode::Decode;
1716 use cu29::bincode::de::Decoder;
1717 use cu29::bincode::de::DecoderImpl;
1718 use cu29::bincode::error::DecodeError;
1719 use cu29::clock::RobotClock;
1720 use cu29::config::CuConfig;
1721 use cu29::config::ComponentConfig;
1722 use cu29::curuntime::CuRuntime;
1723 use cu29::curuntime::KeyFrame;
1724 use cu29::CuResult;
1725 use cu29::CuError;
1726 use cu29::cutask::CuSrcTask;
1727 use cu29::cutask::CuSinkTask;
1728 use cu29::cutask::CuTask;
1729 use cu29::cutask::CuMsg;
1730 use cu29::cutask::CuMsgMetadata;
1731 use cu29::copperlist::CopperList;
1732 use cu29::monitoring::CuMonitor; use cu29::monitoring::CuTaskState;
1734 use cu29::monitoring::Decision;
1735 use cu29::prelude::app::CuApplication;
1736 use cu29::prelude::debug;
1737 use cu29::prelude::stream_write;
1738 use cu29::prelude::UnifiedLogType;
1739 use cu29::prelude::UnifiedLogWrite;
1740
1741 #imports
1742
1743 #sim_imports
1744
1745 #[allow(unused_imports)]
1747 use cu29::monitoring::NoMonitor;
1748
1749 pub type CuTasks = #task_types_tuple;
1753 pub type CuBridges = #bridges_type_tokens;
1754
1755 #sim_tasks
1756 #sim_support
1757 #sim_tasks_instanciator
1758
1759 pub const TASKS_IDS: &'static [&'static str] = &[#( #ids ),*];
1760
1761 #culist_support
1762 #tasks_instanciator
1763 #bridges_instanciator
1764
1765 pub fn monitor_instanciator(config: &CuConfig) -> #monitor_type {
1766 #monitor_type::new(config, #mission_mod::TASKS_IDS).expect("Failed to create the given monitor.")
1767 }
1768
1769 pub #application_struct
1771
1772 #application_impl
1773
1774 #std_application_impl
1775
1776 #application_builder
1777 }
1778
1779 };
1780 all_missions_tokens.push(mission_mod_tokens);
1781 }
1782
1783 let default_application_tokens = if all_missions.contains_key("default") {
1784 let default_builder = if std {
1785 Some(quote! {
1786 #[allow(unused_imports)]
1788 use default::#builder_name;
1789 })
1790 } else {
1791 None
1792 };
1793 quote! {
1794 #default_builder
1795
1796 #[allow(unused_imports)]
1797 use default::#application_name;
1798 }
1799 } else {
1800 quote!() };
1802
1803 let result: proc_macro2::TokenStream = quote! {
1804 #(#all_missions_tokens)*
1805 #default_application_tokens
1806 };
1807
1808 #[cfg(feature = "macro_debug")]
1810 {
1811 let formatted_code = rustfmt_generated_code(result.to_string());
1812 eprintln!("\n === Gen. Runtime ===\n");
1813 eprintln!("{formatted_code}");
1814 eprintln!("\n === === === === === ===\n");
1817 }
1818 result.into()
1819}
1820
1821fn read_config(config_file: &str) -> CuResult<CuConfig> {
1822 let filename = config_full_path(config_file);
1823
1824 read_configuration(filename.as_str())
1825}
1826
1827fn config_full_path(config_file: &str) -> String {
1828 let mut config_full_path = utils::caller_crate_root();
1829 config_full_path.push(config_file);
1830 let filename = config_full_path
1831 .as_os_str()
1832 .to_str()
1833 .expect("Could not interpret the config file name");
1834 filename.to_string()
1835}
1836
1837fn extract_tasks_output_types(graph: &CuGraph) -> Vec<Option<Type>> {
1838 let result = graph
1839 .get_all_nodes()
1840 .iter()
1841 .map(|(_, node)| {
1842 let id = node.get_id();
1843 let type_str = graph.get_node_output_msg_type(id.as_str());
1844 let result = type_str.map(|type_str| {
1845 let result = parse_str::<Type>(type_str.as_str())
1846 .expect("Could not parse output message type.");
1847 result
1848 });
1849 result
1850 })
1851 .collect();
1852 result
1853}
1854
1855struct CuTaskSpecSet {
1856 pub ids: Vec<String>,
1857 pub cutypes: Vec<CuTaskType>,
1858 pub background_flags: Vec<bool>,
1859 pub logging_enabled: Vec<bool>,
1860 pub type_names: Vec<String>,
1861 pub task_types: Vec<Type>,
1862 pub instantiation_types: Vec<Type>,
1863 pub sim_task_types: Vec<Type>,
1864 pub run_in_sim_flags: Vec<bool>,
1865 #[allow(dead_code)]
1866 pub output_types: Vec<Option<Type>>,
1867 pub node_id_to_task_index: Vec<Option<usize>>,
1868}
1869
1870impl CuTaskSpecSet {
1871 pub fn from_graph(graph: &CuGraph) -> Self {
1872 let all_id_nodes: Vec<(NodeId, &Node)> = graph
1873 .get_all_nodes()
1874 .into_iter()
1875 .filter(|(_, node)| node.get_flavor() == Flavor::Task)
1876 .collect();
1877
1878 let ids = all_id_nodes
1879 .iter()
1880 .map(|(_, node)| node.get_id().to_string())
1881 .collect();
1882
1883 let cutypes = all_id_nodes
1884 .iter()
1885 .map(|(id, _)| find_task_type_for_id(graph, *id))
1886 .collect();
1887
1888 let background_flags: Vec<bool> = all_id_nodes
1889 .iter()
1890 .map(|(_, node)| node.is_background())
1891 .collect();
1892
1893 let logging_enabled: Vec<bool> = all_id_nodes
1894 .iter()
1895 .map(|(_, node)| node.is_logging_enabled())
1896 .collect();
1897
1898 let type_names: Vec<String> = all_id_nodes
1899 .iter()
1900 .map(|(_, node)| node.get_type().to_string())
1901 .collect();
1902
1903 let output_types = extract_tasks_output_types(graph);
1904
1905 let task_types = type_names
1906 .iter()
1907 .zip(background_flags.iter())
1908 .zip(output_types.iter())
1909 .map(|((name, &background), output_type)| {
1910 let name_type = parse_str::<Type>(name).unwrap_or_else(|error| {
1911 panic!("Could not transform {name} into a Task Rust type: {error}");
1912 });
1913 if background {
1914 if let Some(output_type) = output_type {
1915 parse_quote!(CuAsyncTask<#name_type, #output_type>)
1916 } else {
1917 panic!("{name}: If a task is background, it has to have an output");
1918 }
1919 } else {
1920 name_type
1921 }
1922 })
1923 .collect();
1924
1925 let instantiation_types = type_names
1926 .iter()
1927 .zip(background_flags.iter())
1928 .zip(output_types.iter())
1929 .map(|((name, &background), output_type)| {
1930 let name_type = parse_str::<Type>(name).unwrap_or_else(|error| {
1931 panic!("Could not transform {name} into a Task Rust type: {error}");
1932 });
1933 if background {
1934 if let Some(output_type) = output_type {
1935 parse_quote!(CuAsyncTask::<#name_type, #output_type>)
1936 } else {
1937 panic!("{name}: If a task is background, it has to have an output");
1938 }
1939 } else {
1940 name_type
1941 }
1942 })
1943 .collect();
1944
1945 let sim_task_types = type_names
1946 .iter()
1947 .map(|name| {
1948 parse_str::<Type>(name).unwrap_or_else(|err| {
1949 eprintln!("Could not transform {name} into a Task Rust type.");
1950 panic!("{err}")
1951 })
1952 })
1953 .collect();
1954
1955 let run_in_sim_flags = all_id_nodes
1956 .iter()
1957 .map(|(_, node)| node.is_run_in_sim())
1958 .collect();
1959
1960 let mut node_id_to_task_index = vec![None; graph.node_count()];
1961 for (index, (node_id, _)) in all_id_nodes.iter().enumerate() {
1962 node_id_to_task_index[*node_id as usize] = Some(index);
1963 }
1964
1965 Self {
1966 ids,
1967 cutypes,
1968 background_flags,
1969 logging_enabled,
1970 type_names,
1971 task_types,
1972 instantiation_types,
1973 sim_task_types,
1974 run_in_sim_flags,
1975 output_types,
1976 node_id_to_task_index,
1977 }
1978 }
1979}
1980
1981fn extract_msg_types(runtime_plan: &CuExecutionLoop) -> Vec<Type> {
1982 runtime_plan
1983 .steps
1984 .iter()
1985 .filter_map(|unit| match unit {
1986 CuExecutionUnit::Step(step) => {
1987 if let Some((_, output_msg_type)) = &step.output_msg_index_type {
1988 Some(
1989 parse_str::<Type>(output_msg_type.as_str()).unwrap_or_else(|_| {
1990 panic!(
1991 "Could not transform {output_msg_type} into a message Rust type."
1992 )
1993 }),
1994 )
1995 } else {
1996 None
1997 }
1998 }
1999 CuExecutionUnit::Loop(_) => todo!("Needs to be implemented"),
2000 })
2001 .collect()
2002}
2003
2004fn build_culist_tuple(all_msgs_types_in_culist_order: &[Type]) -> TypeTuple {
2006 if all_msgs_types_in_culist_order.is_empty() {
2007 parse_quote! { () }
2008 } else {
2009 parse_quote! {
2010 ( #( CuMsg<#all_msgs_types_in_culist_order> ),* )
2011 }
2012 }
2013}
2014
2015fn build_culist_tuple_encode(all_msgs_types_in_culist_order: &[Type]) -> ItemImpl {
2017 let indices: Vec<usize> = (0..all_msgs_types_in_culist_order.len()).collect();
2018
2019 let encode_fields: Vec<_> = indices
2021 .iter()
2022 .map(|i| {
2023 let idx = syn::Index::from(*i);
2024 quote! { self.0.#idx.encode(encoder)?; }
2025 })
2026 .collect();
2027
2028 parse_quote! {
2029 impl Encode for CuStampedDataSet {
2030 fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
2031 #(#encode_fields)*
2032 Ok(())
2033 }
2034 }
2035 }
2036}
2037
2038fn build_culist_tuple_decode(all_msgs_types_in_culist_order: &[Type]) -> ItemImpl {
2040 let indices: Vec<usize> = (0..all_msgs_types_in_culist_order.len()).collect();
2041
2042 let decode_fields: Vec<_> = indices
2044 .iter()
2045 .map(|i| {
2046 let t = &all_msgs_types_in_culist_order[*i];
2047 quote! { CuMsg::<#t>::decode(decoder)? }
2048 })
2049 .collect();
2050
2051 parse_quote! {
2052 impl Decode<()> for CuStampedDataSet {
2053 fn decode<D: Decoder<Context=()>>(decoder: &mut D) -> Result<Self, DecodeError> {
2054 Ok(CuStampedDataSet ((
2055 #(#decode_fields),*
2056 )))
2057 }
2058 }
2059 }
2060}
2061
2062fn build_culist_erasedcumsgs(all_msgs_types_in_culist_order: &[Type]) -> ItemImpl {
2063 let indices: Vec<usize> = (0..all_msgs_types_in_culist_order.len()).collect();
2064 let casted_fields: Vec<_> = indices
2065 .iter()
2066 .map(|i| {
2067 let idx = syn::Index::from(*i);
2068 quote! { &self.0.#idx as &dyn ErasedCuStampedData }
2069 })
2070 .collect();
2071 parse_quote! {
2072 impl ErasedCuStampedDataSet for CuStampedDataSet {
2073 fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
2074 vec![
2075 #(#casted_fields),*
2076 ]
2077 }
2078 }
2079 }
2080}
2081
2082fn build_culist_tuple_debug(all_msgs_types_in_culist_order: &[Type]) -> ItemImpl {
2083 let indices: Vec<usize> = (0..all_msgs_types_in_culist_order.len()).collect();
2084
2085 let debug_fields: Vec<_> = indices
2086 .iter()
2087 .map(|i| {
2088 let idx = syn::Index::from(*i);
2089 quote! { .field(&self.0.#idx) }
2090 })
2091 .collect();
2092
2093 parse_quote! {
2094 impl Debug for CuStampedDataSet {
2095 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
2096 f.debug_tuple("CuStampedDataSet")
2097 #(#debug_fields)*
2098 .finish()
2099 }
2100 }
2101 }
2102}
2103
2104fn build_culist_tuple_serialize(all_msgs_types_in_culist_order: &[Type]) -> ItemImpl {
2106 let indices: Vec<usize> = (0..all_msgs_types_in_culist_order.len()).collect();
2107 let tuple_len = all_msgs_types_in_culist_order.len();
2108
2109 let serialize_fields: Vec<_> = indices
2111 .iter()
2112 .map(|i| {
2113 let idx = syn::Index::from(*i);
2114 quote! { &self.0.#idx }
2115 })
2116 .collect();
2117
2118 parse_quote! {
2119 impl Serialize for CuStampedDataSet {
2120 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2121 where
2122 S: serde::Serializer,
2123 {
2124 use serde::ser::SerializeTuple;
2125 let mut tuple = serializer.serialize_tuple(#tuple_len)?;
2126 #(tuple.serialize_element(#serialize_fields)?;)*
2127 tuple.end()
2128 }
2129 }
2130 }
2131}
2132
2133fn build_culist_tuple_default(all_msgs_types_in_culist_order: &[Type]) -> ItemImpl {
2135 let default_fields: Vec<_> = all_msgs_types_in_culist_order
2137 .iter()
2138 .map(|msg_type| quote! { CuStampedData::<#msg_type, CuMsgMetadata>::default() })
2139 .collect();
2140
2141 parse_quote! {
2142 impl Default for CuStampedDataSet {
2143 fn default() -> CuStampedDataSet
2144 {
2145 CuStampedDataSet((
2146 #(#default_fields),*
2147 ))
2148 }
2149 }
2150 }
2151}
2152
2153fn collect_bridge_channel_usage(graph: &CuGraph) -> HashMap<BridgeChannelKey, String> {
2154 let mut usage = HashMap::new();
2155 for edge_idx in graph.0.edge_indices() {
2156 let cnx = graph
2157 .0
2158 .edge_weight(edge_idx)
2159 .expect("Edge should exist while collecting bridge usage")
2160 .clone();
2161 if let Some(channel) = &cnx.src_channel {
2162 let key = BridgeChannelKey {
2163 bridge_id: cnx.src.clone(),
2164 channel_id: channel.clone(),
2165 direction: BridgeChannelDirection::Rx,
2166 };
2167 usage
2168 .entry(key)
2169 .and_modify(|msg| {
2170 if msg != &cnx.msg {
2171 panic!(
2172 "Bridge '{}' channel '{}' is used with incompatible message types: {} vs {}",
2173 cnx.src, channel, msg, cnx.msg
2174 );
2175 }
2176 })
2177 .or_insert(cnx.msg.clone());
2178 }
2179 if let Some(channel) = &cnx.dst_channel {
2180 let key = BridgeChannelKey {
2181 bridge_id: cnx.dst.clone(),
2182 channel_id: channel.clone(),
2183 direction: BridgeChannelDirection::Tx,
2184 };
2185 usage
2186 .entry(key)
2187 .and_modify(|msg| {
2188 if msg != &cnx.msg {
2189 panic!(
2190 "Bridge '{}' channel '{}' is used with incompatible message types: {} vs {}",
2191 cnx.dst, channel, msg, cnx.msg
2192 );
2193 }
2194 })
2195 .or_insert(cnx.msg.clone());
2196 }
2197 }
2198 usage
2199}
2200
2201fn build_bridge_specs(
2202 config: &CuConfig,
2203 graph: &CuGraph,
2204 channel_usage: &HashMap<BridgeChannelKey, String>,
2205) -> Vec<BridgeSpec> {
2206 let mut specs = Vec::new();
2207 for (bridge_index, bridge_cfg) in config.bridges.iter().enumerate() {
2208 if graph.get_node_id_by_name(bridge_cfg.id.as_str()).is_none() {
2209 continue;
2210 }
2211
2212 let type_path = parse_str::<Type>(bridge_cfg.type_.as_str()).unwrap_or_else(|err| {
2213 panic!(
2214 "Could not parse bridge type '{}' for '{}': {err}",
2215 bridge_cfg.type_, bridge_cfg.id
2216 )
2217 });
2218
2219 let mut rx_channels = Vec::new();
2220 let mut tx_channels = Vec::new();
2221
2222 for (channel_index, channel) in bridge_cfg.channels.iter().enumerate() {
2223 match channel {
2224 BridgeChannelConfigRepresentation::Rx { id, .. } => {
2225 let key = BridgeChannelKey {
2226 bridge_id: bridge_cfg.id.clone(),
2227 channel_id: id.clone(),
2228 direction: BridgeChannelDirection::Rx,
2229 };
2230 if let Some(msg_type) = channel_usage.get(&key) {
2231 let msg_type = parse_str::<Type>(msg_type).unwrap_or_else(|err| {
2232 panic!(
2233 "Could not parse message type '{msg_type}' for bridge '{}' channel '{}': {err}",
2234 bridge_cfg.id, id
2235 )
2236 });
2237 let const_ident =
2238 Ident::new(&config_id_to_bridge_const(id.as_str()), Span::call_site());
2239 rx_channels.push(BridgeChannelSpec {
2240 id: id.clone(),
2241 const_ident,
2242 msg_type,
2243 config_index: channel_index,
2244 plan_node_id: None,
2245 culist_index: None,
2246 monitor_index: None,
2247 });
2248 }
2249 }
2250 BridgeChannelConfigRepresentation::Tx { id, .. } => {
2251 let key = BridgeChannelKey {
2252 bridge_id: bridge_cfg.id.clone(),
2253 channel_id: id.clone(),
2254 direction: BridgeChannelDirection::Tx,
2255 };
2256 if let Some(msg_type) = channel_usage.get(&key) {
2257 let msg_type = parse_str::<Type>(msg_type).unwrap_or_else(|err| {
2258 panic!(
2259 "Could not parse message type '{msg_type}' for bridge '{}' channel '{}': {err}",
2260 bridge_cfg.id, id
2261 )
2262 });
2263 let const_ident =
2264 Ident::new(&config_id_to_bridge_const(id.as_str()), Span::call_site());
2265 tx_channels.push(BridgeChannelSpec {
2266 id: id.clone(),
2267 const_ident,
2268 msg_type,
2269 config_index: channel_index,
2270 plan_node_id: None,
2271 culist_index: None,
2272 monitor_index: None,
2273 });
2274 }
2275 }
2276 }
2277 }
2278
2279 if rx_channels.is_empty() && tx_channels.is_empty() {
2280 continue;
2281 }
2282
2283 specs.push(BridgeSpec {
2284 id: bridge_cfg.id.clone(),
2285 type_path,
2286 config_index: bridge_index,
2287 tuple_index: 0,
2288 monitor_index: None,
2289 rx_channels,
2290 tx_channels,
2291 });
2292 }
2293
2294 for (tuple_index, spec) in specs.iter_mut().enumerate() {
2295 spec.tuple_index = tuple_index;
2296 }
2297
2298 specs
2299}
2300
2301fn collect_task_member_names(graph: &CuGraph) -> Vec<(NodeId, String)> {
2302 graph
2303 .get_all_nodes()
2304 .iter()
2305 .filter(|(_, node)| node.get_flavor() == Flavor::Task)
2306 .map(|(node_id, node)| (*node_id, config_id_to_struct_member(node.get_id().as_str())))
2307 .collect()
2308}
2309
2310fn build_execution_plan(
2311 graph: &CuGraph,
2312 task_specs: &CuTaskSpecSet,
2313 bridge_specs: &mut [BridgeSpec],
2314) -> CuResult<(
2315 CuExecutionLoop,
2316 Vec<ExecutionEntity>,
2317 HashMap<NodeId, NodeId>,
2318)> {
2319 let mut plan_graph = CuGraph::default();
2320 let mut exec_entities = Vec::new();
2321 let mut original_to_plan = HashMap::new();
2322 let mut plan_to_original = HashMap::new();
2323 let mut name_to_original = HashMap::new();
2324 let mut channel_nodes = HashMap::new();
2325
2326 for (node_id, node) in graph.get_all_nodes() {
2327 name_to_original.insert(node.get_id(), node_id);
2328 if node.get_flavor() != Flavor::Task {
2329 continue;
2330 }
2331 let plan_node_id = plan_graph.add_node(node.clone())?;
2332 let task_index = task_specs.node_id_to_task_index[node_id as usize]
2333 .expect("Task missing from specifications");
2334 plan_to_original.insert(plan_node_id, node_id);
2335 original_to_plan.insert(node_id, plan_node_id);
2336 if plan_node_id as usize != exec_entities.len() {
2337 panic!("Unexpected node ordering while mirroring tasks in plan graph");
2338 }
2339 exec_entities.push(ExecutionEntity {
2340 kind: ExecutionEntityKind::Task { task_index },
2341 });
2342 }
2343
2344 for (bridge_index, spec) in bridge_specs.iter_mut().enumerate() {
2345 for (channel_index, channel_spec) in spec.rx_channels.iter_mut().enumerate() {
2346 let mut node = Node::new(
2347 format!("{}::rx::{}", spec.id, channel_spec.id).as_str(),
2348 "__CuBridgeRxChannel",
2349 );
2350 node.set_flavor(Flavor::Bridge);
2351 let plan_node_id = plan_graph.add_node(node)?;
2352 if plan_node_id as usize != exec_entities.len() {
2353 panic!("Unexpected node ordering while inserting bridge rx channel");
2354 }
2355 channel_spec.plan_node_id = Some(plan_node_id);
2356 exec_entities.push(ExecutionEntity {
2357 kind: ExecutionEntityKind::BridgeRx {
2358 bridge_index,
2359 channel_index,
2360 },
2361 });
2362 channel_nodes.insert(
2363 BridgeChannelKey {
2364 bridge_id: spec.id.clone(),
2365 channel_id: channel_spec.id.clone(),
2366 direction: BridgeChannelDirection::Rx,
2367 },
2368 plan_node_id,
2369 );
2370 }
2371
2372 for (channel_index, channel_spec) in spec.tx_channels.iter_mut().enumerate() {
2373 let mut node = Node::new(
2374 format!("{}::tx::{}", spec.id, channel_spec.id).as_str(),
2375 "__CuBridgeTxChannel",
2376 );
2377 node.set_flavor(Flavor::Bridge);
2378 let plan_node_id = plan_graph.add_node(node)?;
2379 if plan_node_id as usize != exec_entities.len() {
2380 panic!("Unexpected node ordering while inserting bridge tx channel");
2381 }
2382 channel_spec.plan_node_id = Some(plan_node_id);
2383 exec_entities.push(ExecutionEntity {
2384 kind: ExecutionEntityKind::BridgeTx {
2385 bridge_index,
2386 channel_index,
2387 },
2388 });
2389 channel_nodes.insert(
2390 BridgeChannelKey {
2391 bridge_id: spec.id.clone(),
2392 channel_id: channel_spec.id.clone(),
2393 direction: BridgeChannelDirection::Tx,
2394 },
2395 plan_node_id,
2396 );
2397 }
2398 }
2399
2400 for edge_idx in graph.0.edge_indices() {
2401 let cnx = graph
2402 .0
2403 .edge_weight(edge_idx)
2404 .expect("Edge should exist while building plan")
2405 .clone();
2406
2407 let src_plan = if let Some(channel) = &cnx.src_channel {
2408 let key = BridgeChannelKey {
2409 bridge_id: cnx.src.clone(),
2410 channel_id: channel.clone(),
2411 direction: BridgeChannelDirection::Rx,
2412 };
2413 *channel_nodes
2414 .get(&key)
2415 .unwrap_or_else(|| panic!("Bridge source {:?} missing from plan graph", key))
2416 } else {
2417 let node_id = name_to_original
2418 .get(&cnx.src)
2419 .copied()
2420 .unwrap_or_else(|| panic!("Unknown source node '{}'", cnx.src));
2421 *original_to_plan
2422 .get(&node_id)
2423 .unwrap_or_else(|| panic!("Source node '{}' missing from plan", cnx.src))
2424 };
2425
2426 let dst_plan = if let Some(channel) = &cnx.dst_channel {
2427 let key = BridgeChannelKey {
2428 bridge_id: cnx.dst.clone(),
2429 channel_id: channel.clone(),
2430 direction: BridgeChannelDirection::Tx,
2431 };
2432 *channel_nodes
2433 .get(&key)
2434 .unwrap_or_else(|| panic!("Bridge destination {:?} missing from plan graph", key))
2435 } else {
2436 let node_id = name_to_original
2437 .get(&cnx.dst)
2438 .copied()
2439 .unwrap_or_else(|| panic!("Unknown destination node '{}'", cnx.dst));
2440 *original_to_plan
2441 .get(&node_id)
2442 .unwrap_or_else(|| panic!("Destination node '{}' missing from plan", cnx.dst))
2443 };
2444
2445 plan_graph
2446 .connect_ext(
2447 src_plan,
2448 dst_plan,
2449 &cnx.msg,
2450 cnx.missions.clone(),
2451 None,
2452 None,
2453 )
2454 .map_err(|e| CuError::from(e.to_string()))?;
2455 }
2456
2457 let runtime_plan = compute_runtime_plan(&plan_graph)?;
2458 Ok((runtime_plan, exec_entities, plan_to_original))
2459}
2460
2461fn collect_culist_metadata(
2462 runtime_plan: &CuExecutionLoop,
2463 exec_entities: &[ExecutionEntity],
2464 bridge_specs: &mut [BridgeSpec],
2465 plan_to_original: &HashMap<NodeId, NodeId>,
2466) -> (Vec<usize>, HashMap<NodeId, usize>) {
2467 let mut culist_order = Vec::new();
2468 let mut node_output_positions = HashMap::new();
2469
2470 for unit in &runtime_plan.steps {
2471 if let CuExecutionUnit::Step(step) = unit {
2472 if let Some((output_idx, _)) = &step.output_msg_index_type {
2473 culist_order.push(*output_idx as usize);
2474 match &exec_entities[step.node_id as usize].kind {
2475 ExecutionEntityKind::Task { .. } => {
2476 if let Some(original_node_id) = plan_to_original.get(&step.node_id) {
2477 node_output_positions.insert(*original_node_id, *output_idx as usize);
2478 }
2479 }
2480 ExecutionEntityKind::BridgeRx {
2481 bridge_index,
2482 channel_index,
2483 } => {
2484 bridge_specs[*bridge_index].rx_channels[*channel_index].culist_index =
2485 Some(*output_idx as usize);
2486 }
2487 ExecutionEntityKind::BridgeTx { .. } => {}
2488 }
2489 }
2490 }
2491 }
2492
2493 (culist_order, node_output_positions)
2494}
2495
2496#[allow(dead_code)]
2497fn build_monitored_ids(task_ids: &[String], bridge_specs: &mut [BridgeSpec]) -> Vec<String> {
2498 let mut names = task_ids.to_vec();
2499 for spec in bridge_specs.iter_mut() {
2500 spec.monitor_index = Some(names.len());
2501 names.push(format!("bridge::{}", spec.id));
2502 for channel in spec.rx_channels.iter_mut() {
2503 channel.monitor_index = Some(names.len());
2504 names.push(format!("bridge::{}::rx::{}", spec.id, channel.id));
2505 }
2506 for channel in spec.tx_channels.iter_mut() {
2507 channel.monitor_index = Some(names.len());
2508 names.push(format!("bridge::{}::tx::{}", spec.id, channel.id));
2509 }
2510 }
2511 names
2512}
2513
2514fn generate_task_execution_tokens(
2515 step: &CuExecutionStep,
2516 task_index: usize,
2517 task_specs: &CuTaskSpecSet,
2518 sim_mode: bool,
2519 mission_mod: &Ident,
2520) -> (proc_macro2::TokenStream, proc_macro2::TokenStream) {
2521 let node_index = int2sliceindex(task_index as u32);
2522 let task_instance = quote! { tasks.#node_index };
2523 let comment_str = format!(
2524 "DEBUG ->> {} ({:?}) Id:{} I:{:?} O:{:?}",
2525 step.node.get_id(),
2526 step.task_type,
2527 step.node_id,
2528 step.input_msg_indices_types,
2529 step.output_msg_index_type
2530 );
2531 let comment_tokens = quote! {{
2532 let _ = stringify!(#comment_str);
2533 }};
2534 let tid = task_index;
2535 let task_enum_name = config_id_to_enum(&task_specs.ids[tid]);
2536 let enum_name = Ident::new(&task_enum_name, Span::call_site());
2537
2538 match step.task_type {
2539 CuTaskType::Source => {
2540 if let Some((output_index, _)) = &step.output_msg_index_type {
2541 let output_culist_index = int2sliceindex(*output_index);
2542
2543 let monitoring_action = quote! {
2544 debug!("Task {}: Error during process: {}", #mission_mod::TASKS_IDS[#tid], &error);
2545 let decision = monitor.process_error(#tid, CuTaskState::Process, &error);
2546 match decision {
2547 Decision::Abort => {
2548 debug!("Process: ABORT decision from monitoring. Task '{}' errored out \
2549 during process. Skipping the processing of CL {}.", #mission_mod::TASKS_IDS[#tid], clid);
2550 monitor.process_copperlist(&#mission_mod::collect_metadata(&culist))?;
2551 cl_manager.end_of_processing(clid)?;
2552 return Ok(());
2553 }
2554 Decision::Ignore => {
2555 debug!("Process: IGNORE decision from monitoring. Task '{}' errored out \
2556 during process. The runtime will continue with a forced empty message.", #mission_mod::TASKS_IDS[#tid]);
2557 let cumsg_output = &mut msgs.#output_culist_index;
2558 cumsg_output.clear_payload();
2559 }
2560 Decision::Shutdown => {
2561 debug!("Process: SHUTDOWN decision from monitoring. Task '{}' errored out \
2562 during process. The runtime cannot continue.", #mission_mod::TASKS_IDS[#tid]);
2563 return Err(CuError::new_with_cause("Task errored out during process.", error));
2564 }
2565 }
2566 };
2567
2568 let call_sim_callback = if sim_mode {
2569 quote! {
2570 let doit = {
2571 let cumsg_output = &mut msgs.#output_culist_index;
2572 let state = CuTaskCallbackState::Process((), cumsg_output);
2573 let ovr = sim_callback(SimStep::#enum_name(state));
2574
2575 if let SimOverride::Errored(reason) = ovr {
2576 let error: CuError = reason.into();
2577 #monitoring_action
2578 false
2579 } else {
2580 ovr == SimOverride::ExecuteByRuntime
2581 }
2582 };
2583 }
2584 } else {
2585 quote! { let doit = true; }
2586 };
2587
2588 let logging_tokens = if !task_specs.logging_enabled[tid] {
2589 let output_culist_index = int2sliceindex(*output_index);
2590 quote! {
2591 let mut cumsg_output = &mut culist.msgs.0.#output_culist_index;
2592 cumsg_output.clear_payload();
2593 }
2594 } else {
2595 quote!()
2596 };
2597
2598 (
2599 quote! {
2600 {
2601 #comment_tokens
2602 kf_manager.freeze_task(clid, &#task_instance)?;
2603 #call_sim_callback
2604 let cumsg_output = &mut msgs.#output_culist_index;
2605 cumsg_output.metadata.process_time.start = clock.now().into();
2606 let maybe_error = if doit { #task_instance.process(clock, cumsg_output) } else { Ok(()) };
2607 cumsg_output.metadata.process_time.end = clock.now().into();
2608 if let Err(error) = maybe_error {
2609 #monitoring_action
2610 }
2611 }
2612 },
2613 logging_tokens,
2614 )
2615 } else {
2616 panic!("Source task should have an output message index.");
2617 }
2618 }
2619 CuTaskType::Sink => {
2620 if let Some((output_index, _)) = &step.output_msg_index_type {
2621 let output_culist_index = int2sliceindex(*output_index);
2622 let indices = step
2623 .input_msg_indices_types
2624 .iter()
2625 .map(|(index, _)| int2sliceindex(*index));
2626
2627 let monitoring_action = quote! {
2628 debug!("Task {}: Error during process: {}", #mission_mod::TASKS_IDS[#tid], &error);
2629 let decision = monitor.process_error(#tid, CuTaskState::Process, &error);
2630 match decision {
2631 Decision::Abort => {
2632 debug!("Process: ABORT decision from monitoring. Task '{}' errored out \
2633 during process. Skipping the processing of CL {}.", #mission_mod::TASKS_IDS[#tid], clid);
2634 monitor.process_copperlist(&#mission_mod::collect_metadata(&culist))?;
2635 cl_manager.end_of_processing(clid)?;
2636 return Ok(());
2637 }
2638 Decision::Ignore => {
2639 debug!("Process: IGNORE decision from monitoring. Task '{}' errored out \
2640 during process. The runtime will continue with a forced empty message.", #mission_mod::TASKS_IDS[#tid]);
2641 let cumsg_output = &mut msgs.#output_culist_index;
2642 cumsg_output.clear_payload();
2643 }
2644 Decision::Shutdown => {
2645 debug!("Process: SHUTDOWN decision from monitoring. Task '{}' errored out \
2646 during process. The runtime cannot continue.", #mission_mod::TASKS_IDS[#tid]);
2647 return Err(CuError::new_with_cause("Task errored out during process.", error));
2648 }
2649 }
2650 };
2651
2652 let inputs_type = if indices.len() == 1 {
2653 quote! { #(msgs.#indices)* }
2654 } else {
2655 quote! { (#(&msgs.#indices),*) }
2656 };
2657
2658 let call_sim_callback = if sim_mode {
2659 quote! {
2660 let doit = {
2661 let cumsg_input = &#inputs_type;
2662 let cumsg_output = &mut msgs.#output_culist_index;
2663 let state = CuTaskCallbackState::Process(cumsg_input, cumsg_output);
2664 let ovr = sim_callback(SimStep::#enum_name(state));
2665
2666 if let SimOverride::Errored(reason) = ovr {
2667 let error: CuError = reason.into();
2668 #monitoring_action
2669 false
2670 } else {
2671 ovr == SimOverride::ExecuteByRuntime
2672 }
2673 };
2674 }
2675 } else {
2676 quote! { let doit = true; }
2677 };
2678
2679 (
2680 quote! {
2681 {
2682 #comment_tokens
2683 kf_manager.freeze_task(clid, &#task_instance)?;
2684 #call_sim_callback
2685 let cumsg_input = &#inputs_type;
2686 let cumsg_output = &mut msgs.#output_culist_index;
2687 cumsg_output.metadata.process_time.start = clock.now().into();
2688 let maybe_error = if doit { #task_instance.process(clock, cumsg_input) } else { Ok(()) };
2689 cumsg_output.metadata.process_time.end = clock.now().into();
2690 if let Err(error) = maybe_error {
2691 #monitoring_action
2692 }
2693 }
2694 },
2695 quote! {},
2696 )
2697 } else {
2698 panic!("Sink tasks should have a virtual output message index.");
2699 }
2700 }
2701 CuTaskType::Regular => {
2702 if let Some((output_index, _)) = &step.output_msg_index_type {
2703 let output_culist_index = int2sliceindex(*output_index);
2704 let indices = step
2705 .input_msg_indices_types
2706 .iter()
2707 .map(|(index, _)| int2sliceindex(*index));
2708
2709 let monitoring_action = quote! {
2710 debug!("Task {}: Error during process: {}", #mission_mod::TASKS_IDS[#tid], &error);
2711 let decision = monitor.process_error(#tid, CuTaskState::Process, &error);
2712 match decision {
2713 Decision::Abort => {
2714 debug!("Process: ABORT decision from monitoring. Task '{}' errored out \
2715 during process. Skipping the processing of CL {}.", #mission_mod::TASKS_IDS[#tid], clid);
2716 monitor.process_copperlist(&#mission_mod::collect_metadata(&culist))?;
2717 cl_manager.end_of_processing(clid)?;
2718 return Ok(());
2719 }
2720 Decision::Ignore => {
2721 debug!("Process: IGNORE decision from monitoring. Task '{}' errored out \
2722 during process. The runtime will continue with a forced empty message.", #mission_mod::TASKS_IDS[#tid]);
2723 let cumsg_output = &mut msgs.#output_culist_index;
2724 cumsg_output.clear_payload();
2725 }
2726 Decision::Shutdown => {
2727 debug!("Process: SHUTDOWN decision from monitoring. Task '{}' errored out \
2728 during process. The runtime cannot continue.", #mission_mod::TASKS_IDS[#tid]);
2729 return Err(CuError::new_with_cause("Task errored out during process.", error));
2730 }
2731 }
2732 };
2733
2734 let inputs_type = if indices.len() == 1 {
2735 quote! { #(msgs.#indices)* }
2736 } else {
2737 quote! { (#(&msgs.#indices),*) }
2738 };
2739
2740 let call_sim_callback = if sim_mode {
2741 quote! {
2742 let doit = {
2743 let cumsg_input = &#inputs_type;
2744 let cumsg_output = &mut msgs.#output_culist_index;
2745 let state = CuTaskCallbackState::Process(cumsg_input, cumsg_output);
2746 let ovr = sim_callback(SimStep::#enum_name(state));
2747
2748 if let SimOverride::Errored(reason) = ovr {
2749 let error: CuError = reason.into();
2750 #monitoring_action
2751 false
2752 }
2753 else {
2754 ovr == SimOverride::ExecuteByRuntime
2755 }
2756 };
2757 }
2758 } else {
2759 quote! { let doit = true; }
2760 };
2761
2762 let logging_tokens = if !task_specs.logging_enabled[tid] {
2763 let output_culist_index = int2sliceindex(*output_index);
2764 quote! {
2765 let mut cumsg_output = &mut culist.msgs.0.#output_culist_index;
2766 cumsg_output.clear_payload();
2767 }
2768 } else {
2769 quote!()
2770 };
2771
2772 (
2773 quote! {
2774 {
2775 #comment_tokens
2776 kf_manager.freeze_task(clid, &#task_instance)?;
2777 #call_sim_callback
2778 let cumsg_input = &#inputs_type;
2779 let cumsg_output = &mut msgs.#output_culist_index;
2780 cumsg_output.metadata.process_time.start = clock.now().into();
2781 let maybe_error = if doit { #task_instance.process(clock, cumsg_input, cumsg_output) } else { Ok(()) };
2782 cumsg_output.metadata.process_time.end = clock.now().into();
2783 if let Err(error) = maybe_error {
2784 #monitoring_action
2785 }
2786 }
2787 },
2788 logging_tokens,
2789 )
2790 } else {
2791 panic!("Regular task should have an output message index.");
2792 }
2793 }
2794 }
2795}
2796
2797fn generate_bridge_rx_execution_tokens(
2798 bridge_spec: &BridgeSpec,
2799 channel_index: usize,
2800 mission_mod: &Ident,
2801) -> (proc_macro2::TokenStream, proc_macro2::TokenStream) {
2802 let bridge_tuple_index = int2sliceindex(bridge_spec.tuple_index as u32);
2803 let channel = &bridge_spec.rx_channels[channel_index];
2804 let culist_index = channel
2805 .culist_index
2806 .unwrap_or_else(|| panic!("Bridge Rx channel missing output index"));
2807 let culist_index_ts = int2sliceindex(culist_index as u32);
2808 let monitor_index = syn::Index::from(
2809 channel
2810 .monitor_index
2811 .expect("Bridge Rx channel missing monitor index"),
2812 );
2813 let bridge_type = &bridge_spec.type_path;
2814 let const_ident = &channel.const_ident;
2815 (
2816 quote! {
2817 {
2818 let bridge = &mut bridges.#bridge_tuple_index;
2819 let cumsg_output = &mut msgs.#culist_index_ts;
2820 cumsg_output.metadata.process_time.start = clock.now().into();
2821 let maybe_error = bridge.receive(
2822 clock,
2823 &<#bridge_type as cu29::cubridge::CuBridge>::Rx::#const_ident,
2824 cumsg_output,
2825 );
2826 cumsg_output.metadata.process_time.end = clock.now().into();
2827 if let Err(error) = maybe_error {
2828 let decision = monitor.process_error(#monitor_index, CuTaskState::Process, &error);
2829 match decision {
2830 Decision::Abort => {
2831 debug!("Process: ABORT decision from monitoring. Task '{}' errored out during process. Skipping the processing of CL {}.", #mission_mod::TASKS_IDS[#monitor_index], clid);
2832 monitor.process_copperlist(&#mission_mod::collect_metadata(&culist))?;
2833 cl_manager.end_of_processing(clid)?;
2834 return Ok(());
2835 }
2836 Decision::Ignore => {
2837 debug!("Process: IGNORE decision from monitoring. Task '{}' errored out during process. The runtime will continue with a forced empty message.", #mission_mod::TASKS_IDS[#monitor_index]);
2838 let cumsg_output = &mut msgs.#culist_index_ts;
2839 cumsg_output.clear_payload();
2840 }
2841 Decision::Shutdown => {
2842 debug!("Process: SHUTDOWN decision from monitoring. Task '{}' errored out during process. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
2843 return Err(CuError::new_with_cause("Task errored out during process.", error));
2844 }
2845 }
2846 }
2847 }
2848 },
2849 quote! {},
2850 )
2851}
2852
2853fn generate_bridge_tx_execution_tokens(
2854 step: &CuExecutionStep,
2855 bridge_spec: &BridgeSpec,
2856 channel_index: usize,
2857 mission_mod: &Ident,
2858) -> (proc_macro2::TokenStream, proc_macro2::TokenStream) {
2859 let channel = &bridge_spec.tx_channels[channel_index];
2860 let monitor_index = syn::Index::from(
2861 channel
2862 .monitor_index
2863 .expect("Bridge Tx channel missing monitor index"),
2864 );
2865 let input_index = step
2866 .input_msg_indices_types
2867 .first()
2868 .map(|(idx, _)| int2sliceindex(*idx))
2869 .expect("Bridge Tx channel should have exactly one input");
2870 let bridge_tuple_index = int2sliceindex(bridge_spec.tuple_index as u32);
2871 let bridge_type = &bridge_spec.type_path;
2872 let const_ident = &channel.const_ident;
2873 (
2874 quote! {
2875 {
2876 let bridge = &mut bridges.#bridge_tuple_index;
2877 let cumsg_input = &mut msgs.#input_index;
2878 cumsg_input.metadata.process_time.start = clock.now().into();
2880 if let Err(error) = bridge.send(
2881 clock,
2882 &<#bridge_type as cu29::cubridge::CuBridge>::Tx::#const_ident,
2883 &*cumsg_input,
2884 ) {
2885 let decision = monitor.process_error(#monitor_index, CuTaskState::Process, &error);
2886 match decision {
2887 Decision::Abort => {
2888 debug!("Process: ABORT decision from monitoring. Task '{}' errored out during process. Skipping the processing of CL {}.", #mission_mod::TASKS_IDS[#monitor_index], clid);
2889 monitor.process_copperlist(&#mission_mod::collect_metadata(&culist))?;
2890 cl_manager.end_of_processing(clid)?;
2891 return Ok(());
2892 }
2893 Decision::Ignore => {
2894 debug!("Process: IGNORE decision from monitoring. Task '{}' errored out during process. The runtime will continue with a forced empty message.", #mission_mod::TASKS_IDS[#monitor_index]);
2895 }
2896 Decision::Shutdown => {
2897 debug!("Process: SHUTDOWN decision from monitoring. Task '{}' errored out during process. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
2898 return Err(CuError::new_with_cause("Task errored out during process.", error));
2899 }
2900 }
2901 }
2902 cumsg_input.metadata.process_time.end = clock.now().into();
2903 }
2904 },
2905 quote! {},
2906 )
2907}
2908
2909#[cfg(test)]
2910mod tests {
2911 #[test]
2913 fn test_compile_fail() {
2914 let t = trybuild::TestCases::new();
2915 t.compile_fail("tests/compile_fail/*/*.rs");
2916 }
2917}
2918#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
2919enum BridgeChannelDirection {
2920 Rx,
2921 Tx,
2922}
2923
2924#[derive(Clone, Debug, PartialEq, Eq, Hash)]
2925struct BridgeChannelKey {
2926 bridge_id: String,
2927 channel_id: String,
2928 direction: BridgeChannelDirection,
2929}
2930
2931#[derive(Clone)]
2932struct BridgeChannelSpec {
2933 id: String,
2934 const_ident: Ident,
2935 #[allow(dead_code)]
2936 msg_type: Type,
2937 config_index: usize,
2938 plan_node_id: Option<NodeId>,
2939 culist_index: Option<usize>,
2940 monitor_index: Option<usize>,
2941}
2942
2943#[derive(Clone)]
2944struct BridgeSpec {
2945 id: String,
2946 type_path: Type,
2947 config_index: usize,
2948 tuple_index: usize,
2949 monitor_index: Option<usize>,
2950 rx_channels: Vec<BridgeChannelSpec>,
2951 tx_channels: Vec<BridgeChannelSpec>,
2952}
2953
2954#[derive(Clone)]
2955struct ExecutionEntity {
2956 kind: ExecutionEntityKind,
2957}
2958
2959#[derive(Clone)]
2960enum ExecutionEntityKind {
2961 Task {
2962 task_index: usize,
2963 },
2964 BridgeRx {
2965 bridge_index: usize,
2966 channel_index: usize,
2967 },
2968 BridgeTx {
2969 bridge_index: usize,
2970 channel_index: usize,
2971 },
2972}