cu29_export/
lib.rs

1mod fsck;
2
3use bincode::config::standard;
4use bincode::decode_from_std_read;
5use bincode::error::DecodeError;
6use clap::{Parser, Subcommand, ValueEnum};
7use cu29::prelude::*;
8use cu29::UnifiedLogType;
9use cu29_intern_strs::read_interned_strings;
10use fsck::check;
11use std::fmt::{Display, Formatter};
12use std::io::Read;
13use std::path::{Path, PathBuf};
14
15#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum)]
16pub enum ExportFormat {
17    Json,
18    Csv,
19}
20
21impl Display for ExportFormat {
22    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
23        match self {
24            ExportFormat::Json => write!(f, "json"),
25            ExportFormat::Csv => write!(f, "csv"),
26        }
27    }
28}
29
30/// This is a generator for a main function to build a log extractor.
31#[derive(Parser)]
32#[command(author, version, about)]
33pub struct LogReaderCli {
34    /// The base path is the name with no _0 _1 et the end.
35    /// for example for toto_0.copper, toto_1.copper ... the base name is toto.copper
36    pub unifiedlog_base: PathBuf,
37
38    #[command(subcommand)]
39    pub command: Command,
40}
41
42#[derive(Subcommand)]
43pub enum Command {
44    /// Extract logs
45    ExtractTextLog { log_index: PathBuf },
46    /// Extract copperlists
47    ExtractCopperlists {
48        #[arg(short, long, default_value_t = ExportFormat::Json)]
49        export_format: ExportFormat,
50    },
51    /// Check the log and dump info about it.
52    Fsck {
53        #[arg(short, long, action = clap::ArgAction::Count)]
54        verbose: u8,
55    },
56}
57
58/// This is a generator for a main function to build a log extractor.
59/// It depends on the specific type of the CopperList payload that is determined at compile time from the configuration.
60pub fn run_cli<P>() -> CuResult<()>
61where
62    P: CopperListTuple,
63{
64    let args = LogReaderCli::parse();
65    let unifiedlog_base = args.unifiedlog_base;
66
67    let UnifiedLogger::Read(mut dl) = UnifiedLoggerBuilder::new()
68        .file_base_name(&unifiedlog_base)
69        .build()
70        .expect("Failed to create logger")
71    else {
72        panic!("Failed to create logger");
73    };
74
75    match args.command {
76        Command::ExtractTextLog { log_index } => {
77            let reader = UnifiedLoggerIOReader::new(dl, UnifiedLogType::StructuredLogLine);
78            textlog_dump(reader, &log_index)?;
79        }
80        Command::ExtractCopperlists { export_format } => {
81            println!("Extracting copperlists with format: {export_format}");
82            let mut reader = UnifiedLoggerIOReader::new(dl, UnifiedLogType::CopperList);
83            let iter = copperlists_reader::<P>(&mut reader);
84
85            match export_format {
86                ExportFormat::Json => {
87                    for entry in iter {
88                        serde_json::to_writer_pretty(std::io::stdout(), &entry).unwrap();
89                    }
90                }
91                ExportFormat::Csv => {
92                    let mut first = true;
93                    for origin in P::get_all_task_ids() {
94                        if !first {
95                            print!(", ");
96                        } else {
97                            print!("id, ");
98                        }
99                        print!("{origin}_time, {origin}_tov, {origin},");
100                        first = false;
101                    }
102                    println!();
103                    for entry in iter {
104                        let mut first = true;
105                        for msg in entry.cumsgs() {
106                            if let Some(payload) = msg.payload() {
107                                if !first {
108                                    print!(", ");
109                                } else {
110                                    print!("{}, ", entry.id);
111                                }
112                                let metadata = msg.metadata();
113                                print!("{}, {}, ", metadata.process_time(), msg.tov());
114                                serde_json::to_writer(std::io::stdout(), payload).unwrap(); // TODO: escape for CSV
115                                first = false;
116                            }
117                        }
118                        println!();
119                    }
120                }
121            }
122        }
123        Command::Fsck { verbose } => {
124            if let Some(value) = check::<P>(&mut dl, verbose) {
125                return value;
126            }
127        }
128    }
129
130    Ok(())
131}
132/// Extracts the copper lists from a binary representation.
133/// P is the Payload determined by the configuration of the application.
134pub fn copperlists_reader<P: CopperListTuple>(
135    mut src: impl Read,
136) -> impl Iterator<Item = CopperList<P>> {
137    std::iter::from_fn(move || {
138        let entry = decode_from_std_read::<CopperList<P>, _, _>(&mut src, standard());
139        match entry {
140            Ok(entry) => Some(entry),
141            Err(e) => match e {
142                DecodeError::UnexpectedEnd { .. } => None,
143                DecodeError::Io { inner, additional } => {
144                    if inner.kind() == std::io::ErrorKind::UnexpectedEof {
145                        None
146                    } else {
147                        println!("Error {inner:?} additional:{additional}");
148                        None
149                    }
150                }
151                _ => {
152                    println!("Error {e:?}");
153                    None
154                }
155            },
156        }
157    })
158}
159
160/// Extracts the keyframes from the log.
161pub fn keyframes_reader(mut src: impl Read) -> impl Iterator<Item = KeyFrame> {
162    std::iter::from_fn(move || {
163        let entry = decode_from_std_read::<KeyFrame, _, _>(&mut src, standard());
164        match entry {
165            Ok(entry) => Some(entry),
166            Err(e) => match e {
167                DecodeError::UnexpectedEnd { .. } => None,
168                DecodeError::Io { inner, additional } => {
169                    if inner.kind() == std::io::ErrorKind::UnexpectedEof {
170                        None
171                    } else {
172                        println!("Error {inner:?} additional:{additional}");
173                        None
174                    }
175                }
176                _ => {
177                    println!("Error {e:?}");
178                    None
179                }
180            },
181        }
182    })
183}
184
185pub fn structlog_reader(mut src: impl Read) -> impl Iterator<Item = CuResult<CuLogEntry>> {
186    std::iter::from_fn(move || {
187        let entry = decode_from_std_read::<CuLogEntry, _, _>(&mut src, standard());
188
189        match entry {
190            Err(DecodeError::UnexpectedEnd { .. }) => None,
191            Err(DecodeError::Io {
192                inner,
193                additional: _,
194            }) => {
195                if inner.kind() == std::io::ErrorKind::UnexpectedEof {
196                    None
197                } else {
198                    Some(Err(CuError::new_with_cause("Error reading log", inner)))
199                }
200            }
201            Err(e) => Some(Err(CuError::new_with_cause("Error reading log", e))),
202            Ok(entry) => {
203                if entry.msg_index == 0 {
204                    None
205                } else {
206                    Some(Ok(entry))
207                }
208            }
209        }
210    })
211}
212
213/// Full dump of the copper structured log from its binary representation.
214/// This rebuilds a textual log.
215/// src: the source of the log data
216/// index: the path to the index file (containing the interned strings constructed at build time)
217pub fn textlog_dump(src: impl Read, index: &Path) -> CuResult<()> {
218    let all_strings = read_interned_strings(index).map_err(|e| {
219        CuError::new_with_cause(
220            "Failed to read interned strings from index",
221            std::io::Error::other(e),
222        )
223    })?;
224
225    for result in structlog_reader(src) {
226        match result {
227            Ok(entry) => match rebuild_logline(&all_strings, &entry) {
228                Ok(line) => println!("{}: {}", entry.time, line),
229                Err(e) => println!("Failed to rebuild log line: {e:?}"),
230            },
231            Err(e) => return Err(e),
232        }
233    }
234
235    Ok(())
236}
237
238// only for users opting into python interface, not supported on macOS at the moment
239#[cfg(all(feature = "python", not(target_os = "macos")))]
240mod python {
241    use bincode::config::standard;
242    use bincode::decode_from_std_read;
243    use bincode::error::DecodeError;
244    use cu29::prelude::*;
245    use cu29_intern_strs::read_interned_strings;
246    use pyo3::exceptions::PyIOError;
247    use pyo3::prelude::*;
248    use pyo3::types::{PyDelta, PyDict, PyList};
249    use std::io::Read;
250    use std::path::Path;
251
252    #[pyclass]
253    pub struct PyLogIterator {
254        reader: Box<dyn Read + Send + Sync>,
255    }
256
257    #[pymethods]
258    impl PyLogIterator {
259        fn __iter__(slf: PyRefMut<Self>) -> PyRefMut<Self> {
260            slf
261        }
262
263        fn __next__(mut slf: PyRefMut<Self>) -> Option<PyResult<PyCuLogEntry>> {
264            match decode_from_std_read::<CuLogEntry, _, _>(&mut slf.reader, standard()) {
265                Ok(entry) => {
266                    if entry.msg_index == 0 {
267                        None
268                    } else {
269                        Some(Ok(PyCuLogEntry { inner: entry }))
270                    }
271                }
272                Err(DecodeError::UnexpectedEnd { .. }) => None,
273                Err(DecodeError::Io { inner, .. })
274                    if inner.kind() == std::io::ErrorKind::UnexpectedEof =>
275                {
276                    None
277                }
278                Err(e) => Some(Err(PyIOError::new_err(e.to_string()))),
279            }
280        }
281    }
282
283    /// Creates an iterator of CuLogEntries from a bare binary structured log file (ie. not within a unified log).
284    /// This is mainly used for using the structured logging out of the Copper framework.
285    /// it returns a tuple with the iterator of log entries and the list of interned strings.
286    #[pyfunction]
287    pub fn struct_log_iterator_bare(
288        bare_struct_src_path: &str,
289        index_path: &str,
290    ) -> PyResult<(PyLogIterator, Vec<String>)> {
291        let file = std::fs::File::open(bare_struct_src_path)
292            .map_err(|e| PyIOError::new_err(e.to_string()))?;
293        let all_strings = read_interned_strings(Path::new(index_path))
294            .map_err(|e| PyIOError::new_err(e.to_string()))?;
295        Ok((
296            PyLogIterator {
297                reader: Box::new(file),
298            },
299            all_strings,
300        ))
301    }
302    /// Creates an iterator of CuLogEntries from a unified log file.
303    /// This function allows you to easily use python to datamind Copper's structured text logs.
304    /// it returns a tuple with the iterator of log entries and the list of interned strings.
305    #[pyfunction]
306    pub fn struct_log_iterator_unified(
307        unified_src_path: &str,
308        index_path: &str,
309    ) -> PyResult<(PyLogIterator, Vec<String>)> {
310        let all_strings = read_interned_strings(Path::new(index_path))
311            .map_err(|e| PyIOError::new_err(e.to_string()))?;
312
313        let UnifiedLogger::Read(dl) = UnifiedLoggerBuilder::new()
314            .file_base_name(Path::new(unified_src_path))
315            .build()
316            .expect("Failed to create logger")
317        else {
318            panic!("Failed to create logger");
319        };
320
321        let reader = UnifiedLoggerIOReader::new(dl, UnifiedLogType::StructuredLogLine);
322        Ok((
323            PyLogIterator {
324                reader: Box::new(reader),
325            },
326            all_strings,
327        ))
328    }
329
330    /// This is a python wrapper for CuLogEntries.
331    #[pyclass]
332    pub struct PyCuLogEntry {
333        pub inner: CuLogEntry,
334    }
335
336    #[pymethods]
337    impl PyCuLogEntry {
338        /// Returns the timestamp of the log entry.
339        pub fn ts<'a>(&self, py: Python<'a>) -> Bound<'a, PyDelta> {
340            let nanoseconds: u64 = self.inner.time.into();
341
342            // Convert nanoseconds to seconds and microseconds
343            let days = (nanoseconds / 86_400_000_000_000) as i32;
344            let seconds = (nanoseconds / 1_000_000_000) as i32;
345            let microseconds = ((nanoseconds % 1_000_000_000) / 1_000) as i32;
346
347            PyDelta::new(py, days, seconds, microseconds, false).unwrap()
348        }
349
350        /// Returns the index of the message in the vector of interned strings.
351        pub fn msg_index(&self) -> u32 {
352            self.inner.msg_index
353        }
354
355        /// Returns the index of the parameter names in the vector of interned strings.
356        pub fn paramname_indexes(&self) -> Vec<u32> {
357            self.inner.paramname_indexes.iter().copied().collect()
358        }
359
360        /// Returns the parameters of this log line
361        pub fn params(&self) -> Vec<Py<PyAny>> {
362            self.inner.params.iter().map(value_to_py).collect()
363        }
364    }
365
366    /// This needs to match the name of the generated '.so'
367    #[pymodule(name = "libcu29_export")]
368    fn cu29_export(m: &Bound<'_, PyModule>) -> PyResult<()> {
369        m.add_class::<PyCuLogEntry>()?;
370        m.add_class::<PyLogIterator>()?;
371        m.add_function(wrap_pyfunction!(struct_log_iterator_bare, m)?)?;
372        m.add_function(wrap_pyfunction!(struct_log_iterator_unified, m)?)?;
373        Ok(())
374    }
375
376    fn value_to_py(value: &cu29::prelude::Value) -> Py<PyAny> {
377        match value {
378            Value::String(s) => Python::attach(|py| s.into_pyobject(py).unwrap().into()),
379            Value::U64(u) => Python::attach(|py| u.into_pyobject(py).unwrap().into()),
380            Value::I64(i) => Python::attach(|py| i.into_pyobject(py).unwrap().into()),
381            Value::F64(f) => Python::attach(|py| f.into_pyobject(py).unwrap().into()),
382            Value::Bool(b) => Python::attach(|py| b.into_pyobject(py).unwrap().to_owned().into()),
383            Value::CuTime(t) => Python::attach(|py| t.0.into_pyobject(py).unwrap().into()),
384            Value::Bytes(b) => Python::attach(|py| b.into_pyobject(py).unwrap().into()),
385            Value::Char(c) => Python::attach(|py| c.into_pyobject(py).unwrap().into()),
386            Value::I8(i) => Python::attach(|py| i.into_pyobject(py).unwrap().into()),
387            Value::U8(u) => Python::attach(|py| u.into_pyobject(py).unwrap().into()),
388            Value::I16(i) => Python::attach(|py| i.into_pyobject(py).unwrap().into()),
389            Value::U16(u) => Python::attach(|py| u.into_pyobject(py).unwrap().into()),
390            Value::I32(i) => Python::attach(|py| i.into_pyobject(py).unwrap().into()),
391            Value::U32(u) => Python::attach(|py| u.into_pyobject(py).unwrap().into()),
392            Value::Map(m) => Python::attach(|py| {
393                let dict = PyDict::new(py);
394                for (k, v) in m.iter() {
395                    dict.set_item(value_to_py(k), value_to_py(v)).unwrap();
396                }
397                dict.into_pyobject(py).unwrap().into()
398            }),
399            Value::F32(f) => Python::attach(|py| f.into_pyobject(py).unwrap().into()),
400            Value::Option(o) => Python::attach(|py| {
401                if o.is_none() {
402                    py.None()
403                } else {
404                    o.clone().map(|v| value_to_py(&v)).unwrap()
405                }
406            }),
407            Value::Unit => Python::attach(|py| py.None()),
408            Value::Newtype(v) => value_to_py(v),
409            Value::Seq(s) => Python::attach(|py| {
410                let list = PyList::new(py, s.iter().map(value_to_py)).unwrap();
411                list.into_pyobject(py).unwrap().into()
412            }),
413        }
414    }
415}
416
417#[cfg(test)]
418mod tests {
419    use super::*;
420    use bincode::{encode_into_slice, Decode, Encode};
421    use fs_extra::dir::{copy, CopyOptions};
422    use std::io::Cursor;
423    use std::sync::{Arc, Mutex};
424    use tempfile::{tempdir, TempDir};
425
426    fn copy_stringindex_to_temp(tmpdir: &TempDir) -> PathBuf {
427        // for some reason using the index in real only locks it and generates a change in the file.
428        let temp_path = tmpdir.path();
429
430        let mut copy_options = CopyOptions::new();
431        copy_options.copy_inside = true;
432
433        copy("test/cu29_log_index", temp_path, &copy_options).unwrap();
434        temp_path.join("cu29_log_index")
435    }
436
437    #[test]
438    fn test_extract_low_level_cu29_log() {
439        let temp_dir = TempDir::new().unwrap();
440        let temp_path = copy_stringindex_to_temp(&temp_dir);
441        let entry = CuLogEntry::new(3, CuLogLevel::Info);
442        let bytes = bincode::encode_to_vec(&entry, standard()).unwrap();
443        let reader = Cursor::new(bytes.as_slice());
444        textlog_dump(reader, temp_path.as_path()).unwrap();
445    }
446
447    #[test]
448    fn end_to_end_datalogger_and_structlog_test() {
449        let dir = tempdir().expect("Failed to create temp dir");
450        let path = dir
451            .path()
452            .join("end_to_end_datalogger_and_structlog_test.copper");
453        {
454            // Write a couple log entries
455            let UnifiedLogger::Write(logger) = UnifiedLoggerBuilder::new()
456                .write(true)
457                .create(true)
458                .file_base_name(&path)
459                .preallocated_size(100000)
460                .build()
461                .expect("Failed to create logger")
462            else {
463                panic!("Failed to create logger")
464            };
465            let data_logger = Arc::new(Mutex::new(logger));
466            let stream = stream_write(data_logger.clone(), UnifiedLogType::StructuredLogLine, 1024)
467                .expect("Failed to create stream");
468            let rt = LoggerRuntime::init(RobotClock::default(), stream, None::<NullLog>);
469
470            let mut entry = CuLogEntry::new(4, CuLogLevel::Info); // this is a "Just a String {}" log line
471            entry.add_param(0, Value::String("Parameter for the log line".into()));
472            log(&mut entry).expect("Failed to log");
473            let mut entry = CuLogEntry::new(2, CuLogLevel::Info); // this is a "Just a String {}" log line
474            entry.add_param(0, Value::String("Parameter for the log line".into()));
475            log(&mut entry).expect("Failed to log");
476
477            // everything is dropped here
478            drop(rt);
479        }
480        // Read back the log
481        let UnifiedLogger::Read(logger) = UnifiedLoggerBuilder::new()
482            .file_base_name(
483                &dir.path()
484                    .join("end_to_end_datalogger_and_structlog_test.copper"),
485            )
486            .build()
487            .expect("Failed to create logger")
488        else {
489            panic!("Failed to create logger")
490        };
491        let reader = UnifiedLoggerIOReader::new(logger, UnifiedLogType::StructuredLogLine);
492        let temp_dir = TempDir::new().unwrap();
493        textlog_dump(
494            reader,
495            Path::new(copy_stringindex_to_temp(&temp_dir).as_path()),
496        )
497        .expect("Failed to dump log");
498    }
499
500    // This is normally generated at compile time in CuPayload.
501    #[derive(Debug, PartialEq, Clone, Copy, Serialize, Encode, Decode, Default)]
502    struct MyMsgs((u8, i32, f32));
503
504    impl ErasedCuStampedDataSet for MyMsgs {
505        fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
506            Vec::new()
507        }
508    }
509
510    impl MatchingTasks for MyMsgs {
511        fn get_all_task_ids() -> &'static [&'static str] {
512            &[]
513        }
514    }
515
516    /// Checks if we can recover the copper lists from a binary representation.
517    #[test]
518    fn test_copperlists_dump() {
519        let mut data = vec![0u8; 10000];
520        let mypls: [MyMsgs; 4] = [
521            MyMsgs((1, 2, 3.0)),
522            MyMsgs((2, 3, 4.0)),
523            MyMsgs((3, 4, 5.0)),
524            MyMsgs((4, 5, 6.0)),
525        ];
526
527        let mut offset: usize = 0;
528        for pl in mypls.iter() {
529            let cl = CopperList::<MyMsgs>::new(1, *pl);
530            offset +=
531                encode_into_slice(&cl, &mut data.as_mut_slice()[offset..], standard()).unwrap();
532        }
533
534        let reader = Cursor::new(data);
535
536        let mut iter = copperlists_reader::<MyMsgs>(reader);
537        assert_eq!(iter.next().unwrap().msgs, MyMsgs((1, 2, 3.0)));
538        assert_eq!(iter.next().unwrap().msgs, MyMsgs((2, 3, 4.0)));
539        assert_eq!(iter.next().unwrap().msgs, MyMsgs((3, 4, 5.0)));
540        assert_eq!(iter.next().unwrap().msgs, MyMsgs((4, 5, 6.0)));
541    }
542}