1use crate::config::ComponentConfig;
2use crate::context::CuContext;
3use crate::cutask::{CuMsg, CuMsgPayload, CuSrcTask, CuTask, Freezable};
4use crate::reflect::{Reflect, TypePath};
5use cu29_clock::CuTime;
6use cu29_traits::{CuError, CuResult};
7use rayon::ThreadPool;
8use std::sync::{Arc, Mutex};
9
10struct AsyncState {
11 processing: bool,
12 ready_at: Option<CuTime>,
13 last_error: Option<CuError>,
14}
15
16fn record_async_error(state: &Mutex<AsyncState>, error: CuError) {
17 let mut guard = match state.lock() {
18 Ok(guard) => guard,
19 Err(poison) => poison.into_inner(),
20 };
21 guard.processing = false;
22 guard.ready_at = None;
23 guard.last_error = Some(error);
24}
25
26fn begin_background_poll<O>(
27 ctx: &CuContext,
28 state: &Mutex<AsyncState>,
29 buffered_output: &Mutex<CuMsg<O>>,
30 real_output: &mut CuMsg<O>,
31) -> CuResult<bool>
32where
33 O: CuMsgPayload + Send + 'static,
34{
35 {
36 let mut state = state.lock().map_err(|_| {
37 CuError::from("Async task state mutex poisoned while scheduling background work")
38 })?;
39 if let Some(error) = state.last_error.take() {
40 return Err(error);
41 }
42 if state.processing {
43 *real_output = CuMsg::default();
44 return Ok(false);
45 }
46
47 if let Some(ready_at) = state.ready_at
48 && ctx.now() < ready_at
49 {
50 *real_output = CuMsg::default();
51 return Ok(false);
52 }
53
54 state.processing = true;
55 state.ready_at = None;
56 }
57
58 let buffered_output = buffered_output.lock().map_err(|_| {
59 let error = CuError::from("Async task output mutex poisoned");
60 record_async_error(state, error.clone());
61 error
62 })?;
63 *real_output = buffered_output.clone();
64 Ok(true)
65}
66
67fn finalize_background_run<O>(
68 state: &Mutex<AsyncState>,
69 output_ref: &mut CuMsg<O>,
70 fallback_end: CuTime,
71 task_result: CuResult<()>,
72) where
73 O: CuMsgPayload + Send + 'static,
74{
75 let mut guard = state.lock().unwrap_or_else(|poison| poison.into_inner());
76 guard.processing = false;
77
78 match task_result {
79 Ok(()) => {
80 let end_from_metadata: Option<CuTime> = output_ref.metadata.process_time.end.into();
81 let end_time = end_from_metadata.unwrap_or_else(|| {
82 output_ref.metadata.process_time.end = fallback_end.into();
83 fallback_end
84 });
85 guard.ready_at = Some(end_time);
86 }
87 Err(error) => {
88 guard.ready_at = None;
89 guard.last_error = Some(error);
90 }
91 }
92}
93
94#[derive(Reflect)]
95#[reflect(no_field_bounds, from_reflect = false, type_path = false)]
96pub struct CuAsyncTask<T, O>
97where
98 T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
99 O: CuMsgPayload + Send + 'static,
100{
101 #[reflect(ignore)]
102 task: Arc<Mutex<T>>,
103 #[reflect(ignore)]
104 output: Arc<Mutex<CuMsg<O>>>,
105 #[reflect(ignore)]
106 state: Arc<Mutex<AsyncState>>,
107 #[reflect(ignore)]
108 tp: Arc<ThreadPool>,
109}
110
111impl<T, O> TypePath for CuAsyncTask<T, O>
112where
113 T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
114 O: CuMsgPayload + Send + 'static,
115{
116 fn type_path() -> &'static str {
117 "cu29_runtime::cuasynctask::CuAsyncTask"
118 }
119
120 fn short_type_path() -> &'static str {
121 "CuAsyncTask"
122 }
123
124 fn type_ident() -> Option<&'static str> {
125 Some("CuAsyncTask")
126 }
127
128 fn crate_name() -> Option<&'static str> {
129 Some("cu29_runtime")
130 }
131
132 fn module_path() -> Option<&'static str> {
133 Some("cuasynctask")
134 }
135}
136
137pub struct CuAsyncTaskResources<'r, T: CuTask> {
139 pub inner: T::Resources<'r>,
140 pub threadpool: Arc<ThreadPool>,
141}
142
143impl<T, O> CuAsyncTask<T, O>
144where
145 T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
146 O: CuMsgPayload + Send + 'static,
147{
148 #[allow(unused)]
149 pub fn new(
150 config: Option<&ComponentConfig>,
151 resources: T::Resources<'_>,
152 tp: Arc<ThreadPool>,
153 ) -> CuResult<Self> {
154 let task = Arc::new(Mutex::new(T::new(config, resources)?));
155 let output = Arc::new(Mutex::new(CuMsg::default()));
156 Ok(Self {
157 task,
158 output,
159 state: Arc::new(Mutex::new(AsyncState {
160 processing: false,
161 ready_at: None,
162 last_error: None,
163 })),
164 tp,
165 })
166 }
167}
168
169impl<T, O> Freezable for CuAsyncTask<T, O>
170where
171 T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
172 O: CuMsgPayload + Send + 'static,
173{
174}
175
176impl<T, I, O> CuTask for CuAsyncTask<T, O>
177where
178 T: for<'i, 'o> CuTask<Input<'i> = CuMsg<I>, Output<'o> = CuMsg<O>> + Send + 'static,
179 I: CuMsgPayload + Send + Sync + 'static,
180 O: CuMsgPayload + Send + 'static,
181{
182 type Resources<'r> = CuAsyncTaskResources<'r, T>;
183 type Input<'m> = T::Input<'m>;
184 type Output<'m> = T::Output<'m>;
185
186 fn new(config: Option<&ComponentConfig>, resources: Self::Resources<'_>) -> CuResult<Self>
187 where
188 Self: Sized,
189 {
190 CuAsyncTask::new(config, resources.inner, resources.threadpool)
191 }
192
193 fn start(&mut self, ctx: &CuContext) -> CuResult<()> {
194 let mut task = self
195 .task
196 .lock()
197 .map_err(|_| CuError::from("Async task mutex poisoned during start"))?;
198 task.start(ctx)
199 }
200
201 fn process<'i, 'o>(
202 &mut self,
203 ctx: &CuContext,
204 input: &Self::Input<'i>,
205 real_output: &mut Self::Output<'o>,
206 ) -> CuResult<()> {
207 if !begin_background_poll(ctx, &self.state, &self.output, real_output)? {
208 return Ok(());
209 }
210
211 self.tp.spawn_fifo({
213 let ctx = ctx.clone();
214 let input = (*input).clone();
215 let output = self.output.clone();
216 let task = self.task.clone();
217 let state = self.state.clone();
218 move || {
219 let input_ref: &CuMsg<I> = &input;
220 let mut output_guard = match output.lock() {
221 Ok(guard) => guard,
222 Err(_) => {
223 record_async_error(
224 &state,
225 CuError::from("Async task output mutex poisoned"),
226 );
227 return;
228 }
229 };
230 let output_ref: &mut CuMsg<O> = &mut output_guard;
231
232 *output_ref = CuMsg::default();
235
236 if output_ref.metadata.process_time.start.is_none() {
238 output_ref.metadata.process_time.start = ctx.now().into();
239 }
240 let task_result = match task.lock() {
241 Ok(mut task_guard) => task_guard.process(&ctx, input_ref, output_ref),
242 Err(poison) => Err(CuError::from(format!(
243 "Async task mutex poisoned: {poison}"
244 ))),
245 };
246 finalize_background_run(&state, output_ref, ctx.now(), task_result);
247 }
248 });
249 Ok(())
250 }
251
252 fn stop(&mut self, ctx: &CuContext) -> CuResult<()> {
253 let mut task = self
254 .task
255 .lock()
256 .map_err(|_| CuError::from("Async task mutex poisoned during stop"))?;
257 task.stop(ctx)
258 }
259}
260
261#[derive(Reflect)]
262#[reflect(no_field_bounds, from_reflect = false, type_path = false)]
263pub struct CuAsyncSrcTask<T, O>
264where
265 T: for<'m> CuSrcTask<Output<'m> = CuMsg<O>> + Send + 'static,
266 O: CuMsgPayload + Send + 'static,
267{
268 #[reflect(ignore)]
269 task: Arc<Mutex<T>>,
270 #[reflect(ignore)]
271 output: Arc<Mutex<CuMsg<O>>>,
272 #[reflect(ignore)]
273 state: Arc<Mutex<AsyncState>>,
274 #[reflect(ignore)]
275 tp: Arc<ThreadPool>,
276}
277
278impl<T, O> TypePath for CuAsyncSrcTask<T, O>
279where
280 T: for<'m> CuSrcTask<Output<'m> = CuMsg<O>> + Send + 'static,
281 O: CuMsgPayload + Send + 'static,
282{
283 fn type_path() -> &'static str {
284 "cu29_runtime::cuasynctask::CuAsyncSrcTask"
285 }
286
287 fn short_type_path() -> &'static str {
288 "CuAsyncSrcTask"
289 }
290
291 fn type_ident() -> Option<&'static str> {
292 Some("CuAsyncSrcTask")
293 }
294
295 fn crate_name() -> Option<&'static str> {
296 Some("cu29_runtime")
297 }
298
299 fn module_path() -> Option<&'static str> {
300 Some("cuasynctask")
301 }
302}
303
304pub struct CuAsyncSrcTaskResources<'r, T: CuSrcTask> {
306 pub inner: T::Resources<'r>,
307 pub threadpool: Arc<ThreadPool>,
308}
309
310impl<T, O> CuAsyncSrcTask<T, O>
311where
312 T: for<'m> CuSrcTask<Output<'m> = CuMsg<O>> + Send + 'static,
313 O: CuMsgPayload + Send + 'static,
314{
315 #[allow(unused)]
316 pub fn new(
317 config: Option<&ComponentConfig>,
318 resources: T::Resources<'_>,
319 tp: Arc<ThreadPool>,
320 ) -> CuResult<Self> {
321 let task = Arc::new(Mutex::new(T::new(config, resources)?));
322 let output = Arc::new(Mutex::new(CuMsg::default()));
323 Ok(Self {
324 task,
325 output,
326 state: Arc::new(Mutex::new(AsyncState {
327 processing: false,
328 ready_at: None,
329 last_error: None,
330 })),
331 tp,
332 })
333 }
334}
335
336impl<T, O> Freezable for CuAsyncSrcTask<T, O>
337where
338 T: for<'m> CuSrcTask<Output<'m> = CuMsg<O>> + Send + 'static,
339 O: CuMsgPayload + Send + 'static,
340{
341}
342
343impl<T, O> CuSrcTask for CuAsyncSrcTask<T, O>
344where
345 T: for<'m> CuSrcTask<Output<'m> = CuMsg<O>> + Send + 'static,
346 O: CuMsgPayload + Send + 'static,
347{
348 type Resources<'r> = CuAsyncSrcTaskResources<'r, T>;
349 type Output<'m> = T::Output<'m>;
350
351 fn new(config: Option<&ComponentConfig>, resources: Self::Resources<'_>) -> CuResult<Self>
352 where
353 Self: Sized,
354 {
355 CuAsyncSrcTask::new(config, resources.inner, resources.threadpool)
356 }
357
358 fn start(&mut self, ctx: &CuContext) -> CuResult<()> {
359 let mut task = self
360 .task
361 .lock()
362 .map_err(|_| CuError::from("Async source mutex poisoned during start"))?;
363 task.start(ctx)
364 }
365
366 fn process<'o>(&mut self, ctx: &CuContext, real_output: &mut Self::Output<'o>) -> CuResult<()> {
367 if !begin_background_poll(ctx, &self.state, &self.output, real_output)? {
368 return Ok(());
369 }
370
371 self.tp.spawn_fifo({
372 let ctx = ctx.clone();
373 let output = self.output.clone();
374 let task = self.task.clone();
375 let state = self.state.clone();
376 move || {
377 let mut output_guard = match output.lock() {
378 Ok(guard) => guard,
379 Err(_) => {
380 record_async_error(
381 &state,
382 CuError::from("Async task output mutex poisoned"),
383 );
384 return;
385 }
386 };
387 let output_ref: &mut CuMsg<O> = &mut output_guard;
388
389 *output_ref = CuMsg::default();
390
391 if output_ref.metadata.process_time.start.is_none() {
392 output_ref.metadata.process_time.start = ctx.now().into();
393 }
394 let task_result = match task.lock() {
395 Ok(mut task_guard) => task_guard.process(&ctx, output_ref),
396 Err(poison) => Err(CuError::from(format!(
397 "Async source mutex poisoned: {poison}"
398 ))),
399 };
400 finalize_background_run(&state, output_ref, ctx.now(), task_result);
401 }
402 });
403 Ok(())
404 }
405
406 fn stop(&mut self, ctx: &CuContext) -> CuResult<()> {
407 let mut task = self
408 .task
409 .lock()
410 .map_err(|_| CuError::from("Async source mutex poisoned during stop"))?;
411 task.stop(ctx)
412 }
413}
414
415#[cfg(test)]
416mod tests {
417 use super::*;
418 use crate::config::ComponentConfig;
419 use crate::cutask::CuMsg;
420 use crate::cutask::Freezable;
421 use crate::input_msg;
422 use crate::output_msg;
423 use cu29_traits::CuResult;
424 use rayon::ThreadPoolBuilder;
425 use std::borrow::BorrowMut;
426 use std::sync::OnceLock;
427 use std::sync::mpsc;
428 use std::time::Duration;
429
430 static READY_RX: OnceLock<Arc<Mutex<mpsc::Receiver<CuTime>>>> = OnceLock::new();
431 static DONE_TX: OnceLock<mpsc::Sender<()>> = OnceLock::new();
432 #[derive(Reflect)]
433 struct TestTask {}
434
435 impl Freezable for TestTask {}
436
437 impl CuTask for TestTask {
438 type Resources<'r> = ();
439 type Input<'m> = input_msg!(u32);
440 type Output<'m> = output_msg!(u32);
441
442 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
443 where
444 Self: Sized,
445 {
446 Ok(Self {})
447 }
448
449 fn process(
450 &mut self,
451 _ctx: &CuContext,
452 input: &Self::Input<'_>,
453 output: &mut Self::Output<'_>,
454 ) -> CuResult<()> {
455 output.borrow_mut().set_payload(*input.payload().unwrap());
456 Ok(())
457 }
458 }
459
460 #[test]
461 fn test_lifecycle() {
462 let tp = Arc::new(
463 rayon::ThreadPoolBuilder::new()
464 .num_threads(1)
465 .build()
466 .unwrap(),
467 );
468
469 let config = ComponentConfig::default();
470 let context = CuContext::new_with_clock();
471 let mut async_task: CuAsyncTask<TestTask, u32> =
472 CuAsyncTask::new(Some(&config), (), tp).unwrap();
473 let input = CuMsg::new(Some(42u32));
474 let mut output = CuMsg::new(None);
475
476 loop {
477 {
478 let output_ref: &mut CuMsg<u32> = &mut output;
479 async_task.process(&context, &input, output_ref).unwrap();
480 }
481
482 if let Some(val) = output.payload() {
483 assert_eq!(*val, 42u32);
484 break;
485 }
486 }
487 }
488
489 #[derive(Reflect)]
490 struct ControlledTask;
491
492 impl Freezable for ControlledTask {}
493
494 impl CuTask for ControlledTask {
495 type Resources<'r> = ();
496 type Input<'m> = input_msg!(u32);
497 type Output<'m> = output_msg!(u32);
498
499 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
500 where
501 Self: Sized,
502 {
503 Ok(Self {})
504 }
505
506 fn process(
507 &mut self,
508 ctx: &CuContext,
509 _input: &Self::Input<'_>,
510 output: &mut Self::Output<'_>,
511 ) -> CuResult<()> {
512 let rx = READY_RX
513 .get()
514 .expect("ready channel not set")
515 .lock()
516 .unwrap();
517 let ready_time = rx
518 .recv_timeout(Duration::from_secs(1))
519 .expect("timed out waiting for ready signal");
520
521 output.set_payload(ready_time.as_nanos() as u32);
522 output.metadata.process_time.start = ctx.now().into();
523 output.metadata.process_time.end = ready_time.into();
524
525 if let Some(done_tx) = DONE_TX.get() {
526 let _ = done_tx.send(());
527 }
528 Ok(())
529 }
530 }
531
532 fn wait_until_async_idle<T, O>(async_task: &CuAsyncTask<T, O>)
533 where
534 T: for<'m> CuTask<Output<'m> = CuMsg<O>> + Send + 'static,
535 O: CuMsgPayload + Send + 'static,
536 {
537 for _ in 0..100 {
538 let state = async_task.state.lock().unwrap();
539 if !state.processing {
540 return;
541 }
542 drop(state);
543 std::thread::sleep(Duration::from_millis(1));
544 }
545 panic!("background task never became idle");
546 }
547
548 fn wait_until_async_src_idle<T, O>(async_task: &CuAsyncSrcTask<T, O>)
549 where
550 T: for<'m> CuSrcTask<Output<'m> = CuMsg<O>> + Send + 'static,
551 O: CuMsgPayload + Send + 'static,
552 {
553 for _ in 0..100 {
554 let state = async_task.state.lock().unwrap();
555 if !state.processing {
556 return;
557 }
558 drop(state);
559 std::thread::sleep(Duration::from_millis(1));
560 }
561 panic!("background source never became idle");
562 }
563
564 #[derive(Clone)]
565 struct ActionTaskResources {
566 actions: Arc<Mutex<mpsc::Receiver<Option<u32>>>>,
567 done: mpsc::Sender<()>,
568 }
569
570 #[derive(Reflect)]
571 #[reflect(no_field_bounds, from_reflect = false)]
572 struct ActionTask {
573 #[reflect(ignore)]
574 actions: Arc<Mutex<mpsc::Receiver<Option<u32>>>>,
575 #[reflect(ignore)]
576 done: mpsc::Sender<()>,
577 }
578
579 impl Freezable for ActionTask {}
580
581 impl CuTask for ActionTask {
582 type Resources<'r> = ActionTaskResources;
583 type Input<'m> = input_msg!(u32);
584 type Output<'m> = output_msg!(u32);
585
586 fn new(config: Option<&ComponentConfig>, resources: Self::Resources<'_>) -> CuResult<Self>
587 where
588 Self: Sized,
589 {
590 let _ = config;
591 Ok(Self {
592 actions: resources.actions,
593 done: resources.done,
594 })
595 }
596
597 fn process(
598 &mut self,
599 _ctx: &CuContext,
600 _input: &Self::Input<'_>,
601 output: &mut Self::Output<'_>,
602 ) -> CuResult<()> {
603 let action = self
604 .actions
605 .lock()
606 .unwrap()
607 .recv_timeout(Duration::from_secs(1))
608 .expect("timed out waiting for action");
609 if let Some(value) = action {
610 output.set_payload(value);
611 }
612 let _ = self.done.send(());
613 Ok(())
614 }
615 }
616
617 #[derive(Reflect)]
618 #[reflect(no_field_bounds, from_reflect = false)]
619 struct ActionSrc {
620 #[reflect(ignore)]
621 actions: Arc<Mutex<mpsc::Receiver<Option<u32>>>>,
622 #[reflect(ignore)]
623 done: mpsc::Sender<()>,
624 }
625
626 impl Freezable for ActionSrc {}
627
628 impl CuSrcTask for ActionSrc {
629 type Resources<'r> = ActionTaskResources;
630 type Output<'m> = output_msg!(u32);
631
632 fn new(config: Option<&ComponentConfig>, resources: Self::Resources<'_>) -> CuResult<Self>
633 where
634 Self: Sized,
635 {
636 let _ = config;
637 Ok(Self {
638 actions: resources.actions,
639 done: resources.done,
640 })
641 }
642
643 fn process(&mut self, _ctx: &CuContext, output: &mut Self::Output<'_>) -> CuResult<()> {
644 let action = self
645 .actions
646 .lock()
647 .unwrap()
648 .recv_timeout(Duration::from_secs(1))
649 .expect("timed out waiting for source action");
650 if let Some(value) = action {
651 output.set_payload(value);
652 }
653 let _ = self.done.send(());
654 Ok(())
655 }
656 }
657
658 #[derive(Clone)]
659 struct ControlledSrcResources {
660 ready_times: Arc<Mutex<mpsc::Receiver<CuTime>>>,
661 done: mpsc::Sender<()>,
662 }
663
664 #[derive(Reflect)]
665 #[reflect(no_field_bounds, from_reflect = false)]
666 struct ControlledSrc {
667 #[reflect(ignore)]
668 ready_times: Arc<Mutex<mpsc::Receiver<CuTime>>>,
669 #[reflect(ignore)]
670 done: mpsc::Sender<()>,
671 }
672
673 impl Freezable for ControlledSrc {}
674
675 impl CuSrcTask for ControlledSrc {
676 type Resources<'r> = ControlledSrcResources;
677 type Output<'m> = output_msg!(u32);
678
679 fn new(config: Option<&ComponentConfig>, resources: Self::Resources<'_>) -> CuResult<Self>
680 where
681 Self: Sized,
682 {
683 let _ = config;
684 Ok(Self {
685 ready_times: resources.ready_times,
686 done: resources.done,
687 })
688 }
689
690 fn process(&mut self, ctx: &CuContext, output: &mut Self::Output<'_>) -> CuResult<()> {
691 let ready_time = self
692 .ready_times
693 .lock()
694 .unwrap()
695 .recv_timeout(Duration::from_secs(1))
696 .expect("timed out waiting for ready signal");
697 output.set_payload(ready_time.as_nanos() as u32);
698 output.metadata.process_time.start = ctx.now().into();
699 output.metadata.process_time.end = ready_time.into();
700 let _ = self.done.send(());
701 Ok(())
702 }
703 }
704
705 #[test]
706 fn background_clears_output_while_processing() {
707 let tp = Arc::new(ThreadPoolBuilder::new().num_threads(1).build().unwrap());
708 let context = CuContext::new_with_clock();
709 let (action_tx, action_rx) = mpsc::channel::<Option<u32>>();
710 let (done_tx, done_rx) = mpsc::channel::<()>();
711 let resources = ActionTaskResources {
712 actions: Arc::new(Mutex::new(action_rx)),
713 done: done_tx,
714 };
715
716 let mut async_task: CuAsyncTask<ActionTask, u32> =
717 CuAsyncTask::new(Some(&ComponentConfig::default()), resources, tp).unwrap();
718 let input = CuMsg::new(Some(1u32));
719 let mut output = CuMsg::new(None);
720
721 async_task.process(&context, &input, &mut output).unwrap();
722 assert!(output.payload().is_none());
723
724 output.set_payload(999);
725 async_task.process(&context, &input, &mut output).unwrap();
726 assert!(
727 output.payload().is_none(),
728 "background poll should clear stale output while the worker is still running"
729 );
730
731 action_tx.send(Some(7)).unwrap();
732 done_rx
733 .recv_timeout(Duration::from_secs(1))
734 .expect("background worker never finished");
735 }
736
737 #[test]
738 fn background_empty_run_does_not_reemit_previous_payload() {
739 let tp = Arc::new(ThreadPoolBuilder::new().num_threads(1).build().unwrap());
740 let context = CuContext::new_with_clock();
741 let (action_tx, action_rx) = mpsc::channel::<Option<u32>>();
742 let (done_tx, done_rx) = mpsc::channel::<()>();
743 let resources = ActionTaskResources {
744 actions: Arc::new(Mutex::new(action_rx)),
745 done: done_tx,
746 };
747
748 let mut async_task: CuAsyncTask<ActionTask, u32> =
749 CuAsyncTask::new(Some(&ComponentConfig::default()), resources, tp).unwrap();
750 let some_input = CuMsg::new(Some(1u32));
751 let no_input = CuMsg::new(None::<u32>);
752 let mut output = CuMsg::new(None);
753
754 action_tx.send(Some(42)).unwrap();
755 async_task
756 .process(&context, &some_input, &mut output)
757 .expect("failed to start first background run");
758 done_rx
759 .recv_timeout(Duration::from_secs(1))
760 .expect("first background run never finished");
761 wait_until_async_idle(&async_task);
762
763 action_tx.send(None).unwrap();
764 async_task
765 .process(&context, &no_input, &mut output)
766 .expect("failed to start empty background run");
767 assert_eq!(output.payload(), Some(&42));
768 done_rx
769 .recv_timeout(Duration::from_secs(1))
770 .expect("empty background run never finished");
771 wait_until_async_idle(&async_task);
772
773 action_tx.send(None).unwrap();
774 async_task
775 .process(&context, &no_input, &mut output)
776 .expect("failed to poll after empty background run");
777 assert!(
778 output.payload().is_none(),
779 "background task re-emitted the previous payload after an empty run"
780 );
781 done_rx
782 .recv_timeout(Duration::from_secs(1))
783 .expect("cleanup background run never finished");
784 }
785
786 #[test]
787 fn background_source_clears_output_while_processing() {
788 let tp = Arc::new(ThreadPoolBuilder::new().num_threads(1).build().unwrap());
789 let context = CuContext::new_with_clock();
790 let (action_tx, action_rx) = mpsc::channel::<Option<u32>>();
791 let (done_tx, done_rx) = mpsc::channel::<()>();
792 let resources = ActionTaskResources {
793 actions: Arc::new(Mutex::new(action_rx)),
794 done: done_tx,
795 };
796
797 let mut async_src: CuAsyncSrcTask<ActionSrc, u32> =
798 CuAsyncSrcTask::new(Some(&ComponentConfig::default()), resources, tp).unwrap();
799 let mut output = CuMsg::new(None);
800
801 async_src.process(&context, &mut output).unwrap();
802 assert!(output.payload().is_none());
803
804 output.set_payload(999);
805 async_src.process(&context, &mut output).unwrap();
806 assert!(
807 output.payload().is_none(),
808 "background source poll should clear stale output while the worker is still running"
809 );
810
811 action_tx.send(Some(7)).unwrap();
812 done_rx
813 .recv_timeout(Duration::from_secs(1))
814 .expect("background source never finished");
815 }
816
817 #[test]
818 fn background_source_empty_run_does_not_reemit_previous_payload() {
819 let tp = Arc::new(ThreadPoolBuilder::new().num_threads(1).build().unwrap());
820 let context = CuContext::new_with_clock();
821 let (action_tx, action_rx) = mpsc::channel::<Option<u32>>();
822 let (done_tx, done_rx) = mpsc::channel::<()>();
823 let resources = ActionTaskResources {
824 actions: Arc::new(Mutex::new(action_rx)),
825 done: done_tx,
826 };
827
828 let mut async_src: CuAsyncSrcTask<ActionSrc, u32> =
829 CuAsyncSrcTask::new(Some(&ComponentConfig::default()), resources, tp).unwrap();
830 let mut output = CuMsg::new(None);
831
832 action_tx.send(Some(42)).unwrap();
833 async_src
834 .process(&context, &mut output)
835 .expect("failed to start first background source run");
836 done_rx
837 .recv_timeout(Duration::from_secs(1))
838 .expect("first background source run never finished");
839 wait_until_async_src_idle(&async_src);
840
841 action_tx.send(None).unwrap();
842 async_src
843 .process(&context, &mut output)
844 .expect("failed to start empty background source run");
845 assert_eq!(output.payload(), Some(&42));
846 done_rx
847 .recv_timeout(Duration::from_secs(1))
848 .expect("empty background source run never finished");
849 wait_until_async_src_idle(&async_src);
850
851 action_tx.send(None).unwrap();
852 async_src
853 .process(&context, &mut output)
854 .expect("failed to poll background source after empty run");
855 assert!(
856 output.payload().is_none(),
857 "background source re-emitted the previous payload after an empty run"
858 );
859 done_rx
860 .recv_timeout(Duration::from_secs(1))
861 .expect("cleanup background source run never finished");
862 }
863
864 #[test]
865 fn background_respects_recorded_ready_time() {
866 let tp = Arc::new(ThreadPoolBuilder::new().num_threads(1).build().unwrap());
867 let (context, clock_mock) = CuContext::new_mock_clock();
868
869 let (ready_tx, ready_rx) = mpsc::channel::<CuTime>();
871 let (done_tx, done_rx) = mpsc::channel::<()>();
872 READY_RX
873 .set(Arc::new(Mutex::new(ready_rx)))
874 .expect("ready channel already set");
875 DONE_TX
876 .set(done_tx)
877 .expect("completion channel already set");
878
879 let mut async_task: CuAsyncTask<ControlledTask, u32> =
880 CuAsyncTask::new(Some(&ComponentConfig::default()), (), tp.clone()).unwrap();
881 let input = CuMsg::new(Some(1u32));
882 let mut output = CuMsg::new(None);
883
884 clock_mock.set_value(0);
886 async_task.process(&context, &input, &mut output).unwrap();
887 assert!(output.payload().is_none());
888
889 clock_mock.set_value(10);
891 async_task.process(&context, &input, &mut output).unwrap();
892 assert!(output.payload().is_none());
893
894 clock_mock.set_value(30);
896 ready_tx.send(CuTime::from(30u64)).unwrap();
897 done_rx
898 .recv_timeout(Duration::from_secs(1))
899 .expect("background task never finished");
900 let mut ready_at_recorded = None;
902 for _ in 0..100 {
903 let state = async_task.state.lock().unwrap();
904 if !state.processing {
905 ready_at_recorded = state.ready_at;
906 if ready_at_recorded.is_some() {
907 break;
908 }
909 }
910 drop(state);
911 std::thread::sleep(Duration::from_millis(1));
912 }
913 assert!(
914 ready_at_recorded.is_some(),
915 "background task finished without recording ready_at"
916 );
917
918 clock_mock.set_value(20);
920 async_task.process(&context, &input, &mut output).unwrap();
921 assert!(
922 output.payload().is_none(),
923 "Output surfaced before recorded ready time"
924 );
925
926 clock_mock.set_value(30);
928 async_task.process(&context, &input, &mut output).unwrap();
929 assert_eq!(output.payload(), Some(&30u32));
930
931 ready_tx.send(CuTime::from(40u64)).unwrap();
933 let _ = done_rx.recv_timeout(Duration::from_secs(1));
934 }
935
936 #[test]
937 fn background_source_respects_recorded_ready_time() {
938 let tp = Arc::new(ThreadPoolBuilder::new().num_threads(1).build().unwrap());
939 let (context, clock_mock) = CuContext::new_mock_clock();
940 let (ready_tx, ready_rx) = mpsc::channel::<CuTime>();
941 let (done_tx, done_rx) = mpsc::channel::<()>();
942 let resources = ControlledSrcResources {
943 ready_times: Arc::new(Mutex::new(ready_rx)),
944 done: done_tx,
945 };
946
947 let mut async_src: CuAsyncSrcTask<ControlledSrc, u32> =
948 CuAsyncSrcTask::new(Some(&ComponentConfig::default()), resources, tp.clone()).unwrap();
949 let mut output = CuMsg::new(None);
950
951 clock_mock.set_value(0);
952 async_src.process(&context, &mut output).unwrap();
953 assert!(output.payload().is_none());
954
955 clock_mock.set_value(10);
956 async_src.process(&context, &mut output).unwrap();
957 assert!(output.payload().is_none());
958
959 clock_mock.set_value(30);
960 ready_tx.send(CuTime::from(30u64)).unwrap();
961 done_rx
962 .recv_timeout(Duration::from_secs(1))
963 .expect("background source never finished");
964
965 let mut ready_at_recorded = None;
966 for _ in 0..100 {
967 let state = async_src.state.lock().unwrap();
968 if !state.processing {
969 ready_at_recorded = state.ready_at;
970 if ready_at_recorded.is_some() {
971 break;
972 }
973 }
974 drop(state);
975 std::thread::sleep(Duration::from_millis(1));
976 }
977 assert!(
978 ready_at_recorded.is_some(),
979 "background source finished without recording ready_at"
980 );
981
982 clock_mock.set_value(20);
983 async_src.process(&context, &mut output).unwrap();
984 assert!(
985 output.payload().is_none(),
986 "background source surfaced output before recorded ready time"
987 );
988
989 clock_mock.set_value(30);
990 async_src.process(&context, &mut output).unwrap();
991 assert_eq!(output.payload(), Some(&30u32));
992
993 ready_tx.send(CuTime::from(40u64)).unwrap();
994 let _ = done_rx.recv_timeout(Duration::from_secs(1));
995 }
996}