1use crate::config::ComponentConfig;
5use crate::context::CuContext;
6use crate::reflect::Reflect;
7use bincode::de::{Decode, Decoder};
8use bincode::enc::{Encode, Encoder};
9use bincode::error::{DecodeError, EncodeError};
10use compact_str::{CompactString, ToCompactString};
11use core::any::{TypeId, type_name};
12use cu29_clock::{PartialCuTimeRange, Tov};
13use cu29_traits::{
14 COMPACT_STRING_CAPACITY, CuCompactString, CuError, CuMsgMetadataTrait, CuResult,
15 ErasedCuStampedData, Metadata,
16};
17use serde::de::DeserializeOwned;
18use serde::{Deserialize, Serialize};
19
20use alloc::format;
21use core::fmt::{Debug, Display, Formatter, Result as FmtResult};
22
23pub trait CuMsgPayload:
26 Default + Debug + Clone + Encode + Decode<()> + Serialize + DeserializeOwned + Sized
27{
28}
29
30pub trait CuMsgPack {}
31
32impl<T> CuMsgPayload for T where
34 T: Default + Debug + Clone + Encode + Decode<()> + Serialize + DeserializeOwned + Sized
35{
36}
37
38macro_rules! impl_cu_msg_pack {
39 ($($name:ident),+) => {
40 impl<'cl, $($name),+> CuMsgPack for ($(&CuMsg<$name>,)+)
41 where
42 $($name: CuMsgPayload),+
43 {}
44 };
45}
46
47impl<T: CuMsgPayload> CuMsgPack for CuMsg<T> {}
48impl<T: CuMsgPayload> CuMsgPack for &CuMsg<T> {}
49impl<T: CuMsgPayload> CuMsgPack for (&CuMsg<T>,) {}
50impl CuMsgPack for () {}
51
52impl_cu_msg_pack!(T1, T2);
54impl_cu_msg_pack!(T1, T2, T3);
55impl_cu_msg_pack!(T1, T2, T3, T4);
56impl_cu_msg_pack!(T1, T2, T3, T4, T5);
57
58#[macro_export]
61macro_rules! input_msg {
62 ($lt:lifetime, $first:ty, $($rest:ty),+) => {
63 ( & $lt CuMsg<$first>, $( & $lt CuMsg<$rest> ),+ )
64 };
65 ($lt:lifetime, $ty:ty) => {
66 CuMsg<$ty> };
68 ($ty:ty) => {
69 CuMsg<$ty>
70 };
71}
72
73#[macro_export]
75macro_rules! output_msg {
76 ($lt:lifetime, $first:ty, $($rest:ty),+) => {
77 ( CuMsg<$first>, $( CuMsg<$rest> ),+ )
78 };
79 ($first:ty, $($rest:ty),+) => {
80 ( CuMsg<$first>, $( CuMsg<$rest> ),+ )
81 };
82 ($ty:ty) => {
83 CuMsg<$ty>
84 };
85 ($lt:lifetime, $ty:ty) => {
86 CuMsg<$ty> };
88}
89
90#[derive(Debug, Clone, bincode::Encode, bincode::Decode, Serialize, Deserialize)]
92pub struct CuMsgMetadata {
93 pub process_time: PartialCuTimeRange,
95 pub status_txt: CuCompactString,
98}
99
100impl Metadata for CuMsgMetadata {}
101
102impl CuMsgMetadata {
103 pub fn set_status(&mut self, status: impl ToCompactString) {
104 self.status_txt = CuCompactString(status.to_compact_string());
105 }
106}
107
108impl CuMsgMetadataTrait for CuMsgMetadata {
109 fn process_time(&self) -> PartialCuTimeRange {
110 self.process_time
111 }
112
113 fn status_txt(&self) -> &CuCompactString {
114 &self.status_txt
115 }
116}
117
118impl Display for CuMsgMetadata {
119 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
120 write!(
121 f,
122 "process_time start: {}, process_time end: {}",
123 self.process_time.start, self.process_time.end
124 )
125 }
126}
127
128#[derive(Default, Debug, Clone, bincode::Encode, bincode::Decode, Serialize, Deserialize)]
130#[serde(bound(
131 serialize = "T: Serialize, M: Serialize",
132 deserialize = "T: DeserializeOwned, M: DeserializeOwned"
133))]
134pub struct CuStampedData<T, M>
135where
136 T: CuMsgPayload,
137 M: Metadata,
138{
139 payload: Option<T>,
141
142 pub tov: Tov,
145
146 pub metadata: M,
148}
149
150impl Default for CuMsgMetadata {
151 fn default() -> Self {
152 CuMsgMetadata {
153 process_time: PartialCuTimeRange::default(),
154 status_txt: CuCompactString(CompactString::with_capacity(COMPACT_STRING_CAPACITY)),
155 }
156 }
157}
158
159impl<T, M> CuStampedData<T, M>
160where
161 T: CuMsgPayload,
162 M: Metadata,
163{
164 pub fn new(payload: Option<T>) -> Self {
165 CuStampedData {
166 payload,
167 tov: Tov::default(),
168 metadata: M::default(),
169 }
170 }
171 pub fn payload(&self) -> Option<&T> {
172 self.payload.as_ref()
173 }
174
175 pub fn set_payload(&mut self, payload: T) {
176 self.payload = Some(payload);
177 }
178
179 pub fn clear_payload(&mut self) {
180 self.payload = None;
181 }
182
183 pub fn payload_mut(&mut self) -> &mut Option<T> {
184 &mut self.payload
185 }
186}
187
188impl<T, M> ErasedCuStampedData for CuStampedData<T, M>
189where
190 T: CuMsgPayload,
191 M: CuMsgMetadataTrait + Metadata,
192{
193 fn payload(&self) -> Option<&dyn erased_serde::Serialize> {
194 self.payload
195 .as_ref()
196 .map(|p| p as &dyn erased_serde::Serialize)
197 }
198
199 fn tov(&self) -> Tov {
200 self.tov
201 }
202
203 fn metadata(&self) -> &dyn CuMsgMetadataTrait {
204 &self.metadata
205 }
206}
207
208pub type CuMsg<T> = CuStampedData<T, CuMsgMetadata>;
211
212impl<T: CuMsgPayload> CuStampedData<T, CuMsgMetadata> {
213 pub unsafe fn assume_payload<U: CuMsgPayload>(&self) -> &CuMsg<U> {
220 unsafe { &*(self as *const CuMsg<T> as *const CuMsg<U>) }
222 }
223
224 pub unsafe fn assume_payload_mut<U: CuMsgPayload>(&mut self) -> &mut CuMsg<U> {
231 unsafe { &mut *(self as *mut CuMsg<T> as *mut CuMsg<U>) }
233 }
234}
235
236impl<T: CuMsgPayload + 'static> CuStampedData<T, CuMsgMetadata> {
237 fn downcast_err<U: CuMsgPayload + 'static>() -> CuError {
238 CuError::from(format!(
239 "CuMsg payload mismatch: {} cannot be reinterpreted as {}",
240 type_name::<T>(),
241 type_name::<U>()
242 ))
243 }
244
245 pub fn downcast_ref<U: CuMsgPayload + 'static>(&self) -> CuResult<&CuMsg<U>> {
247 if TypeId::of::<T>() == TypeId::of::<U>() {
248 Ok(unsafe { self.assume_payload::<U>() })
250 } else {
251 Err(Self::downcast_err::<U>())
252 }
253 }
254
255 pub fn downcast_mut<U: CuMsgPayload + 'static>(&mut self) -> CuResult<&mut CuMsg<U>> {
257 if TypeId::of::<T>() == TypeId::of::<U>() {
258 Ok(unsafe { self.assume_payload_mut::<U>() })
260 } else {
261 Err(Self::downcast_err::<U>())
262 }
263 }
264}
265
266pub trait Freezable {
269 fn freeze<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
273 Encode::encode(&(), encoder) }
275
276 fn thaw<D: Decoder>(&mut self, _decoder: &mut D) -> Result<(), DecodeError> {
279 Ok(())
280 }
281}
282
283pub struct BincodeAdapter<'a, T: Freezable + ?Sized>(pub &'a T);
286
287impl<'a, T: Freezable + ?Sized> Encode for BincodeAdapter<'a, T> {
288 fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
289 self.0.freeze(encoder)
290 }
291}
292
293pub trait CuSrcTask: Freezable + Reflect {
298 type Output<'m>: CuMsgPayload;
299 type Resources<'r>;
301
302 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
305 where
306 Self: Sized;
307
308 fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
310 Ok(())
311 }
312
313 fn preprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
317 Ok(())
318 }
319
320 fn process<'o>(&mut self, ctx: &CuContext, new_msg: &mut Self::Output<'o>) -> CuResult<()>;
324
325 fn postprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
329 Ok(())
330 }
331
332 fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
334 Ok(())
335 }
336}
337
338pub trait CuTask: Freezable + Reflect {
340 type Input<'m>: CuMsgPack;
341 type Output<'m>: CuMsgPayload;
342 type Resources<'r>;
344
345 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
348 where
349 Self: Sized;
350
351 fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
353 Ok(())
354 }
355
356 fn preprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
360 Ok(())
361 }
362
363 fn process<'i, 'o>(
367 &mut self,
368 _ctx: &CuContext,
369 input: &Self::Input<'i>,
370 output: &mut Self::Output<'o>,
371 ) -> CuResult<()>;
372
373 fn postprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
377 Ok(())
378 }
379
380 fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
382 Ok(())
383 }
384}
385
386pub trait CuSinkTask: Freezable + Reflect {
388 type Input<'m>: CuMsgPack;
389 type Resources<'r>;
391
392 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
395 where
396 Self: Sized;
397
398 fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
400 Ok(())
401 }
402
403 fn preprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
407 Ok(())
408 }
409
410 fn process<'i>(&mut self, _ctx: &CuContext, input: &Self::Input<'i>) -> CuResult<()>;
414
415 fn postprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
419 Ok(())
420 }
421
422 fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
424 Ok(())
425 }
426}
427
428#[cfg(test)]
429mod tests {
430 use super::*;
431 use bincode::{config, decode_from_slice, encode_to_vec};
432
433 #[test]
434 fn test_cucompactstr_encode_decode() {
435 let cstr = CuCompactString(CompactString::from("hello"));
436 let config = config::standard();
437 let encoded = encode_to_vec(&cstr, config).expect("Encoding failed");
438 let (decoded, _): (CuCompactString, usize) =
439 decode_from_slice(&encoded, config).expect("Decoding failed");
440 assert_eq!(cstr.0, decoded.0);
441 }
442}