1use proc_macro::TokenStream;
2use quote::{format_ident, quote};
3use std::collections::{BTreeMap, HashMap};
4use std::fs::read_to_string;
5use std::path::Path;
6use std::process::Command;
7use syn::Fields::{Named, Unnamed};
8use syn::meta::parser;
9use syn::{
10 Field, Fields, ItemImpl, ItemStruct, LitStr, Type, TypeTuple, parse_macro_input, parse_quote,
11 parse_str,
12};
13
14use crate::utils::{config_id_to_bridge_const, config_id_to_enum, config_id_to_struct_member};
15use cu29_runtime::config::CuConfig;
16use cu29_runtime::config::{
17 BridgeChannelConfigRepresentation, ConfigGraphs, CuGraph, Flavor, Node, NodeId,
18 ResourceBundleConfig, read_configuration,
19};
20use cu29_runtime::curuntime::{
21 CuExecutionLoop, CuExecutionStep, CuExecutionUnit, CuTaskType, compute_runtime_plan,
22 find_task_type_for_id,
23};
24use cu29_traits::{CuError, CuResult};
25use proc_macro2::{Ident, Span};
26
27mod bundle_resources;
28mod resources;
29mod utils;
30
31const DEFAULT_CLNB: usize = 2; #[inline]
34fn int2sliceindex(i: u32) -> syn::Index {
35 syn::Index::from(i as usize)
36}
37
38#[inline(always)]
39fn return_error(msg: String) -> TokenStream {
40 syn::Error::new(Span::call_site(), msg)
41 .to_compile_error()
42 .into()
43}
44
45fn rtsan_guard_tokens() -> proc_macro2::TokenStream {
46 if cfg!(feature = "rtsan") {
47 quote! {
48 let _rt_guard = ::cu29::rtsan::ScopedSanitizeRealtime::default();
49 }
50 } else {
51 quote! {}
52 }
53}
54
55fn git_output_trimmed(repo_root: &Path, args: &[&str]) -> Option<String> {
56 let output = Command::new("git")
57 .arg("-C")
58 .arg(repo_root)
59 .args(args)
60 .output()
61 .ok()?;
62 if !output.status.success() {
63 return None;
64 }
65 let stdout = String::from_utf8(output.stdout).ok()?;
66 Some(stdout.trim().to_string())
67}
68
69fn detect_git_info(repo_root: &Path) -> (Option<String>, Option<bool>) {
70 let in_repo = git_output_trimmed(repo_root, &["rev-parse", "--is-inside-work-tree"])
71 .is_some_and(|value| value == "true");
72 if !in_repo {
73 return (None, None);
74 }
75
76 let commit = git_output_trimmed(repo_root, &["rev-parse", "HEAD"]).filter(|s| !s.is_empty());
77 let dirty = git_output_trimmed(repo_root, &["status", "--porcelain"]).map(|s| !s.is_empty());
79 (commit, dirty)
80}
81
82#[proc_macro]
83pub fn resources(input: TokenStream) -> TokenStream {
84 resources::resources(input)
85}
86
87#[proc_macro]
88pub fn bundle_resources(input: TokenStream) -> TokenStream {
89 bundle_resources::bundle_resources(input)
90}
91
92#[proc_macro]
96pub fn gen_cumsgs(config_path_lit: TokenStream) -> TokenStream {
97 #[cfg(feature = "std")]
98 let std = true;
99
100 #[cfg(not(feature = "std"))]
101 let std = false;
102
103 let config = parse_macro_input!(config_path_lit as LitStr).value();
104 if !std::path::Path::new(&config_full_path(&config)).exists() {
105 return return_error(format!(
106 "The configuration file `{config}` does not exist. Please provide a valid path."
107 ));
108 }
109 #[cfg(feature = "macro_debug")]
110 eprintln!("[gen culist support with {config:?}]");
111 let cuconfig = match read_config(&config) {
112 Ok(cuconfig) => cuconfig,
113 Err(e) => return return_error(e.to_string()),
114 };
115
116 let extra_imports = if !std {
117 quote! {
118 use core::fmt::Debug;
119 use core::fmt::Formatter;
120 use core::fmt::Result as FmtResult;
121 use alloc::vec;
122 use alloc::vec::Vec;
123 }
124 } else {
125 quote! {
126 use std::fmt::Debug;
127 use std::fmt::Formatter;
128 use std::fmt::Result as FmtResult;
129 }
130 };
131
132 let common_imports = quote! {
133 use cu29::bincode::Encode;
134 use cu29::bincode::enc::Encoder;
135 use cu29::bincode::error::EncodeError;
136 use cu29::bincode::Decode;
137 use cu29::bincode::de::Decoder;
138 use cu29::bincode::error::DecodeError;
139 use cu29::copperlist::CopperList;
140 use cu29::prelude::ErasedCuStampedData;
141 use cu29::prelude::ErasedCuStampedDataSet;
142 use cu29::prelude::MatchingTasks;
143 use cu29::prelude::Serialize;
144 use cu29::prelude::CuMsg;
145 use cu29::prelude::CuMsgMetadata;
146 use cu29::prelude::CuListZeroedInit;
147 use cu29::prelude::CuCompactString;
148 #extra_imports
149 };
150
151 let with_uses = match &cuconfig.graphs {
152 ConfigGraphs::Simple(graph) => {
153 let support = match build_gen_cumsgs_support(&cuconfig, graph, None) {
154 Ok(support) => support,
155 Err(e) => return return_error(e.to_string()),
156 };
157
158 quote! {
159 mod cumsgs {
160 #common_imports
161 #support
162 }
163 use cumsgs::CuStampedDataSet;
164 type CuMsgs=CuStampedDataSet;
165 }
166 }
167 ConfigGraphs::Missions(graphs) => {
168 let mut missions: Vec<_> = graphs.iter().collect();
169 missions.sort_by(|a, b| a.0.cmp(b.0));
170
171 let mut mission_modules = Vec::<proc_macro2::TokenStream>::new();
172 for (mission, graph) in missions {
173 let mission_mod = match parse_str::<Ident>(mission.as_str()) {
174 Ok(id) => id,
175 Err(_) => {
176 return return_error(format!(
177 "Mission '{mission}' is not a valid Rust identifier for gen_cumsgs output."
178 ));
179 }
180 };
181
182 let support = match build_gen_cumsgs_support(&cuconfig, graph, Some(mission)) {
183 Ok(support) => support,
184 Err(e) => return return_error(e.to_string()),
185 };
186
187 mission_modules.push(quote! {
188 pub mod #mission_mod {
189 #common_imports
190 #support
191 }
192 });
193 }
194
195 let default_exports = if graphs.contains_key("default") {
196 quote! {
197 use cumsgs::default::CuStampedDataSet;
198 type CuMsgs=CuStampedDataSet;
199 }
200 } else {
201 quote! {}
202 };
203
204 quote! {
205 mod cumsgs {
206 #(#mission_modules)*
207 }
208 #default_exports
209 }
210 }
211 };
212 with_uses.into()
213}
214
215fn build_gen_cumsgs_support(
216 cuconfig: &CuConfig,
217 graph: &CuGraph,
218 mission_label: Option<&str>,
219) -> CuResult<proc_macro2::TokenStream> {
220 let task_specs = CuTaskSpecSet::from_graph(graph);
221 let channel_usage = collect_bridge_channel_usage(graph);
222 let mut bridge_specs = build_bridge_specs(cuconfig, graph, &channel_usage);
223 let (culist_plan, exec_entities, plan_to_original) =
224 build_execution_plan(graph, &task_specs, &mut bridge_specs).map_err(|e| {
225 if let Some(mission) = mission_label {
226 CuError::from(format!(
227 "Could not compute copperlist plan for mission '{mission}': {e}"
228 ))
229 } else {
230 CuError::from(format!("Could not compute copperlist plan: {e}"))
231 }
232 })?;
233 let task_member_names = collect_task_member_names(graph);
234 let (culist_order, node_output_positions) = collect_culist_metadata(
235 &culist_plan,
236 &exec_entities,
237 &mut bridge_specs,
238 &plan_to_original,
239 );
240
241 #[cfg(feature = "macro_debug")]
242 if let Some(mission) = mission_label {
243 eprintln!(
244 "[The CuStampedDataSet matching tasks ids for mission '{mission}' are {:?}]",
245 culist_order
246 );
247 } else {
248 eprintln!(
249 "[The CuStampedDataSet matching tasks ids are {:?}]",
250 culist_order
251 );
252 }
253
254 Ok(gen_culist_support(
255 &culist_plan,
256 &culist_order,
257 &node_output_positions,
258 &task_member_names,
259 &bridge_specs,
260 ))
261}
262
263fn gen_culist_support(
265 runtime_plan: &CuExecutionLoop,
266 culist_indices_in_plan_order: &[usize],
267 node_output_positions: &HashMap<NodeId, usize>,
268 task_member_names: &[(NodeId, String)],
269 bridge_specs: &[BridgeSpec],
270) -> proc_macro2::TokenStream {
271 #[cfg(feature = "macro_debug")]
272 eprintln!("[Extract msgs types]");
273 let output_packs = extract_output_packs(runtime_plan);
274 let slot_types: Vec<Type> = output_packs.iter().map(|pack| pack.slot_type()).collect();
275
276 let culist_size = output_packs.len();
277
278 #[cfg(feature = "macro_debug")]
279 eprintln!("[build the copperlist struct]");
280 let msgs_types_tuple: TypeTuple = build_culist_tuple(&slot_types);
281
282 #[cfg(feature = "macro_debug")]
283 eprintln!("[build the copperlist tuple bincode support]");
284 let msgs_types_tuple_encode = build_culist_tuple_encode(&slot_types);
285 let msgs_types_tuple_decode = build_culist_tuple_decode(&slot_types);
286
287 #[cfg(feature = "macro_debug")]
288 eprintln!("[build the copperlist tuple debug support]");
289 let msgs_types_tuple_debug = build_culist_tuple_debug(&slot_types);
290
291 #[cfg(feature = "macro_debug")]
292 eprintln!("[build the copperlist tuple serialize support]");
293 let msgs_types_tuple_serialize = build_culist_tuple_serialize(&slot_types);
294
295 #[cfg(feature = "macro_debug")]
296 eprintln!("[build the default tuple support]");
297 let msgs_types_tuple_default = build_culist_tuple_default(&slot_types);
298
299 #[cfg(feature = "macro_debug")]
300 eprintln!("[build erasedcumsgs]");
301
302 let erasedmsg_trait_impl = build_culist_erasedcumsgs(&output_packs);
303
304 let metadata_accessors: Vec<proc_macro2::TokenStream> = culist_indices_in_plan_order
305 .iter()
306 .map(|idx| {
307 let slot_index = syn::Index::from(*idx);
308 let pack = output_packs
309 .get(*idx)
310 .unwrap_or_else(|| panic!("Missing output pack for index {idx}"));
311 if pack.is_multi() {
312 quote! { &culist.msgs.0.#slot_index.0.metadata }
313 } else {
314 quote! { &culist.msgs.0.#slot_index.metadata }
315 }
316 })
317 .collect();
318 let mut zeroed_init_tokens: Vec<proc_macro2::TokenStream> = Vec::new();
319 for idx in culist_indices_in_plan_order {
320 let slot_index = syn::Index::from(*idx);
321 let pack = output_packs
322 .get(*idx)
323 .unwrap_or_else(|| panic!("Missing output pack for index {idx}"));
324 if pack.is_multi() {
325 for port_idx in 0..pack.msg_types.len() {
326 let port_index = syn::Index::from(port_idx);
327 zeroed_init_tokens.push(quote! {
328 self.0.#slot_index.#port_index.metadata.status_txt = CuCompactString::default();
329 self.0.#slot_index.#port_index.metadata.process_time.start =
330 cu29::clock::OptionCuTime::none();
331 self.0.#slot_index.#port_index.metadata.process_time.end =
332 cu29::clock::OptionCuTime::none();
333 });
334 }
335 } else {
336 zeroed_init_tokens.push(quote! {
337 self.0.#slot_index.metadata.status_txt = CuCompactString::default();
338 self.0.#slot_index.metadata.process_time.start = cu29::clock::OptionCuTime::none();
339 self.0.#slot_index.metadata.process_time.end = cu29::clock::OptionCuTime::none();
340 });
341 }
342 }
343 let collect_metadata_function = quote! {
344 pub fn collect_metadata<'a>(culist: &'a CuList) -> [&'a CuMsgMetadata; #culist_size] {
345 [#( #metadata_accessors, )*]
346 }
347 };
348
349 let cumsg_count: usize = output_packs.iter().map(|pack| pack.msg_types.len()).sum();
350
351 let payload_bytes_accumulators: Vec<proc_macro2::TokenStream> = culist_indices_in_plan_order
352 .iter()
353 .map(|idx| {
354 let slot_index = syn::Index::from(*idx);
355 let pack = output_packs
356 .get(*idx)
357 .unwrap_or_else(|| panic!("Missing output pack for index {idx}"));
358 if pack.is_multi() {
359 let iter = (0..pack.msg_types.len()).map(|port_idx| {
360 let port_index = syn::Index::from(port_idx);
361 quote! {
362 if let Some(payload) = culist.msgs.0.#slot_index.#port_index.payload() {
363 raw += cu29::monitoring::CuPayloadSize::raw_bytes(payload);
364 handles += cu29::monitoring::CuPayloadSize::handle_bytes(payload);
365 }
366 }
367 });
368 quote! { #(#iter)* }
369 } else {
370 quote! {
371 if let Some(payload) = culist.msgs.0.#slot_index.payload() {
372 raw += cu29::monitoring::CuPayloadSize::raw_bytes(payload);
373 handles += cu29::monitoring::CuPayloadSize::handle_bytes(payload);
374 }
375 }
376 }
377 })
378 .collect();
379
380 let payload_raw_bytes_accumulators: Vec<proc_macro2::TokenStream> = output_packs
381 .iter()
382 .enumerate()
383 .map(|(slot_idx, pack)| {
384 let slot_index = syn::Index::from(slot_idx);
385 if pack.is_multi() {
386 let iter = (0..pack.msg_types.len()).map(|port_idx| {
387 let port_index = syn::Index::from(port_idx);
388 quote! {
389 if let Some(payload) = self.0.#slot_index.#port_index.payload() {
390 bytes.push(Some(
391 cu29::monitoring::CuPayloadSize::raw_bytes(payload) as u64
392 ));
393 } else {
394 bytes.push(None);
395 }
396 }
397 });
398 quote! { #(#iter)* }
399 } else {
400 quote! {
401 if let Some(payload) = self.0.#slot_index.payload() {
402 bytes.push(Some(
403 cu29::monitoring::CuPayloadSize::raw_bytes(payload) as u64
404 ));
405 } else {
406 bytes.push(None);
407 }
408 }
409 }
410 })
411 .collect();
412
413 let compute_payload_bytes_fn = quote! {
414 pub fn compute_payload_bytes(culist: &CuList) -> (u64, u64) {
415 let mut raw: usize = 0;
416 let mut handles: usize = 0;
417 #(#payload_bytes_accumulators)*
418 (raw as u64, handles as u64)
419 }
420 };
421
422 let payload_raw_bytes_impl = quote! {
423 impl ::cu29::CuPayloadRawBytes for CuStampedDataSet {
424 fn payload_raw_bytes(&self) -> Vec<Option<u64>> {
425 let mut bytes: Vec<Option<u64>> = Vec::with_capacity(#cumsg_count);
426 #(#payload_raw_bytes_accumulators)*
427 bytes
428 }
429 }
430 };
431
432 let task_name_literals: Vec<String> = task_member_names
433 .iter()
434 .map(|(_, name)| name.clone())
435 .collect();
436
437 let mut slot_task_names: Vec<Option<String>> = vec![None; output_packs.len()];
438
439 let mut methods = Vec::new();
440 for (node_id, name) in task_member_names {
441 let output_position = node_output_positions
442 .get(node_id)
443 .unwrap_or_else(|| panic!("Task {name} (id: {node_id}) not found in execution order"));
444 let pack = output_packs
445 .get(*output_position)
446 .unwrap_or_else(|| panic!("Missing output pack for task {name}"));
447 let slot_index = syn::Index::from(*output_position);
448 slot_task_names[*output_position] = Some(name.clone());
449
450 if pack.msg_types.len() == 1 {
451 let fn_name = format_ident!("get_{}_output", name);
452 let payload_type = pack.msg_types.first().unwrap();
453 methods.push(quote! {
454 #[allow(dead_code)]
455 pub fn #fn_name(&self) -> &CuMsg<#payload_type> {
456 &self.0.#slot_index
457 }
458 });
459 } else {
460 let outputs_fn = format_ident!("get_{}_outputs", name);
461 let slot_type = pack.slot_type();
462 for (port_idx, payload_type) in pack.msg_types.iter().enumerate() {
463 let fn_name = format_ident!("get_{}_output_{}", name, port_idx);
464 let port_index = syn::Index::from(port_idx);
465 methods.push(quote! {
466 #[allow(dead_code)]
467 pub fn #fn_name(&self) -> &CuMsg<#payload_type> {
468 &self.0.#slot_index.#port_index
469 }
470 });
471 }
472 methods.push(quote! {
473 #[allow(dead_code)]
474 pub fn #outputs_fn(&self) -> &#slot_type {
475 &self.0.#slot_index
476 }
477 });
478 }
479 }
480
481 let mut logviz_blocks = Vec::new();
482 for (slot_idx, pack) in output_packs.iter().enumerate() {
483 if pack.msg_types.is_empty() {
484 continue;
485 }
486 let slot_index = syn::Index::from(slot_idx);
487 let slot_name = slot_task_names.get(slot_idx).and_then(|name| name.as_ref());
488
489 if pack.is_multi() {
490 for (port_idx, _) in pack.msg_types.iter().enumerate() {
491 let port_index = syn::Index::from(port_idx);
492 let path_expr = if let Some(name) = slot_name {
493 let lit = LitStr::new(name, Span::call_site());
494 quote! { format!("{}/{}", #lit, #port_idx) }
495 } else {
496 quote! { format!("slot_{}/{}", #slot_idx, #port_idx) }
497 };
498 logviz_blocks.push(quote! {
499 {
500 let msg = &self.0.#slot_index.#port_index;
501 if let Some(payload) = msg.payload() {
502 ::cu29_logviz::apply_tov(rec, &msg.tov);
503 let path = #path_expr;
504 ::cu29_logviz::log_payload_auto(rec, &path, payload)?;
505 }
506 }
507 });
508 }
509 } else {
510 let path_expr = if let Some(name) = slot_name {
511 let lit = LitStr::new(name, Span::call_site());
512 quote! { #lit.to_string() }
513 } else {
514 quote! { format!("slot_{}", #slot_idx) }
515 };
516 logviz_blocks.push(quote! {
517 {
518 let msg = &self.0.#slot_index;
519 if let Some(payload) = msg.payload() {
520 ::cu29_logviz::apply_tov(rec, &msg.tov);
521 let path = #path_expr;
522 ::cu29_logviz::log_payload_auto(rec, &path, payload)?;
523 }
524 }
525 });
526 }
527 }
528
529 let logviz_impl = if cfg!(feature = "logviz") {
530 quote! {
531 impl ::cu29_logviz::LogvizDataSet for CuStampedDataSet {
532 fn logviz_emit(
533 &self,
534 rec: &::cu29_logviz::RecordingStream,
535 ) -> ::cu29::prelude::CuResult<()> {
536 #(#logviz_blocks)*
537 Ok(())
538 }
539 }
540 }
541 } else {
542 quote! {}
543 };
544 for spec in bridge_specs {
546 for channel in &spec.rx_channels {
547 if let Some(culist_index) = channel.culist_index {
548 let slot_index = syn::Index::from(culist_index);
549 let bridge_name = config_id_to_struct_member(spec.id.as_str());
550 let channel_name = config_id_to_struct_member(channel.id.as_str());
551 let fn_name = format_ident!("get_{}_rx_{}", bridge_name, channel_name);
552 let msg_type = &channel.msg_type;
553
554 methods.push(quote! {
555 #[allow(dead_code)]
556 pub fn #fn_name(&self) -> &CuMsg<#msg_type> {
557 &self.0.#slot_index
558 }
559 });
560 }
561 }
562 }
563
564 quote! {
566 #collect_metadata_function
567 #compute_payload_bytes_fn
568
569 pub struct CuStampedDataSet(pub #msgs_types_tuple);
570
571 pub type CuList = CopperList<CuStampedDataSet>;
572
573 impl CuStampedDataSet {
574 #(#methods)*
575
576 #[allow(dead_code)]
577 fn get_tuple(&self) -> &#msgs_types_tuple {
578 &self.0
579 }
580
581 #[allow(dead_code)]
582 fn get_tuple_mut(&mut self) -> &mut #msgs_types_tuple {
583 &mut self.0
584 }
585 }
586
587 #payload_raw_bytes_impl
588 #logviz_impl
589
590 impl MatchingTasks for CuStampedDataSet {
591 #[allow(dead_code)]
592 fn get_all_task_ids() -> &'static [&'static str] {
593 &[#(#task_name_literals),*]
594 }
595 }
596
597 #msgs_types_tuple_encode
603 #msgs_types_tuple_decode
604
605 #msgs_types_tuple_debug
607
608 #msgs_types_tuple_serialize
610
611 #msgs_types_tuple_default
613
614 #erasedmsg_trait_impl
616
617 impl CuListZeroedInit for CuStampedDataSet {
618 fn init_zeroed(&mut self) {
619 #(#zeroed_init_tokens)*
620 }
621 }
622 }
623}
624
625fn gen_sim_support(
626 runtime_plan: &CuExecutionLoop,
627 exec_entities: &[ExecutionEntity],
628 bridge_specs: &[BridgeSpec],
629) -> proc_macro2::TokenStream {
630 #[cfg(feature = "macro_debug")]
631 eprintln!("[Sim: Build SimEnum]");
632 let plan_enum: Vec<proc_macro2::TokenStream> = runtime_plan
633 .steps
634 .iter()
635 .map(|unit| match unit {
636 CuExecutionUnit::Step(step) => match &exec_entities[step.node_id as usize].kind {
637 ExecutionEntityKind::Task { .. } => {
638 let enum_entry_name = config_id_to_enum(step.node.get_id().as_str());
639 let enum_ident = Ident::new(&enum_entry_name, Span::call_site());
640 let inputs: Vec<Type> = step
641 .input_msg_indices_types
642 .iter()
643 .map(|input| {
644 parse_str::<Type>(format!("CuMsg<{}>", input.msg_type).as_str()).unwrap()
645 })
646 .collect();
647 let output: Option<Type> = step.output_msg_pack.as_ref().map(|pack| {
648 let msg_types: Vec<Type> = pack
649 .msg_types
650 .iter()
651 .map(|msg_type| {
652 parse_str::<Type>(msg_type.as_str()).unwrap_or_else(|_| {
653 panic!("Could not transform {msg_type} into a message Rust type.")
654 })
655 })
656 .collect();
657 build_output_slot_type(&msg_types)
658 });
659 let no_output = parse_str::<Type>("CuMsg<()>").unwrap();
660 let output = output.as_ref().unwrap_or(&no_output);
661
662 let inputs_type = if inputs.is_empty() {
663 quote! { () }
664 } else if inputs.len() == 1 {
665 let input = inputs.first().unwrap();
666 quote! { &'a #input }
667 } else {
668 quote! { &'a (#(&'a #inputs),*) }
669 };
670
671 quote! {
672 #enum_ident(CuTaskCallbackState<#inputs_type, &'a mut #output>)
673 }
674 }
675 ExecutionEntityKind::BridgeRx { bridge_index, channel_index } => {
676 let bridge_spec = &bridge_specs[*bridge_index];
677 let channel = &bridge_spec.rx_channels[*channel_index];
678 let enum_entry_name = config_id_to_enum(&format!("{}_rx_{}", bridge_spec.id, channel.id));
679 let enum_ident = Ident::new(&enum_entry_name, Span::call_site());
680 let channel_type: Type = parse_str::<Type>(channel.msg_type_name.as_str()).unwrap();
681 let bridge_type = &bridge_spec.type_path;
682 let _const_ident = &channel.const_ident;
683 quote! {
684 #enum_ident {
685 channel: &'static cu29::cubridge::BridgeChannel<< <#bridge_type as cu29::cubridge::CuBridge>::Rx as cu29::cubridge::BridgeChannelSet >::Id, #channel_type>,
686 msg: &'a mut CuMsg<#channel_type>,
687 }
688 }
689 }
690 ExecutionEntityKind::BridgeTx { bridge_index, channel_index } => {
691 let bridge_spec = &bridge_specs[*bridge_index];
692 let channel = &bridge_spec.tx_channels[*channel_index];
693 let enum_entry_name = config_id_to_enum(&format!("{}_tx_{}", bridge_spec.id, channel.id));
694 let enum_ident = Ident::new(&enum_entry_name, Span::call_site());
695 let channel_type: Type = parse_str::<Type>(channel.msg_type_name.as_str()).unwrap();
696 let bridge_type = &bridge_spec.type_path;
697 let _const_ident = &channel.const_ident;
698 quote! {
699 #enum_ident {
700 channel: &'static cu29::cubridge::BridgeChannel<< <#bridge_type as cu29::cubridge::CuBridge>::Tx as cu29::cubridge::BridgeChannelSet >::Id, #channel_type>,
701 msg: &'a CuMsg<#channel_type>,
702 }
703 }
704 }
705 },
706 CuExecutionUnit::Loop(_) => {
707 todo!("Needs to be implemented")
708 }
709 })
710 .collect();
711
712 let mut variants = plan_enum;
714
715 for bridge_spec in bridge_specs {
717 let enum_entry_name = config_id_to_enum(&format!("{}_bridge", bridge_spec.id));
718 let enum_ident = Ident::new(&enum_entry_name, Span::call_site());
719 variants.push(quote! {
720 #enum_ident(cu29::simulation::CuBridgeLifecycleState)
721 });
722 }
723
724 variants.push(quote! { __Phantom(core::marker::PhantomData<&'a ()>) });
725 quote! {
726 #[allow(dead_code, unused_lifetimes)]
728 pub enum SimStep<'a> {
729 #(#variants),*
730 }
731 }
732}
733
734#[proc_macro_attribute]
738pub fn copper_runtime(args: TokenStream, input: TokenStream) -> TokenStream {
739 #[cfg(feature = "macro_debug")]
740 eprintln!("[entry]");
741 let mut application_struct = parse_macro_input!(input as ItemStruct);
742
743 let application_name = &application_struct.ident;
744 let builder_name = format_ident!("{}Builder", application_name);
745
746 let mut config_file: Option<LitStr> = None;
747 let mut sim_mode = false;
748
749 #[cfg(feature = "std")]
750 let std = true;
751
752 #[cfg(not(feature = "std"))]
753 let std = false;
754
755 let rt_guard = rtsan_guard_tokens();
756
757 let attribute_config_parser = parser(|meta| {
759 if meta.path.is_ident("config") {
760 config_file = Some(meta.value()?.parse()?);
761 Ok(())
762 } else if meta.path.is_ident("sim_mode") {
763 if meta.input.peek(syn::Token![=]) {
765 meta.input.parse::<syn::Token![=]>()?;
766 let value: syn::LitBool = meta.input.parse()?;
767 sim_mode = value.value();
768 Ok(())
769 } else {
770 sim_mode = true;
772 Ok(())
773 }
774 } else {
775 Err(meta.error("unsupported property"))
776 }
777 });
778
779 #[cfg(feature = "macro_debug")]
780 eprintln!("[parse]");
781 parse_macro_input!(args with attribute_config_parser);
783
784 let config_file = match config_file {
795 Some(file) => file.value(),
796 None => {
797 return return_error(
798 "Expected config file attribute like #[CopperRuntime(config = \"path\")]"
799 .to_string(),
800 );
801 }
802 };
803
804 if !std::path::Path::new(&config_full_path(&config_file)).exists() {
805 return return_error(format!(
806 "The configuration file `{config_file}` does not exist. Please provide a valid path."
807 ));
808 }
809
810 let copper_config = match read_config(&config_file) {
811 Ok(cuconfig) => cuconfig,
812 Err(e) => return return_error(e.to_string()),
813 };
814 let copper_config_content = match read_to_string(config_full_path(config_file.as_str())) {
815 Ok(ok) => ok,
816 Err(e) => {
817 return return_error(format!(
818 "Could not read the config file (should not happen because we just succeeded just before). {e}"
819 ));
820 }
821 };
822 let caller_root = utils::caller_crate_root();
823 let (git_commit, git_dirty) = detect_git_info(&caller_root);
824 let git_commit_tokens = if let Some(commit) = git_commit {
825 quote! { Some(#commit.to_string()) }
826 } else {
827 quote! { None }
828 };
829 let git_dirty_tokens = if let Some(dirty) = git_dirty {
830 quote! { Some(#dirty) }
831 } else {
832 quote! { None }
833 };
834
835 #[cfg(feature = "macro_debug")]
836 eprintln!("[build monitor type]");
837 let monitor_configs = copper_config.get_monitor_configs();
838 let (monitor_type, monitor_instanciator_body) = if monitor_configs.is_empty() {
839 (
840 quote! { NoMonitor },
841 quote! {
842 let mut monitor = NoMonitor::new(config, TASKS_IDS)
843 .expect("Failed to create NoMonitor.");
844 let copperlist_info = ::cu29::monitoring::CopperListInfo::new(
845 core::mem::size_of::<CuList>(),
846 #DEFAULT_CLNB,
847 );
848 monitor.set_copperlist_info(copperlist_info);
849 monitor
850 },
851 )
852 } else if monitor_configs.len() == 1 {
853 let only_monitor_type = parse_str::<Type>(monitor_configs[0].get_type())
854 .expect("Could not transform the monitor type name into a Rust type.");
855 (
856 quote! { #only_monitor_type },
857 quote! {
858 let mut monitor = #only_monitor_type::new(config, TASKS_IDS)
859 .expect("Failed to create the given monitor.");
860 let copperlist_info = ::cu29::monitoring::CopperListInfo::new(
861 core::mem::size_of::<CuList>(),
862 #DEFAULT_CLNB,
863 );
864 monitor.set_copperlist_info(copperlist_info);
865 monitor
866 },
867 )
868 } else {
869 let monitor_types: Vec<Type> = monitor_configs
870 .iter()
871 .map(|monitor_config| {
872 parse_str::<Type>(monitor_config.get_type())
873 .expect("Could not transform the monitor type name into a Rust type.")
874 })
875 .collect();
876 let monitor_bindings: Vec<Ident> = (0..monitor_types.len())
877 .map(|idx| format_ident!("__cu_monitor_{idx}"))
878 .collect();
879 let monitor_config_bindings: Vec<Ident> = (0..monitor_types.len())
880 .map(|idx| format_ident!("__cu_monitor_cfg_{idx}"))
881 .collect();
882 let monitor_indices: Vec<syn::Index> =
883 (0..monitor_types.len()).map(syn::Index::from).collect();
884
885 let monitor_builders: Vec<proc_macro2::TokenStream> = monitor_types
886 .iter()
887 .zip(monitor_bindings.iter())
888 .zip(monitor_config_bindings.iter())
889 .zip(monitor_indices.iter())
890 .map(|(((monitor_ty, monitor_binding), config_binding), monitor_idx)| {
891 quote! {
892 let mut #config_binding = config.clone();
893 let __cu_monitor_cfg_entry = config
894 .get_monitor_configs()
895 .get(#monitor_idx)
896 .cloned()
897 .unwrap_or_else(|| panic!("Missing monitor config at index {}", #monitor_idx));
898 #config_binding.monitors = vec![__cu_monitor_cfg_entry];
899 let #monitor_binding = #monitor_ty::new(
900 &#config_binding,
901 TASKS_IDS,
902 )
903 .expect("Failed to create one of the configured monitors.");
904 }
905 })
906 .collect();
907 let tuple_type: TypeTuple = parse_quote! { (#(#monitor_types),*,) };
908 (
909 quote! { #tuple_type },
910 quote! {
911 #(#monitor_builders)*
912 let mut monitor: #tuple_type = (#(#monitor_bindings),*,);
913 let copperlist_info = ::cu29::monitoring::CopperListInfo::new(
914 core::mem::size_of::<CuList>(),
915 #DEFAULT_CLNB,
916 );
917 monitor.set_copperlist_info(copperlist_info);
918 monitor
919 },
920 )
921 };
922
923 #[cfg(feature = "macro_debug")]
925 eprintln!("[build runtime field]");
926 let runtime_field: Field = if sim_mode {
928 parse_quote! {
929 copper_runtime: cu29::curuntime::CuRuntime<CuSimTasks, CuBridges, CuStampedDataSet, #monitor_type, #DEFAULT_CLNB>
930 }
931 } else {
932 parse_quote! {
933 copper_runtime: cu29::curuntime::CuRuntime<CuTasks, CuBridges, CuStampedDataSet, #monitor_type, #DEFAULT_CLNB>
934 }
935 };
936 let lifecycle_stream_field: Field = parse_quote! {
937 runtime_lifecycle_stream: Option<Box<dyn WriteStream<RuntimeLifecycleRecord>>>
938 };
939
940 #[cfg(feature = "macro_debug")]
941 eprintln!("[match struct anonymity]");
942 match &mut application_struct.fields {
943 Named(fields_named) => {
944 fields_named.named.push(runtime_field);
945 fields_named.named.push(lifecycle_stream_field);
946 }
947 Unnamed(fields_unnamed) => {
948 fields_unnamed.unnamed.push(runtime_field);
949 fields_unnamed.unnamed.push(lifecycle_stream_field);
950 }
951 Fields::Unit => {
952 panic!(
953 "This struct is a unit struct, it should have named or unnamed fields. use struct Something {{}} and not struct Something;"
954 )
955 }
956 };
957
958 let all_missions = copper_config.graphs.get_all_missions_graphs();
959 let mut all_missions_tokens = Vec::<proc_macro2::TokenStream>::new();
960 for (mission, graph) in &all_missions {
961 let git_commit_tokens = git_commit_tokens.clone();
962 let git_dirty_tokens = git_dirty_tokens.clone();
963 let mission_mod = parse_str::<Ident>(mission.as_str())
964 .expect("Could not make an identifier of the mission name");
965
966 #[cfg(feature = "macro_debug")]
967 eprintln!("[extract tasks ids & types]");
968 let task_specs = CuTaskSpecSet::from_graph(graph);
969
970 let culist_channel_usage = collect_bridge_channel_usage(graph);
971 let mut culist_bridge_specs =
972 build_bridge_specs(&copper_config, graph, &culist_channel_usage);
973 let (culist_plan, culist_exec_entities, culist_plan_to_original) =
974 match build_execution_plan(graph, &task_specs, &mut culist_bridge_specs) {
975 Ok(plan) => plan,
976 Err(e) => return return_error(format!("Could not compute copperlist plan: {e}")),
977 };
978 let task_member_names = collect_task_member_names(graph);
979 let (culist_call_order, node_output_positions) = collect_culist_metadata(
980 &culist_plan,
981 &culist_exec_entities,
982 &mut culist_bridge_specs,
983 &culist_plan_to_original,
984 );
985
986 #[cfg(feature = "macro_debug")]
987 {
988 eprintln!("[runtime plan for mission {mission}]");
989 eprintln!("{culist_plan:?}");
990 }
991
992 let culist_support: proc_macro2::TokenStream = gen_culist_support(
993 &culist_plan,
994 &culist_call_order,
995 &node_output_positions,
996 &task_member_names,
997 &culist_bridge_specs,
998 );
999
1000 let bundle_specs = match build_bundle_specs(&copper_config, mission.as_str()) {
1001 Ok(specs) => specs,
1002 Err(e) => return return_error(e.to_string()),
1003 };
1004 let threadpool_bundle_index = if task_specs.background_flags.iter().any(|&flag| flag) {
1005 match bundle_specs
1006 .iter()
1007 .position(|bundle| bundle.id == "threadpool")
1008 {
1009 Some(index) => Some(index),
1010 None => {
1011 return return_error(
1012 "Background tasks require the threadpool bundle to be configured"
1013 .to_string(),
1014 );
1015 }
1016 }
1017 } else {
1018 None
1019 };
1020
1021 let resource_specs =
1022 match collect_resource_specs(graph, &task_specs, &culist_bridge_specs, &bundle_specs) {
1023 Ok(specs) => specs,
1024 Err(e) => return return_error(e.to_string()),
1025 };
1026
1027 let (resources_module, resources_instanciator_fn) =
1028 match build_resources_module(&bundle_specs) {
1029 Ok(tokens) => tokens,
1030 Err(e) => return return_error(e.to_string()),
1031 };
1032 let task_resource_mappings =
1033 match build_task_resource_mappings(&resource_specs, &task_specs) {
1034 Ok(tokens) => tokens,
1035 Err(e) => return return_error(e.to_string()),
1036 };
1037 let bridge_resource_mappings =
1038 build_bridge_resource_mappings(&resource_specs, &culist_bridge_specs);
1039
1040 let ids = build_monitored_ids(&task_specs.ids, &mut culist_bridge_specs);
1041
1042 let task_reflect_read_arms: Vec<proc_macro2::TokenStream> = task_specs
1043 .ids
1044 .iter()
1045 .enumerate()
1046 .map(|(index, task_id)| {
1047 let task_index = syn::Index::from(index);
1048 let task_id_lit = LitStr::new(task_id, Span::call_site());
1049 quote! {
1050 #task_id_lit => Some(&self.copper_runtime.tasks.#task_index as &dyn cu29::reflect::Reflect),
1051 }
1052 })
1053 .collect();
1054
1055 let task_reflect_write_arms: Vec<proc_macro2::TokenStream> = task_specs
1056 .ids
1057 .iter()
1058 .enumerate()
1059 .map(|(index, task_id)| {
1060 let task_index = syn::Index::from(index);
1061 let task_id_lit = LitStr::new(task_id, Span::call_site());
1062 quote! {
1063 #task_id_lit => Some(&mut self.copper_runtime.tasks.#task_index as &mut dyn cu29::reflect::Reflect),
1064 }
1065 })
1066 .collect();
1067
1068 let mut reflect_registry_types: BTreeMap<String, Type> = BTreeMap::new();
1069 let mut add_reflect_type = |ty: Type| {
1070 let key = quote! { #ty }.to_string();
1071 reflect_registry_types.entry(key).or_insert(ty);
1072 };
1073
1074 for task_type in &task_specs.task_types {
1075 add_reflect_type(task_type.clone());
1076 }
1077
1078 for bridge_spec in &culist_bridge_specs {
1079 add_reflect_type(bridge_spec.type_path.clone());
1080 for channel in bridge_spec
1081 .rx_channels
1082 .iter()
1083 .chain(bridge_spec.tx_channels.iter())
1084 {
1085 add_reflect_type(channel.msg_type.clone());
1086 }
1087 }
1088
1089 for output_pack in extract_output_packs(&culist_plan) {
1090 for msg_type in output_pack.msg_types {
1091 add_reflect_type(msg_type);
1092 }
1093 }
1094
1095 let reflect_type_registration_calls: Vec<proc_macro2::TokenStream> = reflect_registry_types
1096 .values()
1097 .map(|ty| {
1098 quote! {
1099 registry.register::<#ty>();
1100 }
1101 })
1102 .collect();
1103
1104 let bridge_types: Vec<Type> = culist_bridge_specs
1105 .iter()
1106 .map(|spec| spec.type_path.clone())
1107 .collect();
1108 let bridges_type_tokens: proc_macro2::TokenStream = if bridge_types.is_empty() {
1109 quote! { () }
1110 } else {
1111 let tuple: TypeTuple = parse_quote! { (#(#bridge_types),*,) };
1112 quote! { #tuple }
1113 };
1114
1115 let bridge_binding_idents: Vec<Ident> = culist_bridge_specs
1116 .iter()
1117 .enumerate()
1118 .map(|(idx, _)| format_ident!("bridge_{idx}"))
1119 .collect();
1120
1121 let bridge_init_statements: Vec<proc_macro2::TokenStream> = culist_bridge_specs
1122 .iter()
1123 .enumerate()
1124 .map(|(idx, spec)| {
1125 let binding_ident = &bridge_binding_idents[idx];
1126 let bridge_mapping_ref = bridge_resource_mappings.refs[idx].clone();
1127 let bridge_type = &spec.type_path;
1128 let bridge_name = spec.id.clone();
1129 let config_index = syn::Index::from(spec.config_index);
1130 let binding_error = LitStr::new(
1131 &format!("Failed to bind resources for bridge '{}'", bridge_name),
1132 Span::call_site(),
1133 );
1134 let tx_configs: Vec<proc_macro2::TokenStream> = spec
1135 .tx_channels
1136 .iter()
1137 .map(|channel| {
1138 let const_ident = &channel.const_ident;
1139 let channel_name = channel.id.clone();
1140 let channel_config_index = syn::Index::from(channel.config_index);
1141 quote! {
1142 {
1143 let (channel_route, channel_config) = match &bridge_cfg.channels[#channel_config_index] {
1144 cu29::config::BridgeChannelConfigRepresentation::Tx { route, config, .. } => {
1145 (route.clone(), config.clone())
1146 }
1147 _ => panic!(
1148 "Bridge '{}' channel '{}' expected to be Tx",
1149 #bridge_name,
1150 #channel_name
1151 ),
1152 };
1153 cu29::cubridge::BridgeChannelConfig::from_static(
1154 &<#bridge_type as cu29::cubridge::CuBridge>::Tx::#const_ident,
1155 channel_route,
1156 channel_config,
1157 )
1158 }
1159 }
1160 })
1161 .collect();
1162 let rx_configs: Vec<proc_macro2::TokenStream> = spec
1163 .rx_channels
1164 .iter()
1165 .map(|channel| {
1166 let const_ident = &channel.const_ident;
1167 let channel_name = channel.id.clone();
1168 let channel_config_index = syn::Index::from(channel.config_index);
1169 quote! {
1170 {
1171 let (channel_route, channel_config) = match &bridge_cfg.channels[#channel_config_index] {
1172 cu29::config::BridgeChannelConfigRepresentation::Rx { route, config, .. } => {
1173 (route.clone(), config.clone())
1174 }
1175 _ => panic!(
1176 "Bridge '{}' channel '{}' expected to be Rx",
1177 #bridge_name,
1178 #channel_name
1179 ),
1180 };
1181 cu29::cubridge::BridgeChannelConfig::from_static(
1182 &<#bridge_type as cu29::cubridge::CuBridge>::Rx::#const_ident,
1183 channel_route,
1184 channel_config,
1185 )
1186 }
1187 }
1188 })
1189 .collect();
1190 quote! {
1191 let #binding_ident = {
1192 let bridge_cfg = config
1193 .bridges
1194 .get(#config_index)
1195 .unwrap_or_else(|| panic!("Bridge '{}' missing from configuration", #bridge_name));
1196 let bridge_mapping = #bridge_mapping_ref;
1197 let bridge_resources = <<#bridge_type as cu29::cubridge::CuBridge>::Resources<'_> as ResourceBindings>::from_bindings(
1198 resources,
1199 bridge_mapping,
1200 )
1201 .map_err(|e| cu29::CuError::new_with_cause(#binding_error, e))?;
1202 let tx_channels: &[cu29::cubridge::BridgeChannelConfig<
1203 <<#bridge_type as cu29::cubridge::CuBridge>::Tx as cu29::cubridge::BridgeChannelSet>::Id,
1204 >] = &[#(#tx_configs),*];
1205 let rx_channels: &[cu29::cubridge::BridgeChannelConfig<
1206 <<#bridge_type as cu29::cubridge::CuBridge>::Rx as cu29::cubridge::BridgeChannelSet>::Id,
1207 >] = &[#(#rx_configs),*];
1208 <#bridge_type as cu29::cubridge::CuBridge>::new(
1209 bridge_cfg.config.as_ref(),
1210 tx_channels,
1211 rx_channels,
1212 bridge_resources,
1213 )?
1214 };
1215 }
1216 })
1217 .collect();
1218
1219 let bridges_instanciator = if culist_bridge_specs.is_empty() {
1220 quote! {
1221 pub fn bridges_instanciator(_config: &CuConfig, resources: &mut ResourceManager) -> CuResult<CuBridges> {
1222 let _ = resources;
1223 Ok(())
1224 }
1225 }
1226 } else {
1227 let bridge_bindings = bridge_binding_idents.clone();
1228 quote! {
1229 pub fn bridges_instanciator(config: &CuConfig, resources: &mut ResourceManager) -> CuResult<CuBridges> {
1230 #(#bridge_init_statements)*
1231 Ok((#(#bridge_bindings),*,))
1232 }
1233 }
1234 };
1235
1236 let all_sim_tasks_types: Vec<Type> = task_specs
1237 .ids
1238 .iter()
1239 .zip(&task_specs.cutypes)
1240 .zip(&task_specs.sim_task_types)
1241 .zip(&task_specs.background_flags)
1242 .zip(&task_specs.run_in_sim_flags)
1243 .zip(task_specs.output_types.iter())
1244 .map(|(((((task_id, task_type), sim_type), background), run_in_sim), output_type)| {
1245 match task_type {
1246 CuTaskType::Source => {
1247 if *background {
1248 panic!("CuSrcTask {task_id} cannot be a background task, it should be a regular task.");
1249 }
1250 if *run_in_sim {
1251 sim_type.clone()
1252 } else {
1253 let msg_type = graph
1254 .get_node_output_msg_type(task_id.as_str())
1255 .unwrap_or_else(|| panic!("CuSrcTask {task_id} should have an outgoing connection with a valid output msg type"));
1256 let sim_task_name = format!("CuSimSrcTask<{msg_type}>");
1257 parse_str(sim_task_name.as_str()).unwrap_or_else(|_| panic!("Could not build the placeholder for simulation: {sim_task_name}"))
1258 }
1259 }
1260 CuTaskType::Regular => {
1261 if *background {
1262 if let Some(out_ty) = output_type {
1263 parse_quote!(CuAsyncTask<#sim_type, #out_ty>)
1264 } else {
1265 panic!("{task_id}: If a task is background, it has to have an output");
1266 }
1267 } else {
1268 sim_type.clone()
1270 }
1271 },
1272 CuTaskType::Sink => {
1273 if *background {
1274 panic!("CuSinkTask {task_id} cannot be a background task, it should be a regular task.");
1275 }
1276
1277 if *run_in_sim {
1278 sim_type.clone()
1280 }
1281 else {
1282 let msg_types = graph
1284 .get_node_input_msg_types(task_id.as_str())
1285 .unwrap_or_else(|| panic!("CuSinkTask {task_id} should have an incoming connection with a valid input msg type"));
1286 let msg_type = if msg_types.len() == 1 {
1287 format!("({},)", msg_types[0])
1288 } else {
1289 format!("({})", msg_types.join(", "))
1290 };
1291 let sim_task_name = format!("CuSimSinkTask<{msg_type}>");
1292 parse_str(sim_task_name.as_str()).unwrap_or_else(|_| panic!("Could not build the placeholder for simulation: {sim_task_name}"))
1293 }
1294 }
1295 }
1296 })
1297 .collect();
1298
1299 #[cfg(feature = "macro_debug")]
1300 eprintln!("[build task tuples]");
1301
1302 let task_types = &task_specs.task_types;
1303 let task_types_tuple: TypeTuple = if task_types.is_empty() {
1306 parse_quote! { () }
1307 } else {
1308 parse_quote! { (#(#task_types),*,) }
1309 };
1310
1311 let task_types_tuple_sim: TypeTuple = if all_sim_tasks_types.is_empty() {
1312 parse_quote! { () }
1313 } else {
1314 parse_quote! { (#(#all_sim_tasks_types),*,) }
1315 };
1316
1317 #[cfg(feature = "macro_debug")]
1318 eprintln!("[gen instances]");
1319 let task_sim_instances_init_code = all_sim_tasks_types
1320 .iter()
1321 .enumerate()
1322 .map(|(index, ty)| {
1323 let additional_error_info = format!(
1324 "Failed to get create instance for {}, instance index {}.",
1325 task_specs.type_names[index], index
1326 );
1327 let mapping_ref = task_resource_mappings.refs[index].clone();
1328 let background = task_specs.background_flags[index];
1329 let inner_task_type = &task_specs.sim_task_types[index];
1330 match task_specs.cutypes[index] {
1331 CuTaskType::Source => quote! {
1332 {
1333 let resources = <<#ty as CuSrcTask>::Resources<'_> as ResourceBindings>::from_bindings(
1334 resources,
1335 #mapping_ref,
1336 ).map_err(|e| e.add_cause(#additional_error_info))?;
1337 <#ty as CuSrcTask>::new(all_instances_configs[#index], resources)
1338 .map_err(|e| e.add_cause(#additional_error_info))?
1339 }
1340 },
1341 CuTaskType::Regular => {
1342 if background {
1343 let threadpool_bundle_index = threadpool_bundle_index
1344 .expect("threadpool bundle missing for background tasks");
1345 quote! {
1346 {
1347 let inner_resources = <<#inner_task_type as CuTask>::Resources<'_> as ResourceBindings>::from_bindings(
1348 resources,
1349 #mapping_ref,
1350 ).map_err(|e| e.add_cause(#additional_error_info))?;
1351 let threadpool_key = cu29::resource::ResourceKey::new(
1352 cu29::resource::BundleIndex::new(#threadpool_bundle_index),
1353 <cu29::resource::ThreadPoolBundle as cu29::resource::ResourceBundleDecl>::Id::BgThreads as usize,
1354 );
1355 let threadpool = resources.borrow_shared_arc(threadpool_key)?;
1356 let resources = cu29::cuasynctask::CuAsyncTaskResources {
1357 inner: inner_resources,
1358 threadpool,
1359 };
1360 <#ty as CuTask>::new(all_instances_configs[#index], resources)
1361 .map_err(|e| e.add_cause(#additional_error_info))?
1362 }
1363 }
1364 } else {
1365 quote! {
1366 {
1367 let resources = <<#ty as CuTask>::Resources<'_> as ResourceBindings>::from_bindings(
1368 resources,
1369 #mapping_ref,
1370 ).map_err(|e| e.add_cause(#additional_error_info))?;
1371 <#ty as CuTask>::new(all_instances_configs[#index], resources)
1372 .map_err(|e| e.add_cause(#additional_error_info))?
1373 }
1374 }
1375 }
1376 }
1377 CuTaskType::Sink => quote! {
1378 {
1379 let resources = <<#ty as CuSinkTask>::Resources<'_> as ResourceBindings>::from_bindings(
1380 resources,
1381 #mapping_ref,
1382 ).map_err(|e| e.add_cause(#additional_error_info))?;
1383 <#ty as CuSinkTask>::new(all_instances_configs[#index], resources)
1384 .map_err(|e| e.add_cause(#additional_error_info))?
1385 }
1386 },
1387 }
1388 })
1389 .collect::<Vec<_>>();
1390
1391 let task_instances_init_code = task_specs
1392 .instantiation_types
1393 .iter()
1394 .zip(&task_specs.background_flags)
1395 .enumerate()
1396 .map(|(index, (task_type, background))| {
1397 let additional_error_info = format!(
1398 "Failed to get create instance for {}, instance index {}.",
1399 task_specs.type_names[index], index
1400 );
1401 let mapping_ref = task_resource_mappings.refs[index].clone();
1402 let inner_task_type = &task_specs.sim_task_types[index];
1403 match task_specs.cutypes[index] {
1404 CuTaskType::Source => quote! {
1405 {
1406 let resources = <<#task_type as CuSrcTask>::Resources<'_> as ResourceBindings>::from_bindings(
1407 resources,
1408 #mapping_ref,
1409 ).map_err(|e| e.add_cause(#additional_error_info))?;
1410 <#task_type as CuSrcTask>::new(all_instances_configs[#index], resources)
1411 .map_err(|e| e.add_cause(#additional_error_info))?
1412 }
1413 },
1414 CuTaskType::Regular => {
1415 if *background {
1416 let threadpool_bundle_index = threadpool_bundle_index
1417 .expect("threadpool bundle missing for background tasks");
1418 quote! {
1419 {
1420 let inner_resources = <<#inner_task_type as CuTask>::Resources<'_> as ResourceBindings>::from_bindings(
1421 resources,
1422 #mapping_ref,
1423 ).map_err(|e| e.add_cause(#additional_error_info))?;
1424 let threadpool_key = cu29::resource::ResourceKey::new(
1425 cu29::resource::BundleIndex::new(#threadpool_bundle_index),
1426 <cu29::resource::ThreadPoolBundle as cu29::resource::ResourceBundleDecl>::Id::BgThreads as usize,
1427 );
1428 let threadpool = resources.borrow_shared_arc(threadpool_key)?;
1429 let resources = cu29::cuasynctask::CuAsyncTaskResources {
1430 inner: inner_resources,
1431 threadpool,
1432 };
1433 <#task_type as CuTask>::new(all_instances_configs[#index], resources)
1434 .map_err(|e| e.add_cause(#additional_error_info))?
1435 }
1436 }
1437 } else {
1438 quote! {
1439 {
1440 let resources = <<#task_type as CuTask>::Resources<'_> as ResourceBindings>::from_bindings(
1441 resources,
1442 #mapping_ref,
1443 ).map_err(|e| e.add_cause(#additional_error_info))?;
1444 <#task_type as CuTask>::new(all_instances_configs[#index], resources)
1445 .map_err(|e| e.add_cause(#additional_error_info))?
1446 }
1447 }
1448 }
1449 }
1450 CuTaskType::Sink => quote! {
1451 {
1452 let resources = <<#task_type as CuSinkTask>::Resources<'_> as ResourceBindings>::from_bindings(
1453 resources,
1454 #mapping_ref,
1455 ).map_err(|e| e.add_cause(#additional_error_info))?;
1456 <#task_type as CuSinkTask>::new(all_instances_configs[#index], resources)
1457 .map_err(|e| e.add_cause(#additional_error_info))?
1458 }
1459 },
1460 }
1461 })
1462 .collect::<Vec<_>>();
1463
1464 let (
1467 task_restore_code,
1468 task_start_calls,
1469 task_stop_calls,
1470 task_preprocess_calls,
1471 task_postprocess_calls,
1472 ): (Vec<_>, Vec<_>, Vec<_>, Vec<_>, Vec<_>) = itertools::multiunzip(
1473 (0..task_specs.task_types.len())
1474 .map(|index| {
1475 let task_index = int2sliceindex(index as u32);
1476 let task_tuple_index = syn::Index::from(index);
1477 let task_enum_name = config_id_to_enum(&task_specs.ids[index]);
1478 let enum_name = Ident::new(&task_enum_name, Span::call_site());
1479 (
1480 quote! {
1482 tasks.#task_tuple_index.thaw(&mut decoder).map_err(|e| CuError::from("Failed to thaw").add_cause(&e.to_string()))?
1483 },
1484 { let monitoring_action = quote! {
1486 let decision = self.copper_runtime.monitor.process_error(#index, CuTaskState::Start, &error);
1487 match decision {
1488 Decision::Abort => {
1489 debug!("Start: ABORT decision from monitoring. Task '{}' errored out \
1490 during start. Aborting all the other starts.", #mission_mod::TASKS_IDS[#index]);
1491 return Ok(());
1492
1493 }
1494 Decision::Ignore => {
1495 debug!("Start: IGNORE decision from monitoring. Task '{}' errored out \
1496 during start. The runtime will continue.", #mission_mod::TASKS_IDS[#index]);
1497 }
1498 Decision::Shutdown => {
1499 debug!("Start: SHUTDOWN decision from monitoring. Task '{}' errored out \
1500 during start. The runtime cannot continue.", #mission_mod::TASKS_IDS[#index]);
1501 return Err(CuError::new_with_cause("Task errored out during start.", error));
1502 }
1503 }
1504 };
1505
1506 let call_sim_callback = if sim_mode {
1507 quote! {
1508 let ovr = sim_callback(SimStep::#enum_name(CuTaskCallbackState::Start));
1510
1511 let doit = if let SimOverride::Errored(reason) = ovr {
1512 let error: CuError = reason.into();
1513 #monitoring_action
1514 false
1515 }
1516 else {
1517 ovr == SimOverride::ExecuteByRuntime
1518 };
1519 }
1520 } else {
1521 quote! {
1522 let doit = true; }
1524 };
1525
1526
1527 quote! {
1528 #call_sim_callback
1529 if doit {
1530 self.copper_runtime.record_execution_marker(
1531 cu29::monitoring::ExecutionMarker {
1532 component_id: #index,
1533 step: CuTaskState::Start,
1534 culistid: None,
1535 }
1536 );
1537 let task = &mut self.copper_runtime.tasks.#task_index;
1538 ctx.set_current_task(#index);
1539 if let Err(error) = task.start(&ctx) {
1540 #monitoring_action
1541 }
1542 }
1543 }
1544 },
1545 { let monitoring_action = quote! {
1547 let decision = self.copper_runtime.monitor.process_error(#index, CuTaskState::Stop, &error);
1548 match decision {
1549 Decision::Abort => {
1550 debug!("Stop: ABORT decision from monitoring. Task '{}' errored out \
1551 during stop. Aborting all the other starts.", #mission_mod::TASKS_IDS[#index]);
1552 return Ok(());
1553
1554 }
1555 Decision::Ignore => {
1556 debug!("Stop: IGNORE decision from monitoring. Task '{}' errored out \
1557 during stop. The runtime will continue.", #mission_mod::TASKS_IDS[#index]);
1558 }
1559 Decision::Shutdown => {
1560 debug!("Stop: SHUTDOWN decision from monitoring. Task '{}' errored out \
1561 during stop. The runtime cannot continue.", #mission_mod::TASKS_IDS[#index]);
1562 return Err(CuError::new_with_cause("Task errored out during stop.", error));
1563 }
1564 }
1565 };
1566 let call_sim_callback = if sim_mode {
1567 quote! {
1568 let ovr = sim_callback(SimStep::#enum_name(CuTaskCallbackState::Stop));
1570
1571 let doit = if let SimOverride::Errored(reason) = ovr {
1572 let error: CuError = reason.into();
1573 #monitoring_action
1574 false
1575 }
1576 else {
1577 ovr == SimOverride::ExecuteByRuntime
1578 };
1579 }
1580 } else {
1581 quote! {
1582 let doit = true; }
1584 };
1585 quote! {
1586 #call_sim_callback
1587 if doit {
1588 self.copper_runtime.record_execution_marker(
1589 cu29::monitoring::ExecutionMarker {
1590 component_id: #index,
1591 step: CuTaskState::Stop,
1592 culistid: None,
1593 }
1594 );
1595 let task = &mut self.copper_runtime.tasks.#task_index;
1596 ctx.set_current_task(#index);
1597 if let Err(error) = task.stop(&ctx) {
1598 #monitoring_action
1599 }
1600 }
1601 }
1602 },
1603 { let monitoring_action = quote! {
1605 let decision = monitor.process_error(#index, CuTaskState::Preprocess, &error);
1606 match decision {
1607 Decision::Abort => {
1608 debug!("Preprocess: ABORT decision from monitoring. Task '{}' errored out \
1609 during preprocess. Aborting all the other starts.", #mission_mod::TASKS_IDS[#index]);
1610 return Ok(());
1611
1612 }
1613 Decision::Ignore => {
1614 debug!("Preprocess: IGNORE decision from monitoring. Task '{}' errored out \
1615 during preprocess. The runtime will continue.", #mission_mod::TASKS_IDS[#index]);
1616 }
1617 Decision::Shutdown => {
1618 debug!("Preprocess: SHUTDOWN decision from monitoring. Task '{}' errored out \
1619 during preprocess. The runtime cannot continue.", #mission_mod::TASKS_IDS[#index]);
1620 return Err(CuError::new_with_cause("Task errored out during preprocess.", error));
1621 }
1622 }
1623 };
1624 let call_sim_callback = if sim_mode {
1625 quote! {
1626 let ovr = sim_callback(SimStep::#enum_name(CuTaskCallbackState::Preprocess));
1628
1629 let doit = if let SimOverride::Errored(reason) = ovr {
1630 let error: CuError = reason.into();
1631 #monitoring_action
1632 false
1633 } else {
1634 ovr == SimOverride::ExecuteByRuntime
1635 };
1636 }
1637 } else {
1638 quote! {
1639 let doit = true; }
1641 };
1642 quote! {
1643 #call_sim_callback
1644 if doit {
1645 execution_probe.record(cu29::monitoring::ExecutionMarker {
1646 component_id: #index,
1647 step: CuTaskState::Preprocess,
1648 culistid: None,
1649 });
1650 ctx.set_current_task(#index);
1651 let maybe_error = {
1652 #rt_guard
1653 tasks.#task_index.preprocess(&ctx)
1654 };
1655 if let Err(error) = maybe_error {
1656 #monitoring_action
1657 }
1658 }
1659 }
1660 },
1661 { let monitoring_action = quote! {
1663 let decision = monitor.process_error(#index, CuTaskState::Postprocess, &error);
1664 match decision {
1665 Decision::Abort => {
1666 debug!("Postprocess: ABORT decision from monitoring. Task '{}' errored out \
1667 during postprocess. Aborting all the other starts.", #mission_mod::TASKS_IDS[#index]);
1668 return Ok(());
1669
1670 }
1671 Decision::Ignore => {
1672 debug!("Postprocess: IGNORE decision from monitoring. Task '{}' errored out \
1673 during postprocess. The runtime will continue.", #mission_mod::TASKS_IDS[#index]);
1674 }
1675 Decision::Shutdown => {
1676 debug!("Postprocess: SHUTDOWN decision from monitoring. Task '{}' errored out \
1677 during postprocess. The runtime cannot continue.", #mission_mod::TASKS_IDS[#index]);
1678 return Err(CuError::new_with_cause("Task errored out during postprocess.", error));
1679 }
1680 }
1681 };
1682 let call_sim_callback = if sim_mode {
1683 quote! {
1684 let ovr = sim_callback(SimStep::#enum_name(CuTaskCallbackState::Postprocess));
1686
1687 let doit = if let SimOverride::Errored(reason) = ovr {
1688 let error: CuError = reason.into();
1689 #monitoring_action
1690 false
1691 } else {
1692 ovr == SimOverride::ExecuteByRuntime
1693 };
1694 }
1695 } else {
1696 quote! {
1697 let doit = true; }
1699 };
1700 quote! {
1701 #call_sim_callback
1702 if doit {
1703 execution_probe.record(cu29::monitoring::ExecutionMarker {
1704 component_id: #index,
1705 step: CuTaskState::Postprocess,
1706 culistid: None,
1707 });
1708 ctx.set_current_task(#index);
1709 let maybe_error = {
1710 #rt_guard
1711 tasks.#task_index.postprocess(&ctx)
1712 };
1713 if let Err(error) = maybe_error {
1714 #monitoring_action
1715 }
1716 }
1717 }
1718 }
1719 )
1720 })
1721 );
1722
1723 let bridge_start_calls: Vec<proc_macro2::TokenStream> = culist_bridge_specs
1724 .iter()
1725 .map(|spec| {
1726 let bridge_index = int2sliceindex(spec.tuple_index as u32);
1727 let monitor_index = syn::Index::from(
1728 spec.monitor_index
1729 .expect("Bridge missing monitor index for start"),
1730 );
1731 let enum_ident = Ident::new(
1732 &config_id_to_enum(&format!("{}_bridge", spec.id)),
1733 Span::call_site(),
1734 );
1735 let call_sim = if sim_mode {
1736 quote! {
1737 let doit = {
1738 let state = SimStep::#enum_ident(cu29::simulation::CuBridgeLifecycleState::Start);
1739 let ovr = sim_callback(state);
1740 if let SimOverride::Errored(reason) = ovr {
1741 let error: CuError = reason.into();
1742 let decision = self.copper_runtime.monitor.process_error(#monitor_index, CuTaskState::Start, &error);
1743 match decision {
1744 Decision::Abort => { debug!("Start: ABORT decision from monitoring. Task '{}' errored out during start. Aborting all the other starts.", #mission_mod::TASKS_IDS[#monitor_index]); return Ok(()); }
1745 Decision::Ignore => { debug!("Start: IGNORE decision from monitoring. Task '{}' errored out during start. The runtime will continue.", #mission_mod::TASKS_IDS[#monitor_index]); false }
1746 Decision::Shutdown => { debug!("Start: SHUTDOWN decision from monitoring. Task '{}' errored out during start. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]); return Err(CuError::new_with_cause("Task errored out during start.", error)); }
1747 }
1748 } else {
1749 ovr == SimOverride::ExecuteByRuntime
1750 }
1751 };
1752 }
1753 } else {
1754 quote! { let doit = true; }
1755 };
1756 quote! {
1757 {
1758 #call_sim
1759 if !doit { return Ok(()); }
1760 self.copper_runtime.record_execution_marker(
1761 cu29::monitoring::ExecutionMarker {
1762 component_id: #monitor_index,
1763 step: CuTaskState::Start,
1764 culistid: None,
1765 }
1766 );
1767 ctx.clear_current_task();
1768 let bridge = &mut self.copper_runtime.bridges.#bridge_index;
1769 if let Err(error) = bridge.start(&ctx) {
1770 let decision = self.copper_runtime.monitor.process_error(#monitor_index, CuTaskState::Start, &error);
1771 match decision {
1772 Decision::Abort => {
1773 debug!("Start: ABORT decision from monitoring. Task '{}' errored out during start. Aborting all the other starts.", #mission_mod::TASKS_IDS[#monitor_index]);
1774 return Ok(());
1775 }
1776 Decision::Ignore => {
1777 debug!("Start: IGNORE decision from monitoring. Task '{}' errored out during start. The runtime will continue.", #mission_mod::TASKS_IDS[#monitor_index]);
1778 }
1779 Decision::Shutdown => {
1780 debug!("Start: SHUTDOWN decision from monitoring. Task '{}' errored out during start. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
1781 return Err(CuError::new_with_cause("Task errored out during start.", error));
1782 }
1783 }
1784 }
1785 }
1786 }
1787 })
1788 .collect();
1789
1790 let bridge_stop_calls: Vec<proc_macro2::TokenStream> = culist_bridge_specs
1791 .iter()
1792 .map(|spec| {
1793 let bridge_index = int2sliceindex(spec.tuple_index as u32);
1794 let monitor_index = syn::Index::from(
1795 spec.monitor_index
1796 .expect("Bridge missing monitor index for stop"),
1797 );
1798 let enum_ident = Ident::new(
1799 &config_id_to_enum(&format!("{}_bridge", spec.id)),
1800 Span::call_site(),
1801 );
1802 let call_sim = if sim_mode {
1803 quote! {
1804 let doit = {
1805 let state = SimStep::#enum_ident(cu29::simulation::CuBridgeLifecycleState::Stop);
1806 let ovr = sim_callback(state);
1807 if let SimOverride::Errored(reason) = ovr {
1808 let error: CuError = reason.into();
1809 let decision = self.copper_runtime.monitor.process_error(#monitor_index, CuTaskState::Stop, &error);
1810 match decision {
1811 Decision::Abort => { debug!("Stop: ABORT decision from monitoring. Task '{}' errored out during stop. Aborting all the other stops.", #mission_mod::TASKS_IDS[#monitor_index]); return Ok(()); }
1812 Decision::Ignore => { debug!("Stop: IGNORE decision from monitoring. Task '{}' errored out during stop. The runtime will continue.", #mission_mod::TASKS_IDS[#monitor_index]); false }
1813 Decision::Shutdown => { debug!("Stop: SHUTDOWN decision from monitoring. Task '{}' errored out during stop. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]); return Err(CuError::new_with_cause("Task errored out during stop.", error)); }
1814 }
1815 } else {
1816 ovr == SimOverride::ExecuteByRuntime
1817 }
1818 };
1819 }
1820 } else {
1821 quote! { let doit = true; }
1822 };
1823 quote! {
1824 {
1825 #call_sim
1826 if !doit { return Ok(()); }
1827 self.copper_runtime.record_execution_marker(
1828 cu29::monitoring::ExecutionMarker {
1829 component_id: #monitor_index,
1830 step: CuTaskState::Stop,
1831 culistid: None,
1832 }
1833 );
1834 ctx.clear_current_task();
1835 let bridge = &mut self.copper_runtime.bridges.#bridge_index;
1836 if let Err(error) = bridge.stop(&ctx) {
1837 let decision = self.copper_runtime.monitor.process_error(#monitor_index, CuTaskState::Stop, &error);
1838 match decision {
1839 Decision::Abort => {
1840 debug!("Stop: ABORT decision from monitoring. Task '{}' errored out during stop. Aborting all the other stops.", #mission_mod::TASKS_IDS[#monitor_index]);
1841 return Ok(());
1842 }
1843 Decision::Ignore => {
1844 debug!("Stop: IGNORE decision from monitoring. Task '{}' errored out during stop. The runtime will continue.", #mission_mod::TASKS_IDS[#monitor_index]);
1845 }
1846 Decision::Shutdown => {
1847 debug!("Stop: SHUTDOWN decision from monitoring. Task '{}' errored out during stop. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
1848 return Err(CuError::new_with_cause("Task errored out during stop.", error));
1849 }
1850 }
1851 }
1852 }
1853 }
1854 })
1855 .collect();
1856
1857 let bridge_preprocess_calls: Vec<proc_macro2::TokenStream> = culist_bridge_specs
1858 .iter()
1859 .map(|spec| {
1860 let bridge_index = int2sliceindex(spec.tuple_index as u32);
1861 let monitor_index = syn::Index::from(
1862 spec.monitor_index
1863 .expect("Bridge missing monitor index for preprocess"),
1864 );
1865 let enum_ident = Ident::new(
1866 &config_id_to_enum(&format!("{}_bridge", spec.id)),
1867 Span::call_site(),
1868 );
1869 let call_sim = if sim_mode {
1870 quote! {
1871 let doit = {
1872 let state = SimStep::#enum_ident(cu29::simulation::CuBridgeLifecycleState::Preprocess);
1873 let ovr = sim_callback(state);
1874 if let SimOverride::Errored(reason) = ovr {
1875 let error: CuError = reason.into();
1876 let decision = monitor.process_error(#monitor_index, CuTaskState::Preprocess, &error);
1877 match decision {
1878 Decision::Abort => { debug!("Preprocess: ABORT decision from monitoring. Task '{}' errored out during preprocess. Aborting all the other starts.", #mission_mod::TASKS_IDS[#monitor_index]); return Ok(()); }
1879 Decision::Ignore => { debug!("Preprocess: IGNORE decision from monitoring. Task '{}' errored out during preprocess. The runtime will continue.", #mission_mod::TASKS_IDS[#monitor_index]); false }
1880 Decision::Shutdown => { debug!("Preprocess: SHUTDOWN decision from monitoring. Task '{}' errored out during preprocess. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]); return Err(CuError::new_with_cause("Task errored out during preprocess.", error)); }
1881 }
1882 } else {
1883 ovr == SimOverride::ExecuteByRuntime
1884 }
1885 };
1886 }
1887 } else {
1888 quote! { let doit = true; }
1889 };
1890 quote! {
1891 {
1892 #call_sim
1893 if doit {
1894 ctx.clear_current_task();
1895 let bridge = &mut __cu_bridges.#bridge_index;
1896 execution_probe.record(cu29::monitoring::ExecutionMarker {
1897 component_id: #monitor_index,
1898 step: CuTaskState::Preprocess,
1899 culistid: None,
1900 });
1901 let maybe_error = {
1902 #rt_guard
1903 bridge.preprocess(&ctx)
1904 };
1905 if let Err(error) = maybe_error {
1906 let decision = monitor.process_error(#monitor_index, CuTaskState::Preprocess, &error);
1907 match decision {
1908 Decision::Abort => {
1909 debug!("Preprocess: ABORT decision from monitoring. Task '{}' errored out during preprocess. Aborting all the other starts.", #mission_mod::TASKS_IDS[#monitor_index]);
1910 return Ok(());
1911 }
1912 Decision::Ignore => {
1913 debug!("Preprocess: IGNORE decision from monitoring. Task '{}' errored out during preprocess. The runtime will continue.", #mission_mod::TASKS_IDS[#monitor_index]);
1914 }
1915 Decision::Shutdown => {
1916 debug!("Preprocess: SHUTDOWN decision from monitoring. Task '{}' errored out during preprocess. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
1917 return Err(CuError::new_with_cause("Task errored out during preprocess.", error));
1918 }
1919 }
1920 }
1921 }
1922 }
1923 }
1924 })
1925 .collect();
1926
1927 let bridge_postprocess_calls: Vec<proc_macro2::TokenStream> = culist_bridge_specs
1928 .iter()
1929 .map(|spec| {
1930 let bridge_index = int2sliceindex(spec.tuple_index as u32);
1931 let monitor_index = syn::Index::from(
1932 spec.monitor_index
1933 .expect("Bridge missing monitor index for postprocess"),
1934 );
1935 let enum_ident = Ident::new(
1936 &config_id_to_enum(&format!("{}_bridge", spec.id)),
1937 Span::call_site(),
1938 );
1939 let call_sim = if sim_mode {
1940 quote! {
1941 let doit = {
1942 let state = SimStep::#enum_ident(cu29::simulation::CuBridgeLifecycleState::Postprocess);
1943 let ovr = sim_callback(state);
1944 if let SimOverride::Errored(reason) = ovr {
1945 let error: CuError = reason.into();
1946 let decision = monitor.process_error(#monitor_index, CuTaskState::Postprocess, &error);
1947 match decision {
1948 Decision::Abort => { debug!("Postprocess: ABORT decision from monitoring. Task '{}' errored out during postprocess. Aborting all the other starts.", #mission_mod::TASKS_IDS[#monitor_index]); return Ok(()); }
1949 Decision::Ignore => { debug!("Postprocess: IGNORE decision from monitoring. Task '{}' errored out during postprocess. The runtime will continue.", #mission_mod::TASKS_IDS[#monitor_index]); false }
1950 Decision::Shutdown => { debug!("Postprocess: SHUTDOWN decision from monitoring. Task '{}' errored out during postprocess. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]); return Err(CuError::new_with_cause("Task errored out during postprocess.", error)); }
1951 }
1952 } else {
1953 ovr == SimOverride::ExecuteByRuntime
1954 }
1955 };
1956 }
1957 } else {
1958 quote! { let doit = true; }
1959 };
1960 quote! {
1961 {
1962 #call_sim
1963 if doit {
1964 ctx.clear_current_task();
1965 let bridge = &mut __cu_bridges.#bridge_index;
1966 kf_manager.freeze_any(clid, bridge)?;
1967 execution_probe.record(cu29::monitoring::ExecutionMarker {
1968 component_id: #monitor_index,
1969 step: CuTaskState::Postprocess,
1970 culistid: Some(clid),
1971 });
1972 let maybe_error = {
1973 #rt_guard
1974 bridge.postprocess(&ctx)
1975 };
1976 if let Err(error) = maybe_error {
1977 let decision = monitor.process_error(#monitor_index, CuTaskState::Postprocess, &error);
1978 match decision {
1979 Decision::Abort => {
1980 debug!("Postprocess: ABORT decision from monitoring. Task '{}' errored out during postprocess. Aborting all the other starts.", #mission_mod::TASKS_IDS[#monitor_index]);
1981 return Ok(());
1982 }
1983 Decision::Ignore => {
1984 debug!("Postprocess: IGNORE decision from monitoring. Task '{}' errored out during postprocess. The runtime will continue.", #mission_mod::TASKS_IDS[#monitor_index]);
1985 }
1986 Decision::Shutdown => {
1987 debug!("Postprocess: SHUTDOWN decision from monitoring. Task '{}' errored out during postprocess. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
1988 return Err(CuError::new_with_cause("Task errored out during postprocess.", error));
1989 }
1990 }
1991 }
1992 }
1993 }
1994 }
1995 })
1996 .collect();
1997
1998 let mut start_calls = bridge_start_calls;
1999 start_calls.extend(task_start_calls);
2000 let mut stop_calls = task_stop_calls;
2001 stop_calls.extend(bridge_stop_calls);
2002 let mut preprocess_calls = bridge_preprocess_calls;
2003 preprocess_calls.extend(task_preprocess_calls);
2004 let mut postprocess_calls = task_postprocess_calls;
2005 postprocess_calls.extend(bridge_postprocess_calls);
2006
2007 let bridge_restore_code: Vec<proc_macro2::TokenStream> = culist_bridge_specs
2009 .iter()
2010 .enumerate()
2011 .map(|(index, _)| {
2012 let bridge_tuple_index = syn::Index::from(index);
2013 quote! {
2014 __cu_bridges.#bridge_tuple_index
2015 .thaw(&mut decoder)
2016 .map_err(|e| CuError::from("Failed to thaw bridge").add_cause(&e.to_string()))?
2017 }
2018 })
2019 .collect();
2020
2021 let output_pack_sizes = collect_output_pack_sizes(&culist_plan);
2022 let runtime_plan_code_and_logging: Vec<(
2023 proc_macro2::TokenStream,
2024 proc_macro2::TokenStream,
2025 )> = culist_plan
2026 .steps
2027 .iter()
2028 .map(|unit| match unit {
2029 CuExecutionUnit::Step(step) => {
2030 #[cfg(feature = "macro_debug")]
2031 eprintln!(
2032 "{} -> {} as {:?}. task_id: {} Input={:?}, Output={:?}",
2033 step.node.get_id(),
2034 step.node.get_type(),
2035 step.task_type,
2036 step.node_id,
2037 step.input_msg_indices_types,
2038 step.output_msg_pack
2039 );
2040
2041 match &culist_exec_entities[step.node_id as usize].kind {
2042 ExecutionEntityKind::Task { task_index } => generate_task_execution_tokens(
2043 step,
2044 *task_index,
2045 &task_specs,
2046 &output_pack_sizes,
2047 sim_mode,
2048 &mission_mod,
2049 ),
2050 ExecutionEntityKind::BridgeRx {
2051 bridge_index,
2052 channel_index,
2053 } => {
2054 let spec = &culist_bridge_specs[*bridge_index];
2055 generate_bridge_rx_execution_tokens(
2056 step,
2057 spec,
2058 *channel_index,
2059 &mission_mod,
2060 sim_mode,
2061 )
2062 }
2063 ExecutionEntityKind::BridgeTx {
2064 bridge_index,
2065 channel_index,
2066 } => {
2067 let spec = &culist_bridge_specs[*bridge_index];
2068 generate_bridge_tx_execution_tokens(
2069 step,
2070 spec,
2071 *channel_index,
2072 &output_pack_sizes,
2073 &mission_mod,
2074 sim_mode,
2075 )
2076 }
2077 }
2078 }
2079 CuExecutionUnit::Loop(_) => {
2080 panic!("Execution loops are not supported in runtime generation");
2081 }
2082 })
2083 .collect();
2084
2085 let sim_support = if sim_mode {
2086 Some(gen_sim_support(
2087 &culist_plan,
2088 &culist_exec_entities,
2089 &culist_bridge_specs,
2090 ))
2091 } else {
2092 None
2093 };
2094
2095 let (new, run_one_iteration, start_all_tasks, stop_all_tasks, run) = if sim_mode {
2096 (
2097 quote! {
2098 fn new(clock:RobotClock, unified_logger: Arc<Mutex<L>>, config_override: Option<CuConfig>, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<Self>
2099 },
2100 quote! {
2101 fn run_one_iteration(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()>
2102 },
2103 quote! {
2104 fn start_all_tasks(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()>
2105 },
2106 quote! {
2107 fn stop_all_tasks(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()>
2108 },
2109 quote! {
2110 fn run(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()>
2111 },
2112 )
2113 } else {
2114 (
2115 if std {
2116 quote! {
2117 fn new(clock:RobotClock, unified_logger: Arc<Mutex<L>>, config_override: Option<CuConfig>) -> CuResult<Self>
2118 }
2119 } else {
2120 quote! {
2121 fn new(clock:RobotClock, unified_logger: Arc<Mutex<L>>) -> CuResult<Self>
2123 }
2124 },
2125 quote! {
2126 fn run_one_iteration(&mut self) -> CuResult<()>
2127 },
2128 quote! {
2129 fn start_all_tasks(&mut self) -> CuResult<()>
2130 },
2131 quote! {
2132 fn stop_all_tasks(&mut self) -> CuResult<()>
2133 },
2134 quote! {
2135 fn run(&mut self) -> CuResult<()>
2136 },
2137 )
2138 };
2139
2140 let sim_callback_arg = if sim_mode {
2141 Some(quote!(sim_callback))
2142 } else {
2143 None
2144 };
2145
2146 let app_trait = if sim_mode {
2147 quote!(CuSimApplication)
2148 } else {
2149 quote!(CuApplication)
2150 };
2151
2152 let sim_callback_on_new_calls = task_specs.ids.iter().enumerate().map(|(i, id)| {
2153 let enum_name = config_id_to_enum(id);
2154 let enum_ident = Ident::new(&enum_name, Span::call_site());
2155 quote! {
2156 sim_callback(SimStep::#enum_ident(CuTaskCallbackState::New(all_instances_configs[#i].cloned())));
2158 }
2159 });
2160
2161 let sim_callback_on_new_bridges = culist_bridge_specs.iter().map(|spec| {
2162 let enum_ident = Ident::new(
2163 &config_id_to_enum(&format!("{}_bridge", spec.id)),
2164 Span::call_site(),
2165 );
2166 let cfg_index = syn::Index::from(spec.config_index);
2167 quote! {
2168 sim_callback(SimStep::#enum_ident(
2169 cu29::simulation::CuBridgeLifecycleState::New(config.bridges[#cfg_index].config.clone())
2170 ));
2171 }
2172 });
2173
2174 let sim_callback_on_new = if sim_mode {
2175 Some(quote! {
2176 let graph = config.get_graph(Some(#mission)).expect("Could not find the mission #mission");
2177 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
2178 .get_all_nodes()
2179 .iter()
2180 .map(|(_, node)| node.get_instance_config())
2181 .collect();
2182 #(#sim_callback_on_new_calls)*
2183 #(#sim_callback_on_new_bridges)*
2184 })
2185 } else {
2186 None
2187 };
2188
2189 let (runtime_plan_code, preprocess_logging_calls): (Vec<_>, Vec<_>) =
2190 itertools::multiunzip(runtime_plan_code_and_logging);
2191
2192 let config_load_stmt = if std {
2193 quote! {
2194 let (config, config_source) = if let Some(overridden_config) = config_override {
2195 debug!("CuConfig: Overridden programmatically.");
2196 (overridden_config, RuntimeLifecycleConfigSource::ProgrammaticOverride)
2197 } else if ::std::path::Path::new(config_filename).exists() {
2198 debug!("CuConfig: Reading configuration from file: {}", config_filename);
2199 (
2200 cu29::config::read_configuration(config_filename)?,
2201 RuntimeLifecycleConfigSource::ExternalFile,
2202 )
2203 } else {
2204 let original_config = Self::original_config();
2205 debug!("CuConfig: Using the bundled configuration compiled into the binary.");
2206 (
2207 cu29::config::read_configuration_str(original_config, None)?,
2208 RuntimeLifecycleConfigSource::BundledDefault,
2209 )
2210 };
2211 }
2212 } else {
2213 quote! {
2214 let original_config = Self::original_config();
2216 debug!("CuConfig: Using the bundled configuration compiled into the binary.");
2217 let config = cu29::config::read_configuration_str(original_config, None)?;
2218 let config_source = RuntimeLifecycleConfigSource::BundledDefault;
2219 }
2220 };
2221
2222 let init_resources_sig = if std {
2223 quote! {
2224 pub fn init_resources(config_override: Option<CuConfig>) -> CuResult<AppResources>
2225 }
2226 } else {
2227 quote! {
2228 pub fn init_resources() -> CuResult<AppResources>
2229 }
2230 };
2231
2232 let init_resources_call = if std {
2233 quote! { Self::init_resources(config_override)? }
2234 } else {
2235 quote! { Self::init_resources()? }
2236 };
2237
2238 let new_with_resources_sig = if sim_mode {
2239 quote! {
2240 pub fn new_with_resources<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static>(
2241 clock: RobotClock,
2242 unified_logger: Arc<Mutex<L>>,
2243 app_resources: AppResources,
2244 sim_callback: &mut impl FnMut(SimStep) -> SimOverride,
2245 ) -> CuResult<Self>
2246 }
2247 } else {
2248 quote! {
2249 pub fn new_with_resources<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static>(
2250 clock: RobotClock,
2251 unified_logger: Arc<Mutex<L>>,
2252 app_resources: AppResources,
2253 ) -> CuResult<Self>
2254 }
2255 };
2256
2257 let new_with_resources_call = if sim_mode {
2258 quote! { Self::new_with_resources(clock, unified_logger, app_resources, sim_callback) }
2259 } else {
2260 quote! { Self::new_with_resources(clock, unified_logger, app_resources) }
2261 };
2262
2263 let kill_handler = if std {
2264 Some(quote! {
2265 ctrlc::set_handler(move || {
2266 STOP_FLAG.store(true, Ordering::SeqCst);
2267 }).expect("Error setting Ctrl-C handler");
2268 })
2269 } else {
2270 None
2271 };
2272
2273 let run_loop = if std {
2274 quote! {
2275 loop {
2276 let iter_start = self.copper_runtime.clock.now();
2277 let result = match std::panic::catch_unwind(std::panic::AssertUnwindSafe(
2278 || <Self as #app_trait<S, L>>::run_one_iteration(self, #sim_callback_arg)
2279 )) {
2280 Ok(result) => result,
2281 Err(payload) => {
2282 let panic_message = cu29::monitoring::panic_payload_to_string(payload.as_ref());
2283 self.copper_runtime.monitor.process_panic(&panic_message);
2284 let _ = self.log_runtime_lifecycle_event(RuntimeLifecycleEvent::Panic {
2285 message: panic_message.clone(),
2286 file: None,
2287 line: None,
2288 column: None,
2289 });
2290 Err(CuError::from(format!(
2291 "Panic while running one iteration: {}",
2292 panic_message
2293 )))
2294 }
2295 };
2296
2297 if let Some(rate) = self.copper_runtime.runtime_config.rate_target_hz {
2298 let period: CuDuration = (1_000_000_000u64 / rate).into();
2299 let elapsed = self.copper_runtime.clock.now() - iter_start;
2300 if elapsed < period {
2301 std::thread::sleep(std::time::Duration::from_nanos(period.as_nanos() - elapsed.as_nanos()));
2302 }
2303 }
2304
2305 if STOP_FLAG.load(Ordering::SeqCst) || result.is_err() {
2306 break result;
2307 }
2308 }
2309 }
2310 } else {
2311 quote! {
2312 loop {
2313 let iter_start = self.copper_runtime.clock.now();
2314 let result = <Self as #app_trait<S, L>>::run_one_iteration(self, #sim_callback_arg);
2315 if let Some(rate) = self.copper_runtime.runtime_config.rate_target_hz {
2316 let period: CuDuration = (1_000_000_000u64 / rate).into();
2317 let elapsed = self.copper_runtime.clock.now() - iter_start;
2318 if elapsed < period {
2319 busy_wait_for(period - elapsed);
2320 }
2321 }
2322
2323 if STOP_FLAG.load(Ordering::SeqCst) || result.is_err() {
2324 break result;
2325 }
2326 }
2327 }
2328 };
2329
2330 #[cfg(feature = "macro_debug")]
2331 eprintln!("[build the run methods]");
2332 let run_methods: proc_macro2::TokenStream = quote! {
2333
2334 #run_one_iteration {
2335
2336 let runtime = &mut self.copper_runtime;
2338 let clock = &runtime.clock;
2339 let execution_probe = &runtime.execution_probe;
2340 let monitor = &mut runtime.monitor;
2341 let tasks = &mut runtime.tasks;
2342 let __cu_bridges = &mut runtime.bridges;
2343 let cl_manager = &mut runtime.copperlists_manager;
2344 let kf_manager = &mut runtime.keyframes_manager;
2345 let iteration_clid = cl_manager.inner.next_cl_id();
2346 let mut ctx = cu29::context::CuContext::builder(clock.clone())
2347 .cl_id(iteration_clid)
2348 .task_ids(#mission_mod::TASKS_IDS)
2349 .build();
2350
2351 #(#preprocess_calls)*
2353
2354 let culist = cl_manager.inner.create().expect("Ran out of space for copper lists"); let clid = culist.id;
2356 debug_assert_eq!(clid, iteration_clid);
2357 kf_manager.reset(clid, clock); culist.change_state(cu29::copperlist::CopperListState::Processing);
2359 culist.msgs.init_zeroed();
2360 let mut ctx = cu29::context::CuContext::builder(clock.clone())
2361 .cl_id(iteration_clid)
2362 .task_ids(#mission_mod::TASKS_IDS)
2363 .build();
2364 {
2365 let msgs = &mut culist.msgs.0;
2366 #(#runtime_plan_code)*
2367 } let (raw_payload_bytes, handle_bytes) = #mission_mod::compute_payload_bytes(&culist);
2369 ctx.clear_current_task();
2370 let monitor_result = monitor.process_copperlist(&ctx, &#mission_mod::collect_metadata(&culist));
2371
2372 #(#preprocess_logging_calls)*
2374
2375 cl_manager.end_of_processing(clid)?;
2376 kf_manager.end_of_processing(clid)?;
2377 monitor_result?;
2378 let stats = cu29::monitoring::CopperListIoStats {
2379 raw_culist_bytes: core::mem::size_of::<CuList>() as u64 + raw_payload_bytes,
2380 handle_bytes,
2381 encoded_culist_bytes: cl_manager.last_encoded_bytes,
2382 keyframe_bytes: kf_manager.last_encoded_bytes,
2383 structured_log_bytes_total: ::cu29::prelude::structured_log_bytes_total(),
2384 culistid: clid,
2385 };
2386 monitor.observe_copperlist_io(stats);
2387
2388 #(#postprocess_calls)*
2390 Ok(())
2391 }
2392
2393 fn restore_keyframe(&mut self, keyframe: &KeyFrame) -> CuResult<()> {
2394 let runtime = &mut self.copper_runtime;
2395 let clock = &runtime.clock;
2396 let tasks = &mut runtime.tasks;
2397 let __cu_bridges = &mut runtime.bridges;
2398 let config = cu29::bincode::config::standard();
2399 let reader = cu29::bincode::de::read::SliceReader::new(&keyframe.serialized_tasks);
2400 let mut decoder = DecoderImpl::new(reader, config, ());
2401 #(#task_restore_code);*;
2402 #(#bridge_restore_code);*;
2403 Ok(())
2404 }
2405
2406 #start_all_tasks {
2407 let _ = self.log_runtime_lifecycle_event(RuntimeLifecycleEvent::MissionStarted {
2408 mission: #mission.to_string(),
2409 });
2410 let lifecycle_clid = self.copper_runtime.copperlists_manager.inner.last_cl_id();
2411 let mut ctx = cu29::context::CuContext::builder(self.copper_runtime.clock.clone())
2412 .cl_id(lifecycle_clid)
2413 .task_ids(#mission_mod::TASKS_IDS)
2414 .build();
2415 #(#start_calls)*
2416 ctx.clear_current_task();
2417 self.copper_runtime.monitor.start(&ctx)?;
2418 Ok(())
2419 }
2420
2421 #stop_all_tasks {
2422 let lifecycle_clid = self.copper_runtime.copperlists_manager.inner.last_cl_id();
2423 let mut ctx = cu29::context::CuContext::builder(self.copper_runtime.clock.clone())
2424 .cl_id(lifecycle_clid)
2425 .task_ids(#mission_mod::TASKS_IDS)
2426 .build();
2427 #(#stop_calls)*
2428 ctx.clear_current_task();
2429 self.copper_runtime.monitor.stop(&ctx)?;
2430 let _ = self.log_runtime_lifecycle_event(RuntimeLifecycleEvent::MissionStopped {
2433 mission: #mission.to_string(),
2434 reason: "stop_all_tasks".to_string(),
2435 });
2436 Ok(())
2437 }
2438
2439 #run {
2440 static STOP_FLAG: AtomicBool = AtomicBool::new(false);
2441
2442 #kill_handler
2443
2444 <Self as #app_trait<S, L>>::start_all_tasks(self, #sim_callback_arg)?;
2445 let result = #run_loop;
2446
2447 if result.is_err() {
2448 error!("A task errored out: {}", &result);
2449 }
2450 <Self as #app_trait<S, L>>::stop_all_tasks(self, #sim_callback_arg)?;
2451 let _ = self.log_shutdown_completed();
2452 result
2453 }
2454 };
2455
2456 let tasks_type = if sim_mode {
2457 quote!(CuSimTasks)
2458 } else {
2459 quote!(CuTasks)
2460 };
2461
2462 let tasks_instanciator_fn = if sim_mode {
2463 quote!(tasks_instanciator_sim)
2464 } else {
2465 quote!(tasks_instanciator)
2466 };
2467
2468 let app_impl_decl = if sim_mode {
2469 quote!(impl<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static> CuSimApplication<S, L> for #application_name)
2470 } else {
2471 quote!(impl<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static> CuApplication<S, L> for #application_name)
2472 };
2473
2474 let simstep_type_decl = if sim_mode {
2475 quote!(
2476 type Step<'z> = SimStep<'z>;
2477 )
2478 } else {
2479 quote!()
2480 };
2481
2482 let app_resources_struct = quote! {
2483 pub struct AppResources {
2484 pub config: CuConfig,
2485 pub config_source: RuntimeLifecycleConfigSource,
2486 pub resources: ResourceManager,
2487 }
2488 };
2489
2490 let init_resources_fn = quote! {
2491 #init_resources_sig {
2492 let config_filename = #config_file;
2493
2494 #[cfg(target_os = "none")]
2495 ::cu29::prelude::info!("CuApp init: config file {}", config_filename);
2496 #[cfg(target_os = "none")]
2497 ::cu29::prelude::info!("CuApp init: loading config");
2498 #config_load_stmt
2499 #[cfg(target_os = "none")]
2500 ::cu29::prelude::info!("CuApp init: config loaded");
2501 if let Some(runtime) = &config.runtime {
2502 #[cfg(target_os = "none")]
2503 ::cu29::prelude::info!(
2504 "CuApp init: rate_target_hz={}",
2505 runtime.rate_target_hz.unwrap_or(0)
2506 );
2507 } else {
2508 #[cfg(target_os = "none")]
2509 ::cu29::prelude::info!("CuApp init: rate_target_hz=none");
2510 }
2511
2512 #[cfg(target_os = "none")]
2513 ::cu29::prelude::info!("CuApp init: building resources");
2514 let resources = #mission_mod::resources_instanciator(&config)?;
2515 #[cfg(target_os = "none")]
2516 ::cu29::prelude::info!("CuApp init: resources ready");
2517
2518 Ok(AppResources {
2519 config,
2520 config_source,
2521 resources,
2522 })
2523 }
2524 };
2525
2526 let new_with_resources_fn = quote! {
2527 #new_with_resources_sig {
2528 let AppResources {
2529 config,
2530 config_source,
2531 resources,
2532 } = app_resources;
2533
2534 #[cfg(target_os = "none")]
2535 {
2536 let structured_stream = ::cu29::prelude::stream_write::<
2537 ::cu29::prelude::CuLogEntry,
2538 S,
2539 >(
2540 unified_logger.clone(),
2541 ::cu29::prelude::UnifiedLogType::StructuredLogLine,
2542 4096 * 10,
2543 )?;
2544 let _logger_runtime = ::cu29::prelude::LoggerRuntime::init(
2545 clock.clone(),
2546 structured_stream,
2547 None::<::cu29::prelude::NullLog>,
2548 );
2549 }
2550
2551 let mut default_section_size = size_of::<super::#mission_mod::CuList>() * 64;
2554 if let Some(section_size_mib) = config.logging.as_ref().and_then(|l| l.section_size_mib) {
2556 default_section_size = section_size_mib as usize * 1024usize * 1024usize;
2558 }
2559 #[cfg(target_os = "none")]
2560 ::cu29::prelude::info!(
2561 "CuApp new: copperlist section size={}",
2562 default_section_size
2563 );
2564 #[cfg(target_os = "none")]
2565 ::cu29::prelude::info!("CuApp new: creating copperlist stream");
2566 let copperlist_stream = stream_write::<#mission_mod::CuList, S>(
2567 unified_logger.clone(),
2568 UnifiedLogType::CopperList,
2569 default_section_size,
2570 )?;
2574 #[cfg(target_os = "none")]
2575 ::cu29::prelude::info!("CuApp new: copperlist stream ready");
2576
2577 #[cfg(target_os = "none")]
2578 ::cu29::prelude::info!("CuApp new: creating keyframes stream");
2579 let keyframes_stream = stream_write::<KeyFrame, S>(
2580 unified_logger.clone(),
2581 UnifiedLogType::FrozenTasks,
2582 1024 * 1024 * 10, )?;
2584 #[cfg(target_os = "none")]
2585 ::cu29::prelude::info!("CuApp new: keyframes stream ready");
2586
2587 #[cfg(target_os = "none")]
2588 ::cu29::prelude::info!("CuApp new: creating runtime lifecycle stream");
2589 let mut runtime_lifecycle_stream = stream_write::<RuntimeLifecycleRecord, S>(
2590 unified_logger.clone(),
2591 UnifiedLogType::RuntimeLifecycle,
2592 1024 * 64, )?;
2594 let effective_config_ron = config
2595 .serialize_ron()
2596 .unwrap_or_else(|_| "<failed to serialize config>".to_string());
2597 let stack_info = RuntimeLifecycleStackInfo {
2598 app_name: env!("CARGO_PKG_NAME").to_string(),
2599 app_version: env!("CARGO_PKG_VERSION").to_string(),
2600 git_commit: #git_commit_tokens,
2601 git_dirty: #git_dirty_tokens,
2602 };
2603 runtime_lifecycle_stream.log(&RuntimeLifecycleRecord {
2604 timestamp: clock.now(),
2605 event: RuntimeLifecycleEvent::Instantiated {
2606 config_source,
2607 effective_config_ron,
2608 stack: stack_info,
2609 },
2610 })?;
2611 #[cfg(target_os = "none")]
2612 ::cu29::prelude::info!("CuApp new: runtime lifecycle stream ready");
2613
2614 #[cfg(target_os = "none")]
2615 ::cu29::prelude::info!("CuApp new: building runtime");
2616 let copper_runtime = CuRuntime::<#mission_mod::#tasks_type, #mission_mod::CuBridges, #mission_mod::CuStampedDataSet, #monitor_type, #DEFAULT_CLNB>::new_with_resources(
2617 clock,
2618 &config,
2619 Some(#mission),
2620 resources,
2621 #mission_mod::#tasks_instanciator_fn,
2622 #mission_mod::monitor_instanciator,
2623 #mission_mod::bridges_instanciator,
2624 copperlist_stream,
2625 keyframes_stream)?;
2626 #[cfg(target_os = "none")]
2627 ::cu29::prelude::info!("CuApp new: runtime built");
2628
2629 let application = Ok(#application_name {
2630 copper_runtime,
2631 runtime_lifecycle_stream: Some(Box::new(runtime_lifecycle_stream)),
2632 });
2633
2634 #sim_callback_on_new
2635
2636 application
2637 }
2638 };
2639
2640 let app_inherent_impl = quote! {
2641 impl #application_name {
2642 pub fn original_config() -> String {
2643 #copper_config_content.to_string()
2644 }
2645
2646 pub fn register_reflect_types(registry: &mut cu29::reflect::TypeRegistry) {
2647 #(#reflect_type_registration_calls)*
2648 }
2649
2650 pub fn log_runtime_lifecycle_event(
2652 &mut self,
2653 event: RuntimeLifecycleEvent,
2654 ) -> CuResult<()> {
2655 let timestamp = self.copper_runtime.clock.now();
2656 let Some(stream) = self.runtime_lifecycle_stream.as_mut() else {
2657 return Err(CuError::from("Runtime lifecycle stream is not initialized"));
2658 };
2659 stream.log(&RuntimeLifecycleRecord { timestamp, event })
2660 }
2661
2662 pub fn log_shutdown_completed(&mut self) -> CuResult<()> {
2666 self.log_runtime_lifecycle_event(RuntimeLifecycleEvent::ShutdownCompleted)
2667 }
2668
2669 #init_resources_fn
2670
2671 #new_with_resources_fn
2672
2673 #[inline]
2675 pub fn copper_runtime_mut(&mut self) -> &mut CuRuntime<#mission_mod::#tasks_type, #mission_mod::CuBridges, #mission_mod::CuStampedDataSet, #monitor_type, #DEFAULT_CLNB> {
2676 &mut self.copper_runtime
2677 }
2678 }
2679 };
2680
2681 let app_reflect_impl = quote! {
2682 impl cu29::reflect::ReflectTaskIntrospection for #application_name {
2683 fn reflect_task(&self, task_id: &str) -> Option<&dyn cu29::reflect::Reflect> {
2684 match task_id {
2685 #(#task_reflect_read_arms)*
2686 _ => None,
2687 }
2688 }
2689
2690 fn reflect_task_mut(
2691 &mut self,
2692 task_id: &str,
2693 ) -> Option<&mut dyn cu29::reflect::Reflect> {
2694 match task_id {
2695 #(#task_reflect_write_arms)*
2696 _ => None,
2697 }
2698 }
2699
2700 fn register_reflect_types(registry: &mut cu29::reflect::TypeRegistry) {
2701 #application_name::register_reflect_types(registry);
2702 }
2703 }
2704 };
2705
2706 #[cfg(feature = "std")]
2707 #[cfg(feature = "macro_debug")]
2708 eprintln!("[build result]");
2709 let application_impl = quote! {
2710 #app_impl_decl {
2711 #simstep_type_decl
2712
2713 #new {
2714 let app_resources = #init_resources_call;
2715 #new_with_resources_call
2716 }
2717
2718 fn get_original_config() -> String {
2719 Self::original_config()
2720 }
2721
2722 #run_methods
2723 }
2724 };
2725
2726 let (
2727 builder_struct,
2728 builder_new,
2729 builder_impl,
2730 builder_sim_callback_method,
2731 builder_build_sim_callback_arg,
2732 ) = if sim_mode {
2733 (
2734 quote! {
2735 #[allow(dead_code)]
2736 pub struct #builder_name <'a, F> {
2737 clock: Option<RobotClock>,
2738 unified_logger: Option<Arc<Mutex<UnifiedLoggerWrite>>>,
2739 config_override: Option<CuConfig>,
2740 sim_callback: Option<&'a mut F>
2741 }
2742 },
2743 quote! {
2744 #[allow(dead_code)]
2745 pub fn new() -> Self {
2746 Self {
2747 clock: None,
2748 unified_logger: None,
2749 config_override: None,
2750 sim_callback: None,
2751 }
2752 }
2753 },
2754 quote! {
2755 impl<'a, F> #builder_name <'a, F>
2756 where
2757 F: FnMut(SimStep) -> SimOverride,
2758 },
2759 Some(quote! {
2760 pub fn with_sim_callback(mut self, sim_callback: &'a mut F) -> Self
2761 {
2762 self.sim_callback = Some(sim_callback);
2763 self
2764 }
2765 }),
2766 Some(quote! {
2767 self.sim_callback
2768 .ok_or(CuError::from("Sim callback missing from builder"))?,
2769 }),
2770 )
2771 } else {
2772 (
2773 quote! {
2774 #[allow(dead_code)]
2775 pub struct #builder_name {
2776 clock: Option<RobotClock>,
2777 unified_logger: Option<Arc<Mutex<UnifiedLoggerWrite>>>,
2778 config_override: Option<CuConfig>,
2779 }
2780 },
2781 quote! {
2782 #[allow(dead_code)]
2783 pub fn new() -> Self {
2784 Self {
2785 clock: None,
2786 unified_logger: None,
2787 config_override: None,
2788 }
2789 }
2790 },
2791 quote! {
2792 impl #builder_name
2793 },
2794 None,
2795 None,
2796 )
2797 };
2798
2799 let std_application_impl = if sim_mode {
2801 Some(quote! {
2803 impl #application_name {
2804 pub fn start_all_tasks(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()> {
2805 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::start_all_tasks(self, sim_callback)
2806 }
2807 pub fn run_one_iteration(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()> {
2808 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::run_one_iteration(self, sim_callback)
2809 }
2810 pub fn run(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()> {
2811 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::run(self, sim_callback)
2812 }
2813 pub fn stop_all_tasks(&mut self, sim_callback: &mut impl FnMut(SimStep) -> SimOverride) -> CuResult<()> {
2814 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::stop_all_tasks(self, sim_callback)
2815 }
2816 }
2817 })
2818 } else if std {
2819 Some(quote! {
2821 impl #application_name {
2822 pub fn start_all_tasks(&mut self) -> CuResult<()> {
2823 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::start_all_tasks(self)
2824 }
2825 pub fn run_one_iteration(&mut self) -> CuResult<()> {
2826 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::run_one_iteration(self)
2827 }
2828 pub fn run(&mut self) -> CuResult<()> {
2829 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::run(self)
2830 }
2831 pub fn stop_all_tasks(&mut self) -> CuResult<()> {
2832 <Self as #app_trait<MmapSectionStorage, UnifiedLoggerWrite>>::stop_all_tasks(self)
2833 }
2834 }
2835 })
2836 } else {
2837 None };
2839
2840 let application_builder = if std {
2841 Some(quote! {
2842 #builder_struct
2843
2844 #builder_impl
2845 {
2846 #builder_new
2847
2848 #[allow(dead_code)]
2849 pub fn with_clock(mut self, clock: RobotClock) -> Self {
2850 self.clock = Some(clock);
2851 self
2852 }
2853
2854 #[allow(dead_code)]
2855 pub fn with_unified_logger(mut self, unified_logger: Arc<Mutex<UnifiedLoggerWrite>>) -> Self {
2856 self.unified_logger = Some(unified_logger);
2857 self
2858 }
2859
2860 #[allow(dead_code)]
2861 pub fn with_context(mut self, copper_ctx: &CopperContext) -> Self {
2862 self.clock = Some(copper_ctx.clock.clone());
2863 self.unified_logger = Some(copper_ctx.unified_logger.clone());
2864 self
2865 }
2866
2867 #[allow(dead_code)]
2868 pub fn with_config(mut self, config_override: CuConfig) -> Self {
2869 self.config_override = Some(config_override);
2870 self
2871 }
2872
2873 #builder_sim_callback_method
2874
2875 #[allow(dead_code)]
2876 pub fn build(self) -> CuResult<#application_name> {
2877 #application_name::new(
2878 self.clock
2879 .ok_or(CuError::from("Clock missing from builder"))?,
2880 self.unified_logger
2881 .ok_or(CuError::from("Unified logger missing from builder"))?,
2882 self.config_override,
2883 #builder_build_sim_callback_arg
2884 )
2885 }
2886 }
2887 })
2888 } else {
2889 None
2891 };
2892
2893 let sim_imports = if sim_mode {
2894 Some(quote! {
2895 use cu29::simulation::SimOverride;
2896 use cu29::simulation::CuTaskCallbackState;
2897 use cu29::simulation::CuSimSrcTask;
2898 use cu29::simulation::CuSimSinkTask;
2899 use cu29::prelude::app::CuSimApplication;
2900 use cu29::cubridge::BridgeChannelSet;
2901 })
2902 } else {
2903 None
2904 };
2905
2906 let sim_tasks = if sim_mode {
2907 Some(quote! {
2908 pub type CuSimTasks = #task_types_tuple_sim;
2911 })
2912 } else {
2913 None
2914 };
2915
2916 let sim_inst_body = if task_sim_instances_init_code.is_empty() {
2917 quote! {
2918 let _ = resources;
2919 Ok(())
2920 }
2921 } else {
2922 quote! { Ok(( #(#task_sim_instances_init_code),*, )) }
2923 };
2924
2925 let sim_tasks_instanciator = if sim_mode {
2926 Some(quote! {
2927 pub fn tasks_instanciator_sim(
2928 all_instances_configs: Vec<Option<&ComponentConfig>>,
2929 resources: &mut ResourceManager,
2930 ) -> CuResult<CuSimTasks> {
2931 #sim_inst_body
2932 }})
2933 } else {
2934 None
2935 };
2936
2937 let tasks_inst_body_std = if task_instances_init_code.is_empty() {
2938 quote! {
2939 let _ = resources;
2940 Ok(())
2941 }
2942 } else {
2943 quote! { Ok(( #(#task_instances_init_code),*, )) }
2944 };
2945
2946 let tasks_inst_body_nostd = if task_instances_init_code.is_empty() {
2947 quote! {
2948 let _ = resources;
2949 Ok(())
2950 }
2951 } else {
2952 quote! { Ok(( #(#task_instances_init_code),*, )) }
2953 };
2954
2955 let tasks_instanciator = if std {
2956 quote! {
2957 pub fn tasks_instanciator<'c>(
2958 all_instances_configs: Vec<Option<&'c ComponentConfig>>,
2959 resources: &mut ResourceManager,
2960 ) -> CuResult<CuTasks> {
2961 #tasks_inst_body_std
2962 }
2963 }
2964 } else {
2965 quote! {
2967 pub fn tasks_instanciator<'c>(
2968 all_instances_configs: Vec<Option<&'c ComponentConfig>>,
2969 resources: &mut ResourceManager,
2970 ) -> CuResult<CuTasks> {
2971 #tasks_inst_body_nostd
2972 }
2973 }
2974 };
2975
2976 let imports = if std {
2977 quote! {
2978 use cu29::rayon::ThreadPool;
2979 use cu29::cuasynctask::CuAsyncTask;
2980 use cu29::curuntime::CopperContext;
2981 use cu29::resource::{ResourceBindings, ResourceManager};
2982 use cu29::prelude::SectionStorage;
2983 use cu29::prelude::UnifiedLoggerWrite;
2984 use cu29::prelude::memmap::MmapSectionStorage;
2985 use std::fmt::{Debug, Formatter};
2986 use std::fmt::Result as FmtResult;
2987 use std::mem::size_of;
2988 use std::boxed::Box;
2989 use std::sync::Arc;
2990 use std::sync::atomic::{AtomicBool, Ordering};
2991 use std::sync::Mutex;
2992 }
2993 } else {
2994 quote! {
2995 use alloc::boxed::Box;
2996 use alloc::sync::Arc;
2997 use alloc::string::String;
2998 use alloc::string::ToString;
2999 use core::sync::atomic::{AtomicBool, Ordering};
3000 use core::fmt::{Debug, Formatter};
3001 use core::fmt::Result as FmtResult;
3002 use core::mem::size_of;
3003 use spin::Mutex;
3004 use cu29::prelude::SectionStorage;
3005 use cu29::resource::{ResourceBindings, ResourceManager};
3006 }
3007 };
3008
3009 let task_mapping_defs = task_resource_mappings.defs.clone();
3010 let bridge_mapping_defs = bridge_resource_mappings.defs.clone();
3011
3012 let mission_mod_tokens = quote! {
3014 mod #mission_mod {
3015 use super::*; use cu29::bincode::Encode;
3018 use cu29::bincode::enc::Encoder;
3019 use cu29::bincode::error::EncodeError;
3020 use cu29::bincode::Decode;
3021 use cu29::bincode::de::Decoder;
3022 use cu29::bincode::de::DecoderImpl;
3023 use cu29::bincode::error::DecodeError;
3024 use cu29::clock::RobotClock;
3025 use cu29::config::CuConfig;
3026 use cu29::config::ComponentConfig;
3027 use cu29::curuntime::CuRuntime;
3028 use cu29::curuntime::KeyFrame;
3029 use cu29::curuntime::RuntimeLifecycleConfigSource;
3030 use cu29::curuntime::RuntimeLifecycleEvent;
3031 use cu29::curuntime::RuntimeLifecycleRecord;
3032 use cu29::curuntime::RuntimeLifecycleStackInfo;
3033 use cu29::CuResult;
3034 use cu29::CuError;
3035 use cu29::cutask::CuSrcTask;
3036 use cu29::cutask::CuSinkTask;
3037 use cu29::cutask::CuTask;
3038 use cu29::cutask::CuMsg;
3039 use cu29::cutask::CuMsgMetadata;
3040 use cu29::copperlist::CopperList;
3041 use cu29::monitoring::CuMonitor; use cu29::monitoring::CuTaskState;
3043 use cu29::monitoring::Decision;
3044 use cu29::prelude::app::CuApplication;
3045 use cu29::prelude::debug;
3046 use cu29::prelude::stream_write;
3047 use cu29::prelude::UnifiedLogType;
3048 use cu29::prelude::UnifiedLogWrite;
3049 use cu29::prelude::WriteStream;
3050
3051 #imports
3052
3053 #sim_imports
3054
3055 #[allow(unused_imports)]
3057 use cu29::monitoring::NoMonitor;
3058
3059 pub type CuTasks = #task_types_tuple;
3063 pub type CuBridges = #bridges_type_tokens;
3064 #resources_module
3065 #resources_instanciator_fn
3066 #task_mapping_defs
3067 #bridge_mapping_defs
3068
3069 #sim_tasks
3070 #sim_support
3071 #sim_tasks_instanciator
3072
3073 pub const TASKS_IDS: &'static [&'static str] = &[#( #ids ),*];
3074
3075 #culist_support
3076 #tasks_instanciator
3077 #bridges_instanciator
3078
3079 pub fn monitor_instanciator(config: &CuConfig) -> #monitor_type {
3080 #monitor_instanciator_body
3081 }
3082
3083 #app_resources_struct
3085 pub #application_struct
3086
3087 #app_inherent_impl
3088 #app_reflect_impl
3089 #application_impl
3090
3091 #std_application_impl
3092
3093 #application_builder
3094 }
3095
3096 };
3097 all_missions_tokens.push(mission_mod_tokens);
3098 }
3099
3100 let default_application_tokens = if all_missions.contains_key("default") {
3101 let default_builder = if std {
3102 Some(quote! {
3103 #[allow(unused_imports)]
3105 use default::#builder_name;
3106 })
3107 } else {
3108 None
3109 };
3110 quote! {
3111 #default_builder
3112
3113 #[allow(unused_imports)]
3114 use default::AppResources;
3115
3116 #[allow(unused_imports)]
3117 use default::resources as app_resources;
3118
3119 #[allow(unused_imports)]
3120 use default::#application_name;
3121 }
3122 } else {
3123 quote!() };
3125
3126 let result: proc_macro2::TokenStream = quote! {
3127 #(#all_missions_tokens)*
3128 #default_application_tokens
3129 };
3130
3131 result.into()
3132}
3133
3134fn read_config(config_file: &str) -> CuResult<CuConfig> {
3135 let filename = config_full_path(config_file);
3136
3137 read_configuration(filename.as_str())
3138}
3139
3140fn config_full_path(config_file: &str) -> String {
3141 let mut config_full_path = utils::caller_crate_root();
3142 config_full_path.push(config_file);
3143 let filename = config_full_path
3144 .as_os_str()
3145 .to_str()
3146 .expect("Could not interpret the config file name");
3147 filename.to_string()
3148}
3149
3150fn extract_tasks_output_types(graph: &CuGraph) -> Vec<Option<Type>> {
3151 graph
3152 .get_all_nodes()
3153 .iter()
3154 .map(|(_, node)| {
3155 let id = node.get_id();
3156 let type_str = graph.get_node_output_msg_type(id.as_str());
3157 type_str.map(|type_str| {
3158 parse_str::<Type>(type_str.as_str()).expect("Could not parse output message type.")
3159 })
3160 })
3161 .collect()
3162}
3163
3164struct CuTaskSpecSet {
3165 pub ids: Vec<String>,
3166 pub cutypes: Vec<CuTaskType>,
3167 pub background_flags: Vec<bool>,
3168 pub logging_enabled: Vec<bool>,
3169 pub type_names: Vec<String>,
3170 pub task_types: Vec<Type>,
3171 pub instantiation_types: Vec<Type>,
3172 pub sim_task_types: Vec<Type>,
3173 pub run_in_sim_flags: Vec<bool>,
3174 #[allow(dead_code)]
3175 pub output_types: Vec<Option<Type>>,
3176 pub node_id_to_task_index: Vec<Option<usize>>,
3177}
3178
3179impl CuTaskSpecSet {
3180 pub fn from_graph(graph: &CuGraph) -> Self {
3181 let all_id_nodes: Vec<(NodeId, &Node)> = graph
3182 .get_all_nodes()
3183 .into_iter()
3184 .filter(|(_, node)| node.get_flavor() == Flavor::Task)
3185 .collect();
3186
3187 let ids = all_id_nodes
3188 .iter()
3189 .map(|(_, node)| node.get_id().to_string())
3190 .collect();
3191
3192 let cutypes = all_id_nodes
3193 .iter()
3194 .map(|(id, _)| find_task_type_for_id(graph, *id))
3195 .collect();
3196
3197 let background_flags: Vec<bool> = all_id_nodes
3198 .iter()
3199 .map(|(_, node)| node.is_background())
3200 .collect();
3201
3202 let logging_enabled: Vec<bool> = all_id_nodes
3203 .iter()
3204 .map(|(_, node)| node.is_logging_enabled())
3205 .collect();
3206
3207 let type_names: Vec<String> = all_id_nodes
3208 .iter()
3209 .map(|(_, node)| node.get_type().to_string())
3210 .collect();
3211
3212 let output_types = extract_tasks_output_types(graph);
3213
3214 let task_types = type_names
3215 .iter()
3216 .zip(background_flags.iter())
3217 .zip(output_types.iter())
3218 .map(|((name, &background), output_type)| {
3219 let name_type = parse_str::<Type>(name).unwrap_or_else(|error| {
3220 panic!("Could not transform {name} into a Task Rust type: {error}");
3221 });
3222 if background {
3223 if let Some(output_type) = output_type {
3224 parse_quote!(CuAsyncTask<#name_type, #output_type>)
3225 } else {
3226 panic!("{name}: If a task is background, it has to have an output");
3227 }
3228 } else {
3229 name_type
3230 }
3231 })
3232 .collect();
3233
3234 let instantiation_types = type_names
3235 .iter()
3236 .zip(background_flags.iter())
3237 .zip(output_types.iter())
3238 .map(|((name, &background), output_type)| {
3239 let name_type = parse_str::<Type>(name).unwrap_or_else(|error| {
3240 panic!("Could not transform {name} into a Task Rust type: {error}");
3241 });
3242 if background {
3243 if let Some(output_type) = output_type {
3244 parse_quote!(CuAsyncTask::<#name_type, #output_type>)
3245 } else {
3246 panic!("{name}: If a task is background, it has to have an output");
3247 }
3248 } else {
3249 name_type
3250 }
3251 })
3252 .collect();
3253
3254 let sim_task_types = type_names
3255 .iter()
3256 .map(|name| {
3257 parse_str::<Type>(name).unwrap_or_else(|err| {
3258 eprintln!("Could not transform {name} into a Task Rust type.");
3259 panic!("{err}")
3260 })
3261 })
3262 .collect();
3263
3264 let run_in_sim_flags = all_id_nodes
3265 .iter()
3266 .map(|(_, node)| node.is_run_in_sim())
3267 .collect();
3268
3269 let mut node_id_to_task_index = vec![None; graph.node_count()];
3270 for (index, (node_id, _)) in all_id_nodes.iter().enumerate() {
3271 node_id_to_task_index[*node_id as usize] = Some(index);
3272 }
3273
3274 Self {
3275 ids,
3276 cutypes,
3277 background_flags,
3278 logging_enabled,
3279 type_names,
3280 task_types,
3281 instantiation_types,
3282 sim_task_types,
3283 run_in_sim_flags,
3284 output_types,
3285 node_id_to_task_index,
3286 }
3287 }
3288}
3289
3290#[derive(Clone)]
3291struct OutputPack {
3292 msg_types: Vec<Type>,
3293}
3294
3295impl OutputPack {
3296 fn slot_type(&self) -> Type {
3297 build_output_slot_type(&self.msg_types)
3298 }
3299
3300 fn is_multi(&self) -> bool {
3301 self.msg_types.len() > 1
3302 }
3303}
3304
3305fn build_output_slot_type(msg_types: &[Type]) -> Type {
3306 if msg_types.is_empty() {
3307 parse_quote! { () }
3308 } else if msg_types.len() == 1 {
3309 let msg_type = msg_types.first().unwrap();
3310 parse_quote! { CuMsg<#msg_type> }
3311 } else {
3312 parse_quote! { ( #( CuMsg<#msg_types> ),* ) }
3313 }
3314}
3315
3316fn extract_output_packs(runtime_plan: &CuExecutionLoop) -> Vec<OutputPack> {
3317 let mut packs: Vec<(u32, OutputPack)> = runtime_plan
3318 .steps
3319 .iter()
3320 .filter_map(|unit| match unit {
3321 CuExecutionUnit::Step(step) => {
3322 if let Some(output_pack) = &step.output_msg_pack {
3323 let msg_types: Vec<Type> = output_pack
3324 .msg_types
3325 .iter()
3326 .map(|output_msg_type| {
3327 parse_str::<Type>(output_msg_type.as_str()).unwrap_or_else(|_| {
3328 panic!(
3329 "Could not transform {output_msg_type} into a message Rust type."
3330 )
3331 })
3332 })
3333 .collect();
3334 Some((output_pack.culist_index, OutputPack { msg_types }))
3335 } else {
3336 None
3337 }
3338 }
3339 CuExecutionUnit::Loop(_) => todo!("Needs to be implemented"),
3340 })
3341 .collect();
3342
3343 packs.sort_by_key(|(index, _)| *index);
3344 packs.into_iter().map(|(_, pack)| pack).collect()
3345}
3346
3347fn collect_output_pack_sizes(runtime_plan: &CuExecutionLoop) -> Vec<usize> {
3348 let mut sizes: Vec<(u32, usize)> = runtime_plan
3349 .steps
3350 .iter()
3351 .filter_map(|unit| match unit {
3352 CuExecutionUnit::Step(step) => step
3353 .output_msg_pack
3354 .as_ref()
3355 .map(|output_pack| (output_pack.culist_index, output_pack.msg_types.len())),
3356 CuExecutionUnit::Loop(_) => todo!("Needs to be implemented"),
3357 })
3358 .collect();
3359
3360 sizes.sort_by_key(|(index, _)| *index);
3361 sizes.into_iter().map(|(_, size)| size).collect()
3362}
3363
3364fn build_culist_tuple(slot_types: &[Type]) -> TypeTuple {
3366 if slot_types.is_empty() {
3367 parse_quote! { () }
3368 } else {
3369 parse_quote! { ( #( #slot_types ),* ) }
3370 }
3371}
3372
3373fn build_culist_tuple_encode(slot_types: &[Type]) -> ItemImpl {
3375 let indices: Vec<usize> = (0..slot_types.len()).collect();
3376
3377 let encode_fields: Vec<_> = indices
3379 .iter()
3380 .map(|i| {
3381 let idx = syn::Index::from(*i);
3382 quote! { self.0.#idx.encode(encoder)?; }
3383 })
3384 .collect();
3385
3386 parse_quote! {
3387 impl Encode for CuStampedDataSet {
3388 fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
3389 #(#encode_fields)*
3390 Ok(())
3391 }
3392 }
3393 }
3394}
3395
3396fn build_culist_tuple_decode(slot_types: &[Type]) -> ItemImpl {
3398 let indices: Vec<usize> = (0..slot_types.len()).collect();
3399
3400 let decode_fields: Vec<_> = indices
3401 .iter()
3402 .map(|i| {
3403 let slot_type = &slot_types[*i];
3404 quote! { <#slot_type as Decode<()>>::decode(decoder)? }
3405 })
3406 .collect();
3407
3408 parse_quote! {
3409 impl Decode<()> for CuStampedDataSet {
3410 fn decode<D: Decoder<Context=()>>(decoder: &mut D) -> Result<Self, DecodeError> {
3411 Ok(CuStampedDataSet ((
3412 #(#decode_fields),*
3413 )))
3414 }
3415 }
3416 }
3417}
3418
3419fn build_culist_erasedcumsgs(output_packs: &[OutputPack]) -> ItemImpl {
3420 let mut casted_fields: Vec<proc_macro2::TokenStream> = Vec::new();
3421 for (idx, pack) in output_packs.iter().enumerate() {
3422 let slot_index = syn::Index::from(idx);
3423 if pack.is_multi() {
3424 for port_idx in 0..pack.msg_types.len() {
3425 let port_index = syn::Index::from(port_idx);
3426 casted_fields.push(quote! {
3427 &self.0.#slot_index.#port_index as &dyn ErasedCuStampedData
3428 });
3429 }
3430 } else {
3431 casted_fields.push(quote! { &self.0.#slot_index as &dyn ErasedCuStampedData });
3432 }
3433 }
3434 parse_quote! {
3435 impl ErasedCuStampedDataSet for CuStampedDataSet {
3436 fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
3437 vec![
3438 #(#casted_fields),*
3439 ]
3440 }
3441 }
3442 }
3443}
3444
3445fn build_culist_tuple_debug(slot_types: &[Type]) -> ItemImpl {
3446 let indices: Vec<usize> = (0..slot_types.len()).collect();
3447
3448 let debug_fields: Vec<_> = indices
3449 .iter()
3450 .map(|i| {
3451 let idx = syn::Index::from(*i);
3452 quote! { .field(&self.0.#idx) }
3453 })
3454 .collect();
3455
3456 parse_quote! {
3457 impl Debug for CuStampedDataSet {
3458 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
3459 f.debug_tuple("CuStampedDataSet")
3460 #(#debug_fields)*
3461 .finish()
3462 }
3463 }
3464 }
3465}
3466
3467fn build_culist_tuple_serialize(slot_types: &[Type]) -> ItemImpl {
3469 let indices: Vec<usize> = (0..slot_types.len()).collect();
3470 let tuple_len = slot_types.len();
3471
3472 let serialize_fields: Vec<_> = indices
3474 .iter()
3475 .map(|i| {
3476 let idx = syn::Index::from(*i);
3477 quote! { &self.0.#idx }
3478 })
3479 .collect();
3480
3481 parse_quote! {
3482 impl Serialize for CuStampedDataSet {
3483 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
3484 where
3485 S: serde::Serializer,
3486 {
3487 use serde::ser::SerializeTuple;
3488 let mut tuple = serializer.serialize_tuple(#tuple_len)?;
3489 #(tuple.serialize_element(#serialize_fields)?;)*
3490 tuple.end()
3491 }
3492 }
3493 }
3494}
3495
3496fn build_culist_tuple_default(slot_types: &[Type]) -> ItemImpl {
3498 let default_fields: Vec<_> = slot_types
3499 .iter()
3500 .map(|slot_type| quote! { <#slot_type as Default>::default() })
3501 .collect();
3502
3503 parse_quote! {
3504 impl Default for CuStampedDataSet {
3505 fn default() -> CuStampedDataSet
3506 {
3507 CuStampedDataSet((
3508 #(#default_fields),*
3509 ))
3510 }
3511 }
3512 }
3513}
3514
3515fn collect_bridge_channel_usage(graph: &CuGraph) -> HashMap<BridgeChannelKey, String> {
3516 let mut usage = HashMap::new();
3517 for cnx in graph.edges() {
3518 if let Some(channel) = &cnx.src_channel {
3519 let key = BridgeChannelKey {
3520 bridge_id: cnx.src.clone(),
3521 channel_id: channel.clone(),
3522 direction: BridgeChannelDirection::Rx,
3523 };
3524 usage
3525 .entry(key)
3526 .and_modify(|msg| {
3527 if msg != &cnx.msg {
3528 panic!(
3529 "Bridge '{}' channel '{}' is used with incompatible message types: {} vs {}",
3530 cnx.src, channel, msg, cnx.msg
3531 );
3532 }
3533 })
3534 .or_insert(cnx.msg.clone());
3535 }
3536 if let Some(channel) = &cnx.dst_channel {
3537 let key = BridgeChannelKey {
3538 bridge_id: cnx.dst.clone(),
3539 channel_id: channel.clone(),
3540 direction: BridgeChannelDirection::Tx,
3541 };
3542 usage
3543 .entry(key)
3544 .and_modify(|msg| {
3545 if msg != &cnx.msg {
3546 panic!(
3547 "Bridge '{}' channel '{}' is used with incompatible message types: {} vs {}",
3548 cnx.dst, channel, msg, cnx.msg
3549 );
3550 }
3551 })
3552 .or_insert(cnx.msg.clone());
3553 }
3554 }
3555 usage
3556}
3557
3558fn build_bridge_specs(
3559 config: &CuConfig,
3560 graph: &CuGraph,
3561 channel_usage: &HashMap<BridgeChannelKey, String>,
3562) -> Vec<BridgeSpec> {
3563 let mut specs = Vec::new();
3564 for (bridge_index, bridge_cfg) in config.bridges.iter().enumerate() {
3565 if graph.get_node_id_by_name(bridge_cfg.id.as_str()).is_none() {
3566 continue;
3567 }
3568
3569 let type_path = parse_str::<Type>(bridge_cfg.type_.as_str()).unwrap_or_else(|err| {
3570 panic!(
3571 "Could not parse bridge type '{}' for '{}': {err}",
3572 bridge_cfg.type_, bridge_cfg.id
3573 )
3574 });
3575
3576 let mut rx_channels = Vec::new();
3577 let mut tx_channels = Vec::new();
3578
3579 for (channel_index, channel) in bridge_cfg.channels.iter().enumerate() {
3580 match channel {
3581 BridgeChannelConfigRepresentation::Rx { id, .. } => {
3582 let key = BridgeChannelKey {
3583 bridge_id: bridge_cfg.id.clone(),
3584 channel_id: id.clone(),
3585 direction: BridgeChannelDirection::Rx,
3586 };
3587 if let Some(msg_type) = channel_usage.get(&key) {
3588 let msg_type_name = msg_type.clone();
3589 let msg_type = parse_str::<Type>(msg_type).unwrap_or_else(|err| {
3590 panic!(
3591 "Could not parse message type '{msg_type}' for bridge '{}' channel '{}': {err}",
3592 bridge_cfg.id, id
3593 )
3594 });
3595 let const_ident =
3596 Ident::new(&config_id_to_bridge_const(id.as_str()), Span::call_site());
3597 rx_channels.push(BridgeChannelSpec {
3598 id: id.clone(),
3599 const_ident,
3600 msg_type,
3601 msg_type_name,
3602 config_index: channel_index,
3603 plan_node_id: None,
3604 culist_index: None,
3605 monitor_index: None,
3606 });
3607 }
3608 }
3609 BridgeChannelConfigRepresentation::Tx { id, .. } => {
3610 let key = BridgeChannelKey {
3611 bridge_id: bridge_cfg.id.clone(),
3612 channel_id: id.clone(),
3613 direction: BridgeChannelDirection::Tx,
3614 };
3615 if let Some(msg_type) = channel_usage.get(&key) {
3616 let msg_type_name = msg_type.clone();
3617 let msg_type = parse_str::<Type>(msg_type).unwrap_or_else(|err| {
3618 panic!(
3619 "Could not parse message type '{msg_type}' for bridge '{}' channel '{}': {err}",
3620 bridge_cfg.id, id
3621 )
3622 });
3623 let const_ident =
3624 Ident::new(&config_id_to_bridge_const(id.as_str()), Span::call_site());
3625 tx_channels.push(BridgeChannelSpec {
3626 id: id.clone(),
3627 const_ident,
3628 msg_type,
3629 msg_type_name,
3630 config_index: channel_index,
3631 plan_node_id: None,
3632 culist_index: None,
3633 monitor_index: None,
3634 });
3635 }
3636 }
3637 }
3638 }
3639
3640 if rx_channels.is_empty() && tx_channels.is_empty() {
3641 continue;
3642 }
3643
3644 specs.push(BridgeSpec {
3645 id: bridge_cfg.id.clone(),
3646 type_path,
3647 config_index: bridge_index,
3648 tuple_index: 0,
3649 monitor_index: None,
3650 rx_channels,
3651 tx_channels,
3652 });
3653 }
3654
3655 for (tuple_index, spec) in specs.iter_mut().enumerate() {
3656 spec.tuple_index = tuple_index;
3657 }
3658
3659 specs
3660}
3661
3662fn collect_task_member_names(graph: &CuGraph) -> Vec<(NodeId, String)> {
3663 graph
3664 .get_all_nodes()
3665 .iter()
3666 .filter(|(_, node)| node.get_flavor() == Flavor::Task)
3667 .map(|(node_id, node)| (*node_id, config_id_to_struct_member(node.get_id().as_str())))
3668 .collect()
3669}
3670
3671#[derive(Clone, Copy)]
3672enum ResourceOwner {
3673 Task(usize),
3674 Bridge(usize),
3675}
3676
3677#[derive(Clone)]
3678struct ResourceKeySpec {
3679 bundle_index: usize,
3680 provider_path: syn::Path,
3681 resource_name: String,
3682 binding_name: String,
3683 owner: ResourceOwner,
3684}
3685
3686fn parse_resource_path(path: &str) -> CuResult<(String, String)> {
3687 let (bundle_id, name) = path.split_once('.').ok_or_else(|| {
3688 CuError::from(format!(
3689 "Resource '{path}' is missing a bundle prefix (expected bundle.resource)"
3690 ))
3691 })?;
3692
3693 if bundle_id.is_empty() || name.is_empty() {
3694 return Err(CuError::from(format!(
3695 "Resource '{path}' must use the 'bundle.resource' format"
3696 )));
3697 }
3698
3699 Ok((bundle_id.to_string(), name.to_string()))
3700}
3701
3702fn collect_resource_specs(
3703 graph: &CuGraph,
3704 task_specs: &CuTaskSpecSet,
3705 bridge_specs: &[BridgeSpec],
3706 bundle_specs: &[BundleSpec],
3707) -> CuResult<Vec<ResourceKeySpec>> {
3708 let mut bridge_lookup: BTreeMap<String, usize> = BTreeMap::new();
3709 for (idx, spec) in bridge_specs.iter().enumerate() {
3710 bridge_lookup.insert(spec.id.clone(), idx);
3711 }
3712
3713 let mut bundle_lookup: HashMap<String, (usize, syn::Path)> = HashMap::new();
3714 for (index, bundle) in bundle_specs.iter().enumerate() {
3715 bundle_lookup.insert(bundle.id.clone(), (index, bundle.provider_path.clone()));
3716 }
3717
3718 let mut specs = Vec::new();
3719
3720 for (node_id, node) in graph.get_all_nodes() {
3721 let resources = node.get_resources();
3722 if let Some(resources) = resources {
3723 let task_index = task_specs.node_id_to_task_index[node_id as usize];
3724 let owner = if let Some(task_index) = task_index {
3725 ResourceOwner::Task(task_index)
3726 } else if node.get_flavor() == Flavor::Bridge {
3727 let bridge_index = bridge_lookup.get(&node.get_id()).ok_or_else(|| {
3728 CuError::from(format!(
3729 "Resource mapping attached to unknown bridge node '{}'",
3730 node.get_id()
3731 ))
3732 })?;
3733 ResourceOwner::Bridge(*bridge_index)
3734 } else {
3735 return Err(CuError::from(format!(
3736 "Resource mapping attached to non-task node '{}'",
3737 node.get_id()
3738 )));
3739 };
3740
3741 for (binding_name, path) in resources {
3742 let (bundle_id, resource_name) = parse_resource_path(path)?;
3743 let (bundle_index, provider_path) =
3744 bundle_lookup.get(&bundle_id).ok_or_else(|| {
3745 CuError::from(format!(
3746 "Resource '{}' references unknown bundle '{}'",
3747 path, bundle_id
3748 ))
3749 })?;
3750 specs.push(ResourceKeySpec {
3751 bundle_index: *bundle_index,
3752 provider_path: provider_path.clone(),
3753 resource_name,
3754 binding_name: binding_name.clone(),
3755 owner,
3756 });
3757 }
3758 }
3759 }
3760
3761 Ok(specs)
3762}
3763
3764fn build_bundle_list<'a>(config: &'a CuConfig, mission: &str) -> Vec<&'a ResourceBundleConfig> {
3765 config
3766 .resources
3767 .iter()
3768 .filter(|bundle| {
3769 bundle
3770 .missions
3771 .as_ref()
3772 .is_none_or(|missions| missions.iter().any(|m| m == mission))
3773 })
3774 .collect()
3775}
3776
3777struct BundleSpec {
3778 id: String,
3779 provider_path: syn::Path,
3780}
3781
3782fn build_bundle_specs(config: &CuConfig, mission: &str) -> CuResult<Vec<BundleSpec>> {
3783 build_bundle_list(config, mission)
3784 .into_iter()
3785 .map(|bundle| {
3786 let provider_path: syn::Path =
3787 syn::parse_str(bundle.provider.as_str()).map_err(|err| {
3788 CuError::from(format!(
3789 "Failed to parse provider path '{}' for bundle '{}': {err}",
3790 bundle.provider, bundle.id
3791 ))
3792 })?;
3793 Ok(BundleSpec {
3794 id: bundle.id.clone(),
3795 provider_path,
3796 })
3797 })
3798 .collect()
3799}
3800
3801fn build_resources_module(
3802 bundle_specs: &[BundleSpec],
3803) -> CuResult<(proc_macro2::TokenStream, proc_macro2::TokenStream)> {
3804 let bundle_consts = bundle_specs.iter().enumerate().map(|(index, bundle)| {
3805 let const_ident = Ident::new(
3806 &config_id_to_bridge_const(bundle.id.as_str()),
3807 Span::call_site(),
3808 );
3809 quote! { pub const #const_ident: BundleIndex = BundleIndex::new(#index); }
3810 });
3811
3812 let resources_module = quote! {
3813 pub mod resources {
3814 #![allow(dead_code)]
3815 use cu29::resource::BundleIndex;
3816
3817 pub mod bundles {
3818 use super::BundleIndex;
3819 #(#bundle_consts)*
3820 }
3821 }
3822 };
3823
3824 let bundle_counts = bundle_specs.iter().map(|bundle| {
3825 let provider_path = &bundle.provider_path;
3826 quote! { <#provider_path as cu29::resource::ResourceBundleDecl>::Id::COUNT }
3827 });
3828
3829 let bundle_inits = bundle_specs
3830 .iter()
3831 .enumerate()
3832 .map(|(index, bundle)| {
3833 let bundle_id = LitStr::new(bundle.id.as_str(), Span::call_site());
3834 let provider_path = &bundle.provider_path;
3835 quote! {
3836 let bundle_cfg = config
3837 .resources
3838 .iter()
3839 .find(|b| b.id == #bundle_id)
3840 .unwrap_or_else(|| panic!("Resource bundle '{}' missing from configuration", #bundle_id));
3841 let bundle_ctx = cu29::resource::BundleContext::<#provider_path>::new(
3842 cu29::resource::BundleIndex::new(#index),
3843 #bundle_id,
3844 );
3845 <#provider_path as cu29::resource::ResourceBundle>::build(
3846 bundle_ctx,
3847 bundle_cfg.config.as_ref(),
3848 &mut manager,
3849 )?;
3850 }
3851 })
3852 .collect::<Vec<_>>();
3853
3854 let resources_instanciator = quote! {
3855 pub fn resources_instanciator(config: &CuConfig) -> CuResult<cu29::resource::ResourceManager> {
3856 let bundle_counts: &[usize] = &[ #(#bundle_counts),* ];
3857 let mut manager = cu29::resource::ResourceManager::new(bundle_counts);
3858 #(#bundle_inits)*
3859 Ok(manager)
3860 }
3861 };
3862
3863 Ok((resources_module, resources_instanciator))
3864}
3865
3866struct ResourceMappingTokens {
3867 defs: proc_macro2::TokenStream,
3868 refs: Vec<proc_macro2::TokenStream>,
3869}
3870
3871fn build_task_resource_mappings(
3872 resource_specs: &[ResourceKeySpec],
3873 task_specs: &CuTaskSpecSet,
3874) -> CuResult<ResourceMappingTokens> {
3875 let mut per_task: Vec<Vec<&ResourceKeySpec>> = vec![Vec::new(); task_specs.ids.len()];
3876
3877 for spec in resource_specs {
3878 let ResourceOwner::Task(task_index) = spec.owner else {
3879 continue;
3880 };
3881 per_task
3882 .get_mut(task_index)
3883 .ok_or_else(|| {
3884 CuError::from(format!(
3885 "Resource '{}' mapped to invalid task index {}",
3886 spec.binding_name, task_index
3887 ))
3888 })?
3889 .push(spec);
3890 }
3891
3892 let mut mapping_defs = Vec::new();
3893 let mut mapping_refs = Vec::new();
3894
3895 for (idx, entries) in per_task.iter().enumerate() {
3896 if entries.is_empty() {
3897 mapping_refs.push(quote! { None });
3898 continue;
3899 }
3900
3901 let binding_task_type = if task_specs.background_flags[idx] {
3902 &task_specs.sim_task_types[idx]
3903 } else {
3904 &task_specs.task_types[idx]
3905 };
3906
3907 let binding_trait = match task_specs.cutypes[idx] {
3908 CuTaskType::Source => quote! { CuSrcTask },
3909 CuTaskType::Regular => quote! { CuTask },
3910 CuTaskType::Sink => quote! { CuSinkTask },
3911 };
3912
3913 let entries_ident = format_ident!("TASK{}_RES_ENTRIES", idx);
3914 let map_ident = format_ident!("TASK{}_RES_MAPPING", idx);
3915 let binding_type = quote! {
3916 <<#binding_task_type as #binding_trait>::Resources<'_> as ResourceBindings>::Binding
3917 };
3918 let entry_tokens = entries.iter().map(|spec| {
3919 let binding_ident =
3920 Ident::new(&config_id_to_enum(spec.binding_name.as_str()), Span::call_site());
3921 let resource_ident =
3922 Ident::new(&config_id_to_enum(spec.resource_name.as_str()), Span::call_site());
3923 let bundle_index = spec.bundle_index;
3924 let provider_path = &spec.provider_path;
3925 quote! {
3926 (#binding_type::#binding_ident, cu29::resource::ResourceKey::new(
3927 cu29::resource::BundleIndex::new(#bundle_index),
3928 <#provider_path as cu29::resource::ResourceBundleDecl>::Id::#resource_ident as usize,
3929 ))
3930 }
3931 });
3932
3933 mapping_defs.push(quote! {
3934 const #entries_ident: &[(#binding_type, cu29::resource::ResourceKey)] = &[ #(#entry_tokens),* ];
3935 const #map_ident: cu29::resource::ResourceBindingMap<#binding_type> =
3936 cu29::resource::ResourceBindingMap::new(#entries_ident);
3937 });
3938 mapping_refs.push(quote! { Some(&#map_ident) });
3939 }
3940
3941 Ok(ResourceMappingTokens {
3942 defs: quote! { #(#mapping_defs)* },
3943 refs: mapping_refs,
3944 })
3945}
3946
3947fn build_bridge_resource_mappings(
3948 resource_specs: &[ResourceKeySpec],
3949 bridge_specs: &[BridgeSpec],
3950) -> ResourceMappingTokens {
3951 let mut per_bridge: Vec<Vec<&ResourceKeySpec>> = vec![Vec::new(); bridge_specs.len()];
3952
3953 for spec in resource_specs {
3954 let ResourceOwner::Bridge(bridge_index) = spec.owner else {
3955 continue;
3956 };
3957 per_bridge[bridge_index].push(spec);
3958 }
3959
3960 let mut mapping_defs = Vec::new();
3961 let mut mapping_refs = Vec::new();
3962
3963 for (idx, entries) in per_bridge.iter().enumerate() {
3964 if entries.is_empty() {
3965 mapping_refs.push(quote! { None });
3966 continue;
3967 }
3968
3969 let bridge_type = &bridge_specs[idx].type_path;
3970 let binding_type = quote! {
3971 <<#bridge_type as cu29::cubridge::CuBridge>::Resources<'_> as ResourceBindings>::Binding
3972 };
3973 let entries_ident = format_ident!("BRIDGE{}_RES_ENTRIES", idx);
3974 let map_ident = format_ident!("BRIDGE{}_RES_MAPPING", idx);
3975 let entry_tokens = entries.iter().map(|spec| {
3976 let binding_ident =
3977 Ident::new(&config_id_to_enum(spec.binding_name.as_str()), Span::call_site());
3978 let resource_ident =
3979 Ident::new(&config_id_to_enum(spec.resource_name.as_str()), Span::call_site());
3980 let bundle_index = spec.bundle_index;
3981 let provider_path = &spec.provider_path;
3982 quote! {
3983 (#binding_type::#binding_ident, cu29::resource::ResourceKey::new(
3984 cu29::resource::BundleIndex::new(#bundle_index),
3985 <#provider_path as cu29::resource::ResourceBundleDecl>::Id::#resource_ident as usize,
3986 ))
3987 }
3988 });
3989
3990 mapping_defs.push(quote! {
3991 const #entries_ident: &[(#binding_type, cu29::resource::ResourceKey)] = &[ #(#entry_tokens),* ];
3992 const #map_ident: cu29::resource::ResourceBindingMap<#binding_type> =
3993 cu29::resource::ResourceBindingMap::new(#entries_ident);
3994 });
3995 mapping_refs.push(quote! { Some(&#map_ident) });
3996 }
3997
3998 ResourceMappingTokens {
3999 defs: quote! { #(#mapping_defs)* },
4000 refs: mapping_refs,
4001 }
4002}
4003
4004fn build_execution_plan(
4005 graph: &CuGraph,
4006 task_specs: &CuTaskSpecSet,
4007 bridge_specs: &mut [BridgeSpec],
4008) -> CuResult<(
4009 CuExecutionLoop,
4010 Vec<ExecutionEntity>,
4011 HashMap<NodeId, NodeId>,
4012)> {
4013 let mut plan_graph = CuGraph::default();
4014 let mut exec_entities = Vec::new();
4015 let mut original_to_plan = HashMap::new();
4016 let mut plan_to_original = HashMap::new();
4017 let mut name_to_original = HashMap::new();
4018 let mut channel_nodes = HashMap::new();
4019
4020 for (node_id, node) in graph.get_all_nodes() {
4021 name_to_original.insert(node.get_id(), node_id);
4022 if node.get_flavor() != Flavor::Task {
4023 continue;
4024 }
4025 let plan_node_id = plan_graph.add_node(node.clone())?;
4026 let task_index = task_specs.node_id_to_task_index[node_id as usize]
4027 .expect("Task missing from specifications");
4028 plan_to_original.insert(plan_node_id, node_id);
4029 original_to_plan.insert(node_id, plan_node_id);
4030 if plan_node_id as usize != exec_entities.len() {
4031 panic!("Unexpected node ordering while mirroring tasks in plan graph");
4032 }
4033 exec_entities.push(ExecutionEntity {
4034 kind: ExecutionEntityKind::Task { task_index },
4035 });
4036 }
4037
4038 for (bridge_index, spec) in bridge_specs.iter_mut().enumerate() {
4039 for (channel_index, channel_spec) in spec.rx_channels.iter_mut().enumerate() {
4040 let mut node = Node::new(
4041 format!("{}::rx::{}", spec.id, channel_spec.id).as_str(),
4042 "__CuBridgeRxChannel",
4043 );
4044 node.set_flavor(Flavor::Bridge);
4045 let plan_node_id = plan_graph.add_node(node)?;
4046 if plan_node_id as usize != exec_entities.len() {
4047 panic!("Unexpected node ordering while inserting bridge rx channel");
4048 }
4049 channel_spec.plan_node_id = Some(plan_node_id);
4050 exec_entities.push(ExecutionEntity {
4051 kind: ExecutionEntityKind::BridgeRx {
4052 bridge_index,
4053 channel_index,
4054 },
4055 });
4056 channel_nodes.insert(
4057 BridgeChannelKey {
4058 bridge_id: spec.id.clone(),
4059 channel_id: channel_spec.id.clone(),
4060 direction: BridgeChannelDirection::Rx,
4061 },
4062 plan_node_id,
4063 );
4064 }
4065
4066 for (channel_index, channel_spec) in spec.tx_channels.iter_mut().enumerate() {
4067 let mut node = Node::new(
4068 format!("{}::tx::{}", spec.id, channel_spec.id).as_str(),
4069 "__CuBridgeTxChannel",
4070 );
4071 node.set_flavor(Flavor::Bridge);
4072 let plan_node_id = plan_graph.add_node(node)?;
4073 if plan_node_id as usize != exec_entities.len() {
4074 panic!("Unexpected node ordering while inserting bridge tx channel");
4075 }
4076 channel_spec.plan_node_id = Some(plan_node_id);
4077 exec_entities.push(ExecutionEntity {
4078 kind: ExecutionEntityKind::BridgeTx {
4079 bridge_index,
4080 channel_index,
4081 },
4082 });
4083 channel_nodes.insert(
4084 BridgeChannelKey {
4085 bridge_id: spec.id.clone(),
4086 channel_id: channel_spec.id.clone(),
4087 direction: BridgeChannelDirection::Tx,
4088 },
4089 plan_node_id,
4090 );
4091 }
4092 }
4093
4094 for cnx in graph.edges() {
4095 let src_plan = if let Some(channel) = &cnx.src_channel {
4096 let key = BridgeChannelKey {
4097 bridge_id: cnx.src.clone(),
4098 channel_id: channel.clone(),
4099 direction: BridgeChannelDirection::Rx,
4100 };
4101 *channel_nodes
4102 .get(&key)
4103 .unwrap_or_else(|| panic!("Bridge source {:?} missing from plan graph", key))
4104 } else {
4105 let node_id = name_to_original
4106 .get(&cnx.src)
4107 .copied()
4108 .unwrap_or_else(|| panic!("Unknown source node '{}'", cnx.src));
4109 *original_to_plan
4110 .get(&node_id)
4111 .unwrap_or_else(|| panic!("Source node '{}' missing from plan", cnx.src))
4112 };
4113
4114 let dst_plan = if let Some(channel) = &cnx.dst_channel {
4115 let key = BridgeChannelKey {
4116 bridge_id: cnx.dst.clone(),
4117 channel_id: channel.clone(),
4118 direction: BridgeChannelDirection::Tx,
4119 };
4120 *channel_nodes
4121 .get(&key)
4122 .unwrap_or_else(|| panic!("Bridge destination {:?} missing from plan graph", key))
4123 } else {
4124 let node_id = name_to_original
4125 .get(&cnx.dst)
4126 .copied()
4127 .unwrap_or_else(|| panic!("Unknown destination node '{}'", cnx.dst));
4128 *original_to_plan
4129 .get(&node_id)
4130 .unwrap_or_else(|| panic!("Destination node '{}' missing from plan", cnx.dst))
4131 };
4132
4133 plan_graph
4134 .connect_ext(
4135 src_plan,
4136 dst_plan,
4137 &cnx.msg,
4138 cnx.missions.clone(),
4139 None,
4140 None,
4141 )
4142 .map_err(|e| CuError::from(e.to_string()))?;
4143 }
4144
4145 let runtime_plan = compute_runtime_plan(&plan_graph)?;
4146 Ok((runtime_plan, exec_entities, plan_to_original))
4147}
4148
4149fn collect_culist_metadata(
4150 runtime_plan: &CuExecutionLoop,
4151 exec_entities: &[ExecutionEntity],
4152 bridge_specs: &mut [BridgeSpec],
4153 plan_to_original: &HashMap<NodeId, NodeId>,
4154) -> (Vec<usize>, HashMap<NodeId, usize>) {
4155 let mut culist_order = Vec::new();
4156 let mut node_output_positions = HashMap::new();
4157
4158 for unit in &runtime_plan.steps {
4159 if let CuExecutionUnit::Step(step) = unit
4160 && let Some(output_pack) = &step.output_msg_pack
4161 {
4162 let output_idx = output_pack.culist_index;
4163 culist_order.push(output_idx as usize);
4164 match &exec_entities[step.node_id as usize].kind {
4165 ExecutionEntityKind::Task { .. } => {
4166 if let Some(original_node_id) = plan_to_original.get(&step.node_id) {
4167 node_output_positions.insert(*original_node_id, output_idx as usize);
4168 }
4169 }
4170 ExecutionEntityKind::BridgeRx {
4171 bridge_index,
4172 channel_index,
4173 } => {
4174 bridge_specs[*bridge_index].rx_channels[*channel_index].culist_index =
4175 Some(output_idx as usize);
4176 }
4177 ExecutionEntityKind::BridgeTx { .. } => {}
4178 }
4179 }
4180 }
4181
4182 (culist_order, node_output_positions)
4183}
4184
4185#[allow(dead_code)]
4186fn build_monitored_ids(task_ids: &[String], bridge_specs: &mut [BridgeSpec]) -> Vec<String> {
4187 let mut names = task_ids.to_vec();
4188 for spec in bridge_specs.iter_mut() {
4189 spec.monitor_index = Some(names.len());
4190 names.push(format!("bridge::{}", spec.id));
4191 for channel in spec.rx_channels.iter_mut() {
4192 channel.monitor_index = Some(names.len());
4193 names.push(format!("bridge::{}::rx::{}", spec.id, channel.id));
4194 }
4195 for channel in spec.tx_channels.iter_mut() {
4196 channel.monitor_index = Some(names.len());
4197 names.push(format!("bridge::{}::tx::{}", spec.id, channel.id));
4198 }
4199 }
4200 names
4201}
4202
4203fn generate_task_execution_tokens(
4204 step: &CuExecutionStep,
4205 task_index: usize,
4206 task_specs: &CuTaskSpecSet,
4207 output_pack_sizes: &[usize],
4208 sim_mode: bool,
4209 mission_mod: &Ident,
4210) -> (proc_macro2::TokenStream, proc_macro2::TokenStream) {
4211 let node_index = int2sliceindex(task_index as u32);
4212 let task_instance = quote! { tasks.#node_index };
4213 let comment_str = format!(
4214 "DEBUG ->> {} ({:?}) Id:{} I:{:?} O:{:?}",
4215 step.node.get_id(),
4216 step.task_type,
4217 step.node_id,
4218 step.input_msg_indices_types,
4219 step.output_msg_pack
4220 );
4221 let comment_tokens = quote! {{
4222 let _ = stringify!(#comment_str);
4223 }};
4224 let tid = task_index;
4225 let task_enum_name = config_id_to_enum(&task_specs.ids[tid]);
4226 let enum_name = Ident::new(&task_enum_name, Span::call_site());
4227 let rt_guard = rtsan_guard_tokens();
4228 let run_in_sim_flag = task_specs.run_in_sim_flags[tid];
4229 let maybe_sim_tick = if sim_mode && !run_in_sim_flag {
4230 quote! {
4231 if !doit {
4232 #task_instance.sim_tick();
4233 }
4234 }
4235 } else {
4236 quote!()
4237 };
4238
4239 let output_pack = step
4240 .output_msg_pack
4241 .as_ref()
4242 .expect("Task should have an output message pack.");
4243 let output_culist_index = int2sliceindex(output_pack.culist_index);
4244 let output_ports: Vec<syn::Index> = (0..output_pack.msg_types.len())
4245 .map(syn::Index::from)
4246 .collect();
4247 let output_clear_payload = if output_ports.len() == 1 {
4248 quote! { cumsg_output.clear_payload(); }
4249 } else {
4250 quote! { #(cumsg_output.#output_ports.clear_payload();)* }
4251 };
4252 let output_start_time = if output_ports.len() == 1 {
4253 quote! {
4254 if cumsg_output.metadata.process_time.start.is_none() {
4255 cumsg_output.metadata.process_time.start = clock.now().into();
4256 }
4257 }
4258 } else {
4259 quote! {
4260 let start_time = clock.now().into();
4261 #( if cumsg_output.#output_ports.metadata.process_time.start.is_none() {
4262 cumsg_output.#output_ports.metadata.process_time.start = start_time;
4263 } )*
4264 }
4265 };
4266 let output_end_time = if output_ports.len() == 1 {
4267 quote! {
4268 if cumsg_output.metadata.process_time.end.is_none() {
4269 cumsg_output.metadata.process_time.end = clock.now().into();
4270 }
4271 }
4272 } else {
4273 quote! {
4274 let end_time = clock.now().into();
4275 #( if cumsg_output.#output_ports.metadata.process_time.end.is_none() {
4276 cumsg_output.#output_ports.metadata.process_time.end = end_time;
4277 } )*
4278 }
4279 };
4280
4281 match step.task_type {
4282 CuTaskType::Source => {
4283 let monitoring_action = quote! {
4284 debug!("Task {}: Error during process: {}", #mission_mod::TASKS_IDS[#tid], &error);
4285 let decision = monitor.process_error(#tid, CuTaskState::Process, &error);
4286 match decision {
4287 Decision::Abort => {
4288 debug!("Process: ABORT decision from monitoring. Task '{}' errored out \
4289 during process. Skipping the processing of CL {}.", #mission_mod::TASKS_IDS[#tid], clid);
4290 ctx.clear_current_task();
4291 let monitor_result = monitor.process_copperlist(&ctx, &#mission_mod::collect_metadata(&culist));
4292 cl_manager.end_of_processing(clid)?;
4293 monitor_result?;
4294 return Ok(());
4295 }
4296 Decision::Ignore => {
4297 debug!("Process: IGNORE decision from monitoring. Task '{}' errored out \
4298 during process. The runtime will continue with a forced empty message.", #mission_mod::TASKS_IDS[#tid]);
4299 let cumsg_output = &mut msgs.#output_culist_index;
4300 #output_clear_payload
4301 }
4302 Decision::Shutdown => {
4303 debug!("Process: SHUTDOWN decision from monitoring. Task '{}' errored out \
4304 during process. The runtime cannot continue.", #mission_mod::TASKS_IDS[#tid]);
4305 return Err(CuError::new_with_cause("Task errored out during process.", error));
4306 }
4307 }
4308 };
4309
4310 let call_sim_callback = if sim_mode {
4311 quote! {
4312 let doit = {
4313 let cumsg_output = &mut msgs.#output_culist_index;
4314 let state = CuTaskCallbackState::Process((), cumsg_output);
4315 let ovr = sim_callback(SimStep::#enum_name(state));
4316
4317 if let SimOverride::Errored(reason) = ovr {
4318 let error: CuError = reason.into();
4319 #monitoring_action
4320 false
4321 } else {
4322 ovr == SimOverride::ExecuteByRuntime
4323 }
4324 };
4325 }
4326 } else {
4327 quote! { let doit = true; }
4328 };
4329
4330 let logging_tokens = if !task_specs.logging_enabled[tid] {
4331 quote! {
4332 let mut cumsg_output = &mut culist.msgs.0.#output_culist_index;
4333 #output_clear_payload
4334 }
4335 } else {
4336 quote!()
4337 };
4338
4339 (
4340 quote! {
4341 {
4342 #comment_tokens
4343 kf_manager.freeze_task(clid, &#task_instance)?;
4344 #call_sim_callback
4345 let cumsg_output = &mut msgs.#output_culist_index;
4346 #maybe_sim_tick
4347 let maybe_error = if doit {
4348 execution_probe.record(cu29::monitoring::ExecutionMarker {
4349 component_id: #tid,
4350 step: CuTaskState::Process,
4351 culistid: Some(clid),
4352 });
4353 #output_start_time
4354 let result = {
4355 #rt_guard
4356 ctx.set_current_task(#tid);
4357 #task_instance.process(&ctx, cumsg_output)
4358 };
4359 #output_end_time
4360 result
4361 } else {
4362 Ok(())
4363 };
4364 if let Err(error) = maybe_error {
4365 #monitoring_action
4366 }
4367 }
4368 },
4369 logging_tokens,
4370 )
4371 }
4372 CuTaskType::Sink => {
4373 let input_exprs: Vec<proc_macro2::TokenStream> = step
4374 .input_msg_indices_types
4375 .iter()
4376 .map(|input| {
4377 let input_index = int2sliceindex(input.culist_index);
4378 let output_size = output_pack_sizes
4379 .get(input.culist_index as usize)
4380 .copied()
4381 .unwrap_or_else(|| {
4382 panic!(
4383 "Missing output pack size for culist index {}",
4384 input.culist_index
4385 )
4386 });
4387 if output_size > 1 {
4388 let port_index = syn::Index::from(input.src_port);
4389 quote! { msgs.#input_index.#port_index }
4390 } else {
4391 quote! { msgs.#input_index }
4392 }
4393 })
4394 .collect();
4395 let inputs_type = if input_exprs.len() == 1 {
4396 let input = input_exprs.first().unwrap();
4397 quote! { #input }
4398 } else {
4399 quote! { (#(&#input_exprs),*) }
4400 };
4401
4402 let monitoring_action = quote! {
4403 debug!("Task {}: Error during process: {}", #mission_mod::TASKS_IDS[#tid], &error);
4404 let decision = monitor.process_error(#tid, CuTaskState::Process, &error);
4405 match decision {
4406 Decision::Abort => {
4407 debug!("Process: ABORT decision from monitoring. Task '{}' errored out \
4408 during process. Skipping the processing of CL {}.", #mission_mod::TASKS_IDS[#tid], clid);
4409 ctx.clear_current_task();
4410 let monitor_result = monitor.process_copperlist(&ctx, &#mission_mod::collect_metadata(&culist));
4411 cl_manager.end_of_processing(clid)?;
4412 monitor_result?;
4413 return Ok(());
4414 }
4415 Decision::Ignore => {
4416 debug!("Process: IGNORE decision from monitoring. Task '{}' errored out \
4417 during process. The runtime will continue with a forced empty message.", #mission_mod::TASKS_IDS[#tid]);
4418 let cumsg_output = &mut msgs.#output_culist_index;
4419 #output_clear_payload
4420 }
4421 Decision::Shutdown => {
4422 debug!("Process: SHUTDOWN decision from monitoring. Task '{}' errored out \
4423 during process. The runtime cannot continue.", #mission_mod::TASKS_IDS[#tid]);
4424 return Err(CuError::new_with_cause("Task errored out during process.", error));
4425 }
4426 }
4427 };
4428
4429 let call_sim_callback = if sim_mode {
4430 quote! {
4431 let doit = {
4432 let cumsg_input = &#inputs_type;
4433 let cumsg_output = &mut msgs.#output_culist_index;
4434 let state = CuTaskCallbackState::Process(cumsg_input, cumsg_output);
4435 let ovr = sim_callback(SimStep::#enum_name(state));
4436
4437 if let SimOverride::Errored(reason) = ovr {
4438 let error: CuError = reason.into();
4439 #monitoring_action
4440 false
4441 } else {
4442 ovr == SimOverride::ExecuteByRuntime
4443 }
4444 };
4445 }
4446 } else {
4447 quote! { let doit = true; }
4448 };
4449
4450 (
4451 quote! {
4452 {
4453 #comment_tokens
4454 kf_manager.freeze_task(clid, &#task_instance)?;
4455 #call_sim_callback
4456 let cumsg_input = &#inputs_type;
4457 let cumsg_output = &mut msgs.#output_culist_index;
4458 let maybe_error = if doit {
4459 execution_probe.record(cu29::monitoring::ExecutionMarker {
4460 component_id: #tid,
4461 step: CuTaskState::Process,
4462 culistid: Some(clid),
4463 });
4464 #output_start_time
4465 let result = {
4466 #rt_guard
4467 ctx.set_current_task(#tid);
4468 #task_instance.process(&ctx, cumsg_input)
4469 };
4470 #output_end_time
4471 result
4472 } else {
4473 Ok(())
4474 };
4475 if let Err(error) = maybe_error {
4476 #monitoring_action
4477 }
4478 }
4479 },
4480 quote! {},
4481 )
4482 }
4483 CuTaskType::Regular => {
4484 let input_exprs: Vec<proc_macro2::TokenStream> = step
4485 .input_msg_indices_types
4486 .iter()
4487 .map(|input| {
4488 let input_index = int2sliceindex(input.culist_index);
4489 let output_size = output_pack_sizes
4490 .get(input.culist_index as usize)
4491 .copied()
4492 .unwrap_or_else(|| {
4493 panic!(
4494 "Missing output pack size for culist index {}",
4495 input.culist_index
4496 )
4497 });
4498 if output_size > 1 {
4499 let port_index = syn::Index::from(input.src_port);
4500 quote! { msgs.#input_index.#port_index }
4501 } else {
4502 quote! { msgs.#input_index }
4503 }
4504 })
4505 .collect();
4506 let inputs_type = if input_exprs.len() == 1 {
4507 let input = input_exprs.first().unwrap();
4508 quote! { #input }
4509 } else {
4510 quote! { (#(&#input_exprs),*) }
4511 };
4512
4513 let monitoring_action = quote! {
4514 debug!("Task {}: Error during process: {}", #mission_mod::TASKS_IDS[#tid], &error);
4515 let decision = monitor.process_error(#tid, CuTaskState::Process, &error);
4516 match decision {
4517 Decision::Abort => {
4518 debug!("Process: ABORT decision from monitoring. Task '{}' errored out \
4519 during process. Skipping the processing of CL {}.", #mission_mod::TASKS_IDS[#tid], clid);
4520 ctx.clear_current_task();
4521 let monitor_result = monitor.process_copperlist(&ctx, &#mission_mod::collect_metadata(&culist));
4522 cl_manager.end_of_processing(clid)?;
4523 monitor_result?;
4524 return Ok(());
4525 }
4526 Decision::Ignore => {
4527 debug!("Process: IGNORE decision from monitoring. Task '{}' errored out \
4528 during process. The runtime will continue with a forced empty message.", #mission_mod::TASKS_IDS[#tid]);
4529 let cumsg_output = &mut msgs.#output_culist_index;
4530 #output_clear_payload
4531 }
4532 Decision::Shutdown => {
4533 debug!("Process: SHUTDOWN decision from monitoring. Task '{}' errored out \
4534 during process. The runtime cannot continue.", #mission_mod::TASKS_IDS[#tid]);
4535 return Err(CuError::new_with_cause("Task errored out during process.", error));
4536 }
4537 }
4538 };
4539
4540 let call_sim_callback = if sim_mode {
4541 quote! {
4542 let doit = {
4543 let cumsg_input = &#inputs_type;
4544 let cumsg_output = &mut msgs.#output_culist_index;
4545 let state = CuTaskCallbackState::Process(cumsg_input, cumsg_output);
4546 let ovr = sim_callback(SimStep::#enum_name(state));
4547
4548 if let SimOverride::Errored(reason) = ovr {
4549 let error: CuError = reason.into();
4550 #monitoring_action
4551 false
4552 }
4553 else {
4554 ovr == SimOverride::ExecuteByRuntime
4555 }
4556 };
4557 }
4558 } else {
4559 quote! { let doit = true; }
4560 };
4561
4562 let logging_tokens = if !task_specs.logging_enabled[tid] {
4563 quote! {
4564 let mut cumsg_output = &mut culist.msgs.0.#output_culist_index;
4565 #output_clear_payload
4566 }
4567 } else {
4568 quote!()
4569 };
4570
4571 (
4572 quote! {
4573 {
4574 #comment_tokens
4575 kf_manager.freeze_task(clid, &#task_instance)?;
4576 #call_sim_callback
4577 let cumsg_input = &#inputs_type;
4578 let cumsg_output = &mut msgs.#output_culist_index;
4579 let maybe_error = if doit {
4580 execution_probe.record(cu29::monitoring::ExecutionMarker {
4581 component_id: #tid,
4582 step: CuTaskState::Process,
4583 culistid: Some(clid),
4584 });
4585 #output_start_time
4586 let result = {
4587 #rt_guard
4588 ctx.set_current_task(#tid);
4589 #task_instance.process(&ctx, cumsg_input, cumsg_output)
4590 };
4591 #output_end_time
4592 result
4593 } else {
4594 Ok(())
4595 };
4596 if let Err(error) = maybe_error {
4597 #monitoring_action
4598 }
4599 }
4600 },
4601 logging_tokens,
4602 )
4603 }
4604 }
4605}
4606
4607fn generate_bridge_rx_execution_tokens(
4608 step: &CuExecutionStep,
4609 bridge_spec: &BridgeSpec,
4610 channel_index: usize,
4611 mission_mod: &Ident,
4612 sim_mode: bool,
4613) -> (proc_macro2::TokenStream, proc_macro2::TokenStream) {
4614 let rt_guard = rtsan_guard_tokens();
4615 let bridge_tuple_index = int2sliceindex(bridge_spec.tuple_index as u32);
4616 let channel = &bridge_spec.rx_channels[channel_index];
4617 let output_pack = step
4618 .output_msg_pack
4619 .as_ref()
4620 .expect("Bridge Rx channel missing output pack");
4621 let port_index = output_pack
4622 .msg_types
4623 .iter()
4624 .position(|msg| msg == &channel.msg_type_name)
4625 .unwrap_or_else(|| {
4626 panic!(
4627 "Bridge Rx channel '{}' missing output port for '{}'",
4628 channel.id, channel.msg_type_name
4629 )
4630 });
4631 let culist_index_ts = int2sliceindex(output_pack.culist_index);
4632 let output_ref = if output_pack.msg_types.len() == 1 {
4633 quote! { &mut msgs.#culist_index_ts }
4634 } else {
4635 let port_index = syn::Index::from(port_index);
4636 quote! { &mut msgs.#culist_index_ts.#port_index }
4637 };
4638 let monitor_index = syn::Index::from(
4639 channel
4640 .monitor_index
4641 .expect("Bridge Rx channel missing monitor index"),
4642 );
4643 let bridge_type = &bridge_spec.type_path;
4644 let const_ident = &channel.const_ident;
4645 let enum_ident = Ident::new(
4646 &config_id_to_enum(&format!("{}_rx_{}", bridge_spec.id, channel.id)),
4647 Span::call_site(),
4648 );
4649
4650 let call_sim_callback = if sim_mode {
4651 quote! {
4652 let doit = {
4653 let state = SimStep::#enum_ident {
4654 channel: &<#bridge_type as cu29::cubridge::CuBridge>::Rx::#const_ident,
4655 msg: cumsg_output,
4656 };
4657 let ovr = sim_callback(state);
4658 if let SimOverride::Errored(reason) = ovr {
4659 let error: CuError = reason.into();
4660 let decision = monitor.process_error(#monitor_index, CuTaskState::Process, &error);
4661 match decision {
4662 Decision::Abort => {
4663 debug!("Process: ABORT decision from monitoring. Task '{}' errored out during process. Skipping the processing of CL {}.", #mission_mod::TASKS_IDS[#monitor_index], clid);
4664 ctx.clear_current_task();
4665 let monitor_result = monitor.process_copperlist(&ctx, &#mission_mod::collect_metadata(&culist));
4666 cl_manager.end_of_processing(clid)?;
4667 monitor_result?;
4668 return Ok(());
4669 }
4670 Decision::Ignore => {
4671 debug!("Process: IGNORE decision from monitoring. Task '{}' errored out during process. The runtime will continue with a forced empty message.", #mission_mod::TASKS_IDS[#monitor_index]);
4672 cumsg_output.clear_payload();
4673 false
4674 }
4675 Decision::Shutdown => {
4676 debug!("Process: SHUTDOWN decision from monitoring. Task '{}' errored out during process. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
4677 return Err(CuError::new_with_cause("Task errored out during process.", error));
4678 }
4679 }
4680 } else {
4681 ovr == SimOverride::ExecuteByRuntime
4682 }
4683 };
4684 }
4685 } else {
4686 quote! { let doit = true; }
4687 };
4688 (
4689 quote! {
4690 {
4691 let bridge = &mut __cu_bridges.#bridge_tuple_index;
4692 let cumsg_output = #output_ref;
4693 #call_sim_callback
4694 if doit {
4695 execution_probe.record(cu29::monitoring::ExecutionMarker {
4696 component_id: #monitor_index,
4697 step: CuTaskState::Process,
4698 culistid: Some(clid),
4699 });
4700 cumsg_output.metadata.process_time.start = clock.now().into();
4701 let maybe_error = {
4702 #rt_guard
4703 ctx.clear_current_task();
4704 bridge.receive(
4705 &ctx,
4706 &<#bridge_type as cu29::cubridge::CuBridge>::Rx::#const_ident,
4707 cumsg_output,
4708 )
4709 };
4710 cumsg_output.metadata.process_time.end = clock.now().into();
4711 if let Err(error) = maybe_error {
4712 let decision = monitor.process_error(#monitor_index, CuTaskState::Process, &error);
4713 match decision {
4714 Decision::Abort => {
4715 debug!("Process: ABORT decision from monitoring. Task '{}' errored out during process. Skipping the processing of CL {}.", #mission_mod::TASKS_IDS[#monitor_index], clid);
4716 ctx.clear_current_task();
4717 let monitor_result = monitor.process_copperlist(&ctx, &#mission_mod::collect_metadata(&culist));
4718 cl_manager.end_of_processing(clid)?;
4719 monitor_result?;
4720 return Ok(());
4721 }
4722 Decision::Ignore => {
4723 debug!("Process: IGNORE decision from monitoring. Task '{}' errored out during process. The runtime will continue with a forced empty message.", #mission_mod::TASKS_IDS[#monitor_index]);
4724 cumsg_output.clear_payload();
4725 }
4726 Decision::Shutdown => {
4727 debug!("Process: SHUTDOWN decision from monitoring. Task '{}' errored out during process. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
4728 return Err(CuError::new_with_cause("Task errored out during process.", error));
4729 }
4730 }
4731 }
4732 }
4733 }
4734 },
4735 quote! {},
4736 )
4737}
4738
4739fn generate_bridge_tx_execution_tokens(
4740 step: &CuExecutionStep,
4741 bridge_spec: &BridgeSpec,
4742 channel_index: usize,
4743 output_pack_sizes: &[usize],
4744 mission_mod: &Ident,
4745 sim_mode: bool,
4746) -> (proc_macro2::TokenStream, proc_macro2::TokenStream) {
4747 let rt_guard = rtsan_guard_tokens();
4748 let channel = &bridge_spec.tx_channels[channel_index];
4749 let monitor_index = syn::Index::from(
4750 channel
4751 .monitor_index
4752 .expect("Bridge Tx channel missing monitor index"),
4753 );
4754 let input = step
4755 .input_msg_indices_types
4756 .first()
4757 .expect("Bridge Tx channel should have exactly one input");
4758 let input_index = int2sliceindex(input.culist_index);
4759 let output_size = output_pack_sizes
4760 .get(input.culist_index as usize)
4761 .copied()
4762 .unwrap_or_else(|| {
4763 panic!(
4764 "Missing output pack size for culist index {}",
4765 input.culist_index
4766 )
4767 });
4768 let input_ref = if output_size > 1 {
4769 let port_index = syn::Index::from(input.src_port);
4770 quote! { &mut msgs.#input_index.#port_index }
4771 } else {
4772 quote! { &mut msgs.#input_index }
4773 };
4774 let output_pack = step
4775 .output_msg_pack
4776 .as_ref()
4777 .expect("Bridge Tx channel missing output pack");
4778 if output_pack.msg_types.len() != 1 {
4779 panic!(
4780 "Bridge Tx channel '{}' expected a single output message slot, got {}",
4781 channel.id,
4782 output_pack.msg_types.len()
4783 );
4784 }
4785 let output_index = int2sliceindex(output_pack.culist_index);
4786 let output_ref = quote! { &mut msgs.#output_index };
4787 let bridge_tuple_index = int2sliceindex(bridge_spec.tuple_index as u32);
4788 let bridge_type = &bridge_spec.type_path;
4789 let const_ident = &channel.const_ident;
4790 let enum_ident = Ident::new(
4791 &config_id_to_enum(&format!("{}_tx_{}", bridge_spec.id, channel.id)),
4792 Span::call_site(),
4793 );
4794
4795 let call_sim_callback = if sim_mode {
4796 quote! {
4797 let doit = {
4798 let state = SimStep::#enum_ident {
4799 channel: &<#bridge_type as cu29::cubridge::CuBridge>::Tx::#const_ident,
4800 msg: &*cumsg_input,
4801 };
4802 let ovr = sim_callback(state);
4803 if let SimOverride::Errored(reason) = ovr {
4804 let error: CuError = reason.into();
4805 let decision = monitor.process_error(#monitor_index, CuTaskState::Process, &error);
4806 match decision {
4807 Decision::Abort => {
4808 debug!("Process: ABORT decision from monitoring. Task '{}' errored out during process. Skipping the processing of CL {}.", #mission_mod::TASKS_IDS[#monitor_index], clid);
4809 ctx.clear_current_task();
4810 let monitor_result = monitor.process_copperlist(&ctx, &#mission_mod::collect_metadata(&culist));
4811 cl_manager.end_of_processing(clid)?;
4812 monitor_result?;
4813 return Ok(());
4814 }
4815 Decision::Ignore => {
4816 debug!("Process: IGNORE decision from monitoring. Task '{}' errored out during process. The runtime will continue with a forced empty message.", #mission_mod::TASKS_IDS[#monitor_index]);
4817 false
4818 }
4819 Decision::Shutdown => {
4820 debug!("Process: SHUTDOWN decision from monitoring. Task '{}' errored out during process. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
4821 return Err(CuError::new_with_cause("Task errored out during process.", error));
4822 }
4823 }
4824 } else {
4825 ovr == SimOverride::ExecuteByRuntime
4826 }
4827 };
4828 }
4829 } else {
4830 quote! { let doit = true; }
4831 };
4832 (
4833 quote! {
4834 {
4835 let bridge = &mut __cu_bridges.#bridge_tuple_index;
4836 let cumsg_input = #input_ref;
4837 let cumsg_output = #output_ref;
4838 #call_sim_callback
4839 if doit {
4840 execution_probe.record(cu29::monitoring::ExecutionMarker {
4841 component_id: #monitor_index,
4842 step: CuTaskState::Process,
4843 culistid: Some(clid),
4844 });
4845 cumsg_output.metadata.process_time.start = clock.now().into();
4846 let maybe_error = {
4847 #rt_guard
4848 ctx.clear_current_task();
4849 bridge.send(
4850 &ctx,
4851 &<#bridge_type as cu29::cubridge::CuBridge>::Tx::#const_ident,
4852 &*cumsg_input,
4853 )
4854 };
4855 if let Err(error) = maybe_error {
4856 let decision = monitor.process_error(#monitor_index, CuTaskState::Process, &error);
4857 match decision {
4858 Decision::Abort => {
4859 debug!("Process: ABORT decision from monitoring. Task '{}' errored out during process. Skipping the processing of CL {}.", #mission_mod::TASKS_IDS[#monitor_index], clid);
4860 ctx.clear_current_task();
4861 let monitor_result = monitor.process_copperlist(&ctx, &#mission_mod::collect_metadata(&culist));
4862 cl_manager.end_of_processing(clid)?;
4863 monitor_result?;
4864 return Ok(());
4865 }
4866 Decision::Ignore => {
4867 debug!("Process: IGNORE decision from monitoring. Task '{}' errored out during process. The runtime will continue with a forced empty message.", #mission_mod::TASKS_IDS[#monitor_index]);
4868 }
4869 Decision::Shutdown => {
4870 debug!("Process: SHUTDOWN decision from monitoring. Task '{}' errored out during process. The runtime cannot continue.", #mission_mod::TASKS_IDS[#monitor_index]);
4871 return Err(CuError::new_with_cause("Task errored out during process.", error));
4872 }
4873 }
4874 }
4875 cumsg_output.metadata.process_time.end = clock.now().into();
4876 }
4877 }
4878 },
4879 quote! {},
4880 )
4881}
4882
4883#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
4884enum BridgeChannelDirection {
4885 Rx,
4886 Tx,
4887}
4888
4889#[derive(Clone, Debug, PartialEq, Eq, Hash)]
4890struct BridgeChannelKey {
4891 bridge_id: String,
4892 channel_id: String,
4893 direction: BridgeChannelDirection,
4894}
4895
4896#[derive(Clone)]
4897struct BridgeChannelSpec {
4898 id: String,
4899 const_ident: Ident,
4900 #[allow(dead_code)]
4901 msg_type: Type,
4902 msg_type_name: String,
4903 config_index: usize,
4904 plan_node_id: Option<NodeId>,
4905 culist_index: Option<usize>,
4906 monitor_index: Option<usize>,
4907}
4908
4909#[derive(Clone)]
4910struct BridgeSpec {
4911 id: String,
4912 type_path: Type,
4913 config_index: usize,
4914 tuple_index: usize,
4915 monitor_index: Option<usize>,
4916 rx_channels: Vec<BridgeChannelSpec>,
4917 tx_channels: Vec<BridgeChannelSpec>,
4918}
4919
4920#[derive(Clone)]
4921struct ExecutionEntity {
4922 kind: ExecutionEntityKind,
4923}
4924
4925#[derive(Clone)]
4926enum ExecutionEntityKind {
4927 Task {
4928 task_index: usize,
4929 },
4930 BridgeRx {
4931 bridge_index: usize,
4932 channel_index: usize,
4933 },
4934 BridgeTx {
4935 bridge_index: usize,
4936 channel_index: usize,
4937 },
4938}
4939
4940#[cfg(test)]
4941mod tests {
4942 #[test]
4944 fn test_compile_fail() {
4945 use rustc_version::{Channel, version_meta};
4946 use std::{fs, path::Path};
4947
4948 let dir = Path::new("tests/compile_fail");
4949 for entry in fs::read_dir(dir).unwrap() {
4950 let entry = entry.unwrap();
4951 if !entry.file_type().unwrap().is_dir() {
4952 continue;
4953 }
4954 for file in fs::read_dir(entry.path()).unwrap() {
4955 let file = file.unwrap();
4956 let p = file.path();
4957 if p.extension().and_then(|x| x.to_str()) != Some("rs") {
4958 continue;
4959 }
4960
4961 let base = p.with_extension("stderr"); let src = match version_meta().unwrap().channel {
4963 Channel::Beta => Path::new(&format!("{}.beta", base.display())).to_path_buf(),
4964 _ => Path::new(&format!("{}.stable", base.display())).to_path_buf(),
4965 };
4966
4967 if src.exists() {
4968 fs::copy(src, &base).unwrap();
4969 }
4970 }
4971 }
4972
4973 let t = trybuild::TestCases::new();
4974 t.compile_fail("tests/compile_fail/*/*.rs");
4975 t.pass("tests/compile_pass/*/*.rs");
4976 }
4977
4978 #[test]
4979 fn bridge_resources_are_collected() {
4980 use super::*;
4981 use cu29::config::{CuGraph, Flavor, Node};
4982 use std::collections::HashMap;
4983 use syn::parse_str;
4984
4985 let mut graph = CuGraph::default();
4986 let mut node = Node::new_with_flavor("radio", "bridge::Dummy", Flavor::Bridge);
4987 let mut res = HashMap::new();
4988 res.insert("serial".to_string(), "fc.serial0".to_string());
4989 node.set_resources(Some(res));
4990 graph.add_node(node).expect("bridge node");
4991
4992 let task_specs = CuTaskSpecSet::from_graph(&graph);
4993 let bridge_spec = BridgeSpec {
4994 id: "radio".to_string(),
4995 type_path: parse_str("bridge::Dummy").unwrap(),
4996 config_index: 0,
4997 tuple_index: 0,
4998 monitor_index: None,
4999 rx_channels: Vec::new(),
5000 tx_channels: Vec::new(),
5001 };
5002
5003 let mut config = cu29::config::CuConfig::default();
5004 config.resources.push(ResourceBundleConfig {
5005 id: "fc".to_string(),
5006 provider: "board::Bundle".to_string(),
5007 config: None,
5008 missions: None,
5009 });
5010 let bundle_specs = build_bundle_specs(&config, "default").expect("bundle specs");
5011 let specs = collect_resource_specs(&graph, &task_specs, &[bridge_spec], &bundle_specs)
5012 .expect("collect specs");
5013 assert_eq!(specs.len(), 1);
5014 assert!(matches!(specs[0].owner, ResourceOwner::Bridge(0)));
5015 assert_eq!(specs[0].binding_name, "serial");
5016 assert_eq!(specs[0].bundle_index, 0);
5017 assert_eq!(specs[0].resource_name, "serial0");
5018 }
5019}