Skip to main content

cu29_runtime/
logcodec.rs

1#[cfg(not(feature = "std"))]
2extern crate alloc;
3
4use crate::config::{ComponentConfig, CuConfig, LoggingCodecSpec};
5use crate::cutask::{CuMsg, CuMsgMetadata, CuMsgPayload};
6use alloc::boxed::Box;
7use alloc::format;
8use alloc::string::{String, ToString};
9#[cfg(feature = "std")]
10use bincode::config::standard;
11use bincode::de::{Decode, Decoder};
12#[cfg(feature = "std")]
13use bincode::decode_from_std_read;
14use bincode::enc::{Encode, Encoder};
15use bincode::error::{DecodeError, EncodeError};
16use core::any::TypeId;
17use cu29_clock::Tov;
18use cu29_traits::{CuError, CuResult, observed_encode_bytes};
19use hashbrown::HashMap;
20use portable_atomic::{AtomicU64, Ordering};
21use serde::de::DeserializeOwned;
22#[cfg(feature = "std")]
23use std::io::Read;
24#[cfg(feature = "std")]
25use std::path::Path;
26
27#[cfg(not(feature = "std"))]
28mod imp {
29    pub use spin::Mutex;
30    pub use spin::once::Once as OnceLock;
31}
32
33#[cfg(feature = "std")]
34mod imp {
35    pub use std::sync::{Mutex, OnceLock};
36}
37
38use imp::*;
39
40#[cfg(feature = "std")]
41use crate::curuntime::{RuntimeLifecycleEvent, RuntimeLifecycleRecord};
42#[cfg(feature = "std")]
43use cu29_unifiedlog::{UnifiedLogger, UnifiedLoggerBuilder, UnifiedLoggerIOReader};
44
45#[cfg(feature = "std")]
46fn lock_mutex<T>(m: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
47    m.lock().unwrap_or_else(|e| e.into_inner())
48}
49
50#[cfg(not(feature = "std"))]
51fn lock_mutex<T>(m: &Mutex<T>) -> spin::MutexGuard<'_, T> {
52    m.lock()
53}
54
55pub trait CuLogCodec<P: CuMsgPayload>: 'static {
56    type Config: DeserializeOwned + Default;
57
58    fn new(config: Self::Config) -> CuResult<Self>
59    where
60        Self: Sized;
61
62    fn encode_payload<E: Encoder>(
63        &mut self,
64        payload: &P,
65        encoder: &mut E,
66    ) -> Result<(), EncodeError>;
67
68    fn decode_payload<D: Decoder<Context = ()>>(
69        &mut self,
70        decoder: &mut D,
71    ) -> Result<P, DecodeError>;
72}
73
74pub struct CodecState<C> {
75    inner: Mutex<Option<(u64, C)>>,
76}
77
78impl<C> CodecState<C> {
79    pub const fn new() -> Self {
80        Self {
81            inner: Mutex::new(None),
82        }
83    }
84}
85
86impl<C> Default for CodecState<C> {
87    fn default() -> Self {
88        Self::new()
89    }
90}
91
92pub struct EffectiveConfigEntry {
93    version: AtomicU64,
94    ron: Mutex<String>,
95}
96
97impl EffectiveConfigEntry {
98    fn new(ron: &str) -> Self {
99        Self {
100            version: AtomicU64::new(1),
101            ron: Mutex::new(ron.to_string()),
102        }
103    }
104
105    pub fn version(&self) -> u64 {
106        self.version.load(Ordering::Acquire)
107    }
108
109    pub fn ron(&self) -> String {
110        lock_mutex(&self.ron).clone()
111    }
112
113    fn set(&self, ron: &str) {
114        *lock_mutex(&self.ron) = ron.to_string();
115        self.version.fetch_add(1, Ordering::AcqRel);
116    }
117}
118
119type EffectiveConfigRegistry = HashMap<TypeId, &'static EffectiveConfigEntry>;
120
121static EFFECTIVE_CONFIGS: OnceLock<Mutex<EffectiveConfigRegistry>> = OnceLock::new();
122
123#[cfg(feature = "std")]
124fn effective_config_registry() -> &'static Mutex<EffectiveConfigRegistry> {
125    EFFECTIVE_CONFIGS.get_or_init(|| Mutex::new(HashMap::new()))
126}
127
128#[cfg(not(feature = "std"))]
129fn effective_config_registry() -> &'static Mutex<EffectiveConfigRegistry> {
130    EFFECTIVE_CONFIGS.call_once(|| Mutex::new(HashMap::new()))
131}
132
133pub fn effective_config_entry<T: 'static>(default_ron: &str) -> &'static EffectiveConfigEntry {
134    let registry = effective_config_registry();
135    let mut registry = lock_mutex(registry);
136    if let Some(entry) = registry.get(&TypeId::of::<T>()) {
137        return entry;
138    }
139
140    let entry = Box::leak(Box::new(EffectiveConfigEntry::new(default_ron)));
141    registry.insert(TypeId::of::<T>(), entry);
142    entry
143}
144
145pub fn set_effective_config_ron<T: 'static>(ron: &str) {
146    effective_config_entry::<T>(ron).set(ron);
147}
148
149pub fn with_codec_for_encode<C, R, B, F>(
150    state: &'static CodecState<C>,
151    config_entry: &EffectiveConfigEntry,
152    build: B,
153    f: F,
154) -> Result<R, EncodeError>
155where
156    B: FnOnce(&str) -> CuResult<C>,
157    F: FnOnce(&mut C) -> Result<R, EncodeError>,
158{
159    let version = config_entry.version();
160    let mut guard = lock_mutex(&state.inner);
161    if guard
162        .as_ref()
163        .is_none_or(|(cached_version, _)| *cached_version != version)
164    {
165        let effective_config_ron = config_entry.ron();
166        let codec = build(&effective_config_ron)
167            .map_err(|err| EncodeError::OtherString(err.to_string()))?;
168        *guard = Some((version, codec));
169    }
170    let (_, codec) = guard
171        .as_mut()
172        .expect("codec state must be initialized after build");
173    f(codec)
174}
175
176pub fn with_codec_for_decode<C, R, B, F>(
177    state: &'static CodecState<C>,
178    config_entry: &EffectiveConfigEntry,
179    build: B,
180    f: F,
181) -> Result<R, DecodeError>
182where
183    B: FnOnce(&str) -> CuResult<C>,
184    F: FnOnce(&mut C) -> Result<R, DecodeError>,
185{
186    let version = config_entry.version();
187    let mut guard = lock_mutex(&state.inner);
188    if guard
189        .as_ref()
190        .is_none_or(|(cached_version, _)| *cached_version != version)
191    {
192        let effective_config_ron = config_entry.ron();
193        let codec = build(&effective_config_ron)
194            .map_err(|err| DecodeError::OtherString(err.to_string()))?;
195        *guard = Some((version, codec));
196    }
197    let (_, codec) = guard
198        .as_mut()
199        .expect("codec state must be initialized after build");
200    f(codec)
201}
202
203pub fn resolve_task_output_codec<'a>(
204    config: &'a CuConfig,
205    mission_id: Option<&str>,
206    task_id: &str,
207    msg_type: &str,
208) -> CuResult<Option<&'a LoggingCodecSpec>> {
209    let node = config.find_task_node(mission_id, task_id).ok_or_else(|| {
210        CuError::from(format!(
211            "Could not find task '{task_id}' while resolving log codec for '{msg_type}'."
212        ))
213    })?;
214
215    let codec_id = node
216        .get_logging()
217        .and_then(|logging| logging.codec_for_msg_type(msg_type));
218    let Some(codec_id) = codec_id else {
219        return Ok(None);
220    };
221
222    config
223        .find_logging_codec_spec(codec_id)
224        .map(Some)
225        .ok_or_else(|| {
226            CuError::from(format!(
227                "Task '{task_id}' binds output '{msg_type}' to unknown logging codec '{codec_id}'."
228            ))
229        })
230}
231
232pub fn instantiate_codec<C, P>(
233    effective_config_ron: &str,
234    mission_id: Option<&str>,
235    task_id: &str,
236    msg_type: &str,
237    expected_type_path: &str,
238) -> CuResult<C>
239where
240    C: CuLogCodec<P>,
241    P: CuMsgPayload,
242{
243    let config = CuConfig::deserialize_ron(effective_config_ron)?;
244    let spec = resolve_task_output_codec(&config, mission_id, task_id, msg_type)?.ok_or_else(
245        || {
246            CuError::from(format!(
247                "Task '{task_id}' output '{msg_type}' has no configured logging codec in the effective config."
248            ))
249        },
250    )?;
251
252    if spec.type_ != expected_type_path {
253        return Err(CuError::from(format!(
254            "Task '{task_id}' output '{msg_type}' resolved logging codec type '{}' but '{}' was compiled for this slot.",
255            spec.type_, expected_type_path
256        )));
257    }
258
259    let codec_config = deserialize_codec_config::<C, P>(spec.config.as_ref())?;
260    C::new(codec_config)
261}
262
263pub fn deserialize_codec_config<C, P>(config: Option<&ComponentConfig>) -> CuResult<C::Config>
264where
265    C: CuLogCodec<P>,
266    P: CuMsgPayload,
267{
268    match config {
269        Some(config) => config.deserialize_into::<C::Config>().map_err(|err| {
270            CuError::from(format!(
271                "Failed to deserialize logging codec config for payload '{}': {err}",
272                core::any::type_name::<P>()
273            ))
274        }),
275        None => Ok(C::Config::default()),
276    }
277}
278
279pub fn encode_msg_with_codec<T, C, E>(
280    msg: &CuMsg<T>,
281    codec: &mut C,
282    encoder: &mut E,
283) -> Result<(), EncodeError>
284where
285    T: CuMsgPayload,
286    C: CuLogCodec<T>,
287    E: Encoder,
288{
289    match msg.payload() {
290        None => {
291            0u8.encode(encoder)?;
292        }
293        Some(payload) => {
294            1u8.encode(encoder)?;
295            let encoded_start = observed_encode_bytes();
296            let handle_start = crate::monitoring::current_payload_handle_bytes();
297            codec.encode_payload(payload, encoder)?;
298            let encoded_bytes = observed_encode_bytes().saturating_sub(encoded_start);
299            let handle_bytes =
300                crate::monitoring::current_payload_handle_bytes().saturating_sub(handle_start);
301            crate::monitoring::record_current_slot_payload_io_stats(
302                core::mem::size_of::<T>(),
303                encoded_bytes,
304                handle_bytes,
305            );
306        }
307    }
308    msg.tov.encode(encoder)?;
309    msg.metadata.encode(encoder)?;
310    Ok(())
311}
312
313pub fn decode_msg_with_codec<T, C, D>(
314    decoder: &mut D,
315    codec: &mut C,
316) -> Result<CuMsg<T>, DecodeError>
317where
318    T: CuMsgPayload,
319    C: CuLogCodec<T>,
320    D: Decoder<Context = ()>,
321{
322    let present: u8 = Decode::decode(decoder)?;
323    let payload = match present {
324        0 => None,
325        1 => Some(codec.decode_payload(decoder)?),
326        value => {
327            return Err(DecodeError::OtherString(format!(
328                "Invalid CuMsg presence tag {value} for payload '{}'",
329                core::any::type_name::<T>()
330            )));
331        }
332    };
333    let tov: Tov = Decode::decode(decoder)?;
334    let metadata: CuMsgMetadata = Decode::decode(decoder)?;
335    Ok(CuMsg::from_parts(payload, tov, metadata))
336}
337
338#[cfg(feature = "std")]
339fn read_next_entry<T: Decode<()>>(src: &mut impl Read) -> CuResult<Option<T>> {
340    match decode_from_std_read::<T, _, _>(src, standard()) {
341        Ok(entry) => Ok(Some(entry)),
342        Err(DecodeError::UnexpectedEnd { .. }) => Ok(None),
343        Err(DecodeError::Io { inner, .. }) if inner.kind() == std::io::ErrorKind::UnexpectedEof => {
344            Ok(None)
345        }
346        Err(err) => Err(CuError::new_with_cause(
347            "Failed to decode runtime lifecycle entry while loading effective log config",
348            err,
349        )),
350    }
351}
352
353#[cfg(feature = "std")]
354pub fn read_effective_config_ron_from_log(log_base: &Path) -> CuResult<Option<String>> {
355    let logger = UnifiedLoggerBuilder::new()
356        .file_base_name(log_base)
357        .build()
358        .map_err(|err| {
359            CuError::new_with_cause(
360                &format!(
361                    "Failed to open Copper log '{}' while loading effective log config",
362                    log_base.display()
363                ),
364                err,
365            )
366        })?;
367    let UnifiedLogger::Read(read_logger) = logger else {
368        return Err(CuError::from(
369            "Expected readable unified logger while loading effective log config",
370        ));
371    };
372
373    let mut reader =
374        UnifiedLoggerIOReader::new(read_logger, cu29_traits::UnifiedLogType::RuntimeLifecycle);
375    while let Some(record) = read_next_entry::<RuntimeLifecycleRecord>(&mut reader)? {
376        if let RuntimeLifecycleEvent::Instantiated {
377            effective_config_ron,
378            ..
379        } = record.event
380        {
381            return Ok(Some(effective_config_ron));
382        }
383    }
384
385    Ok(None)
386}
387
388#[cfg(feature = "std")]
389pub fn seed_effective_config_from_log<T: 'static>(log_base: &Path) -> CuResult<Option<String>> {
390    let effective_config_ron = read_effective_config_ron_from_log(log_base)?;
391    if let Some(ref ron) = effective_config_ron {
392        set_effective_config_ron::<T>(ron);
393    }
394    Ok(effective_config_ron)
395}