Skip to main content

cu29_runtime/
cuasynctask.rs

1use crate::config::ComponentConfig;
2use crate::context::CuContext;
3use crate::cutask::{CuMsg, CuMsgPayload, CuTask, Freezable};
4use crate::reflect::{Reflect, TypePath};
5use cu29_clock::CuTime;
6use cu29_traits::{CuError, CuResult};
7use rayon::ThreadPool;
8use std::sync::{Arc, Mutex};
9
10struct AsyncState {
11    processing: bool,
12    ready_at: Option<CuTime>,
13    last_error: Option<CuError>,
14}
15
16fn record_async_error(state: &Mutex<AsyncState>, error: CuError) {
17    let mut guard = match state.lock() {
18        Ok(guard) => guard,
19        Err(poison) => poison.into_inner(),
20    };
21    guard.processing = false;
22    guard.ready_at = None;
23    guard.last_error = Some(error);
24}
25
26#[derive(Reflect)]
27#[reflect(no_field_bounds, from_reflect = false, type_path = false)]
28pub struct CuAsyncTask<T, O>
29where
30    T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
31    O: CuMsgPayload + Send + 'static,
32{
33    #[reflect(ignore)]
34    task: Arc<Mutex<T>>,
35    #[reflect(ignore)]
36    output: Arc<Mutex<CuMsg<O>>>,
37    #[reflect(ignore)]
38    state: Arc<Mutex<AsyncState>>,
39    #[reflect(ignore)]
40    tp: Arc<ThreadPool>,
41}
42
43impl<T, O> TypePath for CuAsyncTask<T, O>
44where
45    T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
46    O: CuMsgPayload + Send + 'static,
47{
48    fn type_path() -> &'static str {
49        "cu29_runtime::cuasynctask::CuAsyncTask"
50    }
51
52    fn short_type_path() -> &'static str {
53        "CuAsyncTask"
54    }
55
56    fn type_ident() -> Option<&'static str> {
57        Some("CuAsyncTask")
58    }
59
60    fn crate_name() -> Option<&'static str> {
61        Some("cu29_runtime")
62    }
63
64    fn module_path() -> Option<&'static str> {
65        Some("cuasynctask")
66    }
67}
68
69/// Resource bundle required by a backgrounded task.
70pub struct CuAsyncTaskResources<'r, T: CuTask> {
71    pub inner: T::Resources<'r>,
72    pub threadpool: Arc<ThreadPool>,
73}
74
75impl<T, O> CuAsyncTask<T, O>
76where
77    T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
78    O: CuMsgPayload + Send + 'static,
79{
80    #[allow(unused)]
81    pub fn new(
82        config: Option<&ComponentConfig>,
83        resources: T::Resources<'_>,
84        tp: Arc<ThreadPool>,
85    ) -> CuResult<Self> {
86        let task = Arc::new(Mutex::new(T::new(config, resources)?));
87        let output = Arc::new(Mutex::new(CuMsg::default()));
88        Ok(Self {
89            task,
90            output,
91            state: Arc::new(Mutex::new(AsyncState {
92                processing: false,
93                ready_at: None,
94                last_error: None,
95            })),
96            tp,
97        })
98    }
99}
100
101impl<T, O> Freezable for CuAsyncTask<T, O>
102where
103    T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
104    O: CuMsgPayload + Send + 'static,
105{
106}
107
108impl<T, I, O> CuTask for CuAsyncTask<T, O>
109where
110    T: for<'i, 'o> CuTask<Input<'i> = CuMsg<I>, Output<'o> = CuMsg<O>> + Send + 'static,
111    I: CuMsgPayload + Send + Sync + 'static,
112    O: CuMsgPayload + Send + 'static,
113{
114    type Resources<'r> = CuAsyncTaskResources<'r, T>;
115    type Input<'m> = T::Input<'m>;
116    type Output<'m> = T::Output<'m>;
117
118    fn new(config: Option<&ComponentConfig>, resources: Self::Resources<'_>) -> CuResult<Self>
119    where
120        Self: Sized,
121    {
122        let task = Arc::new(Mutex::new(T::new(config, resources.inner)?));
123        let output = Arc::new(Mutex::new(CuMsg::default()));
124        Ok(Self {
125            task,
126            output,
127            state: Arc::new(Mutex::new(AsyncState {
128                processing: false,
129                ready_at: None,
130                last_error: None,
131            })),
132            tp: resources.threadpool,
133        })
134    }
135
136    fn process<'i, 'o>(
137        &mut self,
138        ctx: &CuContext,
139        input: &Self::Input<'i>,
140        real_output: &mut Self::Output<'o>,
141    ) -> CuResult<()> {
142        {
143            let mut state = self.state.lock().map_err(|_| {
144                CuError::from("Async task state mutex poisoned while scheduling background work")
145            })?;
146            if let Some(error) = state.last_error.take() {
147                return Err(error);
148            }
149            if state.processing {
150                // background task still running
151                return Ok(());
152            }
153
154            if let Some(ready_at) = state.ready_at
155                && ctx.now() < ready_at
156            {
157                // result not yet allowed to surface based on recorded completion time
158                return Ok(());
159            }
160
161            // mark as processing before spawning the next job
162            state.processing = true;
163            state.ready_at = None;
164        }
165
166        // clone the last finished output (if any) as the visible result for this polling round
167        let buffered_output = self.output.lock().map_err(|_| {
168            let error = CuError::from("Async task output mutex poisoned");
169            record_async_error(&self.state, error.clone());
170            error
171        })?;
172        *real_output = buffered_output.clone();
173
174        // immediately requeue a task based on the new input
175        self.tp.spawn_fifo({
176            let ctx = ctx.clone();
177            let input = (*input).clone();
178            let output = self.output.clone();
179            let task = self.task.clone();
180            let state = self.state.clone();
181            move || {
182                let input_ref: &CuMsg<I> = &input;
183                let mut output_guard = match output.lock() {
184                    Ok(guard) => guard,
185                    Err(_) => {
186                        record_async_error(
187                            &state,
188                            CuError::from("Async task output mutex poisoned"),
189                        );
190                        return;
191                    }
192                };
193                let output_ref: &mut CuMsg<O> = &mut output_guard;
194
195                // Track the actual processing interval so replay can honor it.
196                if output_ref.metadata.process_time.start.is_none() {
197                    output_ref.metadata.process_time.start = ctx.now().into();
198                }
199                let task_result = match task.lock() {
200                    Ok(mut task_guard) => task_guard.process(&ctx, input_ref, output_ref),
201                    Err(poison) => Err(CuError::from(format!(
202                        "Async task mutex poisoned: {poison}"
203                    ))),
204                };
205
206                let mut guard = state.lock().unwrap_or_else(|poison| poison.into_inner());
207                guard.processing = false;
208
209                match task_result {
210                    Ok(()) => {
211                        let end_from_metadata: Option<CuTime> =
212                            output_ref.metadata.process_time.end.into();
213                        let end_time = end_from_metadata.unwrap_or_else(|| {
214                            let now = ctx.now();
215                            output_ref.metadata.process_time.end = now.into();
216                            now
217                        });
218                        guard.ready_at = Some(end_time);
219                    }
220                    Err(error) => {
221                        guard.ready_at = None;
222                        guard.last_error = Some(error);
223                    }
224                }
225            }
226        });
227        Ok(())
228    }
229}
230
231#[cfg(test)]
232mod tests {
233    use super::*;
234    use crate::config::ComponentConfig;
235    use crate::cutask::CuMsg;
236    use crate::cutask::Freezable;
237    use crate::input_msg;
238    use crate::output_msg;
239    use cu29_traits::CuResult;
240    use rayon::ThreadPoolBuilder;
241    use std::borrow::BorrowMut;
242    use std::sync::OnceLock;
243    use std::sync::mpsc;
244    use std::time::Duration;
245
246    static READY_RX: OnceLock<Arc<Mutex<mpsc::Receiver<CuTime>>>> = OnceLock::new();
247    static DONE_TX: OnceLock<mpsc::Sender<()>> = OnceLock::new();
248    #[derive(Reflect)]
249    struct TestTask {}
250
251    impl Freezable for TestTask {}
252
253    impl CuTask for TestTask {
254        type Resources<'r> = ();
255        type Input<'m> = input_msg!(u32);
256        type Output<'m> = output_msg!(u32);
257
258        fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
259        where
260            Self: Sized,
261        {
262            Ok(Self {})
263        }
264
265        fn process(
266            &mut self,
267            _ctx: &CuContext,
268            input: &Self::Input<'_>,
269            output: &mut Self::Output<'_>,
270        ) -> CuResult<()> {
271            output.borrow_mut().set_payload(*input.payload().unwrap());
272            Ok(())
273        }
274    }
275
276    #[test]
277    fn test_lifecycle() {
278        let tp = Arc::new(
279            rayon::ThreadPoolBuilder::new()
280                .num_threads(1)
281                .build()
282                .unwrap(),
283        );
284
285        let config = ComponentConfig::default();
286        let context = CuContext::new_with_clock();
287        let mut async_task: CuAsyncTask<TestTask, u32> =
288            CuAsyncTask::new(Some(&config), (), tp).unwrap();
289        let input = CuMsg::new(Some(42u32));
290        let mut output = CuMsg::new(None);
291
292        loop {
293            {
294                let output_ref: &mut CuMsg<u32> = &mut output;
295                async_task.process(&context, &input, output_ref).unwrap();
296            }
297
298            if let Some(val) = output.payload() {
299                assert_eq!(*val, 42u32);
300                break;
301            }
302        }
303    }
304
305    #[derive(Reflect)]
306    struct ControlledTask;
307
308    impl Freezable for ControlledTask {}
309
310    impl CuTask for ControlledTask {
311        type Resources<'r> = ();
312        type Input<'m> = input_msg!(u32);
313        type Output<'m> = output_msg!(u32);
314
315        fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
316        where
317            Self: Sized,
318        {
319            Ok(Self {})
320        }
321
322        fn process(
323            &mut self,
324            ctx: &CuContext,
325            _input: &Self::Input<'_>,
326            output: &mut Self::Output<'_>,
327        ) -> CuResult<()> {
328            let rx = READY_RX
329                .get()
330                .expect("ready channel not set")
331                .lock()
332                .unwrap();
333            let ready_time = rx
334                .recv_timeout(Duration::from_secs(1))
335                .expect("timed out waiting for ready signal");
336
337            output.set_payload(ready_time.as_nanos() as u32);
338            output.metadata.process_time.start = ctx.now().into();
339            output.metadata.process_time.end = ready_time.into();
340
341            if let Some(done_tx) = DONE_TX.get() {
342                let _ = done_tx.send(());
343            }
344            Ok(())
345        }
346    }
347
348    #[test]
349    fn background_respects_recorded_ready_time() {
350        let tp = Arc::new(ThreadPoolBuilder::new().num_threads(1).build().unwrap());
351        let (context, clock_mock) = CuContext::new_mock_clock();
352
353        // Install the control channels for the task.
354        let (ready_tx, ready_rx) = mpsc::channel::<CuTime>();
355        let (done_tx, done_rx) = mpsc::channel::<()>();
356        READY_RX
357            .set(Arc::new(Mutex::new(ready_rx)))
358            .expect("ready channel already set");
359        DONE_TX
360            .set(done_tx)
361            .expect("completion channel already set");
362
363        let mut async_task: CuAsyncTask<ControlledTask, u32> =
364            CuAsyncTask::new(Some(&ComponentConfig::default()), (), tp.clone()).unwrap();
365        let input = CuMsg::new(Some(1u32));
366        let mut output = CuMsg::new(None);
367
368        // Copperlist 0: kick off processing, nothing ready yet.
369        clock_mock.set_value(0);
370        async_task.process(&context, &input, &mut output).unwrap();
371        assert!(output.payload().is_none());
372
373        // Copperlist 1 at time 10: still running in the background.
374        clock_mock.set_value(10);
375        async_task.process(&context, &input, &mut output).unwrap();
376        assert!(output.payload().is_none());
377
378        // The background thread finishes at time 30 (recorded in metadata).
379        clock_mock.set_value(30);
380        ready_tx.send(CuTime::from(30u64)).unwrap();
381        done_rx
382            .recv_timeout(Duration::from_secs(1))
383            .expect("background task never finished");
384        // Wait until the async wrapper has cleared its processing flag and captured ready_at.
385        let mut ready_at_recorded = None;
386        for _ in 0..100 {
387            let state = async_task.state.lock().unwrap();
388            if !state.processing {
389                ready_at_recorded = state.ready_at;
390                if ready_at_recorded.is_some() {
391                    break;
392                }
393            }
394            drop(state);
395            std::thread::sleep(Duration::from_millis(1));
396        }
397        assert!(
398            ready_at_recorded.is_some(),
399            "background task finished without recording ready_at"
400        );
401
402        // Replay earlier than the recorded end time: the output should be held back.
403        clock_mock.set_value(20);
404        async_task.process(&context, &input, &mut output).unwrap();
405        assert!(
406            output.payload().is_none(),
407            "Output surfaced before recorded ready time"
408        );
409
410        // Once the mock clock reaches the recorded end time, the result is released.
411        clock_mock.set_value(30);
412        async_task.process(&context, &input, &mut output).unwrap();
413        assert_eq!(output.payload(), Some(&30u32));
414
415        // Allow the background worker spawned by the last poll to complete so the thread pool shuts down cleanly.
416        ready_tx.send(CuTime::from(40u64)).unwrap();
417        let _ = done_rx.recv_timeout(Duration::from_secs(1));
418    }
419}