cu29_runtime/
cuasynctask.rs1use crate::config::ComponentConfig;
2use crate::cutask::{CuMsg, CuMsgPayload, CuTask, Freezable};
3use cu29_clock::RobotClock;
4use cu29_traits::CuResult;
5use rayon::ThreadPool;
6use std::sync::{Arc, Mutex, MutexGuard};
7
8pub struct CuAsyncTask<T, O>
9where
10 T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
11 O: CuMsgPayload + Send + 'static,
12{
13 task: Arc<Mutex<T>>,
14 output: Arc<Mutex<CuMsg<O>>>,
15 processing: Arc<Mutex<bool>>, tp: Arc<ThreadPool>,
17}
18
19impl<T, O> CuAsyncTask<T, O>
20where
21 T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
22 O: CuMsgPayload + Send + 'static,
23{
24 #[allow(unused)]
25 pub fn new(config: Option<&ComponentConfig>, tp: Arc<ThreadPool>) -> CuResult<Self> {
26 let task = Arc::new(Mutex::new(T::new(config)?));
27 let output = Arc::new(Mutex::new(CuMsg::default()));
28 Ok(Self {
29 task,
30 output,
31 processing: Arc::new(Mutex::new(false)),
32 tp,
33 })
34 }
35}
36
37impl<T, O> Freezable for CuAsyncTask<T, O>
38where
39 T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
40 O: CuMsgPayload + Send + 'static,
41{
42}
43
44impl<T, I, O> CuTask for CuAsyncTask<T, O>
45where
46 T: for<'i, 'o> CuTask<Input<'i> = CuMsg<I>, Output<'o> = CuMsg<O>> + Send + 'static,
47 I: CuMsgPayload + Send + Sync + 'static,
48 O: CuMsgPayload + Send + 'static,
49{
50 type Input<'m> = T::Input<'m>;
51 type Output<'m> = T::Output<'m>;
52
53 fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
54 where
55 Self: Sized,
56 {
57 Err("AsyncTask cannot be instantiated directly, use Async_task::new()".into())
58 }
59
60 fn process<'i, 'o>(
61 &mut self,
62 clock: &RobotClock,
63 input: &Self::Input<'i>,
64 real_output: &mut Self::Output<'o>,
65 ) -> CuResult<()> {
66 let mut processing = self.processing.lock().unwrap();
67 if *processing {
68 return Ok(());
70 }
71
72 *processing = true; let buffered_output = self.output.lock().unwrap(); *real_output = buffered_output.clone();
75
76 self.tp.spawn_fifo({
78 let clock = clock.clone();
79 let input = (*input).clone();
80 let output = self.output.clone();
81 let task = self.task.clone();
82 let processing = self.processing.clone();
83 move || {
84 let input_ref: &CuMsg<I> = &input;
85 let mut output: MutexGuard<CuMsg<O>> = output.lock().unwrap();
86
87 let input_ref: &CuMsg<I> = unsafe { std::mem::transmute(input_ref) };
89 let output_ref: &mut MutexGuard<CuMsg<O>> =
90 unsafe { std::mem::transmute(&mut output) };
91 task.lock()
92 .unwrap()
93 .process(&clock, input_ref, output_ref)
94 .unwrap();
95 *processing.lock().unwrap() = false; }
97 });
98 Ok(())
99 }
100}
101
102#[cfg(test)]
103mod tests {
104 use super::*;
105 use crate::config::ComponentConfig;
106 use crate::cutask::CuMsg;
107 use crate::cutask::Freezable;
108 use crate::input_msg;
109 use crate::output_msg;
110 use cu29_clock::RobotClock;
111 use cu29_traits::CuResult;
112 use std::borrow::BorrowMut;
113 struct TestTask {}
114
115 impl Freezable for TestTask {}
116
117 impl CuTask for TestTask {
118 type Input<'m> = input_msg!(u32);
119 type Output<'m> = output_msg!(u32);
120
121 fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
122 where
123 Self: Sized,
124 {
125 Ok(Self {})
126 }
127
128 fn process(
129 &mut self,
130 _clock: &RobotClock,
131 input: &Self::Input<'_>,
132 output: &mut Self::Output<'_>,
133 ) -> CuResult<()> {
134 output.borrow_mut().set_payload(*input.payload().unwrap());
135 Ok(())
136 }
137 }
138
139 #[test]
140 fn test_lifecycle() {
141 let tp = Arc::new(
142 rayon::ThreadPoolBuilder::new()
143 .num_threads(1)
144 .build()
145 .unwrap(),
146 );
147
148 let config = ComponentConfig::default();
149 let clock = RobotClock::default();
150 let mut async_task: CuAsyncTask<TestTask, u32> =
151 CuAsyncTask::new(Some(&config), tp).unwrap();
152 let input = CuMsg::new(Some(42u32));
153 let mut output = CuMsg::new(None);
154
155 loop {
156 {
157 let output_ref: &mut CuMsg<u32> = &mut output;
158 async_task.process(&clock, &input, output_ref).unwrap();
159 }
160
161 if let Some(val) = output.payload() {
162 assert_eq!(*val, 42u32);
163 break;
164 }
165 }
166 }
167}