Skip to main content

cu29_runtime/
cutask.rs

1//! This module contains all the main definition of the traits you need to implement
2//! or interact with to create a Copper task.
3
4use crate::config::ComponentConfig;
5use crate::context::CuContext;
6use crate::reflect::Reflect;
7use bincode::de::{Decode, Decoder};
8use bincode::enc::{Encode, Encoder};
9use bincode::error::{DecodeError, EncodeError};
10use compact_str::{CompactString, ToCompactString};
11use core::any::{TypeId, type_name};
12use cu29_clock::{PartialCuTimeRange, Tov};
13use cu29_traits::{
14    COMPACT_STRING_CAPACITY, CuCompactString, CuError, CuMsgMetadataTrait, CuResult,
15    ErasedCuStampedData, Metadata,
16};
17use serde::de::DeserializeOwned;
18use serde::{Deserialize, Serialize};
19
20use alloc::format;
21use core::fmt::{Debug, Display, Formatter, Result as FmtResult};
22
23/// The state of a task.
24// Everything that is stateful in copper for zero copy constraints need to be restricted to this trait.
25pub trait CuMsgPayload:
26    Default + Debug + Clone + Encode + Decode<()> + Serialize + DeserializeOwned + Sized
27{
28}
29
30pub trait CuMsgPack {}
31
32// Also anything that follows this contract can be a payload (blanket implementation)
33impl<T> CuMsgPayload for T where
34    T: Default + Debug + Clone + Encode + Decode<()> + Serialize + DeserializeOwned + Sized
35{
36}
37
38macro_rules! impl_cu_msg_pack {
39    ($($name:ident),+) => {
40        impl<'cl, $($name),+> CuMsgPack for ($(&CuMsg<$name>,)+)
41        where
42            $($name: CuMsgPayload),+
43        {}
44    };
45}
46
47impl<T: CuMsgPayload> CuMsgPack for CuMsg<T> {}
48impl<T: CuMsgPayload> CuMsgPack for &CuMsg<T> {}
49impl<T: CuMsgPayload> CuMsgPack for (&CuMsg<T>,) {}
50impl CuMsgPack for () {}
51
52// Apply the macro to generate implementations for tuple sizes up to 5
53impl_cu_msg_pack!(T1, T2);
54impl_cu_msg_pack!(T1, T2, T3);
55impl_cu_msg_pack!(T1, T2, T3, T4);
56impl_cu_msg_pack!(T1, T2, T3, T4, T5);
57
58// A convenience macro to get from a payload or a list of payloads to a proper CuMsg or CuMsgPack
59// declaration for your tasks used for input messages.
60#[macro_export]
61macro_rules! input_msg {
62    ($lt:lifetime, $first:ty, $($rest:ty),+) => {
63        ( & $lt CuMsg<$first>, $( & $lt CuMsg<$rest> ),+ )
64    };
65    ($lt:lifetime, $ty:ty) => {
66        CuMsg<$ty>   // This is for backward compatibility
67    };
68    ($ty:ty) => {
69        CuMsg<$ty>
70    };
71}
72
73// A convenience macro to get from a payload to a proper CuMsg used as output.
74#[macro_export]
75macro_rules! output_msg {
76    ($lt:lifetime, $first:ty, $($rest:ty),+) => {
77        ( CuMsg<$first>, $( CuMsg<$rest> ),+ )
78    };
79    ($first:ty, $($rest:ty),+) => {
80        ( CuMsg<$first>, $( CuMsg<$rest> ),+ )
81    };
82    ($ty:ty) => {
83        CuMsg<$ty>
84    };
85    ($lt:lifetime, $ty:ty) => {
86        CuMsg<$ty>  // This is for backward compatibility
87    };
88}
89
90/// CuMsgMetadata is a structure that contains metadata common to all CuStampedDataSet.
91#[derive(Debug, Clone, bincode::Encode, bincode::Decode, Serialize, Deserialize)]
92pub struct CuMsgMetadata {
93    /// The time range used for the processing of this message
94    pub process_time: PartialCuTimeRange,
95    /// A small string for real time feedback purposes.
96    /// This is useful for to display on the field when the tasks are operating correctly.
97    pub status_txt: CuCompactString,
98}
99
100impl Metadata for CuMsgMetadata {}
101
102impl CuMsgMetadata {
103    pub fn set_status(&mut self, status: impl ToCompactString) {
104        self.status_txt = CuCompactString(status.to_compact_string());
105    }
106}
107
108impl CuMsgMetadataTrait for CuMsgMetadata {
109    fn process_time(&self) -> PartialCuTimeRange {
110        self.process_time
111    }
112
113    fn status_txt(&self) -> &CuCompactString {
114        &self.status_txt
115    }
116}
117
118impl Display for CuMsgMetadata {
119    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
120        write!(
121            f,
122            "process_time start: {}, process_time end: {}",
123            self.process_time.start, self.process_time.end
124        )
125    }
126}
127
128/// CuMsg is the envelope holding the msg payload and the metadata between tasks.
129#[derive(Default, Debug, Clone, bincode::Encode, bincode::Decode, Serialize, Deserialize)]
130#[serde(bound(
131    serialize = "T: Serialize, M: Serialize",
132    deserialize = "T: DeserializeOwned, M: DeserializeOwned"
133))]
134pub struct CuStampedData<T, M>
135where
136    T: CuMsgPayload,
137    M: Metadata,
138{
139    /// This payload is the actual data exchanged between tasks.
140    payload: Option<T>,
141
142    /// The time of validity of the message.
143    /// It can be undefined (None), one measure point or a range of measures (TimeRange).
144    pub tov: Tov,
145
146    /// This metadata is the data that is common to all messages.
147    pub metadata: M,
148}
149
150impl Default for CuMsgMetadata {
151    fn default() -> Self {
152        CuMsgMetadata {
153            process_time: PartialCuTimeRange::default(),
154            status_txt: CuCompactString(CompactString::with_capacity(COMPACT_STRING_CAPACITY)),
155        }
156    }
157}
158
159impl<T, M> CuStampedData<T, M>
160where
161    T: CuMsgPayload,
162    M: Metadata,
163{
164    pub fn new(payload: Option<T>) -> Self {
165        CuStampedData {
166            payload,
167            tov: Tov::default(),
168            metadata: M::default(),
169        }
170    }
171    pub fn payload(&self) -> Option<&T> {
172        self.payload.as_ref()
173    }
174
175    pub fn set_payload(&mut self, payload: T) {
176        self.payload = Some(payload);
177    }
178
179    pub fn clear_payload(&mut self) {
180        self.payload = None;
181    }
182
183    pub fn payload_mut(&mut self) -> &mut Option<T> {
184        &mut self.payload
185    }
186}
187
188impl<T, M> ErasedCuStampedData for CuStampedData<T, M>
189where
190    T: CuMsgPayload,
191    M: CuMsgMetadataTrait + Metadata,
192{
193    fn payload(&self) -> Option<&dyn erased_serde::Serialize> {
194        self.payload
195            .as_ref()
196            .map(|p| p as &dyn erased_serde::Serialize)
197    }
198
199    fn tov(&self) -> Tov {
200        self.tov
201    }
202
203    fn metadata(&self) -> &dyn CuMsgMetadataTrait {
204        &self.metadata
205    }
206}
207
208/// This is the robotics message type for Copper with the correct Metadata type
209/// that will be used by the runtime.
210pub type CuMsg<T> = CuStampedData<T, CuMsgMetadata>;
211
212impl<T: CuMsgPayload> CuStampedData<T, CuMsgMetadata> {
213    /// Reinterprets the payload type carried by this message.
214    ///
215    /// # Safety
216    ///
217    /// The caller must guarantee that the message really contains a payload of type `U`. Failing
218    /// to do so is undefined behaviour.
219    pub unsafe fn assume_payload<U: CuMsgPayload>(&self) -> &CuMsg<U> {
220        // SAFETY: Caller guarantees that the underlying payload is of type U.
221        unsafe { &*(self as *const CuMsg<T> as *const CuMsg<U>) }
222    }
223
224    /// Mutable variant of [`assume_payload`](Self::assume_payload).
225    ///
226    /// # Safety
227    ///
228    /// The caller must guarantee that mutating the returned message is sound for the actual
229    /// payload type stored in the buffer.
230    pub unsafe fn assume_payload_mut<U: CuMsgPayload>(&mut self) -> &mut CuMsg<U> {
231        // SAFETY: Caller guarantees that the underlying payload is of type U.
232        unsafe { &mut *(self as *mut CuMsg<T> as *mut CuMsg<U>) }
233    }
234}
235
236impl<T: CuMsgPayload + 'static> CuStampedData<T, CuMsgMetadata> {
237    fn downcast_err<U: CuMsgPayload + 'static>() -> CuError {
238        CuError::from(format!(
239            "CuMsg payload mismatch: {} cannot be reinterpreted as {}",
240            type_name::<T>(),
241            type_name::<U>()
242        ))
243    }
244
245    /// Attempts to view this message as carrying payload `U`.
246    pub fn downcast_ref<U: CuMsgPayload + 'static>(&self) -> CuResult<&CuMsg<U>> {
247        if TypeId::of::<T>() == TypeId::of::<U>() {
248            // SAFETY: We just proved that T == U.
249            Ok(unsafe { self.assume_payload::<U>() })
250        } else {
251            Err(Self::downcast_err::<U>())
252        }
253    }
254
255    /// Mutable variant of [`downcast_ref`](Self::downcast_ref).
256    pub fn downcast_mut<U: CuMsgPayload + 'static>(&mut self) -> CuResult<&mut CuMsg<U>> {
257        if TypeId::of::<T>() == TypeId::of::<U>() {
258            // SAFETY: We just proved that T == U.
259            Ok(unsafe { self.assume_payload_mut::<U>() })
260        } else {
261            Err(Self::downcast_err::<U>())
262        }
263    }
264}
265
266/// The internal state of a task needs to be serializable
267/// so the framework can take a snapshot of the task graph.
268pub trait Freezable {
269    /// This method is called by the framework when it wants to save the task state.
270    /// The default implementation is to encode nothing (stateless).
271    /// If you have a state, you need to implement this method.
272    fn freeze<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
273        Encode::encode(&(), encoder) // default is stateless
274    }
275
276    /// This method is called by the framework when it wants to restore the task to a specific state.
277    /// Here it is similar to Decode but the framework will give you a new instance of the task (the new method will be called)
278    fn thaw<D: Decoder>(&mut self, _decoder: &mut D) -> Result<(), DecodeError> {
279        Ok(())
280    }
281}
282
283/// Bincode Adapter for Freezable tasks
284/// This allows the use of the bincode API directly to freeze and thaw tasks.
285pub struct BincodeAdapter<'a, T: Freezable + ?Sized>(pub &'a T);
286
287impl<'a, T: Freezable + ?Sized> Encode for BincodeAdapter<'a, T> {
288    fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
289        self.0.freeze(encoder)
290    }
291}
292
293/// A Src Task is a task that only produces messages. For example drivers for sensors are Src Tasks.
294/// They are in push mode from the runtime.
295/// To set the frequency of the pulls and align them to any hw, see the runtime configuration.
296/// Note: A source has the privilege to have a clock passed to it vs a frozen clock.
297pub trait CuSrcTask: Freezable + Reflect {
298    type Output<'m>: CuMsgPayload;
299    /// Resources required by the task.
300    type Resources<'r>;
301
302    /// Here you need to initialize everything your task will need for the duration of its lifetime.
303    /// The config allows you to access the configuration of the task.
304    fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
305    where
306        Self: Sized;
307
308    /// Start is called between the creation of the task and the first call to pre/process.
309    fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
310        Ok(())
311    }
312
313    /// This is a method called by the runtime before "process". This is a kind of best effort,
314    /// as soon as possible call to give a chance for the task to do some work before to prepare
315    /// to make "process" as short as possible.
316    fn preprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
317        Ok(())
318    }
319
320    /// Process is the most critical execution of the task.
321    /// The goal will be to produce the output message as soon as possible.
322    /// Use preprocess to prepare the task to make this method as short as possible.
323    fn process<'o>(&mut self, ctx: &CuContext, new_msg: &mut Self::Output<'o>) -> CuResult<()>;
324
325    /// This is a method called by the runtime after "process". It is best effort a chance for
326    /// the task to update some state after process is out of the way.
327    /// It can be use for example to maintain statistics etc. that are not time-critical for the robot.
328    fn postprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
329        Ok(())
330    }
331
332    /// Called to stop the task. It signals that the *process method won't be called until start is called again.
333    fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
334        Ok(())
335    }
336}
337
338/// This is the most generic Task of copper. It is a "transform" task deriving an output from an input.
339pub trait CuTask: Freezable + Reflect {
340    type Input<'m>: CuMsgPack;
341    type Output<'m>: CuMsgPayload;
342    /// Resources required by the task.
343    type Resources<'r>;
344
345    /// Here you need to initialize everything your task will need for the duration of its lifetime.
346    /// The config allows you to access the configuration of the task.
347    fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
348    where
349        Self: Sized;
350
351    /// Start is called between the creation of the task and the first call to pre/process.
352    fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
353        Ok(())
354    }
355
356    /// This is a method called by the runtime before "process". This is a kind of best effort,
357    /// as soon as possible call to give a chance for the task to do some work before to prepare
358    /// to make "process" as short as possible.
359    fn preprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
360        Ok(())
361    }
362
363    /// Process is the most critical execution of the task.
364    /// The goal will be to produce the output message as soon as possible.
365    /// Use preprocess to prepare the task to make this method as short as possible.
366    fn process<'i, 'o>(
367        &mut self,
368        _ctx: &CuContext,
369        input: &Self::Input<'i>,
370        output: &mut Self::Output<'o>,
371    ) -> CuResult<()>;
372
373    /// This is a method called by the runtime after "process". It is best effort a chance for
374    /// the task to update some state after process is out of the way.
375    /// It can be use for example to maintain statistics etc. that are not time-critical for the robot.
376    fn postprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
377        Ok(())
378    }
379
380    /// Called to stop the task. It signals that the *process method won't be called until start is called again.
381    fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
382        Ok(())
383    }
384}
385
386/// A Sink Task is a task that only consumes messages. For example drivers for actuators are Sink Tasks.
387pub trait CuSinkTask: Freezable + Reflect {
388    type Input<'m>: CuMsgPack;
389    /// Resources required by the task.
390    type Resources<'r>;
391
392    /// Here you need to initialize everything your task will need for the duration of its lifetime.
393    /// The config allows you to access the configuration of the task.
394    fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
395    where
396        Self: Sized;
397
398    /// Start is called between the creation of the task and the first call to pre/process.
399    fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
400        Ok(())
401    }
402
403    /// This is a method called by the runtime before "process". This is a kind of best effort,
404    /// as soon as possible call to give a chance for the task to do some work before to prepare
405    /// to make "process" as short as possible.
406    fn preprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
407        Ok(())
408    }
409
410    /// Process is the most critical execution of the task.
411    /// The goal will be to produce the output message as soon as possible.
412    /// Use preprocess to prepare the task to make this method as short as possible.
413    fn process<'i>(&mut self, _ctx: &CuContext, input: &Self::Input<'i>) -> CuResult<()>;
414
415    /// This is a method called by the runtime after "process". It is best effort a chance for
416    /// the task to update some state after process is out of the way.
417    /// It can be use for example to maintain statistics etc. that are not time-critical for the robot.
418    fn postprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
419        Ok(())
420    }
421
422    /// Called to stop the task. It signals that the *process method won't be called until start is called again.
423    fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
424        Ok(())
425    }
426}
427
428#[cfg(test)]
429mod tests {
430    use super::*;
431    use bincode::{config, decode_from_slice, encode_to_vec};
432
433    #[test]
434    fn test_cucompactstr_encode_decode() {
435        let cstr = CuCompactString(CompactString::from("hello"));
436        let config = config::standard();
437        let encoded = encode_to_vec(&cstr, config).expect("Encoding failed");
438        let (decoded, _): (CuCompactString, usize) =
439            decode_from_slice(&encoded, config).expect("Decoding failed");
440        assert_eq!(cstr.0, decoded.0);
441    }
442}