Skip to main content

cu29_export/
logstats.rs

1use crate::copperlists_reader;
2use cu29::clock::{CuDuration, OptionCuTime};
3use cu29::config::{CuConfig, CuGraph, Flavor};
4use cu29::curuntime::{CuExecutionLoop, CuExecutionUnit, compute_runtime_plan};
5use cu29::monitoring::CuDurationStatistics;
6use cu29::prelude::{CopperListTuple, CuMsgMetadataTrait, CuPayloadRawBytes};
7use cu29::{CuError, CuResult};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::fs::File;
11use std::io::Read;
12use std::path::Path;
13
14const LOGSTATS_SCHEMA_VERSION: u32 = 1;
15const MAX_LATENCY_NS: u64 = 10_000_000_000;
16
17#[derive(Debug, Serialize, Deserialize)]
18pub struct LogStats {
19    pub schema_version: u32,
20    pub config_signature: String,
21    pub mission: Option<String>,
22    pub edges: Vec<EdgeLogStats>,
23    pub perf: PerfStats,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct EdgeLogStats {
28    pub src: String,
29    #[serde(skip_serializing_if = "Option::is_none")]
30    pub src_channel: Option<String>,
31    pub dst: String,
32    #[serde(skip_serializing_if = "Option::is_none")]
33    pub dst_channel: Option<String>,
34    pub msg: String,
35    pub samples: u64,
36    pub none_samples: u64,
37    pub valid_time_samples: u64,
38    pub total_raw_bytes: u64,
39    pub avg_raw_bytes: Option<f64>,
40    pub rate_hz: Option<f64>,
41    pub throughput_bytes_per_sec: Option<f64>,
42}
43
44#[derive(Debug, Serialize, Deserialize)]
45pub struct PerfStats {
46    pub samples: u64,
47    pub valid_time_samples: u64,
48    pub end_to_end: DurationStats,
49    pub jitter: DurationStats,
50}
51
52#[derive(Debug, Default, Serialize, Deserialize)]
53pub struct DurationStats {
54    pub min_ns: Option<u64>,
55    pub max_ns: Option<u64>,
56    pub mean_ns: Option<f64>,
57    pub stddev_ns: Option<f64>,
58}
59
60#[derive(Clone, Debug, Eq, Hash, PartialEq)]
61struct EdgeKey {
62    src: String,
63    src_channel: Option<String>,
64    dst: String,
65    dst_channel: Option<String>,
66    msg: String,
67}
68
69#[derive(Clone, Debug, Eq, Hash, PartialEq)]
70struct SrcMsgKey {
71    src: String,
72    msg: String,
73}
74
75#[derive(Clone, Debug)]
76struct OutputSlot {
77    edges: Vec<EdgeKey>,
78}
79
80#[derive(Debug, Default, Clone)]
81struct EdgeAccumulator {
82    samples: u64,
83    none_samples: u64,
84    valid_time_samples: u64,
85    total_raw_bytes: u64,
86    min_end_ns: Option<u64>,
87    max_end_ns: Option<u64>,
88}
89
90impl EdgeAccumulator {
91    fn record_sample(&mut self, payload_bytes: Option<u64>, end_time_ns: Option<u64>) {
92        self.samples = self.samples.saturating_add(1);
93        if let Some(bytes) = payload_bytes {
94            self.total_raw_bytes = self.total_raw_bytes.saturating_add(bytes);
95        } else {
96            self.none_samples = self.none_samples.saturating_add(1);
97        }
98
99        if let Some(end_ns) = end_time_ns {
100            self.valid_time_samples = self.valid_time_samples.saturating_add(1);
101            self.min_end_ns = Some(self.min_end_ns.map_or(end_ns, |min| min.min(end_ns)));
102            self.max_end_ns = Some(self.max_end_ns.map_or(end_ns, |max| max.max(end_ns)));
103        }
104    }
105
106    fn finalize(self, key: EdgeKey) -> EdgeLogStats {
107        let payload_samples = self.samples.saturating_sub(self.none_samples);
108        let avg_raw_bytes = if payload_samples > 0 {
109            Some(self.total_raw_bytes as f64 / payload_samples as f64)
110        } else {
111            None
112        };
113
114        let (rate_hz, throughput_bytes_per_sec) = if self.valid_time_samples >= 2 {
115            match (self.min_end_ns, self.max_end_ns) {
116                (Some(min_ns), Some(max_ns)) if max_ns > min_ns => {
117                    let duration_ns = max_ns - min_ns;
118                    let duration_secs = duration_ns as f64 / 1_000_000_000.0;
119                    let intervals = (self.valid_time_samples - 1) as f64;
120                    (
121                        Some(intervals / duration_secs),
122                        Some(self.total_raw_bytes as f64 / duration_secs),
123                    )
124                }
125                _ => (None, None),
126            }
127        } else {
128            (None, None)
129        };
130
131        EdgeLogStats {
132            src: key.src,
133            src_channel: key.src_channel,
134            dst: key.dst,
135            dst_channel: key.dst_channel,
136            msg: key.msg,
137            samples: self.samples,
138            none_samples: self.none_samples,
139            valid_time_samples: self.valid_time_samples,
140            total_raw_bytes: self.total_raw_bytes,
141            avg_raw_bytes,
142            rate_hz,
143            throughput_bytes_per_sec,
144        }
145    }
146}
147
148#[derive(Debug)]
149struct PerfAccumulator {
150    stats: CuDurationStatistics,
151    samples: u64,
152    valid_time_samples: u64,
153}
154
155impl PerfAccumulator {
156    fn new() -> Self {
157        Self {
158            stats: CuDurationStatistics::new(CuDuration(MAX_LATENCY_NS)),
159            samples: 0,
160            valid_time_samples: 0,
161        }
162    }
163
164    fn record_sample(&mut self, latency: Option<CuDuration>) {
165        self.samples = self.samples.saturating_add(1);
166        if let Some(latency) = latency {
167            self.stats.record(latency);
168            self.valid_time_samples = self.valid_time_samples.saturating_add(1);
169        }
170    }
171
172    fn finalize(&self) -> PerfStats {
173        let end_to_end = duration_stats_from(&self.stats);
174        let jitter = jitter_stats_from(&self.stats);
175
176        PerfStats {
177            samples: self.samples,
178            valid_time_samples: self.valid_time_samples,
179            end_to_end,
180            jitter,
181        }
182    }
183}
184
185pub fn compute_logstats<P>(
186    mut reader: impl Read,
187    config: &CuConfig,
188    mission: Option<&str>,
189) -> CuResult<LogStats>
190where
191    P: CopperListTuple + CuPayloadRawBytes,
192{
193    let graph = config.get_graph(mission)?;
194    let signature = build_graph_signature(graph, mission);
195    let output_slots = build_output_slots(graph)?;
196    let mut edge_accumulators = build_edge_accumulators(graph);
197    let mut perf = PerfAccumulator::new();
198    let mut warned_lengths = false;
199
200    for culist in copperlists_reader::<P>(&mut reader) {
201        let payload_sizes = culist.msgs.payload_raw_bytes();
202        let cumsgs = culist.msgs.cumsgs();
203
204        let payload_len = payload_sizes.len();
205        let msg_len = cumsgs.len();
206        let slot_len = output_slots.len();
207        if !warned_lengths && (payload_len != msg_len || payload_len != slot_len) {
208            eprintln!(
209                "Warning: output mapping length mismatch (sizes={}, msgs={}, slots={})",
210                payload_len, msg_len, slot_len
211            );
212            warned_lengths = true;
213        }
214
215        let count = payload_len.min(msg_len).min(slot_len);
216
217        for idx in 0..count {
218            let slot = &output_slots[idx];
219            if slot.edges.is_empty() {
220                continue;
221            }
222            let payload_bytes = payload_sizes[idx];
223            let end_time_ns = extract_end_time_ns(cumsgs[idx].metadata());
224            for edge in &slot.edges {
225                if let Some(acc) = edge_accumulators.get_mut(edge) {
226                    acc.record_sample(payload_bytes, end_time_ns);
227                }
228            }
229        }
230
231        perf.record_sample(compute_end_to_end_latency(&cumsgs));
232    }
233
234    let edges = edge_accumulators
235        .into_iter()
236        .map(|(key, acc)| acc.finalize(key))
237        .collect();
238
239    Ok(LogStats {
240        schema_version: LOGSTATS_SCHEMA_VERSION,
241        config_signature: signature,
242        mission: mission.map(|value| value.to_string()),
243        edges,
244        perf: perf.finalize(),
245    })
246}
247
248pub fn write_logstats(stats: &LogStats, path: &Path) -> CuResult<()> {
249    let file = File::create(path)
250        .map_err(|e| CuError::new_with_cause("Failed to create logstats output", e))?;
251    serde_json::to_writer_pretty(file, stats)
252        .map_err(|e| CuError::new_with_cause("Failed to serialize logstats", e))?;
253    Ok(())
254}
255
256fn build_output_slots(graph: &CuGraph) -> CuResult<Vec<OutputSlot>> {
257    let packs = collect_output_packs(graph)?;
258    let edges_by_src = build_edges_by_src_msg(graph);
259    let total_msgs: usize = packs.iter().map(|pack| pack.msg_types.len()).sum();
260    let mut slots = Vec::with_capacity(total_msgs);
261
262    for pack in packs {
263        for msg in pack.msg_types {
264            let edges = edges_by_src
265                .get(&SrcMsgKey {
266                    src: pack.src.clone(),
267                    msg: msg.clone(),
268                })
269                .cloned()
270                .unwrap_or_default();
271            slots.push(OutputSlot { edges });
272        }
273    }
274
275    Ok(slots)
276}
277
278fn build_edge_accumulators(graph: &CuGraph) -> HashMap<EdgeKey, EdgeAccumulator> {
279    let mut acc = HashMap::new();
280    for cnx in graph.edges() {
281        let key = EdgeKey {
282            src: cnx.src.clone(),
283            src_channel: cnx.src_channel.clone(),
284            dst: cnx.dst.clone(),
285            dst_channel: cnx.dst_channel.clone(),
286            msg: cnx.msg.clone(),
287        };
288        acc.entry(key).or_default();
289    }
290    acc
291}
292
293fn build_edges_by_src_msg(graph: &CuGraph) -> HashMap<SrcMsgKey, Vec<EdgeKey>> {
294    let mut map: HashMap<SrcMsgKey, Vec<EdgeKey>> = HashMap::new();
295    for cnx in graph.edges() {
296        let key = SrcMsgKey {
297            src: cnx.src.clone(),
298            msg: cnx.msg.clone(),
299        };
300        let edge = EdgeKey {
301            src: cnx.src.clone(),
302            src_channel: cnx.src_channel.clone(),
303            dst: cnx.dst.clone(),
304            dst_channel: cnx.dst_channel.clone(),
305            msg: cnx.msg.clone(),
306        };
307        map.entry(key).or_default().push(edge);
308    }
309    map
310}
311
312#[derive(Debug)]
313struct OutputPackInfo {
314    culist_index: u32,
315    src: String,
316    msg_types: Vec<String>,
317}
318
319fn collect_output_packs(graph: &CuGraph) -> CuResult<Vec<OutputPackInfo>> {
320    let plan = compute_runtime_plan(graph)?;
321    let mut packs = Vec::new();
322    collect_output_packs_from_loop(&plan, graph, &mut packs)?;
323    packs.sort_by_key(|pack| pack.culist_index);
324    Ok(packs)
325}
326
327fn collect_output_packs_from_loop(
328    loop_unit: &CuExecutionLoop,
329    graph: &CuGraph,
330    packs: &mut Vec<OutputPackInfo>,
331) -> CuResult<()> {
332    for step in &loop_unit.steps {
333        match step {
334            CuExecutionUnit::Step(step) => {
335                if let Some(output_pack) = &step.output_msg_pack {
336                    let node = graph
337                        .get_node(step.node_id)
338                        .ok_or_else(|| CuError::from("Missing node for output pack"))?;
339                    packs.push(OutputPackInfo {
340                        culist_index: output_pack.culist_index,
341                        src: node.get_id(),
342                        msg_types: output_pack.msg_types.clone(),
343                    });
344                }
345            }
346            CuExecutionUnit::Loop(inner) => {
347                collect_output_packs_from_loop(inner, graph, packs)?;
348            }
349        }
350    }
351    Ok(())
352}
353
354fn compute_end_to_end_latency(
355    msgs: &[&dyn cu29::prelude::ErasedCuStampedData],
356) -> Option<CuDuration> {
357    let start = msgs
358        .first()
359        .and_then(|msg| extract_start_time_ns(msg.metadata()))?;
360    let end = msgs
361        .last()
362        .and_then(|msg| extract_end_time_ns(msg.metadata()))?;
363    end.checked_sub(start).map(CuDuration::from_nanos)
364}
365
366fn extract_start_time_ns(meta: &dyn CuMsgMetadataTrait) -> Option<u64> {
367    option_time_ns(meta.process_time().start)
368}
369
370fn extract_end_time_ns(meta: &dyn CuMsgMetadataTrait) -> Option<u64> {
371    option_time_ns(meta.process_time().end)
372}
373
374fn option_time_ns(value: OptionCuTime) -> Option<u64> {
375    Option::<CuDuration>::from(value).map(|t| t.as_nanos())
376}
377
378fn duration_stats_from(stats: &CuDurationStatistics) -> DurationStats {
379    if stats.is_empty() {
380        return DurationStats::default();
381    }
382    DurationStats {
383        min_ns: Some(stats.min().as_nanos()),
384        max_ns: Some(stats.max().as_nanos()),
385        mean_ns: Some(stats.mean().as_nanos() as f64),
386        stddev_ns: Some(stats.stddev().as_nanos() as f64),
387    }
388}
389
390fn jitter_stats_from(stats: &CuDurationStatistics) -> DurationStats {
391    if stats.len() < 2 {
392        return DurationStats::default();
393    }
394    DurationStats {
395        min_ns: Some(stats.jitter_min().as_nanos()),
396        max_ns: Some(stats.jitter_max().as_nanos()),
397        mean_ns: Some(stats.jitter_mean().as_nanos() as f64),
398        stddev_ns: Some(stats.jitter_stddev().as_nanos() as f64),
399    }
400}
401
402fn build_graph_signature(graph: &CuGraph, mission: Option<&str>) -> String {
403    let mut parts = Vec::new();
404    parts.push(format!("mission={}", mission.unwrap_or("default")));
405
406    let mut nodes: Vec<_> = graph.get_all_nodes();
407    nodes.sort_by(|a, b| a.1.get_id().cmp(&b.1.get_id()));
408    for (_, node) in nodes {
409        parts.push(format!(
410            "node|{}|{}|{}",
411            node.get_id(),
412            node.get_type(),
413            flavor_label(node.get_flavor())
414        ));
415    }
416
417    let mut edges: Vec<String> = graph
418        .edges()
419        .map(|cnx| {
420            format!(
421                "edge|{}|{}|{}",
422                format_endpoint(cnx.src.as_str(), cnx.src_channel.as_deref()),
423                format_endpoint(cnx.dst.as_str(), cnx.dst_channel.as_deref()),
424                cnx.msg
425            )
426        })
427        .collect();
428    edges.sort();
429    parts.extend(edges);
430
431    let joined = parts.join("\n");
432    format!("fnv1a64:{:016x}", fnv1a64(joined.as_bytes()))
433}
434
435fn flavor_label(flavor: Flavor) -> &'static str {
436    match flavor {
437        Flavor::Task => "task",
438        Flavor::Bridge => "bridge",
439    }
440}
441
442fn format_endpoint(node: &str, channel: Option<&str>) -> String {
443    match channel {
444        Some(ch) => format!("{node}/{ch}"),
445        None => node.to_string(),
446    }
447}
448
449fn fnv1a64(data: &[u8]) -> u64 {
450    const OFFSET_BASIS: u64 = 0xcbf29ce484222325;
451    const PRIME: u64 = 0x100000001b3;
452    let mut hash = OFFSET_BASIS;
453    for byte in data {
454        hash ^= u64::from(*byte);
455        hash = hash.wrapping_mul(PRIME);
456    }
457    hash
458}
459
460#[cfg(test)]
461mod tests {
462    use super::*;
463
464    fn edge_key() -> EdgeKey {
465        EdgeKey {
466            src: "src".to_string(),
467            src_channel: None,
468            dst: "dst".to_string(),
469            dst_channel: None,
470            msg: "Msg".to_string(),
471        }
472    }
473
474    #[test]
475    fn edge_stats_average_and_rate() {
476        let mut acc = EdgeAccumulator::default();
477        acc.record_sample(Some(100), Some(1_000_000_000));
478        acc.record_sample(Some(300), Some(2_000_000_000));
479        let stats = acc.finalize(edge_key());
480
481        assert_eq!(stats.samples, 2);
482        assert_eq!(stats.none_samples, 0);
483        assert_eq!(stats.total_raw_bytes, 400);
484        assert!((stats.avg_raw_bytes.unwrap() - 200.0).abs() < 1e-6);
485        assert!((stats.rate_hz.unwrap() - 1.0).abs() < 1e-6);
486        assert!((stats.throughput_bytes_per_sec.unwrap() - 400.0).abs() < 1e-6);
487    }
488
489    #[test]
490    fn edge_stats_handles_missing_times() {
491        let mut acc = EdgeAccumulator::default();
492        acc.record_sample(Some(64), None);
493        let stats = acc.finalize(edge_key());
494        assert_eq!(stats.samples, 1);
495        assert_eq!(stats.valid_time_samples, 0);
496        assert!(stats.rate_hz.is_none());
497        assert!(stats.throughput_bytes_per_sec.is_none());
498    }
499
500    #[test]
501    fn perf_stats_skip_missing_latency() {
502        let mut perf = PerfAccumulator::new();
503        perf.record_sample(Some(CuDuration::from_nanos(1_000)));
504        perf.record_sample(None);
505        let stats = perf.finalize();
506
507        assert_eq!(stats.samples, 2);
508        assert_eq!(stats.valid_time_samples, 1);
509        assert_eq!(stats.end_to_end.min_ns, Some(1_000));
510        assert_eq!(stats.end_to_end.max_ns, Some(1_000));
511        assert_eq!(stats.jitter.min_ns, None);
512    }
513}