Skip to main content

cu29_runtime/
logcodec.rs

1#[cfg(not(feature = "std"))]
2extern crate alloc;
3
4use crate::config::{ComponentConfig, CuConfig, LoggingCodecSpec};
5use crate::cutask::{CuMsg, CuMsgMetadata, CuMsgPayload};
6use crate::sync_compat::{Mutex, OnceLock, lock as lock_mutex, once_get_or_init};
7use alloc::boxed::Box;
8use alloc::format;
9use alloc::string::{String, ToString};
10#[cfg(feature = "std")]
11use bincode::config::standard;
12use bincode::de::{Decode, Decoder};
13#[cfg(feature = "std")]
14use bincode::decode_from_std_read;
15use bincode::enc::{Encode, Encoder};
16use bincode::error::{DecodeError, EncodeError};
17use core::any::TypeId;
18use cu29_clock::Tov;
19use cu29_traits::{CuError, CuResult, observed_encode_bytes};
20use hashbrown::HashMap;
21use portable_atomic::{AtomicU64, Ordering};
22use serde::de::DeserializeOwned;
23#[cfg(feature = "std")]
24use std::io::Read;
25#[cfg(feature = "std")]
26use std::path::Path;
27
28#[cfg(feature = "std")]
29use crate::curuntime::{RuntimeLifecycleEvent, RuntimeLifecycleRecord};
30#[cfg(feature = "std")]
31use cu29_unifiedlog::{UnifiedLogger, UnifiedLoggerBuilder, UnifiedLoggerIOReader};
32
33pub trait CuLogCodec<P: CuMsgPayload>: 'static {
34    type Config: DeserializeOwned + Default;
35
36    fn new(config: Self::Config) -> CuResult<Self>
37    where
38        Self: Sized;
39
40    /// Returns handle-backed source bytes read directly by the codec.
41    ///
42    /// This reports only extra handle-backed residency beyond the payload's
43    /// fixed `size_of::<P>()` footprint already accounted by runtime
44    /// monitoring. Codecs must implement this explicitly so they opt into the
45    /// correct accounting model for their payload type.
46    fn source_payload_handle_bytes(&self, payload: &P) -> usize;
47
48    fn encode_payload<E: Encoder>(
49        &mut self,
50        payload: &P,
51        encoder: &mut E,
52    ) -> Result<(), EncodeError>;
53
54    fn decode_payload<D: Decoder<Context = ()>>(
55        &mut self,
56        decoder: &mut D,
57    ) -> Result<P, DecodeError>;
58}
59
60pub struct CodecState<C> {
61    inner: Mutex<Option<(u64, C)>>,
62}
63
64impl<C> CodecState<C> {
65    pub const fn new() -> Self {
66        Self {
67            inner: Mutex::new(None),
68        }
69    }
70}
71
72impl<C> Default for CodecState<C> {
73    fn default() -> Self {
74        Self::new()
75    }
76}
77
78pub struct EffectiveConfigEntry {
79    version: AtomicU64,
80    ron: Mutex<String>,
81}
82
83impl EffectiveConfigEntry {
84    fn new(ron: &str) -> Self {
85        Self {
86            version: AtomicU64::new(1),
87            ron: Mutex::new(ron.to_string()),
88        }
89    }
90
91    pub fn version(&self) -> u64 {
92        self.version.load(Ordering::Acquire)
93    }
94
95    pub fn ron(&self) -> String {
96        lock_mutex(&self.ron).clone()
97    }
98
99    fn set(&self, ron: &str) {
100        *lock_mutex(&self.ron) = ron.to_string();
101        self.version.fetch_add(1, Ordering::AcqRel);
102    }
103}
104
105type EffectiveConfigRegistry = HashMap<TypeId, &'static EffectiveConfigEntry>;
106
107static EFFECTIVE_CONFIGS: OnceLock<Mutex<EffectiveConfigRegistry>> = OnceLock::new();
108
109fn effective_config_registry() -> &'static Mutex<EffectiveConfigRegistry> {
110    once_get_or_init(&EFFECTIVE_CONFIGS, || Mutex::new(HashMap::new()))
111}
112
113pub fn effective_config_entry<T: 'static>(default_ron: &str) -> &'static EffectiveConfigEntry {
114    let registry = effective_config_registry();
115    let mut registry = lock_mutex(registry);
116    if let Some(entry) = registry.get(&TypeId::of::<T>()) {
117        return entry;
118    }
119
120    let entry = Box::leak(Box::new(EffectiveConfigEntry::new(default_ron)));
121    registry.insert(TypeId::of::<T>(), entry);
122    entry
123}
124
125pub fn set_effective_config_ron<T: 'static>(ron: &str) {
126    effective_config_entry::<T>(ron).set(ron);
127}
128
129pub fn with_codec_for_encode<C, R, B, F>(
130    state: &'static CodecState<C>,
131    config_entry: &EffectiveConfigEntry,
132    build: B,
133    f: F,
134) -> Result<R, EncodeError>
135where
136    B: FnOnce(&str) -> CuResult<C>,
137    F: FnOnce(&mut C) -> Result<R, EncodeError>,
138{
139    let version = config_entry.version();
140    let mut guard = lock_mutex(&state.inner);
141    if guard
142        .as_ref()
143        .is_none_or(|(cached_version, _)| *cached_version != version)
144    {
145        let effective_config_ron = config_entry.ron();
146        let codec = build(&effective_config_ron)
147            .map_err(|err| EncodeError::OtherString(err.to_string()))?;
148        *guard = Some((version, codec));
149    }
150    let (_, codec) = guard
151        .as_mut()
152        .expect("codec state must be initialized after build");
153    f(codec)
154}
155
156pub fn with_codec_for_decode<C, R, B, F>(
157    state: &'static CodecState<C>,
158    config_entry: &EffectiveConfigEntry,
159    build: B,
160    f: F,
161) -> Result<R, DecodeError>
162where
163    B: FnOnce(&str) -> CuResult<C>,
164    F: FnOnce(&mut C) -> Result<R, DecodeError>,
165{
166    let version = config_entry.version();
167    let mut guard = lock_mutex(&state.inner);
168    if guard
169        .as_ref()
170        .is_none_or(|(cached_version, _)| *cached_version != version)
171    {
172        let effective_config_ron = config_entry.ron();
173        let codec = build(&effective_config_ron)
174            .map_err(|err| DecodeError::OtherString(err.to_string()))?;
175        *guard = Some((version, codec));
176    }
177    let (_, codec) = guard
178        .as_mut()
179        .expect("codec state must be initialized after build");
180    f(codec)
181}
182
183pub fn resolve_task_output_codec<'a>(
184    config: &'a CuConfig,
185    mission_id: Option<&str>,
186    task_id: &str,
187    msg_type: &str,
188) -> CuResult<Option<&'a LoggingCodecSpec>> {
189    let node = config.find_task_node(mission_id, task_id).ok_or_else(|| {
190        CuError::from(format!(
191            "Could not find task '{task_id}' while resolving log codec for '{msg_type}'."
192        ))
193    })?;
194
195    let codec_id = node
196        .get_logging()
197        .and_then(|logging| logging.codec_for_msg_type(msg_type));
198    let Some(codec_id) = codec_id else {
199        return Ok(None);
200    };
201
202    config
203        .find_logging_codec_spec(codec_id)
204        .map(Some)
205        .ok_or_else(|| {
206            CuError::from(format!(
207                "Task '{task_id}' binds output '{msg_type}' to unknown logging codec '{codec_id}'."
208            ))
209        })
210}
211
212pub fn instantiate_codec<C, P>(
213    effective_config_ron: &str,
214    mission_id: Option<&str>,
215    task_id: &str,
216    msg_type: &str,
217    expected_type_path: &str,
218) -> CuResult<C>
219where
220    C: CuLogCodec<P>,
221    P: CuMsgPayload,
222{
223    let config = CuConfig::deserialize_ron(effective_config_ron)?;
224    let spec = resolve_task_output_codec(&config, mission_id, task_id, msg_type)?.ok_or_else(
225        || {
226            CuError::from(format!(
227                "Task '{task_id}' output '{msg_type}' has no configured logging codec in the effective config."
228            ))
229        },
230    )?;
231
232    if spec.type_ != expected_type_path {
233        return Err(CuError::from(format!(
234            "Task '{task_id}' output '{msg_type}' resolved logging codec type '{}' but '{}' was compiled for this slot.",
235            spec.type_, expected_type_path
236        )));
237    }
238
239    let codec_config = deserialize_codec_config::<C, P>(spec.config.as_ref())?;
240    C::new(codec_config)
241}
242
243pub fn deserialize_codec_config<C, P>(config: Option<&ComponentConfig>) -> CuResult<C::Config>
244where
245    C: CuLogCodec<P>,
246    P: CuMsgPayload,
247{
248    match config {
249        Some(config) => config.deserialize_into::<C::Config>().map_err(|err| {
250            CuError::from(format!(
251                "Failed to deserialize logging codec config for payload '{}': {err}",
252                core::any::type_name::<P>()
253            ))
254        }),
255        None => Ok(C::Config::default()),
256    }
257}
258
259pub fn encode_msg_with_codec<T, C, E>(
260    msg: &CuMsg<T>,
261    codec: &mut C,
262    encoder: &mut E,
263) -> Result<(), EncodeError>
264where
265    T: CuMsgPayload,
266    C: CuLogCodec<T>,
267    E: Encoder,
268{
269    match msg.payload() {
270        None => {
271            0u8.encode(encoder)?;
272        }
273        Some(payload) => {
274            1u8.encode(encoder)?;
275            let encoded_start = observed_encode_bytes();
276            let handle_start = crate::monitoring::current_payload_handle_bytes();
277            let source_handle_bytes = codec.source_payload_handle_bytes(payload);
278            if source_handle_bytes > 0 {
279                crate::monitoring::record_payload_handle_bytes(source_handle_bytes);
280            }
281            codec.encode_payload(payload, encoder)?;
282            let encoded_bytes = observed_encode_bytes().saturating_sub(encoded_start);
283            let handle_bytes =
284                crate::monitoring::current_payload_handle_bytes().saturating_sub(handle_start);
285            crate::monitoring::record_current_slot_payload_io_stats(
286                core::mem::size_of::<T>(),
287                encoded_bytes,
288                handle_bytes,
289            );
290        }
291    }
292    msg.tov.encode(encoder)?;
293    msg.metadata.encode(encoder)?;
294    Ok(())
295}
296
297pub fn decode_msg_with_codec<T, C, D>(
298    decoder: &mut D,
299    codec: &mut C,
300) -> Result<CuMsg<T>, DecodeError>
301where
302    T: CuMsgPayload,
303    C: CuLogCodec<T>,
304    D: Decoder<Context = ()>,
305{
306    let present: u8 = Decode::decode(decoder)?;
307    let payload = match present {
308        0 => None,
309        1 => Some(codec.decode_payload(decoder)?),
310        value => {
311            return Err(DecodeError::OtherString(format!(
312                "Invalid CuMsg presence tag {value} for payload '{}'",
313                core::any::type_name::<T>()
314            )));
315        }
316    };
317    let tov: Tov = Decode::decode(decoder)?;
318    let metadata: CuMsgMetadata = Decode::decode(decoder)?;
319    Ok(CuMsg::from_parts(payload, tov, metadata))
320}
321
322#[cfg(feature = "std")]
323fn read_next_entry<T: Decode<()>>(src: &mut impl Read) -> CuResult<Option<T>> {
324    match decode_from_std_read::<T, _, _>(src, standard()) {
325        Ok(entry) => Ok(Some(entry)),
326        Err(DecodeError::UnexpectedEnd { .. }) => Ok(None),
327        Err(DecodeError::Io { inner, .. }) if inner.kind() == std::io::ErrorKind::UnexpectedEof => {
328            Ok(None)
329        }
330        Err(err) => Err(CuError::new_with_cause(
331            "Failed to decode runtime lifecycle entry while loading effective log config",
332            err,
333        )),
334    }
335}
336
337#[cfg(feature = "std")]
338pub fn read_effective_config_ron_from_log(log_base: &Path) -> CuResult<Option<String>> {
339    let logger = UnifiedLoggerBuilder::new()
340        .file_base_name(log_base)
341        .build()
342        .map_err(|err| {
343            CuError::new_with_cause(
344                &format!(
345                    "Failed to open Copper log '{}' while loading effective log config",
346                    log_base.display()
347                ),
348                err,
349            )
350        })?;
351    let UnifiedLogger::Read(read_logger) = logger else {
352        return Err(CuError::from(
353            "Expected readable unified logger while loading effective log config",
354        ));
355    };
356
357    let mut reader =
358        UnifiedLoggerIOReader::new(read_logger, cu29_traits::UnifiedLogType::RuntimeLifecycle);
359    while let Some(record) = read_next_entry::<RuntimeLifecycleRecord>(&mut reader)? {
360        if let RuntimeLifecycleEvent::Instantiated {
361            effective_config_ron,
362            ..
363        } = record.event
364        {
365            return Ok(Some(effective_config_ron));
366        }
367    }
368
369    Ok(None)
370}
371
372#[cfg(feature = "std")]
373pub fn seed_effective_config_from_log<T: 'static>(log_base: &Path) -> CuResult<Option<String>> {
374    let effective_config_ron = read_effective_config_ron_from_log(log_base)?;
375    if let Some(ref ron) = effective_config_ron {
376        set_effective_config_ron::<T>(ron);
377    }
378    Ok(effective_config_ron)
379}