cu29_runtime/
cuasynctask.rs

1use crate::config::ComponentConfig;
2use crate::cutask::{CuMsg, CuMsgPayload, CuTask, Freezable};
3use cu29_clock::RobotClock;
4use cu29_traits::CuResult;
5use rayon::ThreadPool;
6use std::sync::{Arc, Mutex, MutexGuard};
7
8pub struct CuAsyncTask<T, O>
9where
10    T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
11    O: CuMsgPayload + Send + 'static,
12{
13    task: Arc<Mutex<T>>,
14    output: Arc<Mutex<CuMsg<O>>>,
15    processing: Arc<Mutex<bool>>, // TODO: an atomic should be enough.
16    tp: Arc<ThreadPool>,
17}
18
19impl<T, O> CuAsyncTask<T, O>
20where
21    T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
22    O: CuMsgPayload + Send + 'static,
23{
24    #[allow(unused)]
25    pub fn new(config: Option<&ComponentConfig>, tp: Arc<ThreadPool>) -> CuResult<Self> {
26        let task = Arc::new(Mutex::new(T::new(config)?));
27        let output = Arc::new(Mutex::new(CuMsg::default()));
28        Ok(Self {
29            task,
30            output,
31            processing: Arc::new(Mutex::new(false)),
32            tp,
33        })
34    }
35}
36
37impl<T, O> Freezable for CuAsyncTask<T, O>
38where
39    T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
40    O: CuMsgPayload + Send + 'static,
41{
42}
43
44impl<T, I, O> CuTask for CuAsyncTask<T, O>
45where
46    T: for<'i, 'o> CuTask<Input<'i> = CuMsg<I>, Output<'o> = CuMsg<O>> + Send + 'static,
47    I: CuMsgPayload + Send + Sync + 'static,
48    O: CuMsgPayload + Send + 'static,
49{
50    type Input<'m> = T::Input<'m>;
51    type Output<'m> = T::Output<'m>;
52
53    fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
54    where
55        Self: Sized,
56    {
57        Err("AsyncTask cannot be instantiated directly, use Async_task::new()".into())
58    }
59
60    fn process<'i, 'o>(
61        &mut self,
62        clock: &RobotClock,
63        input: &Self::Input<'i>,
64        real_output: &mut Self::Output<'o>,
65    ) -> CuResult<()> {
66        let mut processing = self.processing.lock().unwrap();
67        if *processing {
68            // if the background task is still processing, returns an empty result.
69            return Ok(());
70        }
71
72        *processing = true; // Reset the done flag for the next processing
73        let buffered_output = self.output.lock().unwrap(); // Clear the output if the task is done
74        *real_output = buffered_output.clone();
75
76        // immediately requeue a task based on the new input
77        self.tp.spawn_fifo({
78            let clock = clock.clone();
79            let input = (*input).clone();
80            let output = self.output.clone();
81            let task = self.task.clone();
82            let processing = self.processing.clone();
83            move || {
84                let input_ref: &CuMsg<I> = &input;
85                let mut output: MutexGuard<CuMsg<O>> = output.lock().unwrap();
86
87                // Safety: because copied the input and output, their lifetime are bound to the task and we control its lifetime.
88                let input_ref: &CuMsg<I> = unsafe { std::mem::transmute(input_ref) };
89                let output_ref: &mut MutexGuard<CuMsg<O>> =
90                    unsafe { std::mem::transmute(&mut output) };
91                task.lock()
92                    .unwrap()
93                    .process(&clock, input_ref, output_ref)
94                    .unwrap();
95                *processing.lock().unwrap() = false; // Mark processing as done
96            }
97        });
98        Ok(())
99    }
100}
101
102#[cfg(test)]
103mod tests {
104    use super::*;
105    use crate::config::ComponentConfig;
106    use crate::cutask::CuMsg;
107    use crate::cutask::Freezable;
108    use crate::input_msg;
109    use crate::output_msg;
110    use cu29_clock::RobotClock;
111    use cu29_traits::CuResult;
112    use std::borrow::BorrowMut;
113    struct TestTask {}
114
115    impl Freezable for TestTask {}
116
117    impl CuTask for TestTask {
118        type Input<'m> = input_msg!(u32);
119        type Output<'m> = output_msg!(u32);
120
121        fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
122        where
123            Self: Sized,
124        {
125            Ok(Self {})
126        }
127
128        fn process(
129            &mut self,
130            _clock: &RobotClock,
131            input: &Self::Input<'_>,
132            output: &mut Self::Output<'_>,
133        ) -> CuResult<()> {
134            output.borrow_mut().set_payload(*input.payload().unwrap());
135            Ok(())
136        }
137    }
138
139    #[test]
140    fn test_lifecycle() {
141        let tp = Arc::new(
142            rayon::ThreadPoolBuilder::new()
143                .num_threads(1)
144                .build()
145                .unwrap(),
146        );
147
148        let config = ComponentConfig::default();
149        let clock = RobotClock::default();
150        let mut async_task: CuAsyncTask<TestTask, u32> =
151            CuAsyncTask::new(Some(&config), tp).unwrap();
152        let input = CuMsg::new(Some(42u32));
153        let mut output = CuMsg::new(None);
154
155        loop {
156            {
157                let output_ref: &mut CuMsg<u32> = &mut output;
158                async_task.process(&clock, &input, output_ref).unwrap();
159            }
160
161            if let Some(val) = output.payload() {
162                assert_eq!(*val, 42u32);
163                break;
164            }
165        }
166    }
167}