1#[cfg(not(feature = "std"))]
2extern crate alloc;
3
4use crate::config::{ComponentConfig, CuConfig, LoggingCodecSpec};
5use crate::cutask::{CuMsg, CuMsgMetadata, CuMsgPayload};
6use crate::sync_compat::{Mutex, OnceLock, lock as lock_mutex, once_get_or_init};
7use alloc::boxed::Box;
8use alloc::format;
9use alloc::string::{String, ToString};
10#[cfg(feature = "std")]
11use bincode::config::standard;
12use bincode::de::{Decode, Decoder};
13#[cfg(feature = "std")]
14use bincode::decode_from_std_read;
15use bincode::enc::{Encode, Encoder};
16use bincode::error::{DecodeError, EncodeError};
17use core::any::TypeId;
18use cu29_clock::Tov;
19use cu29_traits::{CuError, CuResult, observed_encode_bytes};
20use hashbrown::HashMap;
21use portable_atomic::{AtomicU64, Ordering};
22use serde::de::DeserializeOwned;
23#[cfg(feature = "std")]
24use std::io::Read;
25#[cfg(feature = "std")]
26use std::path::Path;
27
28#[cfg(feature = "std")]
29use crate::curuntime::{RuntimeLifecycleEvent, RuntimeLifecycleRecord};
30#[cfg(feature = "std")]
31use cu29_unifiedlog::{UnifiedLogger, UnifiedLoggerBuilder, UnifiedLoggerIOReader};
32
33pub trait CuLogCodec<P: CuMsgPayload>: 'static {
34 type Config: DeserializeOwned + Default;
35
36 fn new(config: Self::Config) -> CuResult<Self>
37 where
38 Self: Sized;
39
40 fn source_payload_handle_bytes(&self, payload: &P) -> usize;
47
48 fn encode_payload<E: Encoder>(
49 &mut self,
50 payload: &P,
51 encoder: &mut E,
52 ) -> Result<(), EncodeError>;
53
54 fn decode_payload<D: Decoder<Context = ()>>(
55 &mut self,
56 decoder: &mut D,
57 ) -> Result<P, DecodeError>;
58}
59
60pub struct CodecState<C> {
61 inner: Mutex<Option<(u64, C)>>,
62}
63
64impl<C> CodecState<C> {
65 pub const fn new() -> Self {
66 Self {
67 inner: Mutex::new(None),
68 }
69 }
70}
71
72impl<C> Default for CodecState<C> {
73 fn default() -> Self {
74 Self::new()
75 }
76}
77
78pub struct EffectiveConfigEntry {
79 version: AtomicU64,
80 ron: Mutex<String>,
81}
82
83impl EffectiveConfigEntry {
84 fn new(ron: &str) -> Self {
85 Self {
86 version: AtomicU64::new(1),
87 ron: Mutex::new(ron.to_string()),
88 }
89 }
90
91 pub fn version(&self) -> u64 {
92 self.version.load(Ordering::Acquire)
93 }
94
95 pub fn ron(&self) -> String {
96 lock_mutex(&self.ron).clone()
97 }
98
99 fn set(&self, ron: &str) {
100 *lock_mutex(&self.ron) = ron.to_string();
101 self.version.fetch_add(1, Ordering::AcqRel);
102 }
103}
104
105type EffectiveConfigRegistry = HashMap<TypeId, &'static EffectiveConfigEntry>;
106
107static EFFECTIVE_CONFIGS: OnceLock<Mutex<EffectiveConfigRegistry>> = OnceLock::new();
108
109fn effective_config_registry() -> &'static Mutex<EffectiveConfigRegistry> {
110 once_get_or_init(&EFFECTIVE_CONFIGS, || Mutex::new(HashMap::new()))
111}
112
113pub fn effective_config_entry<T: 'static>(default_ron: &str) -> &'static EffectiveConfigEntry {
114 let registry = effective_config_registry();
115 let mut registry = lock_mutex(registry);
116 if let Some(entry) = registry.get(&TypeId::of::<T>()) {
117 return entry;
118 }
119
120 let entry = Box::leak(Box::new(EffectiveConfigEntry::new(default_ron)));
121 registry.insert(TypeId::of::<T>(), entry);
122 entry
123}
124
125pub fn set_effective_config_ron<T: 'static>(ron: &str) {
126 effective_config_entry::<T>(ron).set(ron);
127}
128
129pub fn with_codec_for_encode<C, R, B, F>(
130 state: &'static CodecState<C>,
131 config_entry: &EffectiveConfigEntry,
132 build: B,
133 f: F,
134) -> Result<R, EncodeError>
135where
136 B: FnOnce(&str) -> CuResult<C>,
137 F: FnOnce(&mut C) -> Result<R, EncodeError>,
138{
139 let version = config_entry.version();
140 let mut guard = lock_mutex(&state.inner);
141 if guard
142 .as_ref()
143 .is_none_or(|(cached_version, _)| *cached_version != version)
144 {
145 let effective_config_ron = config_entry.ron();
146 let codec = build(&effective_config_ron)
147 .map_err(|err| EncodeError::OtherString(err.to_string()))?;
148 *guard = Some((version, codec));
149 }
150 let (_, codec) = guard
151 .as_mut()
152 .expect("codec state must be initialized after build");
153 f(codec)
154}
155
156pub fn with_codec_for_decode<C, R, B, F>(
157 state: &'static CodecState<C>,
158 config_entry: &EffectiveConfigEntry,
159 build: B,
160 f: F,
161) -> Result<R, DecodeError>
162where
163 B: FnOnce(&str) -> CuResult<C>,
164 F: FnOnce(&mut C) -> Result<R, DecodeError>,
165{
166 let version = config_entry.version();
167 let mut guard = lock_mutex(&state.inner);
168 if guard
169 .as_ref()
170 .is_none_or(|(cached_version, _)| *cached_version != version)
171 {
172 let effective_config_ron = config_entry.ron();
173 let codec = build(&effective_config_ron)
174 .map_err(|err| DecodeError::OtherString(err.to_string()))?;
175 *guard = Some((version, codec));
176 }
177 let (_, codec) = guard
178 .as_mut()
179 .expect("codec state must be initialized after build");
180 f(codec)
181}
182
183pub fn resolve_task_output_codec<'a>(
184 config: &'a CuConfig,
185 mission_id: Option<&str>,
186 task_id: &str,
187 msg_type: &str,
188) -> CuResult<Option<&'a LoggingCodecSpec>> {
189 let node = config.find_task_node(mission_id, task_id).ok_or_else(|| {
190 CuError::from(format!(
191 "Could not find task '{task_id}' while resolving log codec for '{msg_type}'."
192 ))
193 })?;
194
195 let codec_id = node
196 .get_logging()
197 .and_then(|logging| logging.codec_for_msg_type(msg_type));
198 let Some(codec_id) = codec_id else {
199 return Ok(None);
200 };
201
202 config
203 .find_logging_codec_spec(codec_id)
204 .map(Some)
205 .ok_or_else(|| {
206 CuError::from(format!(
207 "Task '{task_id}' binds output '{msg_type}' to unknown logging codec '{codec_id}'."
208 ))
209 })
210}
211
212pub fn instantiate_codec<C, P>(
213 effective_config_ron: &str,
214 mission_id: Option<&str>,
215 task_id: &str,
216 msg_type: &str,
217 expected_type_path: &str,
218) -> CuResult<C>
219where
220 C: CuLogCodec<P>,
221 P: CuMsgPayload,
222{
223 let config = CuConfig::deserialize_ron(effective_config_ron)?;
224 let spec = resolve_task_output_codec(&config, mission_id, task_id, msg_type)?.ok_or_else(
225 || {
226 CuError::from(format!(
227 "Task '{task_id}' output '{msg_type}' has no configured logging codec in the effective config."
228 ))
229 },
230 )?;
231
232 if spec.type_ != expected_type_path {
233 return Err(CuError::from(format!(
234 "Task '{task_id}' output '{msg_type}' resolved logging codec type '{}' but '{}' was compiled for this slot.",
235 spec.type_, expected_type_path
236 )));
237 }
238
239 let codec_config = deserialize_codec_config::<C, P>(spec.config.as_ref())?;
240 C::new(codec_config)
241}
242
243pub fn deserialize_codec_config<C, P>(config: Option<&ComponentConfig>) -> CuResult<C::Config>
244where
245 C: CuLogCodec<P>,
246 P: CuMsgPayload,
247{
248 match config {
249 Some(config) => config.deserialize_into::<C::Config>().map_err(|err| {
250 CuError::from(format!(
251 "Failed to deserialize logging codec config for payload '{}': {err}",
252 core::any::type_name::<P>()
253 ))
254 }),
255 None => Ok(C::Config::default()),
256 }
257}
258
259pub fn encode_msg_with_codec<T, C, E>(
260 msg: &CuMsg<T>,
261 codec: &mut C,
262 encoder: &mut E,
263) -> Result<(), EncodeError>
264where
265 T: CuMsgPayload,
266 C: CuLogCodec<T>,
267 E: Encoder,
268{
269 match msg.payload() {
270 None => {
271 0u8.encode(encoder)?;
272 }
273 Some(payload) => {
274 1u8.encode(encoder)?;
275 let encoded_start = observed_encode_bytes();
276 let handle_start = crate::monitoring::current_payload_handle_bytes();
277 let source_handle_bytes = codec.source_payload_handle_bytes(payload);
278 if source_handle_bytes > 0 {
279 crate::monitoring::record_payload_handle_bytes(source_handle_bytes);
280 }
281 codec.encode_payload(payload, encoder)?;
282 let encoded_bytes = observed_encode_bytes().saturating_sub(encoded_start);
283 let handle_bytes =
284 crate::monitoring::current_payload_handle_bytes().saturating_sub(handle_start);
285 crate::monitoring::record_current_slot_payload_io_stats(
286 core::mem::size_of::<T>(),
287 encoded_bytes,
288 handle_bytes,
289 );
290 }
291 }
292 msg.tov.encode(encoder)?;
293 msg.metadata.encode(encoder)?;
294 Ok(())
295}
296
297pub fn decode_msg_with_codec<T, C, D>(
298 decoder: &mut D,
299 codec: &mut C,
300) -> Result<CuMsg<T>, DecodeError>
301where
302 T: CuMsgPayload,
303 C: CuLogCodec<T>,
304 D: Decoder<Context = ()>,
305{
306 let present: u8 = Decode::decode(decoder)?;
307 let payload = match present {
308 0 => None,
309 1 => Some(codec.decode_payload(decoder)?),
310 value => {
311 return Err(DecodeError::OtherString(format!(
312 "Invalid CuMsg presence tag {value} for payload '{}'",
313 core::any::type_name::<T>()
314 )));
315 }
316 };
317 let tov: Tov = Decode::decode(decoder)?;
318 let metadata: CuMsgMetadata = Decode::decode(decoder)?;
319 Ok(CuMsg::from_parts(payload, tov, metadata))
320}
321
322#[cfg(feature = "std")]
323fn read_next_entry<T: Decode<()>>(src: &mut impl Read) -> CuResult<Option<T>> {
324 match decode_from_std_read::<T, _, _>(src, standard()) {
325 Ok(entry) => Ok(Some(entry)),
326 Err(DecodeError::UnexpectedEnd { .. }) => Ok(None),
327 Err(DecodeError::Io { inner, .. }) if inner.kind() == std::io::ErrorKind::UnexpectedEof => {
328 Ok(None)
329 }
330 Err(err) => Err(CuError::new_with_cause(
331 "Failed to decode runtime lifecycle entry while loading effective log config",
332 err,
333 )),
334 }
335}
336
337#[cfg(feature = "std")]
338pub fn read_effective_config_ron_from_log(log_base: &Path) -> CuResult<Option<String>> {
339 let logger = UnifiedLoggerBuilder::new()
340 .file_base_name(log_base)
341 .build()
342 .map_err(|err| {
343 CuError::new_with_cause(
344 &format!(
345 "Failed to open Copper log '{}' while loading effective log config",
346 log_base.display()
347 ),
348 err,
349 )
350 })?;
351 let UnifiedLogger::Read(read_logger) = logger else {
352 return Err(CuError::from(
353 "Expected readable unified logger while loading effective log config",
354 ));
355 };
356
357 let mut reader =
358 UnifiedLoggerIOReader::new(read_logger, cu29_traits::UnifiedLogType::RuntimeLifecycle);
359 while let Some(record) = read_next_entry::<RuntimeLifecycleRecord>(&mut reader)? {
360 if let RuntimeLifecycleEvent::Instantiated {
361 effective_config_ron,
362 ..
363 } = record.event
364 {
365 return Ok(Some(effective_config_ron));
366 }
367 }
368
369 Ok(None)
370}
371
372#[cfg(feature = "std")]
373pub fn seed_effective_config_from_log<T: 'static>(log_base: &Path) -> CuResult<Option<String>> {
374 let effective_config_ron = read_effective_config_ron_from_log(log_base)?;
375 if let Some(ref ron) = effective_config_ron {
376 set_effective_config_ron::<T>(ron);
377 }
378 Ok(effective_config_ron)
379}