Skip to main content

cu29_runtime/
cutask.rs

1//! This module contains all the main definition of the traits you need to implement
2//! or interact with to create a Copper task.
3
4use crate::config::ComponentConfig;
5use crate::context::CuContext;
6use crate::reflect::Reflect;
7#[cfg(feature = "reflect")]
8use crate::reflect::TypePath;
9#[cfg(feature = "reflect")]
10use bevy_reflect;
11use bincode::de::{Decode, Decoder};
12use bincode::enc::{Encode, Encoder};
13use bincode::error::{DecodeError, EncodeError};
14use compact_str::{CompactString, ToCompactString};
15use core::any::{TypeId, type_name};
16use cu29_clock::{PartialCuTimeRange, Tov};
17use cu29_traits::{
18    COMPACT_STRING_CAPACITY, CuCompactString, CuError, CuMsgMetadataTrait, CuMsgOrigin, CuResult,
19    ErasedCuStampedData, Metadata,
20};
21use serde::de::DeserializeOwned;
22use serde::{Deserialize, Serialize};
23
24use alloc::format;
25use core::fmt::{Debug, Display, Formatter, Result as FmtResult};
26
27/// The state of a task.
28// Everything that is stateful in copper for zero copy constraints need to be restricted to this trait.
29#[cfg(feature = "reflect")]
30pub trait CuMsgPayload:
31    Default
32    + Debug
33    + Clone
34    + Encode
35    + Decode<()>
36    + Serialize
37    + DeserializeOwned
38    + Reflect
39    + TypePath
40    + Sized
41{
42}
43
44#[cfg(not(feature = "reflect"))]
45pub trait CuMsgPayload:
46    Default + Debug + Clone + Encode + Decode<()> + Serialize + DeserializeOwned + Reflect + Sized
47{
48}
49
50pub trait CuMsgPack {}
51
52// Also anything that follows this contract can be a payload (blanket implementation)
53#[cfg(feature = "reflect")]
54impl<T> CuMsgPayload for T where
55    T: Default
56        + Debug
57        + Clone
58        + Encode
59        + Decode<()>
60        + Serialize
61        + DeserializeOwned
62        + Reflect
63        + TypePath
64        + Sized
65{
66}
67
68#[cfg(not(feature = "reflect"))]
69impl<T> CuMsgPayload for T where
70    T: Default
71        + Debug
72        + Clone
73        + Encode
74        + Decode<()>
75        + Serialize
76        + DeserializeOwned
77        + Reflect
78        + Sized
79{
80}
81
82macro_rules! impl_cu_msg_pack {
83    ($($name:ident),+) => {
84        impl<'cl, $($name),+> CuMsgPack for ($(&CuMsg<$name>,)+)
85        where
86            $($name: CuMsgPayload),+
87        {}
88    };
89}
90
91macro_rules! impl_cu_msg_pack_up_to {
92    ($first:ident, $second:ident $(, $rest:ident)* $(,)?) => {
93        impl_cu_msg_pack!($first, $second);
94        impl_cu_msg_pack_up_to!(@accumulate ($first, $second); $($rest),*);
95    };
96    (@accumulate ($($acc:ident),+);) => {};
97    (@accumulate ($($acc:ident),+); $next:ident $(, $rest:ident)*) => {
98        impl_cu_msg_pack!($($acc),+, $next);
99        impl_cu_msg_pack_up_to!(@accumulate ($($acc),+, $next); $($rest),*);
100    };
101}
102
103impl<T: CuMsgPayload> CuMsgPack for CuMsg<T> {}
104impl<T: CuMsgPayload> CuMsgPack for &CuMsg<T> {}
105impl<T: CuMsgPayload> CuMsgPack for (&CuMsg<T>,) {}
106impl CuMsgPack for () {}
107
108// Apply the macro to generate implementations for tuple sizes up to 12.
109impl_cu_msg_pack_up_to!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12);
110
111// A convenience macro to get from a payload or a list of payloads to a proper CuMsg or CuMsgPack
112// declaration for your tasks used for input messages.
113#[macro_export]
114macro_rules! input_msg {
115    ($lt:lifetime, $first:ty, $($rest:ty),+) => {
116        ( & $lt CuMsg<$first>, $( & $lt CuMsg<$rest> ),+ )
117    };
118    ($lt:lifetime, $ty:ty) => {
119        CuMsg<$ty>   // This is for backward compatibility
120    };
121    ($ty:ty) => {
122        CuMsg<$ty>
123    };
124}
125
126// A convenience macro to get from a payload to a proper CuMsg used as output.
127#[macro_export]
128macro_rules! output_msg {
129    ($lt:lifetime, $first:ty, $($rest:ty),+) => {
130        ( CuMsg<$first>, $( CuMsg<$rest> ),+ )
131    };
132    ($first:ty, $($rest:ty),+) => {
133        ( CuMsg<$first>, $( CuMsg<$rest> ),+ )
134    };
135    ($ty:ty) => {
136        CuMsg<$ty>
137    };
138    ($lt:lifetime, $ty:ty) => {
139        CuMsg<$ty>  // This is for backward compatibility
140    };
141}
142
143/// CuMsgMetadata is a structure that contains metadata common to all CuStampedDataSet.
144#[derive(Debug, Clone, bincode::Encode, bincode::Decode, Serialize, Deserialize, Reflect)]
145#[reflect(opaque, from_reflect = false, no_field_bounds)]
146pub struct CuMsgMetadata {
147    /// The time range used for the processing of this message
148    pub process_time: PartialCuTimeRange,
149    /// A small string for real time feedback purposes.
150    /// This is useful for to display on the field when the tasks are operating correctly.
151    pub status_txt: CuCompactString,
152    /// Remote Copper provenance captured on receive, when available.
153    pub origin: Option<CuMsgOrigin>,
154}
155
156impl Metadata for CuMsgMetadata {}
157
158impl CuMsgMetadata {
159    pub fn set_status(&mut self, status: impl ToCompactString) {
160        self.status_txt = CuCompactString(status.to_compact_string());
161    }
162
163    pub fn set_origin(&mut self, origin: CuMsgOrigin) {
164        self.origin = Some(origin);
165    }
166
167    pub fn clear_origin(&mut self) {
168        self.origin = None;
169    }
170}
171
172impl CuMsgMetadataTrait for CuMsgMetadata {
173    fn process_time(&self) -> PartialCuTimeRange {
174        self.process_time
175    }
176
177    fn status_txt(&self) -> &CuCompactString {
178        &self.status_txt
179    }
180
181    fn origin(&self) -> Option<&CuMsgOrigin> {
182        self.origin.as_ref()
183    }
184}
185
186impl Display for CuMsgMetadata {
187    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
188        write!(
189            f,
190            "process_time start: {}, process_time end: {}",
191            self.process_time.start, self.process_time.end
192        )
193    }
194}
195
196/// CuMsg is the envelope holding the msg payload and the metadata between tasks.
197#[derive(Default, Debug, Clone, bincode::Decode, Serialize, Deserialize, Reflect)]
198#[reflect(opaque, from_reflect = false, no_field_bounds)]
199#[serde(bound(
200    serialize = "T: Serialize, M: Serialize",
201    deserialize = "T: DeserializeOwned, M: DeserializeOwned"
202))]
203pub struct CuStampedData<T, M>
204where
205    T: CuMsgPayload,
206    M: Metadata,
207{
208    /// This payload is the actual data exchanged between tasks.
209    payload: Option<T>,
210
211    /// The time of validity of the message.
212    /// It can be undefined (None), one measure point or a range of measures (TimeRange).
213    pub tov: Tov,
214
215    /// This metadata is the data that is common to all messages.
216    pub metadata: M,
217}
218
219impl<T, M> Encode for CuStampedData<T, M>
220where
221    T: CuMsgPayload,
222    M: Metadata,
223{
224    fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
225        match &self.payload {
226            None => {
227                0u8.encode(encoder)?;
228            }
229            Some(payload) => {
230                1u8.encode(encoder)?;
231                let encoded_start = cu29_traits::observed_encode_bytes();
232                let handle_start = crate::monitoring::current_payload_handle_bytes();
233                payload.encode(encoder)?;
234                let encoded_bytes =
235                    cu29_traits::observed_encode_bytes().saturating_sub(encoded_start);
236                let handle_bytes =
237                    crate::monitoring::current_payload_handle_bytes().saturating_sub(handle_start);
238                crate::monitoring::record_current_slot_payload_io_stats(
239                    core::mem::size_of::<T>(),
240                    encoded_bytes,
241                    handle_bytes,
242                );
243            }
244        }
245        self.tov.encode(encoder)?;
246        self.metadata.encode(encoder)?;
247        Ok(())
248    }
249}
250
251impl Default for CuMsgMetadata {
252    fn default() -> Self {
253        CuMsgMetadata {
254            process_time: PartialCuTimeRange::default(),
255            status_txt: CuCompactString(CompactString::with_capacity(COMPACT_STRING_CAPACITY)),
256            origin: None,
257        }
258    }
259}
260
261impl<T, M> CuStampedData<T, M>
262where
263    T: CuMsgPayload,
264    M: Metadata,
265{
266    pub(crate) fn from_parts(payload: Option<T>, tov: Tov, metadata: M) -> Self {
267        CuStampedData {
268            payload,
269            tov,
270            metadata,
271        }
272    }
273
274    pub fn new(payload: Option<T>) -> Self {
275        Self::from_parts(payload, Tov::default(), M::default())
276    }
277    pub fn payload(&self) -> Option<&T> {
278        self.payload.as_ref()
279    }
280
281    pub fn set_payload(&mut self, payload: T) {
282        self.payload = Some(payload);
283    }
284
285    pub fn clear_payload(&mut self) {
286        self.payload = None;
287    }
288
289    pub fn payload_mut(&mut self) -> &mut Option<T> {
290        &mut self.payload
291    }
292}
293
294impl<T, M> ErasedCuStampedData for CuStampedData<T, M>
295where
296    T: CuMsgPayload,
297    M: CuMsgMetadataTrait + Metadata,
298{
299    fn payload(&self) -> Option<&dyn erased_serde::Serialize> {
300        self.payload
301            .as_ref()
302            .map(|p| p as &dyn erased_serde::Serialize)
303    }
304
305    #[cfg(feature = "reflect")]
306    fn payload_reflect(&self) -> Option<&dyn cu29_traits::Reflect> {
307        self.payload
308            .as_ref()
309            .map(|p| p as &dyn cu29_traits::Reflect)
310    }
311
312    fn tov(&self) -> Tov {
313        self.tov
314    }
315
316    fn metadata(&self) -> &dyn CuMsgMetadataTrait {
317        &self.metadata
318    }
319}
320
321/// This is the robotics message type for Copper with the correct Metadata type
322/// that will be used by the runtime.
323pub type CuMsg<T> = CuStampedData<T, CuMsgMetadata>;
324
325impl<T: CuMsgPayload> CuStampedData<T, CuMsgMetadata> {
326    /// Reinterprets the payload type carried by this message.
327    ///
328    /// # Safety
329    ///
330    /// The caller must guarantee that the message really contains a payload of type `U`. Failing
331    /// to do so is undefined behaviour.
332    pub unsafe fn assume_payload<U: CuMsgPayload>(&self) -> &CuMsg<U> {
333        // SAFETY: Caller guarantees that the underlying payload is of type U.
334        unsafe { &*(self as *const CuMsg<T> as *const CuMsg<U>) }
335    }
336
337    /// Mutable variant of [`assume_payload`](Self::assume_payload).
338    ///
339    /// # Safety
340    ///
341    /// The caller must guarantee that mutating the returned message is sound for the actual
342    /// payload type stored in the buffer.
343    pub unsafe fn assume_payload_mut<U: CuMsgPayload>(&mut self) -> &mut CuMsg<U> {
344        // SAFETY: Caller guarantees that the underlying payload is of type U.
345        unsafe { &mut *(self as *mut CuMsg<T> as *mut CuMsg<U>) }
346    }
347}
348
349impl<T: CuMsgPayload + 'static> CuStampedData<T, CuMsgMetadata> {
350    fn downcast_err<U: CuMsgPayload + 'static>() -> CuError {
351        CuError::from(format!(
352            "CuMsg payload mismatch: {} cannot be reinterpreted as {}",
353            type_name::<T>(),
354            type_name::<U>()
355        ))
356    }
357
358    /// Attempts to view this message as carrying payload `U`.
359    pub fn downcast_ref<U: CuMsgPayload + 'static>(&self) -> CuResult<&CuMsg<U>> {
360        if TypeId::of::<T>() == TypeId::of::<U>() {
361            // SAFETY: We just proved that T == U.
362            Ok(unsafe { self.assume_payload::<U>() })
363        } else {
364            Err(Self::downcast_err::<U>())
365        }
366    }
367
368    /// Mutable variant of [`downcast_ref`](Self::downcast_ref).
369    pub fn downcast_mut<U: CuMsgPayload + 'static>(&mut self) -> CuResult<&mut CuMsg<U>> {
370        if TypeId::of::<T>() == TypeId::of::<U>() {
371            // SAFETY: We just proved that T == U.
372            Ok(unsafe { self.assume_payload_mut::<U>() })
373        } else {
374            Err(Self::downcast_err::<U>())
375        }
376    }
377}
378
379/// The internal state of a task needs to be serializable
380/// so the framework can take a snapshot of the task graph.
381pub trait Freezable {
382    /// This method is called by the framework when it wants to save the task state.
383    /// The default implementation is to encode nothing (stateless).
384    /// If you have a state, you need to implement this method.
385    fn freeze<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
386        Encode::encode(&(), encoder) // default is stateless
387    }
388
389    /// This method is called by the framework when it wants to restore the task to a specific state.
390    /// Here it is similar to Decode but the framework will give you a new instance of the task (the new method will be called)
391    fn thaw<D: Decoder>(&mut self, _decoder: &mut D) -> Result<(), DecodeError> {
392        Ok(())
393    }
394}
395
396/// Bincode Adapter for Freezable tasks
397/// This allows the use of the bincode API directly to freeze and thaw tasks.
398pub struct BincodeAdapter<'a, T: Freezable + ?Sized>(pub &'a T);
399
400impl<'a, T: Freezable + ?Sized> Encode for BincodeAdapter<'a, T> {
401    fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
402        self.0.freeze(encoder)
403    }
404}
405
406/// A Src Task is a task that only produces messages. For example drivers for sensors are Src Tasks.
407/// They are in push mode from the runtime.
408/// To set the frequency of the pulls and align them to any hw, see the runtime configuration.
409/// Note: A source has the privilege to have a clock passed to it vs a frozen clock.
410pub trait CuSrcTask: Freezable + Reflect {
411    type Output<'m>: CuMsgPayload;
412    /// Resources required by the task.
413    type Resources<'r>;
414
415    /// Here you need to initialize everything your task will need for the duration of its lifetime.
416    /// The config allows you to access the configuration of the task.
417    fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
418    where
419        Self: Sized;
420
421    /// Start is called between the creation of the task and the first call to pre/process.
422    fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
423        Ok(())
424    }
425
426    /// This is a method called by the runtime before "process". This is a kind of best effort,
427    /// as soon as possible call to give a chance for the task to do some work before to prepare
428    /// to make "process" as short as possible.
429    fn preprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
430        Ok(())
431    }
432
433    /// Process is the most critical execution of the task.
434    /// The goal will be to produce the output message as soon as possible.
435    /// Use preprocess to prepare the task to make this method as short as possible.
436    fn process<'o>(&mut self, ctx: &CuContext, new_msg: &mut Self::Output<'o>) -> CuResult<()>;
437
438    /// This is a method called by the runtime after "process". It is best effort a chance for
439    /// the task to update some state after process is out of the way.
440    /// It can be use for example to maintain statistics etc. that are not time-critical for the robot.
441    fn postprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
442        Ok(())
443    }
444
445    /// Called to stop the task. It signals that the *process method won't be called until start is called again.
446    fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
447        Ok(())
448    }
449}
450
451/// This is the most generic Task of copper. It is a "transform" task deriving an output from an input.
452pub trait CuTask: Freezable + Reflect {
453    type Input<'m>: CuMsgPack;
454    type Output<'m>: CuMsgPayload;
455    /// Resources required by the task.
456    type Resources<'r>;
457
458    /// Here you need to initialize everything your task will need for the duration of its lifetime.
459    /// The config allows you to access the configuration of the task.
460    fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
461    where
462        Self: Sized;
463
464    /// Start is called between the creation of the task and the first call to pre/process.
465    fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
466        Ok(())
467    }
468
469    /// This is a method called by the runtime before "process". This is a kind of best effort,
470    /// as soon as possible call to give a chance for the task to do some work before to prepare
471    /// to make "process" as short as possible.
472    fn preprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
473        Ok(())
474    }
475
476    /// Process is the most critical execution of the task.
477    /// The goal will be to produce the output message as soon as possible.
478    /// Use preprocess to prepare the task to make this method as short as possible.
479    fn process<'i, 'o>(
480        &mut self,
481        _ctx: &CuContext,
482        input: &Self::Input<'i>,
483        output: &mut Self::Output<'o>,
484    ) -> CuResult<()>;
485
486    /// This is a method called by the runtime after "process". It is best effort a chance for
487    /// the task to update some state after process is out of the way.
488    /// It can be use for example to maintain statistics etc. that are not time-critical for the robot.
489    fn postprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
490        Ok(())
491    }
492
493    /// Called to stop the task. It signals that the *process method won't be called until start is called again.
494    fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
495        Ok(())
496    }
497}
498
499/// A Sink Task is a task that only consumes messages. For example drivers for actuators are Sink Tasks.
500pub trait CuSinkTask: Freezable + Reflect {
501    type Input<'m>: CuMsgPack;
502    /// Resources required by the task.
503    type Resources<'r>;
504
505    /// Here you need to initialize everything your task will need for the duration of its lifetime.
506    /// The config allows you to access the configuration of the task.
507    fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
508    where
509        Self: Sized;
510
511    /// Start is called between the creation of the task and the first call to pre/process.
512    fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
513        Ok(())
514    }
515
516    /// This is a method called by the runtime before "process". This is a kind of best effort,
517    /// as soon as possible call to give a chance for the task to do some work before to prepare
518    /// to make "process" as short as possible.
519    fn preprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
520        Ok(())
521    }
522
523    /// Process is the most critical execution of the task.
524    /// The goal will be to produce the output message as soon as possible.
525    /// Use preprocess to prepare the task to make this method as short as possible.
526    fn process<'i>(&mut self, _ctx: &CuContext, input: &Self::Input<'i>) -> CuResult<()>;
527
528    /// This is a method called by the runtime after "process". It is best effort a chance for
529    /// the task to update some state after process is out of the way.
530    /// It can be use for example to maintain statistics etc. that are not time-critical for the robot.
531    fn postprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
532        Ok(())
533    }
534
535    /// Called to stop the task. It signals that the *process method won't be called until start is called again.
536    fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
537        Ok(())
538    }
539}
540
541#[cfg(test)]
542mod tests {
543    use super::*;
544    use bincode::{config, decode_from_slice, encode_to_vec};
545
546    #[test]
547    fn test_cucompactstr_encode_decode() {
548        let cstr = CuCompactString(CompactString::from("hello"));
549        let config = config::standard();
550        let encoded = encode_to_vec(&cstr, config).expect("Encoding failed");
551        let (decoded, _): (CuCompactString, usize) =
552            decode_from_slice(&encoded, config).expect("Decoding failed");
553        assert_eq!(cstr.0, decoded.0);
554    }
555}