1use crate::config::ComponentConfig;
2use crate::context::CuContext;
3use crate::cutask::{CuMsg, CuMsgPayload, CuTask, Freezable};
4use crate::reflect::{Reflect, TypePath};
5use cu29_clock::CuTime;
6use cu29_traits::{CuError, CuResult};
7use rayon::ThreadPool;
8use std::sync::{Arc, Mutex};
9
10struct AsyncState {
11 processing: bool,
12 ready_at: Option<CuTime>,
13 last_error: Option<CuError>,
14}
15
16fn record_async_error(state: &Mutex<AsyncState>, error: CuError) {
17 let mut guard = match state.lock() {
18 Ok(guard) => guard,
19 Err(poison) => poison.into_inner(),
20 };
21 guard.processing = false;
22 guard.ready_at = None;
23 guard.last_error = Some(error);
24}
25
26#[derive(Reflect)]
27#[reflect(no_field_bounds, from_reflect = false, type_path = false)]
28pub struct CuAsyncTask<T, O>
29where
30 T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
31 O: CuMsgPayload + Send + 'static,
32{
33 #[reflect(ignore)]
34 task: Arc<Mutex<T>>,
35 #[reflect(ignore)]
36 output: Arc<Mutex<CuMsg<O>>>,
37 #[reflect(ignore)]
38 state: Arc<Mutex<AsyncState>>,
39 #[reflect(ignore)]
40 tp: Arc<ThreadPool>,
41}
42
43impl<T, O> TypePath for CuAsyncTask<T, O>
44where
45 T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
46 O: CuMsgPayload + Send + 'static,
47{
48 fn type_path() -> &'static str {
49 "cu29_runtime::cuasynctask::CuAsyncTask"
50 }
51
52 fn short_type_path() -> &'static str {
53 "CuAsyncTask"
54 }
55
56 fn type_ident() -> Option<&'static str> {
57 Some("CuAsyncTask")
58 }
59
60 fn crate_name() -> Option<&'static str> {
61 Some("cu29_runtime")
62 }
63
64 fn module_path() -> Option<&'static str> {
65 Some("cuasynctask")
66 }
67}
68
69pub struct CuAsyncTaskResources<'r, T: CuTask> {
71 pub inner: T::Resources<'r>,
72 pub threadpool: Arc<ThreadPool>,
73}
74
75impl<T, O> CuAsyncTask<T, O>
76where
77 T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
78 O: CuMsgPayload + Send + 'static,
79{
80 #[allow(unused)]
81 pub fn new(
82 config: Option<&ComponentConfig>,
83 resources: T::Resources<'_>,
84 tp: Arc<ThreadPool>,
85 ) -> CuResult<Self> {
86 let task = Arc::new(Mutex::new(T::new(config, resources)?));
87 let output = Arc::new(Mutex::new(CuMsg::default()));
88 Ok(Self {
89 task,
90 output,
91 state: Arc::new(Mutex::new(AsyncState {
92 processing: false,
93 ready_at: None,
94 last_error: None,
95 })),
96 tp,
97 })
98 }
99}
100
101impl<T, O> Freezable for CuAsyncTask<T, O>
102where
103 T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
104 O: CuMsgPayload + Send + 'static,
105{
106}
107
108impl<T, I, O> CuTask for CuAsyncTask<T, O>
109where
110 T: for<'i, 'o> CuTask<Input<'i> = CuMsg<I>, Output<'o> = CuMsg<O>> + Send + 'static,
111 I: CuMsgPayload + Send + Sync + 'static,
112 O: CuMsgPayload + Send + 'static,
113{
114 type Resources<'r> = CuAsyncTaskResources<'r, T>;
115 type Input<'m> = T::Input<'m>;
116 type Output<'m> = T::Output<'m>;
117
118 fn new(config: Option<&ComponentConfig>, resources: Self::Resources<'_>) -> CuResult<Self>
119 where
120 Self: Sized,
121 {
122 let task = Arc::new(Mutex::new(T::new(config, resources.inner)?));
123 let output = Arc::new(Mutex::new(CuMsg::default()));
124 Ok(Self {
125 task,
126 output,
127 state: Arc::new(Mutex::new(AsyncState {
128 processing: false,
129 ready_at: None,
130 last_error: None,
131 })),
132 tp: resources.threadpool,
133 })
134 }
135
136 fn process<'i, 'o>(
137 &mut self,
138 ctx: &CuContext,
139 input: &Self::Input<'i>,
140 real_output: &mut Self::Output<'o>,
141 ) -> CuResult<()> {
142 {
143 let mut state = self.state.lock().map_err(|_| {
144 CuError::from("Async task state mutex poisoned while scheduling background work")
145 })?;
146 if let Some(error) = state.last_error.take() {
147 return Err(error);
148 }
149 if state.processing {
150 return Ok(());
152 }
153
154 if let Some(ready_at) = state.ready_at
155 && ctx.now() < ready_at
156 {
157 return Ok(());
159 }
160
161 state.processing = true;
163 state.ready_at = None;
164 }
165
166 let buffered_output = self.output.lock().map_err(|_| {
168 let error = CuError::from("Async task output mutex poisoned");
169 record_async_error(&self.state, error.clone());
170 error
171 })?;
172 *real_output = buffered_output.clone();
173
174 self.tp.spawn_fifo({
176 let ctx = ctx.clone();
177 let input = (*input).clone();
178 let output = self.output.clone();
179 let task = self.task.clone();
180 let state = self.state.clone();
181 move || {
182 let input_ref: &CuMsg<I> = &input;
183 let mut output_guard = match output.lock() {
184 Ok(guard) => guard,
185 Err(_) => {
186 record_async_error(
187 &state,
188 CuError::from("Async task output mutex poisoned"),
189 );
190 return;
191 }
192 };
193 let output_ref: &mut CuMsg<O> = &mut output_guard;
194
195 if output_ref.metadata.process_time.start.is_none() {
197 output_ref.metadata.process_time.start = ctx.now().into();
198 }
199 let task_result = match task.lock() {
200 Ok(mut task_guard) => task_guard.process(&ctx, input_ref, output_ref),
201 Err(poison) => Err(CuError::from(format!(
202 "Async task mutex poisoned: {poison}"
203 ))),
204 };
205
206 let mut guard = state.lock().unwrap_or_else(|poison| poison.into_inner());
207 guard.processing = false;
208
209 match task_result {
210 Ok(()) => {
211 let end_from_metadata: Option<CuTime> =
212 output_ref.metadata.process_time.end.into();
213 let end_time = end_from_metadata.unwrap_or_else(|| {
214 let now = ctx.now();
215 output_ref.metadata.process_time.end = now.into();
216 now
217 });
218 guard.ready_at = Some(end_time);
219 }
220 Err(error) => {
221 guard.ready_at = None;
222 guard.last_error = Some(error);
223 }
224 }
225 }
226 });
227 Ok(())
228 }
229}
230
231#[cfg(test)]
232mod tests {
233 use super::*;
234 use crate::config::ComponentConfig;
235 use crate::cutask::CuMsg;
236 use crate::cutask::Freezable;
237 use crate::input_msg;
238 use crate::output_msg;
239 use cu29_traits::CuResult;
240 use rayon::ThreadPoolBuilder;
241 use std::borrow::BorrowMut;
242 use std::sync::OnceLock;
243 use std::sync::mpsc;
244 use std::time::Duration;
245
246 static READY_RX: OnceLock<Arc<Mutex<mpsc::Receiver<CuTime>>>> = OnceLock::new();
247 static DONE_TX: OnceLock<mpsc::Sender<()>> = OnceLock::new();
248 #[derive(Reflect)]
249 struct TestTask {}
250
251 impl Freezable for TestTask {}
252
253 impl CuTask for TestTask {
254 type Resources<'r> = ();
255 type Input<'m> = input_msg!(u32);
256 type Output<'m> = output_msg!(u32);
257
258 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
259 where
260 Self: Sized,
261 {
262 Ok(Self {})
263 }
264
265 fn process(
266 &mut self,
267 _ctx: &CuContext,
268 input: &Self::Input<'_>,
269 output: &mut Self::Output<'_>,
270 ) -> CuResult<()> {
271 output.borrow_mut().set_payload(*input.payload().unwrap());
272 Ok(())
273 }
274 }
275
276 #[test]
277 fn test_lifecycle() {
278 let tp = Arc::new(
279 rayon::ThreadPoolBuilder::new()
280 .num_threads(1)
281 .build()
282 .unwrap(),
283 );
284
285 let config = ComponentConfig::default();
286 let context = CuContext::new_with_clock();
287 let mut async_task: CuAsyncTask<TestTask, u32> =
288 CuAsyncTask::new(Some(&config), (), tp).unwrap();
289 let input = CuMsg::new(Some(42u32));
290 let mut output = CuMsg::new(None);
291
292 loop {
293 {
294 let output_ref: &mut CuMsg<u32> = &mut output;
295 async_task.process(&context, &input, output_ref).unwrap();
296 }
297
298 if let Some(val) = output.payload() {
299 assert_eq!(*val, 42u32);
300 break;
301 }
302 }
303 }
304
305 #[derive(Reflect)]
306 struct ControlledTask;
307
308 impl Freezable for ControlledTask {}
309
310 impl CuTask for ControlledTask {
311 type Resources<'r> = ();
312 type Input<'m> = input_msg!(u32);
313 type Output<'m> = output_msg!(u32);
314
315 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
316 where
317 Self: Sized,
318 {
319 Ok(Self {})
320 }
321
322 fn process(
323 &mut self,
324 ctx: &CuContext,
325 _input: &Self::Input<'_>,
326 output: &mut Self::Output<'_>,
327 ) -> CuResult<()> {
328 let rx = READY_RX
329 .get()
330 .expect("ready channel not set")
331 .lock()
332 .unwrap();
333 let ready_time = rx
334 .recv_timeout(Duration::from_secs(1))
335 .expect("timed out waiting for ready signal");
336
337 output.set_payload(ready_time.as_nanos() as u32);
338 output.metadata.process_time.start = ctx.now().into();
339 output.metadata.process_time.end = ready_time.into();
340
341 if let Some(done_tx) = DONE_TX.get() {
342 let _ = done_tx.send(());
343 }
344 Ok(())
345 }
346 }
347
348 #[test]
349 fn background_respects_recorded_ready_time() {
350 let tp = Arc::new(ThreadPoolBuilder::new().num_threads(1).build().unwrap());
351 let (context, clock_mock) = CuContext::new_mock_clock();
352
353 let (ready_tx, ready_rx) = mpsc::channel::<CuTime>();
355 let (done_tx, done_rx) = mpsc::channel::<()>();
356 READY_RX
357 .set(Arc::new(Mutex::new(ready_rx)))
358 .expect("ready channel already set");
359 DONE_TX
360 .set(done_tx)
361 .expect("completion channel already set");
362
363 let mut async_task: CuAsyncTask<ControlledTask, u32> =
364 CuAsyncTask::new(Some(&ComponentConfig::default()), (), tp.clone()).unwrap();
365 let input = CuMsg::new(Some(1u32));
366 let mut output = CuMsg::new(None);
367
368 clock_mock.set_value(0);
370 async_task.process(&context, &input, &mut output).unwrap();
371 assert!(output.payload().is_none());
372
373 clock_mock.set_value(10);
375 async_task.process(&context, &input, &mut output).unwrap();
376 assert!(output.payload().is_none());
377
378 clock_mock.set_value(30);
380 ready_tx.send(CuTime::from(30u64)).unwrap();
381 done_rx
382 .recv_timeout(Duration::from_secs(1))
383 .expect("background task never finished");
384 let mut ready_at_recorded = None;
386 for _ in 0..100 {
387 let state = async_task.state.lock().unwrap();
388 if !state.processing {
389 ready_at_recorded = state.ready_at;
390 if ready_at_recorded.is_some() {
391 break;
392 }
393 }
394 drop(state);
395 std::thread::sleep(Duration::from_millis(1));
396 }
397 assert!(
398 ready_at_recorded.is_some(),
399 "background task finished without recording ready_at"
400 );
401
402 clock_mock.set_value(20);
404 async_task.process(&context, &input, &mut output).unwrap();
405 assert!(
406 output.payload().is_none(),
407 "Output surfaced before recorded ready time"
408 );
409
410 clock_mock.set_value(30);
412 async_task.process(&context, &input, &mut output).unwrap();
413 assert_eq!(output.payload(), Some(&30u32));
414
415 ready_tx.send(CuTime::from(40u64)).unwrap();
417 let _ = done_rx.recv_timeout(Duration::from_secs(1));
418 }
419}