1use crate::copperlists_reader;
2use cu29::clock::{CuDuration, OptionCuTime};
3use cu29::config::{CuConfig, CuGraph, Flavor};
4use cu29::curuntime::{CuExecutionLoop, CuExecutionUnit, compute_runtime_plan};
5use cu29::monitoring::CuDurationStatistics;
6use cu29::prelude::{CopperListTuple, CuMsgMetadataTrait, CuPayloadRawBytes};
7use cu29::{CuError, CuResult};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::fs::File;
11use std::io::Read;
12use std::path::Path;
13
14const LOGSTATS_SCHEMA_VERSION: u32 = 1;
15const MAX_LATENCY_NS: u64 = 10_000_000_000;
16
17#[derive(Debug, Serialize, Deserialize)]
18pub struct LogStats {
19 pub schema_version: u32,
20 pub config_signature: String,
21 pub mission: Option<String>,
22 pub edges: Vec<EdgeLogStats>,
23 pub perf: PerfStats,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct EdgeLogStats {
28 pub src: String,
29 #[serde(skip_serializing_if = "Option::is_none")]
30 pub src_channel: Option<String>,
31 pub dst: String,
32 #[serde(skip_serializing_if = "Option::is_none")]
33 pub dst_channel: Option<String>,
34 pub msg: String,
35 pub samples: u64,
36 pub none_samples: u64,
37 pub valid_time_samples: u64,
38 pub total_raw_bytes: u64,
39 pub avg_raw_bytes: Option<f64>,
40 pub rate_hz: Option<f64>,
41 pub throughput_bytes_per_sec: Option<f64>,
42}
43
44#[derive(Debug, Serialize, Deserialize)]
45pub struct PerfStats {
46 pub samples: u64,
47 pub valid_time_samples: u64,
48 pub end_to_end: DurationStats,
49 pub jitter: DurationStats,
50}
51
52#[derive(Debug, Default, Serialize, Deserialize)]
53pub struct DurationStats {
54 pub min_ns: Option<u64>,
55 pub max_ns: Option<u64>,
56 pub mean_ns: Option<f64>,
57 pub stddev_ns: Option<f64>,
58}
59
60#[derive(Clone, Debug, Eq, Hash, PartialEq)]
61struct EdgeKey {
62 src: String,
63 src_channel: Option<String>,
64 dst: String,
65 dst_channel: Option<String>,
66 msg: String,
67}
68
69#[derive(Clone, Debug, Eq, Hash, PartialEq)]
70struct SrcMsgKey {
71 src: String,
72 msg: String,
73}
74
75#[derive(Clone, Debug)]
76struct OutputSlot {
77 edges: Vec<EdgeKey>,
78}
79
80#[derive(Debug, Default, Clone)]
81struct EdgeAccumulator {
82 samples: u64,
83 none_samples: u64,
84 valid_time_samples: u64,
85 total_raw_bytes: u64,
86 min_end_ns: Option<u64>,
87 max_end_ns: Option<u64>,
88}
89
90impl EdgeAccumulator {
91 fn record_sample(&mut self, payload_bytes: Option<u64>, end_time_ns: Option<u64>) {
92 self.samples = self.samples.saturating_add(1);
93 if let Some(bytes) = payload_bytes {
94 self.total_raw_bytes = self.total_raw_bytes.saturating_add(bytes);
95 } else {
96 self.none_samples = self.none_samples.saturating_add(1);
97 }
98
99 if let Some(end_ns) = end_time_ns {
100 self.valid_time_samples = self.valid_time_samples.saturating_add(1);
101 self.min_end_ns = Some(self.min_end_ns.map_or(end_ns, |min| min.min(end_ns)));
102 self.max_end_ns = Some(self.max_end_ns.map_or(end_ns, |max| max.max(end_ns)));
103 }
104 }
105
106 fn finalize(self, key: EdgeKey) -> EdgeLogStats {
107 let payload_samples = self.samples.saturating_sub(self.none_samples);
108 let avg_raw_bytes = if payload_samples > 0 {
109 Some(self.total_raw_bytes as f64 / payload_samples as f64)
110 } else {
111 None
112 };
113
114 let (rate_hz, throughput_bytes_per_sec) = if self.valid_time_samples >= 2 {
115 match (self.min_end_ns, self.max_end_ns) {
116 (Some(min_ns), Some(max_ns)) if max_ns > min_ns => {
117 let duration_ns = max_ns - min_ns;
118 let duration_secs = duration_ns as f64 / 1_000_000_000.0;
119 let intervals = (self.valid_time_samples - 1) as f64;
120 (
121 Some(intervals / duration_secs),
122 Some(self.total_raw_bytes as f64 / duration_secs),
123 )
124 }
125 _ => (None, None),
126 }
127 } else {
128 (None, None)
129 };
130
131 EdgeLogStats {
132 src: key.src,
133 src_channel: key.src_channel,
134 dst: key.dst,
135 dst_channel: key.dst_channel,
136 msg: key.msg,
137 samples: self.samples,
138 none_samples: self.none_samples,
139 valid_time_samples: self.valid_time_samples,
140 total_raw_bytes: self.total_raw_bytes,
141 avg_raw_bytes,
142 rate_hz,
143 throughput_bytes_per_sec,
144 }
145 }
146}
147
148#[derive(Debug)]
149struct PerfAccumulator {
150 stats: CuDurationStatistics,
151 samples: u64,
152 valid_time_samples: u64,
153}
154
155impl PerfAccumulator {
156 fn new() -> Self {
157 Self {
158 stats: CuDurationStatistics::new(CuDuration(MAX_LATENCY_NS)),
159 samples: 0,
160 valid_time_samples: 0,
161 }
162 }
163
164 fn record_sample(&mut self, latency: Option<CuDuration>) {
165 self.samples = self.samples.saturating_add(1);
166 if let Some(latency) = latency {
167 self.stats.record(latency);
168 self.valid_time_samples = self.valid_time_samples.saturating_add(1);
169 }
170 }
171
172 fn finalize(&self) -> PerfStats {
173 let end_to_end = duration_stats_from(&self.stats);
174 let jitter = jitter_stats_from(&self.stats);
175
176 PerfStats {
177 samples: self.samples,
178 valid_time_samples: self.valid_time_samples,
179 end_to_end,
180 jitter,
181 }
182 }
183}
184
185pub fn compute_logstats<P>(
186 mut reader: impl Read,
187 config: &CuConfig,
188 mission: Option<&str>,
189) -> CuResult<LogStats>
190where
191 P: CopperListTuple + CuPayloadRawBytes,
192{
193 let graph = config.get_graph(mission)?;
194 let signature = build_graph_signature(graph, mission);
195 let output_slots = build_output_slots(graph)?;
196 let mut edge_accumulators = build_edge_accumulators(graph);
197 let mut perf = PerfAccumulator::new();
198 let mut warned_lengths = false;
199
200 for culist in copperlists_reader::<P>(&mut reader) {
201 let payload_sizes = culist.msgs.payload_raw_bytes();
202 let cumsgs = culist.msgs.cumsgs();
203
204 let payload_len = payload_sizes.len();
205 let msg_len = cumsgs.len();
206 let slot_len = output_slots.len();
207 if !warned_lengths && (payload_len != msg_len || payload_len != slot_len) {
208 eprintln!(
209 "Warning: output mapping length mismatch (sizes={}, msgs={}, slots={})",
210 payload_len, msg_len, slot_len
211 );
212 warned_lengths = true;
213 }
214
215 let count = payload_len.min(msg_len).min(slot_len);
216
217 for idx in 0..count {
218 let slot = &output_slots[idx];
219 if slot.edges.is_empty() {
220 continue;
221 }
222 let payload_bytes = payload_sizes[idx];
223 let end_time_ns = extract_end_time_ns(cumsgs[idx].metadata());
224 for edge in &slot.edges {
225 if let Some(acc) = edge_accumulators.get_mut(edge) {
226 acc.record_sample(payload_bytes, end_time_ns);
227 }
228 }
229 }
230
231 perf.record_sample(compute_end_to_end_latency(&cumsgs));
232 }
233
234 let edges = edge_accumulators
235 .into_iter()
236 .map(|(key, acc)| acc.finalize(key))
237 .collect();
238
239 Ok(LogStats {
240 schema_version: LOGSTATS_SCHEMA_VERSION,
241 config_signature: signature,
242 mission: mission.map(|value| value.to_string()),
243 edges,
244 perf: perf.finalize(),
245 })
246}
247
248pub fn write_logstats(stats: &LogStats, path: &Path) -> CuResult<()> {
249 let file = File::create(path)
250 .map_err(|e| CuError::new_with_cause("Failed to create logstats output", e))?;
251 serde_json::to_writer_pretty(file, stats)
252 .map_err(|e| CuError::new_with_cause("Failed to serialize logstats", e))?;
253 Ok(())
254}
255
256fn build_output_slots(graph: &CuGraph) -> CuResult<Vec<OutputSlot>> {
257 let packs = collect_output_packs(graph)?;
258 let edges_by_src = build_edges_by_src_msg(graph);
259 let total_msgs: usize = packs.iter().map(|pack| pack.msg_types.len()).sum();
260 let mut slots = Vec::with_capacity(total_msgs);
261
262 for pack in packs {
263 for msg in pack.msg_types {
264 let edges = edges_by_src
265 .get(&SrcMsgKey {
266 src: pack.src.clone(),
267 msg: msg.clone(),
268 })
269 .cloned()
270 .unwrap_or_default();
271 slots.push(OutputSlot { edges });
272 }
273 }
274
275 Ok(slots)
276}
277
278fn build_edge_accumulators(graph: &CuGraph) -> HashMap<EdgeKey, EdgeAccumulator> {
279 let mut acc = HashMap::new();
280 for cnx in graph.edges() {
281 let key = EdgeKey {
282 src: cnx.src.clone(),
283 src_channel: cnx.src_channel.clone(),
284 dst: cnx.dst.clone(),
285 dst_channel: cnx.dst_channel.clone(),
286 msg: cnx.msg.clone(),
287 };
288 acc.entry(key).or_default();
289 }
290 acc
291}
292
293fn build_edges_by_src_msg(graph: &CuGraph) -> HashMap<SrcMsgKey, Vec<EdgeKey>> {
294 let mut map: HashMap<SrcMsgKey, Vec<EdgeKey>> = HashMap::new();
295 for cnx in graph.edges() {
296 let key = SrcMsgKey {
297 src: cnx.src.clone(),
298 msg: cnx.msg.clone(),
299 };
300 let edge = EdgeKey {
301 src: cnx.src.clone(),
302 src_channel: cnx.src_channel.clone(),
303 dst: cnx.dst.clone(),
304 dst_channel: cnx.dst_channel.clone(),
305 msg: cnx.msg.clone(),
306 };
307 map.entry(key).or_default().push(edge);
308 }
309 map
310}
311
312#[derive(Debug)]
313struct OutputPackInfo {
314 culist_index: u32,
315 src: String,
316 msg_types: Vec<String>,
317}
318
319fn collect_output_packs(graph: &CuGraph) -> CuResult<Vec<OutputPackInfo>> {
320 let plan = compute_runtime_plan(graph)?;
321 let mut packs = Vec::new();
322 collect_output_packs_from_loop(&plan, graph, &mut packs)?;
323 packs.sort_by_key(|pack| pack.culist_index);
324 Ok(packs)
325}
326
327fn collect_output_packs_from_loop(
328 loop_unit: &CuExecutionLoop,
329 graph: &CuGraph,
330 packs: &mut Vec<OutputPackInfo>,
331) -> CuResult<()> {
332 for step in &loop_unit.steps {
333 match step {
334 CuExecutionUnit::Step(step) => {
335 if let Some(output_pack) = &step.output_msg_pack {
336 let node = graph
337 .get_node(step.node_id)
338 .ok_or_else(|| CuError::from("Missing node for output pack"))?;
339 packs.push(OutputPackInfo {
340 culist_index: output_pack.culist_index,
341 src: node.get_id(),
342 msg_types: output_pack.msg_types.clone(),
343 });
344 }
345 }
346 CuExecutionUnit::Loop(inner) => {
347 collect_output_packs_from_loop(inner, graph, packs)?;
348 }
349 }
350 }
351 Ok(())
352}
353
354fn compute_end_to_end_latency(
355 msgs: &[&dyn cu29::prelude::ErasedCuStampedData],
356) -> Option<CuDuration> {
357 let start = msgs
358 .first()
359 .and_then(|msg| extract_start_time_ns(msg.metadata()))?;
360 let end = msgs
361 .last()
362 .and_then(|msg| extract_end_time_ns(msg.metadata()))?;
363 end.checked_sub(start).map(CuDuration::from_nanos)
364}
365
366fn extract_start_time_ns(meta: &dyn CuMsgMetadataTrait) -> Option<u64> {
367 option_time_ns(meta.process_time().start)
368}
369
370fn extract_end_time_ns(meta: &dyn CuMsgMetadataTrait) -> Option<u64> {
371 option_time_ns(meta.process_time().end)
372}
373
374fn option_time_ns(value: OptionCuTime) -> Option<u64> {
375 Option::<CuDuration>::from(value).map(|t| t.as_nanos())
376}
377
378fn duration_stats_from(stats: &CuDurationStatistics) -> DurationStats {
379 if stats.is_empty() {
380 return DurationStats::default();
381 }
382 DurationStats {
383 min_ns: Some(stats.min().as_nanos()),
384 max_ns: Some(stats.max().as_nanos()),
385 mean_ns: Some(stats.mean().as_nanos() as f64),
386 stddev_ns: Some(stats.stddev().as_nanos() as f64),
387 }
388}
389
390fn jitter_stats_from(stats: &CuDurationStatistics) -> DurationStats {
391 if stats.len() < 2 {
392 return DurationStats::default();
393 }
394 DurationStats {
395 min_ns: Some(stats.jitter_min().as_nanos()),
396 max_ns: Some(stats.jitter_max().as_nanos()),
397 mean_ns: Some(stats.jitter_mean().as_nanos() as f64),
398 stddev_ns: Some(stats.jitter_stddev().as_nanos() as f64),
399 }
400}
401
402fn build_graph_signature(graph: &CuGraph, mission: Option<&str>) -> String {
403 let mut parts = Vec::new();
404 parts.push(format!("mission={}", mission.unwrap_or("default")));
405
406 let mut nodes: Vec<_> = graph.get_all_nodes();
407 nodes.sort_by(|a, b| a.1.get_id().cmp(&b.1.get_id()));
408 for (_, node) in nodes {
409 parts.push(format!(
410 "node|{}|{}|{}",
411 node.get_id(),
412 node.get_type(),
413 flavor_label(node.get_flavor())
414 ));
415 }
416
417 let mut edges: Vec<String> = graph
418 .edges()
419 .map(|cnx| {
420 format!(
421 "edge|{}|{}|{}",
422 format_endpoint(cnx.src.as_str(), cnx.src_channel.as_deref()),
423 format_endpoint(cnx.dst.as_str(), cnx.dst_channel.as_deref()),
424 cnx.msg
425 )
426 })
427 .collect();
428 edges.sort();
429 parts.extend(edges);
430
431 let joined = parts.join("\n");
432 format!("fnv1a64:{:016x}", fnv1a64(joined.as_bytes()))
433}
434
435fn flavor_label(flavor: Flavor) -> &'static str {
436 match flavor {
437 Flavor::Task => "task",
438 Flavor::Bridge => "bridge",
439 }
440}
441
442fn format_endpoint(node: &str, channel: Option<&str>) -> String {
443 match channel {
444 Some(ch) => format!("{node}/{ch}"),
445 None => node.to_string(),
446 }
447}
448
449fn fnv1a64(data: &[u8]) -> u64 {
450 const OFFSET_BASIS: u64 = 0xcbf29ce484222325;
451 const PRIME: u64 = 0x100000001b3;
452 let mut hash = OFFSET_BASIS;
453 for byte in data {
454 hash ^= u64::from(*byte);
455 hash = hash.wrapping_mul(PRIME);
456 }
457 hash
458}
459
460#[cfg(test)]
461mod tests {
462 use super::*;
463
464 fn edge_key() -> EdgeKey {
465 EdgeKey {
466 src: "src".to_string(),
467 src_channel: None,
468 dst: "dst".to_string(),
469 dst_channel: None,
470 msg: "Msg".to_string(),
471 }
472 }
473
474 #[test]
475 fn edge_stats_average_and_rate() {
476 let mut acc = EdgeAccumulator::default();
477 acc.record_sample(Some(100), Some(1_000_000_000));
478 acc.record_sample(Some(300), Some(2_000_000_000));
479 let stats = acc.finalize(edge_key());
480
481 assert_eq!(stats.samples, 2);
482 assert_eq!(stats.none_samples, 0);
483 assert_eq!(stats.total_raw_bytes, 400);
484 assert!((stats.avg_raw_bytes.unwrap() - 200.0).abs() < 1e-6);
485 assert!((stats.rate_hz.unwrap() - 1.0).abs() < 1e-6);
486 assert!((stats.throughput_bytes_per_sec.unwrap() - 400.0).abs() < 1e-6);
487 }
488
489 #[test]
490 fn edge_stats_handles_missing_times() {
491 let mut acc = EdgeAccumulator::default();
492 acc.record_sample(Some(64), None);
493 let stats = acc.finalize(edge_key());
494 assert_eq!(stats.samples, 1);
495 assert_eq!(stats.valid_time_samples, 0);
496 assert!(stats.rate_hz.is_none());
497 assert!(stats.throughput_bytes_per_sec.is_none());
498 }
499
500 #[test]
501 fn perf_stats_skip_missing_latency() {
502 let mut perf = PerfAccumulator::new();
503 perf.record_sample(Some(CuDuration::from_nanos(1_000)));
504 perf.record_sample(None);
505 let stats = perf.finalize();
506
507 assert_eq!(stats.samples, 2);
508 assert_eq!(stats.valid_time_samples, 1);
509 assert_eq!(stats.end_to_end.min_ns, Some(1_000));
510 assert_eq!(stats.end_to_end.max_ns, Some(1_000));
511 assert_eq!(stats.jitter.min_ns, None);
512 }
513}