1use crate::config::ComponentConfig;
2use crate::cutask::{CuMsg, CuMsgPayload, CuTask, Freezable};
3use crate::resource::{ResourceBindings, ResourceManager, ResourceMapping};
4use cu29_clock::{CuTime, RobotClock};
5use cu29_traits::{CuError, CuResult};
6use rayon::ThreadPool;
7use std::sync::{Arc, Mutex, MutexGuard};
8
9struct AsyncState {
10 processing: bool,
11 ready_at: Option<CuTime>,
12}
13
14pub struct CuAsyncTask<T, O>
15where
16 T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
17 O: CuMsgPayload + Send + 'static,
18{
19 task: Arc<Mutex<T>>,
20 output: Arc<Mutex<CuMsg<O>>>,
21 state: Arc<Mutex<AsyncState>>,
22 tp: Arc<ThreadPool>,
23}
24
25pub struct CuAsyncTaskResources<'r, T: CuTask> {
27 pub inner: T::Resources<'r>,
28 pub threadpool: Arc<ThreadPool>,
29}
30
31impl<'r, T> ResourceBindings<'r> for CuAsyncTaskResources<'r, T>
32where
33 T: CuTask,
34 T::Resources<'r>: ResourceBindings<'r>,
35{
36 fn from_bindings(
37 manager: &'r mut ResourceManager,
38 mapping: Option<&ResourceMapping>,
39 ) -> CuResult<Self> {
40 let mapping = mapping.ok_or_else(|| CuError::from("Missing resource bindings"))?;
41 let threadpool_key = mapping
42 .get("bg_threads")
43 .ok_or_else(|| CuError::from("Missing 'bg_threads' binding for background task"))?
44 .typed();
45 let threadpool = manager.borrow_shared_arc(threadpool_key)?;
46 Ok(Self {
47 inner: T::Resources::from_bindings(manager, Some(mapping))?,
48 threadpool,
49 })
50 }
51}
52
53impl<T, O> CuAsyncTask<T, O>
54where
55 T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
56 O: CuMsgPayload + Send + 'static,
57{
58 #[allow(unused)]
59 pub fn new(
60 config: Option<&ComponentConfig>,
61 resources: T::Resources<'_>,
62 tp: Arc<ThreadPool>,
63 ) -> CuResult<Self> {
64 let task = Arc::new(Mutex::new(T::new_with(config, resources)?));
65 let output = Arc::new(Mutex::new(CuMsg::default()));
66 Ok(Self {
67 task,
68 output,
69 state: Arc::new(Mutex::new(AsyncState {
70 processing: false,
71 ready_at: None,
72 })),
73 tp,
74 })
75 }
76}
77
78impl<T, O> Freezable for CuAsyncTask<T, O>
79where
80 T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
81 O: CuMsgPayload + Send + 'static,
82{
83}
84
85impl<T, I, O> CuTask for CuAsyncTask<T, O>
86where
87 T: for<'i, 'o> CuTask<Input<'i> = CuMsg<I>, Output<'o> = CuMsg<O>> + Send + 'static,
88 I: CuMsgPayload + Send + Sync + 'static,
89 O: CuMsgPayload + Send + 'static,
90{
91 type Resources<'r> = CuAsyncTaskResources<'r, T>;
92 type Input<'m> = T::Input<'m>;
93 type Output<'m> = T::Output<'m>;
94
95 fn new_with(config: Option<&ComponentConfig>, resources: Self::Resources<'_>) -> CuResult<Self>
96 where
97 Self: Sized,
98 {
99 let task = Arc::new(Mutex::new(T::new_with(config, resources.inner)?));
100 let output = Arc::new(Mutex::new(CuMsg::default()));
101 Ok(Self {
102 task,
103 output,
104 state: Arc::new(Mutex::new(AsyncState {
105 processing: false,
106 ready_at: None,
107 })),
108 tp: resources.threadpool,
109 })
110 }
111
112 fn process<'i, 'o>(
113 &mut self,
114 clock: &RobotClock,
115 input: &Self::Input<'i>,
116 real_output: &mut Self::Output<'o>,
117 ) -> CuResult<()> {
118 {
119 let mut state = self.state.lock().unwrap();
120 if state.processing {
121 return Ok(());
123 }
124
125 if let Some(ready_at) = state.ready_at
126 && clock.now() < ready_at
127 {
128 return Ok(());
130 }
131
132 state.processing = true;
134 state.ready_at = None;
135 }
136
137 let buffered_output = self.output.lock().unwrap();
139 *real_output = buffered_output.clone();
140
141 self.tp.spawn_fifo({
143 let clock = clock.clone();
144 let input = (*input).clone();
145 let output = self.output.clone();
146 let task = self.task.clone();
147 let state = self.state.clone();
148 move || {
149 let input_ref: &CuMsg<I> = &input;
150 let mut output: MutexGuard<CuMsg<O>> = output.lock().unwrap();
151
152 let input_ref: &CuMsg<I> = unsafe { std::mem::transmute(input_ref) };
154 let output_ref: &mut MutexGuard<CuMsg<O>> =
155 unsafe { std::mem::transmute(&mut output) };
156
157 if output_ref.metadata.process_time.start.is_none() {
159 output_ref.metadata.process_time.start = clock.now().into();
160 }
161 task.lock()
162 .unwrap()
163 .process(&clock, input_ref, output_ref)
164 .unwrap();
165 let end_from_metadata: Option<CuTime> = output_ref.metadata.process_time.end.into();
166 let end_time = end_from_metadata.unwrap_or_else(|| {
167 let now = clock.now();
168 output_ref.metadata.process_time.end = now.into();
169 now
170 });
171
172 let mut guard = state.lock().unwrap();
173 guard.processing = false; guard.ready_at = Some(end_time);
175 }
176 });
177 Ok(())
178 }
179}
180
181#[cfg(test)]
182mod tests {
183 use super::*;
184 use crate::config::ComponentConfig;
185 use crate::cutask::CuMsg;
186 use crate::cutask::Freezable;
187 use crate::input_msg;
188 use crate::output_msg;
189 use cu29_clock::RobotClock;
190 use cu29_traits::CuResult;
191 use rayon::ThreadPoolBuilder;
192 use std::borrow::BorrowMut;
193 use std::sync::OnceLock;
194 use std::sync::mpsc;
195 use std::time::Duration;
196
197 static READY_RX: OnceLock<Arc<Mutex<mpsc::Receiver<CuTime>>>> = OnceLock::new();
198 static DONE_TX: OnceLock<mpsc::Sender<()>> = OnceLock::new();
199 struct TestTask {}
200
201 impl Freezable for TestTask {}
202
203 impl CuTask for TestTask {
204 type Resources<'r> = ();
205 type Input<'m> = input_msg!(u32);
206 type Output<'m> = output_msg!(u32);
207
208 fn new_with(
209 _config: Option<&ComponentConfig>,
210 _resources: Self::Resources<'_>,
211 ) -> CuResult<Self>
212 where
213 Self: Sized,
214 {
215 Ok(Self {})
216 }
217
218 fn process(
219 &mut self,
220 _clock: &RobotClock,
221 input: &Self::Input<'_>,
222 output: &mut Self::Output<'_>,
223 ) -> CuResult<()> {
224 output.borrow_mut().set_payload(*input.payload().unwrap());
225 Ok(())
226 }
227 }
228
229 #[test]
230 fn test_lifecycle() {
231 let tp = Arc::new(
232 rayon::ThreadPoolBuilder::new()
233 .num_threads(1)
234 .build()
235 .unwrap(),
236 );
237
238 let config = ComponentConfig::default();
239 let clock = RobotClock::default();
240 let mut async_task: CuAsyncTask<TestTask, u32> =
241 CuAsyncTask::new(Some(&config), (), tp).unwrap();
242 let input = CuMsg::new(Some(42u32));
243 let mut output = CuMsg::new(None);
244
245 loop {
246 {
247 let output_ref: &mut CuMsg<u32> = &mut output;
248 async_task.process(&clock, &input, output_ref).unwrap();
249 }
250
251 if let Some(val) = output.payload() {
252 assert_eq!(*val, 42u32);
253 break;
254 }
255 }
256 }
257
258 struct ControlledTask;
259
260 impl Freezable for ControlledTask {}
261
262 impl CuTask for ControlledTask {
263 type Resources<'r> = ();
264 type Input<'m> = input_msg!(u32);
265 type Output<'m> = output_msg!(u32);
266
267 fn new_with(
268 _config: Option<&ComponentConfig>,
269 _resources: Self::Resources<'_>,
270 ) -> CuResult<Self>
271 where
272 Self: Sized,
273 {
274 Ok(Self {})
275 }
276
277 fn process(
278 &mut self,
279 clock: &RobotClock,
280 _input: &Self::Input<'_>,
281 output: &mut Self::Output<'_>,
282 ) -> CuResult<()> {
283 let rx = READY_RX
284 .get()
285 .expect("ready channel not set")
286 .lock()
287 .unwrap();
288 let ready_time = rx
289 .recv_timeout(Duration::from_secs(1))
290 .expect("timed out waiting for ready signal");
291
292 output.set_payload(ready_time.as_nanos() as u32);
293 output.metadata.process_time.start = clock.now().into();
294 output.metadata.process_time.end = ready_time.into();
295
296 if let Some(done_tx) = DONE_TX.get() {
297 let _ = done_tx.send(());
298 }
299 Ok(())
300 }
301 }
302
303 #[test]
304 fn background_respects_recorded_ready_time() {
305 let tp = Arc::new(ThreadPoolBuilder::new().num_threads(1).build().unwrap());
306 let (clock, clock_mock) = RobotClock::mock();
307
308 let (ready_tx, ready_rx) = mpsc::channel::<CuTime>();
310 let (done_tx, done_rx) = mpsc::channel::<()>();
311 READY_RX
312 .set(Arc::new(Mutex::new(ready_rx)))
313 .expect("ready channel already set");
314 DONE_TX
315 .set(done_tx)
316 .expect("completion channel already set");
317
318 let mut async_task: CuAsyncTask<ControlledTask, u32> =
319 CuAsyncTask::new(Some(&ComponentConfig::default()), (), tp.clone()).unwrap();
320 let input = CuMsg::new(Some(1u32));
321 let mut output = CuMsg::new(None);
322
323 clock_mock.set_value(0);
325 async_task.process(&clock, &input, &mut output).unwrap();
326 assert!(output.payload().is_none());
327
328 clock_mock.set_value(10);
330 async_task.process(&clock, &input, &mut output).unwrap();
331 assert!(output.payload().is_none());
332
333 clock_mock.set_value(30);
335 ready_tx.send(CuTime::from(30u64)).unwrap();
336 done_rx
337 .recv_timeout(Duration::from_secs(1))
338 .expect("background task never finished");
339 let mut ready_at_recorded = None;
341 for _ in 0..100 {
342 let state = async_task.state.lock().unwrap();
343 if !state.processing {
344 ready_at_recorded = state.ready_at;
345 if ready_at_recorded.is_some() {
346 break;
347 }
348 }
349 drop(state);
350 std::thread::sleep(Duration::from_millis(1));
351 }
352 assert!(
353 ready_at_recorded.is_some(),
354 "background task finished without recording ready_at"
355 );
356
357 clock_mock.set_value(20);
359 async_task.process(&clock, &input, &mut output).unwrap();
360 assert!(
361 output.payload().is_none(),
362 "Output surfaced before recorded ready time"
363 );
364
365 clock_mock.set_value(30);
367 async_task.process(&clock, &input, &mut output).unwrap();
368 assert_eq!(output.payload(), Some(&30u32));
369
370 ready_tx.send(CuTime::from(40u64)).unwrap();
372 let _ = done_rx.recv_timeout(Duration::from_secs(1));
373 }
374}