1use crate::config::ComponentConfig;
5use crate::context::CuContext;
6use crate::reflect::Reflect;
7#[cfg(feature = "reflect")]
8use crate::reflect::TypePath;
9#[cfg(feature = "reflect")]
10use bevy_reflect;
11use bincode::de::{Decode, Decoder};
12use bincode::enc::{Encode, Encoder};
13use bincode::error::{DecodeError, EncodeError};
14use compact_str::{CompactString, ToCompactString};
15use core::any::{TypeId, type_name};
16use cu29_clock::{PartialCuTimeRange, Tov};
17use cu29_traits::{
18 COMPACT_STRING_CAPACITY, CuCompactString, CuError, CuMsgMetadataTrait, CuMsgOrigin, CuResult,
19 ErasedCuStampedData, Metadata,
20};
21use serde::de::DeserializeOwned;
22use serde::{Deserialize, Serialize};
23
24use alloc::format;
25use core::fmt::{Debug, Display, Formatter, Result as FmtResult};
26
27#[cfg(feature = "reflect")]
30pub trait CuMsgPayload:
31 Default
32 + Debug
33 + Clone
34 + Encode
35 + Decode<()>
36 + Serialize
37 + DeserializeOwned
38 + Reflect
39 + TypePath
40 + Sized
41{
42}
43
44#[cfg(not(feature = "reflect"))]
45pub trait CuMsgPayload:
46 Default + Debug + Clone + Encode + Decode<()> + Serialize + DeserializeOwned + Reflect + Sized
47{
48}
49
50pub trait CuMsgPack {}
51
52#[cfg(feature = "reflect")]
54impl<T> CuMsgPayload for T where
55 T: Default
56 + Debug
57 + Clone
58 + Encode
59 + Decode<()>
60 + Serialize
61 + DeserializeOwned
62 + Reflect
63 + TypePath
64 + Sized
65{
66}
67
68#[cfg(not(feature = "reflect"))]
69impl<T> CuMsgPayload for T where
70 T: Default
71 + Debug
72 + Clone
73 + Encode
74 + Decode<()>
75 + Serialize
76 + DeserializeOwned
77 + Reflect
78 + Sized
79{
80}
81
82macro_rules! impl_cu_msg_pack {
83 ($($name:ident),+) => {
84 impl<'cl, $($name),+> CuMsgPack for ($(&CuMsg<$name>,)+)
85 where
86 $($name: CuMsgPayload),+
87 {}
88 };
89}
90
91macro_rules! impl_cu_msg_pack_up_to {
92 ($first:ident, $second:ident $(, $rest:ident)* $(,)?) => {
93 impl_cu_msg_pack!($first, $second);
94 impl_cu_msg_pack_up_to!(@accumulate ($first, $second); $($rest),*);
95 };
96 (@accumulate ($($acc:ident),+);) => {};
97 (@accumulate ($($acc:ident),+); $next:ident $(, $rest:ident)*) => {
98 impl_cu_msg_pack!($($acc),+, $next);
99 impl_cu_msg_pack_up_to!(@accumulate ($($acc),+, $next); $($rest),*);
100 };
101}
102
103impl<T: CuMsgPayload> CuMsgPack for CuMsg<T> {}
104impl<T: CuMsgPayload> CuMsgPack for &CuMsg<T> {}
105impl<T: CuMsgPayload> CuMsgPack for (&CuMsg<T>,) {}
106impl CuMsgPack for () {}
107
108impl_cu_msg_pack_up_to!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12);
110
111#[macro_export]
114macro_rules! input_msg {
115 ($lt:lifetime, $first:ty, $($rest:ty),+) => {
116 ( & $lt CuMsg<$first>, $( & $lt CuMsg<$rest> ),+ )
117 };
118 ($lt:lifetime, $ty:ty) => {
119 CuMsg<$ty> };
121 ($ty:ty) => {
122 CuMsg<$ty>
123 };
124}
125
126#[macro_export]
128macro_rules! output_msg {
129 ($lt:lifetime, $first:ty, $($rest:ty),+) => {
130 ( CuMsg<$first>, $( CuMsg<$rest> ),+ )
131 };
132 ($first:ty, $($rest:ty),+) => {
133 ( CuMsg<$first>, $( CuMsg<$rest> ),+ )
134 };
135 ($ty:ty) => {
136 CuMsg<$ty>
137 };
138 ($lt:lifetime, $ty:ty) => {
139 CuMsg<$ty> };
141}
142
143#[derive(Debug, Clone, bincode::Encode, bincode::Decode, Serialize, Deserialize, Reflect)]
145#[reflect(opaque, from_reflect = false, no_field_bounds)]
146pub struct CuMsgMetadata {
147 pub process_time: PartialCuTimeRange,
149 pub status_txt: CuCompactString,
152 pub origin: Option<CuMsgOrigin>,
154}
155
156impl Metadata for CuMsgMetadata {}
157
158impl CuMsgMetadata {
159 pub fn set_status(&mut self, status: impl ToCompactString) {
160 self.status_txt = CuCompactString(status.to_compact_string());
161 }
162
163 pub fn set_origin(&mut self, origin: CuMsgOrigin) {
164 self.origin = Some(origin);
165 }
166
167 pub fn clear_origin(&mut self) {
168 self.origin = None;
169 }
170}
171
172impl CuMsgMetadataTrait for CuMsgMetadata {
173 fn process_time(&self) -> PartialCuTimeRange {
174 self.process_time
175 }
176
177 fn status_txt(&self) -> &CuCompactString {
178 &self.status_txt
179 }
180
181 fn origin(&self) -> Option<&CuMsgOrigin> {
182 self.origin.as_ref()
183 }
184}
185
186impl Display for CuMsgMetadata {
187 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
188 write!(
189 f,
190 "process_time start: {}, process_time end: {}",
191 self.process_time.start, self.process_time.end
192 )
193 }
194}
195
196#[derive(Default, Debug, Clone, bincode::Decode, Serialize, Deserialize, Reflect)]
198#[reflect(opaque, from_reflect = false, no_field_bounds)]
199#[serde(bound(
200 serialize = "T: Serialize, M: Serialize",
201 deserialize = "T: DeserializeOwned, M: DeserializeOwned"
202))]
203pub struct CuStampedData<T, M>
204where
205 T: CuMsgPayload,
206 M: Metadata,
207{
208 payload: Option<T>,
210
211 pub tov: Tov,
214
215 pub metadata: M,
217}
218
219impl<T, M> Encode for CuStampedData<T, M>
220where
221 T: CuMsgPayload,
222 M: Metadata,
223{
224 fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
225 match &self.payload {
226 None => {
227 0u8.encode(encoder)?;
228 }
229 Some(payload) => {
230 1u8.encode(encoder)?;
231 let encoded_start = cu29_traits::observed_encode_bytes();
232 let handle_start = crate::monitoring::current_payload_handle_bytes();
233 payload.encode(encoder)?;
234 let encoded_bytes =
235 cu29_traits::observed_encode_bytes().saturating_sub(encoded_start);
236 let handle_bytes =
237 crate::monitoring::current_payload_handle_bytes().saturating_sub(handle_start);
238 crate::monitoring::record_current_slot_payload_io_stats(
239 core::mem::size_of::<T>(),
240 encoded_bytes,
241 handle_bytes,
242 );
243 }
244 }
245 self.tov.encode(encoder)?;
246 self.metadata.encode(encoder)?;
247 Ok(())
248 }
249}
250
251impl Default for CuMsgMetadata {
252 fn default() -> Self {
253 CuMsgMetadata {
254 process_time: PartialCuTimeRange::default(),
255 status_txt: CuCompactString(CompactString::with_capacity(COMPACT_STRING_CAPACITY)),
256 origin: None,
257 }
258 }
259}
260
261impl<T, M> CuStampedData<T, M>
262where
263 T: CuMsgPayload,
264 M: Metadata,
265{
266 pub(crate) fn from_parts(payload: Option<T>, tov: Tov, metadata: M) -> Self {
267 CuStampedData {
268 payload,
269 tov,
270 metadata,
271 }
272 }
273
274 pub fn new(payload: Option<T>) -> Self {
275 Self::from_parts(payload, Tov::default(), M::default())
276 }
277 pub fn payload(&self) -> Option<&T> {
278 self.payload.as_ref()
279 }
280
281 pub fn set_payload(&mut self, payload: T) {
282 self.payload = Some(payload);
283 }
284
285 pub fn clear_payload(&mut self) {
286 self.payload = None;
287 }
288
289 pub fn payload_mut(&mut self) -> &mut Option<T> {
290 &mut self.payload
291 }
292}
293
294impl<T, M> ErasedCuStampedData for CuStampedData<T, M>
295where
296 T: CuMsgPayload,
297 M: CuMsgMetadataTrait + Metadata,
298{
299 fn payload(&self) -> Option<&dyn erased_serde::Serialize> {
300 self.payload
301 .as_ref()
302 .map(|p| p as &dyn erased_serde::Serialize)
303 }
304
305 #[cfg(feature = "reflect")]
306 fn payload_reflect(&self) -> Option<&dyn cu29_traits::Reflect> {
307 self.payload
308 .as_ref()
309 .map(|p| p as &dyn cu29_traits::Reflect)
310 }
311
312 fn tov(&self) -> Tov {
313 self.tov
314 }
315
316 fn metadata(&self) -> &dyn CuMsgMetadataTrait {
317 &self.metadata
318 }
319}
320
321pub type CuMsg<T> = CuStampedData<T, CuMsgMetadata>;
324
325impl<T: CuMsgPayload> CuStampedData<T, CuMsgMetadata> {
326 pub unsafe fn assume_payload<U: CuMsgPayload>(&self) -> &CuMsg<U> {
333 unsafe { &*(self as *const CuMsg<T> as *const CuMsg<U>) }
335 }
336
337 pub unsafe fn assume_payload_mut<U: CuMsgPayload>(&mut self) -> &mut CuMsg<U> {
344 unsafe { &mut *(self as *mut CuMsg<T> as *mut CuMsg<U>) }
346 }
347}
348
349impl<T: CuMsgPayload + 'static> CuStampedData<T, CuMsgMetadata> {
350 fn downcast_err<U: CuMsgPayload + 'static>() -> CuError {
351 CuError::from(format!(
352 "CuMsg payload mismatch: {} cannot be reinterpreted as {}",
353 type_name::<T>(),
354 type_name::<U>()
355 ))
356 }
357
358 pub fn downcast_ref<U: CuMsgPayload + 'static>(&self) -> CuResult<&CuMsg<U>> {
360 if TypeId::of::<T>() == TypeId::of::<U>() {
361 Ok(unsafe { self.assume_payload::<U>() })
363 } else {
364 Err(Self::downcast_err::<U>())
365 }
366 }
367
368 pub fn downcast_mut<U: CuMsgPayload + 'static>(&mut self) -> CuResult<&mut CuMsg<U>> {
370 if TypeId::of::<T>() == TypeId::of::<U>() {
371 Ok(unsafe { self.assume_payload_mut::<U>() })
373 } else {
374 Err(Self::downcast_err::<U>())
375 }
376 }
377}
378
379pub trait Freezable {
382 fn freeze<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
386 Encode::encode(&(), encoder) }
388
389 fn thaw<D: Decoder>(&mut self, _decoder: &mut D) -> Result<(), DecodeError> {
392 Ok(())
393 }
394}
395
396pub struct BincodeAdapter<'a, T: Freezable + ?Sized>(pub &'a T);
399
400impl<'a, T: Freezable + ?Sized> Encode for BincodeAdapter<'a, T> {
401 fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
402 self.0.freeze(encoder)
403 }
404}
405
406pub trait CuSrcTask: Freezable + Reflect {
411 type Output<'m>: CuMsgPayload;
412 type Resources<'r>;
414
415 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
418 where
419 Self: Sized;
420
421 fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
423 Ok(())
424 }
425
426 fn preprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
430 Ok(())
431 }
432
433 fn process<'o>(&mut self, ctx: &CuContext, new_msg: &mut Self::Output<'o>) -> CuResult<()>;
437
438 fn postprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
442 Ok(())
443 }
444
445 fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
447 Ok(())
448 }
449}
450
451pub trait CuTask: Freezable + Reflect {
453 type Input<'m>: CuMsgPack;
454 type Output<'m>: CuMsgPayload;
455 type Resources<'r>;
457
458 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
461 where
462 Self: Sized;
463
464 fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
466 Ok(())
467 }
468
469 fn preprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
473 Ok(())
474 }
475
476 fn process<'i, 'o>(
480 &mut self,
481 _ctx: &CuContext,
482 input: &Self::Input<'i>,
483 output: &mut Self::Output<'o>,
484 ) -> CuResult<()>;
485
486 fn postprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
490 Ok(())
491 }
492
493 fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
495 Ok(())
496 }
497}
498
499pub trait CuSinkTask: Freezable + Reflect {
501 type Input<'m>: CuMsgPack;
502 type Resources<'r>;
504
505 fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
508 where
509 Self: Sized;
510
511 fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
513 Ok(())
514 }
515
516 fn preprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
520 Ok(())
521 }
522
523 fn process<'i>(&mut self, _ctx: &CuContext, input: &Self::Input<'i>) -> CuResult<()>;
527
528 fn postprocess(&mut self, _ctx: &CuContext) -> CuResult<()> {
532 Ok(())
533 }
534
535 fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
537 Ok(())
538 }
539}
540
541#[cfg(test)]
542mod tests {
543 use super::*;
544 use bincode::{config, decode_from_slice, encode_to_vec};
545
546 #[test]
547 fn test_cucompactstr_encode_decode() {
548 let cstr = CuCompactString(CompactString::from("hello"));
549 let config = config::standard();
550 let encoded = encode_to_vec(&cstr, config).expect("Encoding failed");
551 let (decoded, _): (CuCompactString, usize) =
552 decode_from_slice(&encoded, config).expect("Decoding failed");
553 assert_eq!(cstr.0, decoded.0);
554 }
555}