Skip to main content

cu29_runtime/
cuasynctask.rs

1use crate::config::ComponentConfig;
2use crate::context::CuContext;
3use crate::cutask::{CuMsg, CuMsgPayload, CuSrcTask, CuTask, Freezable};
4use crate::reflect::{Reflect, TypePath};
5use cu29_clock::CuTime;
6use cu29_traits::{CuError, CuResult};
7use rayon::ThreadPool;
8use std::sync::{Arc, Mutex};
9
10struct AsyncState {
11    processing: bool,
12    ready_at: Option<CuTime>,
13    last_error: Option<CuError>,
14}
15
16fn record_async_error(state: &Mutex<AsyncState>, error: CuError) {
17    let mut guard = match state.lock() {
18        Ok(guard) => guard,
19        Err(poison) => poison.into_inner(),
20    };
21    guard.processing = false;
22    guard.ready_at = None;
23    guard.last_error = Some(error);
24}
25
26fn begin_background_poll<O>(
27    ctx: &CuContext,
28    state: &Mutex<AsyncState>,
29    buffered_output: &Mutex<CuMsg<O>>,
30    real_output: &mut CuMsg<O>,
31) -> CuResult<bool>
32where
33    O: CuMsgPayload + Send + 'static,
34{
35    {
36        let mut state = state.lock().map_err(|_| {
37            CuError::from("Async task state mutex poisoned while scheduling background work")
38        })?;
39        if let Some(error) = state.last_error.take() {
40            return Err(error);
41        }
42        if state.processing {
43            *real_output = CuMsg::default();
44            return Ok(false);
45        }
46
47        if let Some(ready_at) = state.ready_at
48            && ctx.now() < ready_at
49        {
50            *real_output = CuMsg::default();
51            return Ok(false);
52        }
53
54        state.processing = true;
55        state.ready_at = None;
56    }
57
58    let buffered_output = buffered_output.lock().map_err(|_| {
59        let error = CuError::from("Async task output mutex poisoned");
60        record_async_error(state, error.clone());
61        error
62    })?;
63    *real_output = buffered_output.clone();
64    Ok(true)
65}
66
67fn finalize_background_run<O>(
68    state: &Mutex<AsyncState>,
69    output_ref: &mut CuMsg<O>,
70    fallback_end: CuTime,
71    task_result: CuResult<()>,
72) where
73    O: CuMsgPayload + Send + 'static,
74{
75    let mut guard = state.lock().unwrap_or_else(|poison| poison.into_inner());
76    guard.processing = false;
77
78    match task_result {
79        Ok(()) => {
80            let end_from_metadata: Option<CuTime> = output_ref.metadata.process_time.end.into();
81            let end_time = end_from_metadata.unwrap_or_else(|| {
82                output_ref.metadata.process_time.end = fallback_end.into();
83                fallback_end
84            });
85            guard.ready_at = Some(end_time);
86        }
87        Err(error) => {
88            guard.ready_at = None;
89            guard.last_error = Some(error);
90        }
91    }
92}
93
94#[derive(Reflect)]
95#[reflect(no_field_bounds, from_reflect = false, type_path = false)]
96pub struct CuAsyncTask<T, O>
97where
98    T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
99    O: CuMsgPayload + Send + 'static,
100{
101    #[reflect(ignore)]
102    task: Arc<Mutex<T>>,
103    #[reflect(ignore)]
104    output: Arc<Mutex<CuMsg<O>>>,
105    #[reflect(ignore)]
106    state: Arc<Mutex<AsyncState>>,
107    #[reflect(ignore)]
108    tp: Arc<ThreadPool>,
109}
110
111impl<T, O> TypePath for CuAsyncTask<T, O>
112where
113    T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
114    O: CuMsgPayload + Send + 'static,
115{
116    fn type_path() -> &'static str {
117        "cu29_runtime::cuasynctask::CuAsyncTask"
118    }
119
120    fn short_type_path() -> &'static str {
121        "CuAsyncTask"
122    }
123
124    fn type_ident() -> Option<&'static str> {
125        Some("CuAsyncTask")
126    }
127
128    fn crate_name() -> Option<&'static str> {
129        Some("cu29_runtime")
130    }
131
132    fn module_path() -> Option<&'static str> {
133        Some("cuasynctask")
134    }
135}
136
137/// Resource bundle required by a backgrounded task.
138pub struct CuAsyncTaskResources<'r, T: CuTask> {
139    pub inner: T::Resources<'r>,
140    pub threadpool: Arc<ThreadPool>,
141}
142
143impl<T, O> CuAsyncTask<T, O>
144where
145    T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
146    O: CuMsgPayload + Send + 'static,
147{
148    #[allow(unused)]
149    pub fn new(
150        config: Option<&ComponentConfig>,
151        resources: T::Resources<'_>,
152        tp: Arc<ThreadPool>,
153    ) -> CuResult<Self> {
154        let task = Arc::new(Mutex::new(T::new(config, resources)?));
155        let output = Arc::new(Mutex::new(CuMsg::default()));
156        Ok(Self {
157            task,
158            output,
159            state: Arc::new(Mutex::new(AsyncState {
160                processing: false,
161                ready_at: None,
162                last_error: None,
163            })),
164            tp,
165        })
166    }
167}
168
169impl<T, O> Freezable for CuAsyncTask<T, O>
170where
171    T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
172    O: CuMsgPayload + Send + 'static,
173{
174}
175
176impl<T, I, O> CuTask for CuAsyncTask<T, O>
177where
178    T: for<'i, 'o> CuTask<Input<'i> = CuMsg<I>, Output<'o> = CuMsg<O>> + Send + 'static,
179    I: CuMsgPayload + Send + Sync + 'static,
180    O: CuMsgPayload + Send + 'static,
181{
182    type Resources<'r> = CuAsyncTaskResources<'r, T>;
183    type Input<'m> = T::Input<'m>;
184    type Output<'m> = T::Output<'m>;
185
186    fn new(config: Option<&ComponentConfig>, resources: Self::Resources<'_>) -> CuResult<Self>
187    where
188        Self: Sized,
189    {
190        CuAsyncTask::new(config, resources.inner, resources.threadpool)
191    }
192
193    fn start(&mut self, ctx: &CuContext) -> CuResult<()> {
194        let mut task = self
195            .task
196            .lock()
197            .map_err(|_| CuError::from("Async task mutex poisoned during start"))?;
198        task.start(ctx)
199    }
200
201    fn process<'i, 'o>(
202        &mut self,
203        ctx: &CuContext,
204        input: &Self::Input<'i>,
205        real_output: &mut Self::Output<'o>,
206    ) -> CuResult<()> {
207        if !begin_background_poll(ctx, &self.state, &self.output, real_output)? {
208            return Ok(());
209        }
210
211        // immediately requeue a task based on the new input
212        self.tp.spawn_fifo({
213            let ctx = ctx.clone();
214            let input = (*input).clone();
215            let output = self.output.clone();
216            let task = self.task.clone();
217            let state = self.state.clone();
218            move || {
219                let input_ref: &CuMsg<I> = &input;
220                let mut output_guard = match output.lock() {
221                    Ok(guard) => guard,
222                    Err(_) => {
223                        record_async_error(
224                            &state,
225                            CuError::from("Async task output mutex poisoned"),
226                        );
227                        return;
228                    }
229                };
230                let output_ref: &mut CuMsg<O> = &mut output_guard;
231
232                // Each async run starts from an empty output so a task that
233                // chooses not to publish does not leak the previous payload.
234                *output_ref = CuMsg::default();
235
236                // Track the actual processing interval so replay can honor it.
237                if output_ref.metadata.process_time.start.is_none() {
238                    output_ref.metadata.process_time.start = ctx.now().into();
239                }
240                let task_result = match task.lock() {
241                    Ok(mut task_guard) => task_guard.process(&ctx, input_ref, output_ref),
242                    Err(poison) => Err(CuError::from(format!(
243                        "Async task mutex poisoned: {poison}"
244                    ))),
245                };
246                finalize_background_run(&state, output_ref, ctx.now(), task_result);
247            }
248        });
249        Ok(())
250    }
251
252    fn stop(&mut self, ctx: &CuContext) -> CuResult<()> {
253        let mut task = self
254            .task
255            .lock()
256            .map_err(|_| CuError::from("Async task mutex poisoned during stop"))?;
257        task.stop(ctx)
258    }
259}
260
261#[derive(Reflect)]
262#[reflect(no_field_bounds, from_reflect = false, type_path = false)]
263pub struct CuAsyncSrcTask<T, O>
264where
265    T: for<'m> CuSrcTask<Output<'m> = CuMsg<O>> + Send + 'static,
266    O: CuMsgPayload + Send + 'static,
267{
268    #[reflect(ignore)]
269    task: Arc<Mutex<T>>,
270    #[reflect(ignore)]
271    output: Arc<Mutex<CuMsg<O>>>,
272    #[reflect(ignore)]
273    state: Arc<Mutex<AsyncState>>,
274    #[reflect(ignore)]
275    tp: Arc<ThreadPool>,
276}
277
278impl<T, O> TypePath for CuAsyncSrcTask<T, O>
279where
280    T: for<'m> CuSrcTask<Output<'m> = CuMsg<O>> + Send + 'static,
281    O: CuMsgPayload + Send + 'static,
282{
283    fn type_path() -> &'static str {
284        "cu29_runtime::cuasynctask::CuAsyncSrcTask"
285    }
286
287    fn short_type_path() -> &'static str {
288        "CuAsyncSrcTask"
289    }
290
291    fn type_ident() -> Option<&'static str> {
292        Some("CuAsyncSrcTask")
293    }
294
295    fn crate_name() -> Option<&'static str> {
296        Some("cu29_runtime")
297    }
298
299    fn module_path() -> Option<&'static str> {
300        Some("cuasynctask")
301    }
302}
303
304/// Resource bundle required by a backgrounded source.
305pub struct CuAsyncSrcTaskResources<'r, T: CuSrcTask> {
306    pub inner: T::Resources<'r>,
307    pub threadpool: Arc<ThreadPool>,
308}
309
310impl<T, O> CuAsyncSrcTask<T, O>
311where
312    T: for<'m> CuSrcTask<Output<'m> = CuMsg<O>> + Send + 'static,
313    O: CuMsgPayload + Send + 'static,
314{
315    #[allow(unused)]
316    pub fn new(
317        config: Option<&ComponentConfig>,
318        resources: T::Resources<'_>,
319        tp: Arc<ThreadPool>,
320    ) -> CuResult<Self> {
321        let task = Arc::new(Mutex::new(T::new(config, resources)?));
322        let output = Arc::new(Mutex::new(CuMsg::default()));
323        Ok(Self {
324            task,
325            output,
326            state: Arc::new(Mutex::new(AsyncState {
327                processing: false,
328                ready_at: None,
329                last_error: None,
330            })),
331            tp,
332        })
333    }
334}
335
336impl<T, O> Freezable for CuAsyncSrcTask<T, O>
337where
338    T: for<'m> CuSrcTask<Output<'m> = CuMsg<O>> + Send + 'static,
339    O: CuMsgPayload + Send + 'static,
340{
341}
342
343impl<T, O> CuSrcTask for CuAsyncSrcTask<T, O>
344where
345    T: for<'m> CuSrcTask<Output<'m> = CuMsg<O>> + Send + 'static,
346    O: CuMsgPayload + Send + 'static,
347{
348    type Resources<'r> = CuAsyncSrcTaskResources<'r, T>;
349    type Output<'m> = T::Output<'m>;
350
351    fn new(config: Option<&ComponentConfig>, resources: Self::Resources<'_>) -> CuResult<Self>
352    where
353        Self: Sized,
354    {
355        CuAsyncSrcTask::new(config, resources.inner, resources.threadpool)
356    }
357
358    fn start(&mut self, ctx: &CuContext) -> CuResult<()> {
359        let mut task = self
360            .task
361            .lock()
362            .map_err(|_| CuError::from("Async source mutex poisoned during start"))?;
363        task.start(ctx)
364    }
365
366    fn process<'o>(&mut self, ctx: &CuContext, real_output: &mut Self::Output<'o>) -> CuResult<()> {
367        if !begin_background_poll(ctx, &self.state, &self.output, real_output)? {
368            return Ok(());
369        }
370
371        self.tp.spawn_fifo({
372            let ctx = ctx.clone();
373            let output = self.output.clone();
374            let task = self.task.clone();
375            let state = self.state.clone();
376            move || {
377                let mut output_guard = match output.lock() {
378                    Ok(guard) => guard,
379                    Err(_) => {
380                        record_async_error(
381                            &state,
382                            CuError::from("Async task output mutex poisoned"),
383                        );
384                        return;
385                    }
386                };
387                let output_ref: &mut CuMsg<O> = &mut output_guard;
388
389                *output_ref = CuMsg::default();
390
391                if output_ref.metadata.process_time.start.is_none() {
392                    output_ref.metadata.process_time.start = ctx.now().into();
393                }
394                let task_result = match task.lock() {
395                    Ok(mut task_guard) => task_guard.process(&ctx, output_ref),
396                    Err(poison) => Err(CuError::from(format!(
397                        "Async source mutex poisoned: {poison}"
398                    ))),
399                };
400                finalize_background_run(&state, output_ref, ctx.now(), task_result);
401            }
402        });
403        Ok(())
404    }
405
406    fn stop(&mut self, ctx: &CuContext) -> CuResult<()> {
407        let mut task = self
408            .task
409            .lock()
410            .map_err(|_| CuError::from("Async source mutex poisoned during stop"))?;
411        task.stop(ctx)
412    }
413}
414
415#[cfg(test)]
416mod tests {
417    use super::*;
418    use crate::config::ComponentConfig;
419    use crate::cutask::CuMsg;
420    use crate::cutask::Freezable;
421    use crate::input_msg;
422    use crate::output_msg;
423    use cu29_traits::CuResult;
424    use rayon::ThreadPoolBuilder;
425    use std::borrow::BorrowMut;
426    use std::sync::OnceLock;
427    use std::sync::mpsc;
428    use std::time::Duration;
429
430    static READY_RX: OnceLock<Arc<Mutex<mpsc::Receiver<CuTime>>>> = OnceLock::new();
431    static DONE_TX: OnceLock<mpsc::Sender<()>> = OnceLock::new();
432    #[derive(Reflect)]
433    struct TestTask {}
434
435    impl Freezable for TestTask {}
436
437    impl CuTask for TestTask {
438        type Resources<'r> = ();
439        type Input<'m> = input_msg!(u32);
440        type Output<'m> = output_msg!(u32);
441
442        fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
443        where
444            Self: Sized,
445        {
446            Ok(Self {})
447        }
448
449        fn process(
450            &mut self,
451            _ctx: &CuContext,
452            input: &Self::Input<'_>,
453            output: &mut Self::Output<'_>,
454        ) -> CuResult<()> {
455            output.borrow_mut().set_payload(*input.payload().unwrap());
456            Ok(())
457        }
458    }
459
460    #[test]
461    fn test_lifecycle() {
462        let tp = Arc::new(
463            rayon::ThreadPoolBuilder::new()
464                .num_threads(1)
465                .build()
466                .unwrap(),
467        );
468
469        let config = ComponentConfig::default();
470        let context = CuContext::new_with_clock();
471        let mut async_task: CuAsyncTask<TestTask, u32> =
472            CuAsyncTask::new(Some(&config), (), tp).unwrap();
473        let input = CuMsg::new(Some(42u32));
474        let mut output = CuMsg::new(None);
475
476        loop {
477            {
478                let output_ref: &mut CuMsg<u32> = &mut output;
479                async_task.process(&context, &input, output_ref).unwrap();
480            }
481
482            if let Some(val) = output.payload() {
483                assert_eq!(*val, 42u32);
484                break;
485            }
486        }
487    }
488
489    #[derive(Reflect)]
490    struct ControlledTask;
491
492    impl Freezable for ControlledTask {}
493
494    impl CuTask for ControlledTask {
495        type Resources<'r> = ();
496        type Input<'m> = input_msg!(u32);
497        type Output<'m> = output_msg!(u32);
498
499        fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
500        where
501            Self: Sized,
502        {
503            Ok(Self {})
504        }
505
506        fn process(
507            &mut self,
508            ctx: &CuContext,
509            _input: &Self::Input<'_>,
510            output: &mut Self::Output<'_>,
511        ) -> CuResult<()> {
512            let rx = READY_RX
513                .get()
514                .expect("ready channel not set")
515                .lock()
516                .unwrap();
517            let ready_time = rx
518                .recv_timeout(Duration::from_secs(1))
519                .expect("timed out waiting for ready signal");
520
521            output.set_payload(ready_time.as_nanos() as u32);
522            output.metadata.process_time.start = ctx.now().into();
523            output.metadata.process_time.end = ready_time.into();
524
525            if let Some(done_tx) = DONE_TX.get() {
526                let _ = done_tx.send(());
527            }
528            Ok(())
529        }
530    }
531
532    fn wait_until_async_idle<T, O>(async_task: &CuAsyncTask<T, O>)
533    where
534        T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
535        O: CuMsgPayload + Send + 'static,
536    {
537        for _ in 0..100 {
538            let state = async_task.state.lock().unwrap();
539            if !state.processing {
540                return;
541            }
542            drop(state);
543            std::thread::sleep(Duration::from_millis(1));
544        }
545        panic!("background task never became idle");
546    }
547
548    fn wait_until_async_src_idle<T, O>(async_task: &CuAsyncSrcTask<T, O>)
549    where
550        T: for<'m> CuSrcTask<Output<'m> = CuMsg<O>> + Send + 'static,
551        O: CuMsgPayload + Send + 'static,
552    {
553        for _ in 0..100 {
554            let state = async_task.state.lock().unwrap();
555            if !state.processing {
556                return;
557            }
558            drop(state);
559            std::thread::sleep(Duration::from_millis(1));
560        }
561        panic!("background source never became idle");
562    }
563
564    #[derive(Clone)]
565    struct ActionTaskResources {
566        actions: Arc<Mutex<mpsc::Receiver<Option<u32>>>>,
567        done: mpsc::Sender<()>,
568    }
569
570    #[derive(Reflect)]
571    #[reflect(no_field_bounds, from_reflect = false)]
572    struct ActionTask {
573        #[reflect(ignore)]
574        actions: Arc<Mutex<mpsc::Receiver<Option<u32>>>>,
575        #[reflect(ignore)]
576        done: mpsc::Sender<()>,
577    }
578
579    impl Freezable for ActionTask {}
580
581    impl CuTask for ActionTask {
582        type Resources<'r> = ActionTaskResources;
583        type Input<'m> = input_msg!(u32);
584        type Output<'m> = output_msg!(u32);
585
586        fn new(config: Option<&ComponentConfig>, resources: Self::Resources<'_>) -> CuResult<Self>
587        where
588            Self: Sized,
589        {
590            let _ = config;
591            Ok(Self {
592                actions: resources.actions,
593                done: resources.done,
594            })
595        }
596
597        fn process(
598            &mut self,
599            _ctx: &CuContext,
600            _input: &Self::Input<'_>,
601            output: &mut Self::Output<'_>,
602        ) -> CuResult<()> {
603            let action = self
604                .actions
605                .lock()
606                .unwrap()
607                .recv_timeout(Duration::from_secs(1))
608                .expect("timed out waiting for action");
609            if let Some(value) = action {
610                output.set_payload(value);
611            }
612            let _ = self.done.send(());
613            Ok(())
614        }
615    }
616
617    #[derive(Reflect)]
618    #[reflect(no_field_bounds, from_reflect = false)]
619    struct ActionSrc {
620        #[reflect(ignore)]
621        actions: Arc<Mutex<mpsc::Receiver<Option<u32>>>>,
622        #[reflect(ignore)]
623        done: mpsc::Sender<()>,
624    }
625
626    impl Freezable for ActionSrc {}
627
628    impl CuSrcTask for ActionSrc {
629        type Resources<'r> = ActionTaskResources;
630        type Output<'m> = output_msg!(u32);
631
632        fn new(config: Option<&ComponentConfig>, resources: Self::Resources<'_>) -> CuResult<Self>
633        where
634            Self: Sized,
635        {
636            let _ = config;
637            Ok(Self {
638                actions: resources.actions,
639                done: resources.done,
640            })
641        }
642
643        fn process(&mut self, _ctx: &CuContext, output: &mut Self::Output<'_>) -> CuResult<()> {
644            let action = self
645                .actions
646                .lock()
647                .unwrap()
648                .recv_timeout(Duration::from_secs(1))
649                .expect("timed out waiting for source action");
650            if let Some(value) = action {
651                output.set_payload(value);
652            }
653            let _ = self.done.send(());
654            Ok(())
655        }
656    }
657
658    #[derive(Clone)]
659    struct ControlledSrcResources {
660        ready_times: Arc<Mutex<mpsc::Receiver<CuTime>>>,
661        done: mpsc::Sender<()>,
662    }
663
664    #[derive(Reflect)]
665    #[reflect(no_field_bounds, from_reflect = false)]
666    struct ControlledSrc {
667        #[reflect(ignore)]
668        ready_times: Arc<Mutex<mpsc::Receiver<CuTime>>>,
669        #[reflect(ignore)]
670        done: mpsc::Sender<()>,
671    }
672
673    impl Freezable for ControlledSrc {}
674
675    impl CuSrcTask for ControlledSrc {
676        type Resources<'r> = ControlledSrcResources;
677        type Output<'m> = output_msg!(u32);
678
679        fn new(config: Option<&ComponentConfig>, resources: Self::Resources<'_>) -> CuResult<Self>
680        where
681            Self: Sized,
682        {
683            let _ = config;
684            Ok(Self {
685                ready_times: resources.ready_times,
686                done: resources.done,
687            })
688        }
689
690        fn process(&mut self, ctx: &CuContext, output: &mut Self::Output<'_>) -> CuResult<()> {
691            let ready_time = self
692                .ready_times
693                .lock()
694                .unwrap()
695                .recv_timeout(Duration::from_secs(1))
696                .expect("timed out waiting for ready signal");
697            output.set_payload(ready_time.as_nanos() as u32);
698            output.metadata.process_time.start = ctx.now().into();
699            output.metadata.process_time.end = ready_time.into();
700            let _ = self.done.send(());
701            Ok(())
702        }
703    }
704
705    #[test]
706    fn background_clears_output_while_processing() {
707        let tp = Arc::new(ThreadPoolBuilder::new().num_threads(1).build().unwrap());
708        let context = CuContext::new_with_clock();
709        let (action_tx, action_rx) = mpsc::channel::<Option<u32>>();
710        let (done_tx, done_rx) = mpsc::channel::<()>();
711        let resources = ActionTaskResources {
712            actions: Arc::new(Mutex::new(action_rx)),
713            done: done_tx,
714        };
715
716        let mut async_task: CuAsyncTask<ActionTask, u32> =
717            CuAsyncTask::new(Some(&ComponentConfig::default()), resources, tp).unwrap();
718        let input = CuMsg::new(Some(1u32));
719        let mut output = CuMsg::new(None);
720
721        async_task.process(&context, &input, &mut output).unwrap();
722        assert!(output.payload().is_none());
723
724        output.set_payload(999);
725        async_task.process(&context, &input, &mut output).unwrap();
726        assert!(
727            output.payload().is_none(),
728            "background poll should clear stale output while the worker is still running"
729        );
730
731        action_tx.send(Some(7)).unwrap();
732        done_rx
733            .recv_timeout(Duration::from_secs(1))
734            .expect("background worker never finished");
735    }
736
737    #[test]
738    fn background_empty_run_does_not_reemit_previous_payload() {
739        let tp = Arc::new(ThreadPoolBuilder::new().num_threads(1).build().unwrap());
740        let context = CuContext::new_with_clock();
741        let (action_tx, action_rx) = mpsc::channel::<Option<u32>>();
742        let (done_tx, done_rx) = mpsc::channel::<()>();
743        let resources = ActionTaskResources {
744            actions: Arc::new(Mutex::new(action_rx)),
745            done: done_tx,
746        };
747
748        let mut async_task: CuAsyncTask<ActionTask, u32> =
749            CuAsyncTask::new(Some(&ComponentConfig::default()), resources, tp).unwrap();
750        let some_input = CuMsg::new(Some(1u32));
751        let no_input = CuMsg::new(None::<u32>);
752        let mut output = CuMsg::new(None);
753
754        action_tx.send(Some(42)).unwrap();
755        async_task
756            .process(&context, &some_input, &mut output)
757            .expect("failed to start first background run");
758        done_rx
759            .recv_timeout(Duration::from_secs(1))
760            .expect("first background run never finished");
761        wait_until_async_idle(&async_task);
762
763        action_tx.send(None).unwrap();
764        async_task
765            .process(&context, &no_input, &mut output)
766            .expect("failed to start empty background run");
767        assert_eq!(output.payload(), Some(&42));
768        done_rx
769            .recv_timeout(Duration::from_secs(1))
770            .expect("empty background run never finished");
771        wait_until_async_idle(&async_task);
772
773        action_tx.send(None).unwrap();
774        async_task
775            .process(&context, &no_input, &mut output)
776            .expect("failed to poll after empty background run");
777        assert!(
778            output.payload().is_none(),
779            "background task re-emitted the previous payload after an empty run"
780        );
781        done_rx
782            .recv_timeout(Duration::from_secs(1))
783            .expect("cleanup background run never finished");
784    }
785
786    #[test]
787    fn background_source_clears_output_while_processing() {
788        let tp = Arc::new(ThreadPoolBuilder::new().num_threads(1).build().unwrap());
789        let context = CuContext::new_with_clock();
790        let (action_tx, action_rx) = mpsc::channel::<Option<u32>>();
791        let (done_tx, done_rx) = mpsc::channel::<()>();
792        let resources = ActionTaskResources {
793            actions: Arc::new(Mutex::new(action_rx)),
794            done: done_tx,
795        };
796
797        let mut async_src: CuAsyncSrcTask<ActionSrc, u32> =
798            CuAsyncSrcTask::new(Some(&ComponentConfig::default()), resources, tp).unwrap();
799        let mut output = CuMsg::new(None);
800
801        async_src.process(&context, &mut output).unwrap();
802        assert!(output.payload().is_none());
803
804        output.set_payload(999);
805        async_src.process(&context, &mut output).unwrap();
806        assert!(
807            output.payload().is_none(),
808            "background source poll should clear stale output while the worker is still running"
809        );
810
811        action_tx.send(Some(7)).unwrap();
812        done_rx
813            .recv_timeout(Duration::from_secs(1))
814            .expect("background source never finished");
815    }
816
817    #[test]
818    fn background_source_empty_run_does_not_reemit_previous_payload() {
819        let tp = Arc::new(ThreadPoolBuilder::new().num_threads(1).build().unwrap());
820        let context = CuContext::new_with_clock();
821        let (action_tx, action_rx) = mpsc::channel::<Option<u32>>();
822        let (done_tx, done_rx) = mpsc::channel::<()>();
823        let resources = ActionTaskResources {
824            actions: Arc::new(Mutex::new(action_rx)),
825            done: done_tx,
826        };
827
828        let mut async_src: CuAsyncSrcTask<ActionSrc, u32> =
829            CuAsyncSrcTask::new(Some(&ComponentConfig::default()), resources, tp).unwrap();
830        let mut output = CuMsg::new(None);
831
832        action_tx.send(Some(42)).unwrap();
833        async_src
834            .process(&context, &mut output)
835            .expect("failed to start first background source run");
836        done_rx
837            .recv_timeout(Duration::from_secs(1))
838            .expect("first background source run never finished");
839        wait_until_async_src_idle(&async_src);
840
841        action_tx.send(None).unwrap();
842        async_src
843            .process(&context, &mut output)
844            .expect("failed to start empty background source run");
845        assert_eq!(output.payload(), Some(&42));
846        done_rx
847            .recv_timeout(Duration::from_secs(1))
848            .expect("empty background source run never finished");
849        wait_until_async_src_idle(&async_src);
850
851        action_tx.send(None).unwrap();
852        async_src
853            .process(&context, &mut output)
854            .expect("failed to poll background source after empty run");
855        assert!(
856            output.payload().is_none(),
857            "background source re-emitted the previous payload after an empty run"
858        );
859        done_rx
860            .recv_timeout(Duration::from_secs(1))
861            .expect("cleanup background source run never finished");
862    }
863
864    #[test]
865    fn background_respects_recorded_ready_time() {
866        let tp = Arc::new(ThreadPoolBuilder::new().num_threads(1).build().unwrap());
867        let (context, clock_mock) = CuContext::new_mock_clock();
868
869        // Install the control channels for the task.
870        let (ready_tx, ready_rx) = mpsc::channel::<CuTime>();
871        let (done_tx, done_rx) = mpsc::channel::<()>();
872        READY_RX
873            .set(Arc::new(Mutex::new(ready_rx)))
874            .expect("ready channel already set");
875        DONE_TX
876            .set(done_tx)
877            .expect("completion channel already set");
878
879        let mut async_task: CuAsyncTask<ControlledTask, u32> =
880            CuAsyncTask::new(Some(&ComponentConfig::default()), (), tp.clone()).unwrap();
881        let input = CuMsg::new(Some(1u32));
882        let mut output = CuMsg::new(None);
883
884        // Copperlist 0: kick off processing, nothing ready yet.
885        clock_mock.set_value(0);
886        async_task.process(&context, &input, &mut output).unwrap();
887        assert!(output.payload().is_none());
888
889        // Copperlist 1 at time 10: still running in the background.
890        clock_mock.set_value(10);
891        async_task.process(&context, &input, &mut output).unwrap();
892        assert!(output.payload().is_none());
893
894        // The background thread finishes at time 30 (recorded in metadata).
895        clock_mock.set_value(30);
896        ready_tx.send(CuTime::from(30u64)).unwrap();
897        done_rx
898            .recv_timeout(Duration::from_secs(1))
899            .expect("background task never finished");
900        // Wait until the async wrapper has cleared its processing flag and captured ready_at.
901        let mut ready_at_recorded = None;
902        for _ in 0..100 {
903            let state = async_task.state.lock().unwrap();
904            if !state.processing {
905                ready_at_recorded = state.ready_at;
906                if ready_at_recorded.is_some() {
907                    break;
908                }
909            }
910            drop(state);
911            std::thread::sleep(Duration::from_millis(1));
912        }
913        assert!(
914            ready_at_recorded.is_some(),
915            "background task finished without recording ready_at"
916        );
917
918        // Replay earlier than the recorded end time: the output should be held back.
919        clock_mock.set_value(20);
920        async_task.process(&context, &input, &mut output).unwrap();
921        assert!(
922            output.payload().is_none(),
923            "Output surfaced before recorded ready time"
924        );
925
926        // Once the mock clock reaches the recorded end time, the result is released.
927        clock_mock.set_value(30);
928        async_task.process(&context, &input, &mut output).unwrap();
929        assert_eq!(output.payload(), Some(&30u32));
930
931        // Allow the background worker spawned by the last poll to complete so the thread pool shuts down cleanly.
932        ready_tx.send(CuTime::from(40u64)).unwrap();
933        let _ = done_rx.recv_timeout(Duration::from_secs(1));
934    }
935
936    #[test]
937    fn background_source_respects_recorded_ready_time() {
938        let tp = Arc::new(ThreadPoolBuilder::new().num_threads(1).build().unwrap());
939        let (context, clock_mock) = CuContext::new_mock_clock();
940        let (ready_tx, ready_rx) = mpsc::channel::<CuTime>();
941        let (done_tx, done_rx) = mpsc::channel::<()>();
942        let resources = ControlledSrcResources {
943            ready_times: Arc::new(Mutex::new(ready_rx)),
944            done: done_tx,
945        };
946
947        let mut async_src: CuAsyncSrcTask<ControlledSrc, u32> =
948            CuAsyncSrcTask::new(Some(&ComponentConfig::default()), resources, tp.clone()).unwrap();
949        let mut output = CuMsg::new(None);
950
951        clock_mock.set_value(0);
952        async_src.process(&context, &mut output).unwrap();
953        assert!(output.payload().is_none());
954
955        clock_mock.set_value(10);
956        async_src.process(&context, &mut output).unwrap();
957        assert!(output.payload().is_none());
958
959        clock_mock.set_value(30);
960        ready_tx.send(CuTime::from(30u64)).unwrap();
961        done_rx
962            .recv_timeout(Duration::from_secs(1))
963            .expect("background source never finished");
964
965        let mut ready_at_recorded = None;
966        for _ in 0..100 {
967            let state = async_src.state.lock().unwrap();
968            if !state.processing {
969                ready_at_recorded = state.ready_at;
970                if ready_at_recorded.is_some() {
971                    break;
972                }
973            }
974            drop(state);
975            std::thread::sleep(Duration::from_millis(1));
976        }
977        assert!(
978            ready_at_recorded.is_some(),
979            "background source finished without recording ready_at"
980        );
981
982        clock_mock.set_value(20);
983        async_src.process(&context, &mut output).unwrap();
984        assert!(
985            output.payload().is_none(),
986            "background source surfaced output before recorded ready time"
987        );
988
989        clock_mock.set_value(30);
990        async_src.process(&context, &mut output).unwrap();
991        assert_eq!(output.payload(), Some(&30u32));
992
993        ready_tx.send(CuTime::from(40u64)).unwrap();
994        let _ = done_rx.recv_timeout(Duration::from_secs(1));
995    }
996}