cu29_runtime/
cuasynctask.rs

1use crate::config::ComponentConfig;
2use crate::cutask::{CuMsg, CuMsgPayload, CuTask, Freezable};
3use crate::resource::{ResourceBindings, ResourceManager, ResourceMapping};
4use cu29_clock::{CuTime, RobotClock};
5use cu29_traits::{CuError, CuResult};
6use rayon::ThreadPool;
7use std::sync::{Arc, Mutex, MutexGuard};
8
9struct AsyncState {
10    processing: bool,
11    ready_at: Option<CuTime>,
12}
13
14pub struct CuAsyncTask<T, O>
15where
16    T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
17    O: CuMsgPayload + Send + 'static,
18{
19    task: Arc<Mutex<T>>,
20    output: Arc<Mutex<CuMsg<O>>>,
21    state: Arc<Mutex<AsyncState>>,
22    tp: Arc<ThreadPool>,
23}
24
25/// Resource bundle required by a backgrounded task.
26pub struct CuAsyncTaskResources<'r, T: CuTask> {
27    pub inner: T::Resources<'r>,
28    pub threadpool: Arc<ThreadPool>,
29}
30
31impl<'r, T> ResourceBindings<'r> for CuAsyncTaskResources<'r, T>
32where
33    T: CuTask,
34    T::Resources<'r>: ResourceBindings<'r>,
35{
36    fn from_bindings(
37        manager: &'r mut ResourceManager,
38        mapping: Option<&ResourceMapping>,
39    ) -> CuResult<Self> {
40        let mapping = mapping.ok_or_else(|| CuError::from("Missing resource bindings"))?;
41        let threadpool_key = mapping
42            .get("bg_threads")
43            .ok_or_else(|| CuError::from("Missing 'bg_threads' binding for background task"))?
44            .typed();
45        let threadpool = manager.borrow_shared_arc(threadpool_key)?;
46        Ok(Self {
47            inner: T::Resources::from_bindings(manager, Some(mapping))?,
48            threadpool,
49        })
50    }
51}
52
53impl<T, O> CuAsyncTask<T, O>
54where
55    T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
56    O: CuMsgPayload + Send + 'static,
57{
58    #[allow(unused)]
59    pub fn new(
60        config: Option<&ComponentConfig>,
61        resources: T::Resources<'_>,
62        tp: Arc<ThreadPool>,
63    ) -> CuResult<Self> {
64        let task = Arc::new(Mutex::new(T::new_with(config, resources)?));
65        let output = Arc::new(Mutex::new(CuMsg::default()));
66        Ok(Self {
67            task,
68            output,
69            state: Arc::new(Mutex::new(AsyncState {
70                processing: false,
71                ready_at: None,
72            })),
73            tp,
74        })
75    }
76}
77
78impl<T, O> Freezable for CuAsyncTask<T, O>
79where
80    T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
81    O: CuMsgPayload + Send + 'static,
82{
83}
84
85impl<T, I, O> CuTask for CuAsyncTask<T, O>
86where
87    T: for<'i, 'o> CuTask<Input<'i> = CuMsg<I>, Output<'o> = CuMsg<O>> + Send + 'static,
88    I: CuMsgPayload + Send + Sync + 'static,
89    O: CuMsgPayload + Send + 'static,
90{
91    type Resources<'r> = CuAsyncTaskResources<'r, T>;
92    type Input<'m> = T::Input<'m>;
93    type Output<'m> = T::Output<'m>;
94
95    fn new_with(config: Option<&ComponentConfig>, resources: Self::Resources<'_>) -> CuResult<Self>
96    where
97        Self: Sized,
98    {
99        let task = Arc::new(Mutex::new(T::new_with(config, resources.inner)?));
100        let output = Arc::new(Mutex::new(CuMsg::default()));
101        Ok(Self {
102            task,
103            output,
104            state: Arc::new(Mutex::new(AsyncState {
105                processing: false,
106                ready_at: None,
107            })),
108            tp: resources.threadpool,
109        })
110    }
111
112    fn process<'i, 'o>(
113        &mut self,
114        clock: &RobotClock,
115        input: &Self::Input<'i>,
116        real_output: &mut Self::Output<'o>,
117    ) -> CuResult<()> {
118        {
119            let mut state = self.state.lock().unwrap();
120            if state.processing {
121                // background task still running
122                return Ok(());
123            }
124
125            if let Some(ready_at) = state.ready_at
126                && clock.now() < ready_at
127            {
128                // result not yet allowed to surface based on recorded completion time
129                return Ok(());
130            }
131
132            // mark as processing before spawning the next job
133            state.processing = true;
134            state.ready_at = None;
135        }
136
137        // clone the last finished output (if any) as the visible result for this polling round
138        let buffered_output = self.output.lock().unwrap();
139        *real_output = buffered_output.clone();
140
141        // immediately requeue a task based on the new input
142        self.tp.spawn_fifo({
143            let clock = clock.clone();
144            let input = (*input).clone();
145            let output = self.output.clone();
146            let task = self.task.clone();
147            let state = self.state.clone();
148            move || {
149                let input_ref: &CuMsg<I> = &input;
150                let mut output: MutexGuard<CuMsg<O>> = output.lock().unwrap();
151
152                // Safety: because copied the input and output, their lifetime are bound to the task and we control its lifetime.
153                let input_ref: &CuMsg<I> = unsafe { std::mem::transmute(input_ref) };
154                let output_ref: &mut MutexGuard<CuMsg<O>> =
155                    unsafe { std::mem::transmute(&mut output) };
156
157                // Track the actual processing interval so replay can honor it.
158                if output_ref.metadata.process_time.start.is_none() {
159                    output_ref.metadata.process_time.start = clock.now().into();
160                }
161                task.lock()
162                    .unwrap()
163                    .process(&clock, input_ref, output_ref)
164                    .unwrap();
165                let end_from_metadata: Option<CuTime> = output_ref.metadata.process_time.end.into();
166                let end_time = end_from_metadata.unwrap_or_else(|| {
167                    let now = clock.now();
168                    output_ref.metadata.process_time.end = now.into();
169                    now
170                });
171
172                let mut guard = state.lock().unwrap();
173                guard.processing = false; // Mark processing as done
174                guard.ready_at = Some(end_time);
175            }
176        });
177        Ok(())
178    }
179}
180
181#[cfg(test)]
182mod tests {
183    use super::*;
184    use crate::config::ComponentConfig;
185    use crate::cutask::CuMsg;
186    use crate::cutask::Freezable;
187    use crate::input_msg;
188    use crate::output_msg;
189    use cu29_clock::RobotClock;
190    use cu29_traits::CuResult;
191    use rayon::ThreadPoolBuilder;
192    use std::borrow::BorrowMut;
193    use std::sync::OnceLock;
194    use std::sync::mpsc;
195    use std::time::Duration;
196
197    static READY_RX: OnceLock<Arc<Mutex<mpsc::Receiver<CuTime>>>> = OnceLock::new();
198    static DONE_TX: OnceLock<mpsc::Sender<()>> = OnceLock::new();
199    struct TestTask {}
200
201    impl Freezable for TestTask {}
202
203    impl CuTask for TestTask {
204        type Resources<'r> = ();
205        type Input<'m> = input_msg!(u32);
206        type Output<'m> = output_msg!(u32);
207
208        fn new_with(
209            _config: Option<&ComponentConfig>,
210            _resources: Self::Resources<'_>,
211        ) -> CuResult<Self>
212        where
213            Self: Sized,
214        {
215            Ok(Self {})
216        }
217
218        fn process(
219            &mut self,
220            _clock: &RobotClock,
221            input: &Self::Input<'_>,
222            output: &mut Self::Output<'_>,
223        ) -> CuResult<()> {
224            output.borrow_mut().set_payload(*input.payload().unwrap());
225            Ok(())
226        }
227    }
228
229    #[test]
230    fn test_lifecycle() {
231        let tp = Arc::new(
232            rayon::ThreadPoolBuilder::new()
233                .num_threads(1)
234                .build()
235                .unwrap(),
236        );
237
238        let config = ComponentConfig::default();
239        let clock = RobotClock::default();
240        let mut async_task: CuAsyncTask<TestTask, u32> =
241            CuAsyncTask::new(Some(&config), (), tp).unwrap();
242        let input = CuMsg::new(Some(42u32));
243        let mut output = CuMsg::new(None);
244
245        loop {
246            {
247                let output_ref: &mut CuMsg<u32> = &mut output;
248                async_task.process(&clock, &input, output_ref).unwrap();
249            }
250
251            if let Some(val) = output.payload() {
252                assert_eq!(*val, 42u32);
253                break;
254            }
255        }
256    }
257
258    struct ControlledTask;
259
260    impl Freezable for ControlledTask {}
261
262    impl CuTask for ControlledTask {
263        type Resources<'r> = ();
264        type Input<'m> = input_msg!(u32);
265        type Output<'m> = output_msg!(u32);
266
267        fn new_with(
268            _config: Option<&ComponentConfig>,
269            _resources: Self::Resources<'_>,
270        ) -> CuResult<Self>
271        where
272            Self: Sized,
273        {
274            Ok(Self {})
275        }
276
277        fn process(
278            &mut self,
279            clock: &RobotClock,
280            _input: &Self::Input<'_>,
281            output: &mut Self::Output<'_>,
282        ) -> CuResult<()> {
283            let rx = READY_RX
284                .get()
285                .expect("ready channel not set")
286                .lock()
287                .unwrap();
288            let ready_time = rx
289                .recv_timeout(Duration::from_secs(1))
290                .expect("timed out waiting for ready signal");
291
292            output.set_payload(ready_time.as_nanos() as u32);
293            output.metadata.process_time.start = clock.now().into();
294            output.metadata.process_time.end = ready_time.into();
295
296            if let Some(done_tx) = DONE_TX.get() {
297                let _ = done_tx.send(());
298            }
299            Ok(())
300        }
301    }
302
303    #[test]
304    fn background_respects_recorded_ready_time() {
305        let tp = Arc::new(ThreadPoolBuilder::new().num_threads(1).build().unwrap());
306        let (clock, clock_mock) = RobotClock::mock();
307
308        // Install the control channels for the task.
309        let (ready_tx, ready_rx) = mpsc::channel::<CuTime>();
310        let (done_tx, done_rx) = mpsc::channel::<()>();
311        READY_RX
312            .set(Arc::new(Mutex::new(ready_rx)))
313            .expect("ready channel already set");
314        DONE_TX
315            .set(done_tx)
316            .expect("completion channel already set");
317
318        let mut async_task: CuAsyncTask<ControlledTask, u32> =
319            CuAsyncTask::new(Some(&ComponentConfig::default()), (), tp.clone()).unwrap();
320        let input = CuMsg::new(Some(1u32));
321        let mut output = CuMsg::new(None);
322
323        // Copperlist 0: kick off processing, nothing ready yet.
324        clock_mock.set_value(0);
325        async_task.process(&clock, &input, &mut output).unwrap();
326        assert!(output.payload().is_none());
327
328        // Copperlist 1 at time 10: still running in the background.
329        clock_mock.set_value(10);
330        async_task.process(&clock, &input, &mut output).unwrap();
331        assert!(output.payload().is_none());
332
333        // The background thread finishes at time 30 (recorded in metadata).
334        clock_mock.set_value(30);
335        ready_tx.send(CuTime::from(30u64)).unwrap();
336        done_rx
337            .recv_timeout(Duration::from_secs(1))
338            .expect("background task never finished");
339        // Wait until the async wrapper has cleared its processing flag and captured ready_at.
340        let mut ready_at_recorded = None;
341        for _ in 0..100 {
342            let state = async_task.state.lock().unwrap();
343            if !state.processing {
344                ready_at_recorded = state.ready_at;
345                if ready_at_recorded.is_some() {
346                    break;
347                }
348            }
349            drop(state);
350            std::thread::sleep(Duration::from_millis(1));
351        }
352        assert!(
353            ready_at_recorded.is_some(),
354            "background task finished without recording ready_at"
355        );
356
357        // Replay earlier than the recorded end time: the output should be held back.
358        clock_mock.set_value(20);
359        async_task.process(&clock, &input, &mut output).unwrap();
360        assert!(
361            output.payload().is_none(),
362            "Output surfaced before recorded ready time"
363        );
364
365        // Once the mock clock reaches the recorded end time, the result is released.
366        clock_mock.set_value(30);
367        async_task.process(&clock, &input, &mut output).unwrap();
368        assert_eq!(output.payload(), Some(&30u32));
369
370        // Allow the background worker spawned by the last poll to complete so the thread pool shuts down cleanly.
371        ready_tx.send(CuTime::from(40u64)).unwrap();
372        let _ = done_rx.recv_timeout(Duration::from_secs(1));
373    }
374}