cu29_unifiedlog/
lib.rs

1use memmap2::{Mmap, MmapMut};
2use std::fmt::{Debug, Display, Formatter};
3use std::fs::{File, OpenOptions};
4use std::io::Read;
5use std::mem::ManuallyDrop;
6use std::path::{Path, PathBuf};
7use std::slice::from_raw_parts_mut;
8use std::sync::{Arc, Mutex};
9use std::{io, mem};
10
11use bincode::config::standard;
12use bincode::decode_from_slice;
13use bincode::encode_into_slice;
14use bincode::error::EncodeError;
15use bincode::{Decode, Encode};
16use cu29_traits::{CuError, CuResult, UnifiedLogType, WriteStream};
17
18const MAIN_MAGIC: [u8; 4] = [0xB4, 0xA5, 0x50, 0xFF];
19
20const SECTION_MAGIC: [u8; 2] = [0xFA, 0x57];
21
22/// The main file header of the datalogger.
23#[derive(Encode, Decode, Debug)]
24pub struct MainHeader {
25    pub magic: [u8; 4],            // Magic number to identify the file.
26    pub first_section_offset: u16, // This is to align with a page at write time.
27    pub page_size: u16,
28}
29
30impl Display for MainHeader {
31    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
32        writeln!(
33            f,
34            "  Magic -> {:2x}{:2x}{:2x}{:2x}",
35            self.magic[0], self.magic[1], self.magic[2], self.magic[3]
36        )?;
37        writeln!(f, "  first_section_offset -> {}", self.first_section_offset)?;
38        writeln!(f, "  page_size -> {}", self.page_size)
39    }
40}
41
42/// Each concurrent sublogger is tracked through a section header.
43/// They form a linked list of sections.
44/// The entry type is used to identify the type of data in the section.
45#[derive(Encode, Decode, Debug)]
46pub struct SectionHeader {
47    pub magic: [u8; 2], // Magic number to identify the section.
48    pub entry_type: UnifiedLogType,
49    pub section_size: u32, // offset from the first byte of this header to the first byte of the next header (MAGIC to MAGIC).
50    pub filled_size: u32,  // how much of the section is filled.
51}
52
53const MAX_HEADER_SIZE: usize = mem::size_of::<SectionHeader>() + 3usize; // 3 == additional worse case scenario for the 3 int variable encoding
54
55impl Display for SectionHeader {
56    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
57        writeln!(f, "    Magic -> {:2x}{:2x}", self.magic[0], self.magic[1])?;
58        writeln!(f, "    type -> {:?}", self.entry_type)?;
59        write!(
60            f,
61            "    use  -> {} / {}",
62            self.filled_size, self.section_size
63        )
64    }
65}
66
67impl Default for SectionHeader {
68    fn default() -> Self {
69        Self {
70            magic: SECTION_MAGIC,
71            entry_type: UnifiedLogType::Empty,
72            section_size: 0,
73            filled_size: 0,
74        }
75    }
76}
77
78/// A wrapper around a memory mapped file to write to.
79struct MmapStream {
80    entry_type: UnifiedLogType,
81    parent_logger: Arc<Mutex<UnifiedLoggerWrite>>,
82    current_section: SectionHandle,
83    current_position: usize,
84    minimum_allocation_amount: usize,
85}
86
87impl MmapStream {
88    fn new(
89        entry_type: UnifiedLogType,
90        parent_logger: Arc<Mutex<UnifiedLoggerWrite>>,
91        minimum_allocation_amount: usize,
92    ) -> Self {
93        let section = parent_logger
94            .lock()
95            .expect("Could not lock a section at MmapStream creation")
96            .add_section(entry_type, minimum_allocation_amount);
97        Self {
98            entry_type,
99            parent_logger,
100            current_section: section,
101            current_position: 0,
102            minimum_allocation_amount,
103        }
104    }
105}
106
107impl Debug for MmapStream {
108    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
109        write!(f, "MmapStream {{ entry_type: {:?}, current_position: {}, minimum_allocation_amount: {} }}", self.entry_type, self.current_position, self.minimum_allocation_amount)
110    }
111}
112
113impl<E: Encode> WriteStream<E> for MmapStream {
114    fn log(&mut self, obj: &E) -> CuResult<()> {
115        let dst = self.current_section.get_user_buffer();
116        let result = encode_into_slice(obj, dst, standard());
117        match result {
118            Ok(nb_bytes) => {
119                self.current_position += nb_bytes;
120                self.current_section.used += nb_bytes as u32;
121                Ok(())
122            }
123            Err(e) => match e {
124                EncodeError::UnexpectedEnd => {
125                    if let Ok(mut logger_guard) = self.parent_logger.lock() {
126                        logger_guard.flush_section(&mut self.current_section);
127                        self.current_section = logger_guard
128                            .add_section(self.entry_type, self.minimum_allocation_amount);
129
130                        let result = encode_into_slice(
131                            obj,
132                            self.current_section.get_user_buffer(),
133                            standard(),
134                        )
135                            .expect(
136                                "Failed to encode object in a newly minted section. Unrecoverable failure.",
137                            ); // If we fail just after creating a section, there is not much we can do, we need to bail.
138                        self.current_position += result;
139                        self.current_section.used += result as u32;
140                        Ok(())
141                    } else {
142                        // It will retry but at least not completely crash.
143                        Err(
144                            "Logger mutex poisoned while reporting EncodeError::UnexpectedEnd"
145                                .into(),
146                        )
147                    }
148                }
149                _ => {
150                    let err =
151                        <&str as Into<CuError>>::into("Unexpected error while encoding object.")
152                            .add_cause(e.to_string().as_str());
153                    Err(err)
154                }
155            },
156        }
157    }
158}
159
160impl Drop for MmapStream {
161    fn drop(&mut self) {
162        if let Ok(mut logger_guard) = self.parent_logger.lock() {
163            logger_guard.flush_section(&mut self.current_section);
164        } else if !std::thread::panicking() {
165            eprintln!("⚠️ MmapStream::drop: logger mutex poisoned");
166        }
167    }
168}
169
170/// Create a new stream to write to the unifiedlogger.
171pub fn stream_write<E: Encode>(
172    logger: Arc<Mutex<UnifiedLoggerWrite>>,
173    entry_type: UnifiedLogType,
174    minimum_allocation_amount: usize,
175) -> impl WriteStream<E> {
176    MmapStream::new(entry_type, logger.clone(), minimum_allocation_amount)
177}
178
179/// Holder of the read or write side of the datalogger.
180pub enum UnifiedLogger {
181    Read(UnifiedLoggerRead),
182    Write(UnifiedLoggerWrite),
183}
184
185/// Use this builder to create a new DataLogger.
186pub struct UnifiedLoggerBuilder {
187    file_base_name: Option<PathBuf>,
188    preallocated_size: Option<usize>,
189    write: bool,
190    create: bool,
191}
192
193impl Default for UnifiedLoggerBuilder {
194    fn default() -> Self {
195        Self::new()
196    }
197}
198
199impl UnifiedLoggerBuilder {
200    pub fn new() -> Self {
201        Self {
202            file_base_name: None,
203            preallocated_size: None,
204            write: false,
205            create: false, // This is the safest default
206        }
207    }
208
209    /// If "something/toto.copper" is given, it will find or create "something/toto_0.copper",  "something/toto_1.copper" etc.
210    pub fn file_base_name(mut self, file_path: &Path) -> Self {
211        self.file_base_name = Some(file_path.to_path_buf());
212        self
213    }
214
215    pub fn preallocated_size(mut self, preallocated_size: usize) -> Self {
216        self.preallocated_size = Some(preallocated_size);
217        self
218    }
219
220    pub fn write(mut self, write: bool) -> Self {
221        self.write = write;
222        self
223    }
224
225    pub fn create(mut self, create: bool) -> Self {
226        self.create = create;
227        self
228    }
229
230    pub fn build(self) -> io::Result<UnifiedLogger> {
231        let page_size = page_size::get();
232
233        if self.write && self.create {
234            let ulw = UnifiedLoggerWrite::new(
235                &self
236                    .file_base_name
237                    .expect("This unified logger has no filename."),
238                self.preallocated_size
239                    .expect("This unified logger has no preallocated size."),
240                page_size,
241            );
242
243            Ok(UnifiedLogger::Write(ulw))
244        } else {
245            let file_path = self.file_base_name.ok_or_else(|| {
246                io::Error::new(io::ErrorKind::InvalidInput, "File path is required")
247            })?;
248            let ulr = UnifiedLoggerRead::new(&file_path)?;
249            Ok(UnifiedLogger::Read(ulr))
250        }
251    }
252}
253
254/// A read side of the datalogger.
255pub struct UnifiedLoggerRead {
256    base_file_path: PathBuf,
257    main_header: MainHeader,
258    current_mmap_buffer: Mmap,
259    current_file: File,
260    current_slab_index: usize,
261    current_reading_position: usize,
262}
263
264struct SlabEntry {
265    file: File,
266    mmap_buffer: ManuallyDrop<MmapMut>,
267    current_global_position: usize,
268    sections_offsets_in_flight: Vec<usize>,
269    flushed_until_offset: usize,
270    page_size: usize,
271}
272
273impl Drop for SlabEntry {
274    fn drop(&mut self) {
275        self.flush_until(self.current_global_position);
276        unsafe { ManuallyDrop::drop(&mut self.mmap_buffer) };
277        self.file
278            .set_len(self.current_global_position as u64)
279            .expect("Failed to trim datalogger file");
280
281        if !self.sections_offsets_in_flight.is_empty() {
282            eprintln!("Error: Slab not full flushed.");
283        }
284    }
285}
286
287pub enum AllocatedSection {
288    NoMoreSpace,
289    Section(SectionHandle),
290}
291
292impl SlabEntry {
293    fn new(file: File, page_size: usize) -> Self {
294        let mmap_buffer =
295            ManuallyDrop::new(unsafe { MmapMut::map_mut(&file).expect("Failed to map file") });
296        Self {
297            file,
298            mmap_buffer,
299            current_global_position: 0,
300            sections_offsets_in_flight: Vec::with_capacity(16),
301            flushed_until_offset: 0,
302            page_size,
303        }
304    }
305
306    /// Unsure the underlying mmap is flush to disk until the given position.
307    fn flush_until(&mut self, until_position: usize) {
308        // This is tolerated under linux, but crashes on macos
309        if (self.flushed_until_offset == until_position) || (until_position == 0) {
310            return;
311        }
312        self.mmap_buffer
313            .flush_async_range(
314                self.flushed_until_offset,
315                until_position - self.flushed_until_offset,
316            )
317            .expect("Failed to flush memory map");
318        self.flushed_until_offset = until_position;
319    }
320
321    fn is_it_my_section(&self, section: &SectionHandle) -> bool {
322        (section.buffer.as_ptr() >= self.mmap_buffer.as_ptr())
323            && (section.buffer.as_ptr() as usize)
324                < (self.mmap_buffer.as_ref().as_ptr() as usize + self.mmap_buffer.as_ref().len())
325    }
326
327    /// Flush the section to disk.
328    /// the flushing is permanent and the section is considered closed.
329    fn flush_section(&mut self, section: &mut SectionHandle) {
330        if section.buffer.as_ptr() < self.mmap_buffer.as_ptr()
331            || section.buffer.as_ptr() as usize
332                > self.mmap_buffer.as_ptr() as usize + self.mmap_buffer.len()
333        {
334            panic!("Invalid section buffer, not in the slab");
335        }
336
337        // Be sure that the header reflects the actual size of the section.
338        section.update_header();
339
340        let _sz = encode_into_slice(&section.section_header, section.buffer, standard())
341            .expect("Failed to encode section header");
342
343        let base = self.mmap_buffer.as_ptr() as usize;
344        let section_buffer_addr = section.buffer.as_ptr() as usize;
345        self.sections_offsets_in_flight
346            .retain(|&x| x != section_buffer_addr - base);
347
348        if self.sections_offsets_in_flight.is_empty() {
349            self.flush_until(self.current_global_position);
350            return;
351        }
352        if self.flushed_until_offset < self.sections_offsets_in_flight[0] {
353            self.flush_until(self.sections_offsets_in_flight[0]);
354        }
355    }
356
357    #[inline]
358    fn align_to_next_page(&self, ptr: usize) -> usize {
359        (ptr + self.page_size - 1) & !(self.page_size - 1)
360    }
361
362    /// The returned slice is section_size or greater.
363    fn add_section(
364        &mut self,
365        entry_type: UnifiedLogType,
366        requested_section_size: usize,
367    ) -> AllocatedSection {
368        // align current_position to the next page
369        self.current_global_position = self.align_to_next_page(self.current_global_position);
370        let section_size = self.align_to_next_page(requested_section_size) as u32;
371
372        // We need to have enough space to store the section in that slab
373        if self.current_global_position + section_size as usize > self.mmap_buffer.len() {
374            return AllocatedSection::NoMoreSpace;
375        }
376
377        let section_header = SectionHeader {
378            magic: SECTION_MAGIC,
379            entry_type,
380            section_size,
381            filled_size: 0u32,
382        };
383
384        let nb_bytes = encode_into_slice(
385            &section_header,
386            &mut self.mmap_buffer[self.current_global_position..],
387            standard(),
388        )
389        .expect("Failed to encode section header");
390        assert!(nb_bytes < self.page_size);
391
392        // save the position to keep track for in flight sections
393        self.sections_offsets_in_flight
394            .push(self.current_global_position);
395        let end_of_section = self.current_global_position + requested_section_size;
396        let user_buffer = &mut self.mmap_buffer[self.current_global_position..end_of_section];
397
398        // here we have the guarantee for exclusive access to that memory for the lifetime of the handle, the borrow checker cannot understand that ever.
399        let handle_buffer =
400            unsafe { from_raw_parts_mut(user_buffer.as_mut_ptr(), user_buffer.len()) };
401
402        self.current_global_position = end_of_section;
403
404        AllocatedSection::Section(SectionHandle::create(section_header, handle_buffer))
405    }
406
407    #[cfg(test)]
408    fn used(&self) -> usize {
409        self.current_global_position
410    }
411}
412
413/// A SectionHandle is a handle to a section in the datalogger.
414/// It allows to track the lifecycle of a section of the datalogger.
415#[derive(Default)]
416pub struct SectionHandle {
417    section_header: SectionHeader,
418    buffer: &'static mut [u8], // This includes the encoded header for end of section patching.
419    used: u32,                 // this is the size of the used part of the buffer.
420}
421
422// This is for a placeholder to unsure an orderly cleanup as we dodge the borrow checker.
423
424impl SectionHandle {
425    // The buffer is considered static as it is a dedicated piece for the section.
426    pub fn create(section_header: SectionHeader, buffer: &'static mut [u8]) -> Self {
427        // here we assume with are passed a valid section.
428        if buffer[0] != SECTION_MAGIC[0] || buffer[1] != SECTION_MAGIC[1] {
429            panic!("Invalid section buffer, magic number not found");
430        }
431
432        if buffer.len() < MAX_HEADER_SIZE {
433            panic!(
434                "Invalid section buffer, too small: {}, it needs to be > {}",
435                buffer.len(),
436                MAX_HEADER_SIZE
437            );
438        }
439
440        Self {
441            section_header,
442            buffer,
443            used: 0,
444        }
445    }
446    pub fn get_user_buffer(&mut self) -> &mut [u8] {
447        &mut self.buffer[MAX_HEADER_SIZE + self.used as usize..]
448    }
449
450    pub fn update_header(&mut self) {
451        // no need to do anything if we never used the section.
452        if self.section_header.entry_type == UnifiedLogType::Empty || self.used == 0 {
453            return;
454        }
455        self.section_header.filled_size = self.used;
456    }
457}
458
459/// A write side of the datalogger.
460pub struct UnifiedLoggerWrite {
461    /// the front slab is the current active slab for any new section.
462    front_slab: SlabEntry,
463    /// the back slab is the previous slab that is being flushed.
464    back_slabs: Vec<SlabEntry>,
465    /// base file path to create the backing files from.
466    base_file_path: PathBuf,
467    /// allocation size for the backing files.
468    slab_size: usize,
469    /// current suffix for the backing files.
470    front_slab_suffix: usize,
471}
472
473fn build_slab_path(base_file_path: &Path, slab_index: usize) -> PathBuf {
474    let mut file_path = base_file_path.to_path_buf();
475    let file_name = file_path
476        .file_name()
477        .expect("Invalid base file path")
478        .to_str()
479        .expect("Could not translate the filename OsStr to str");
480    let mut file_name = file_name.split('.').collect::<Vec<&str>>();
481    let extension = file_name.pop().expect("Could not find the file extension.");
482    let file_name = file_name.join(".");
483    let file_name = format!("{file_name}_{slab_index}.{extension}");
484    file_path.set_file_name(file_name);
485    file_path
486}
487
488fn make_slab_file(base_file_path: &Path, slab_size: usize, slab_suffix: usize) -> File {
489    let file_path = build_slab_path(base_file_path, slab_suffix);
490    let file = OpenOptions::new()
491        .read(true)
492        .write(true)
493        .create(true)
494        .truncate(true)
495        .open(&file_path)
496        .unwrap_or_else(|_| panic!("Failed to open file: {}", file_path.display()));
497    file.set_len(slab_size as u64)
498        .expect("Failed to set file length");
499    file
500}
501
502impl UnifiedLoggerWrite {
503    fn next_slab(&mut self) -> File {
504        self.front_slab_suffix += 1;
505
506        make_slab_file(&self.base_file_path, self.slab_size, self.front_slab_suffix)
507    }
508
509    fn new(base_file_path: &Path, slab_size: usize, page_size: usize) -> Self {
510        let file = make_slab_file(base_file_path, slab_size, 0);
511        let mut front_slab = SlabEntry::new(file, page_size);
512
513        // This is the first slab so add the main header.
514        let main_header = MainHeader {
515            magic: MAIN_MAGIC,
516            first_section_offset: page_size as u16,
517            page_size: page_size as u16,
518        };
519        let nb_bytes = encode_into_slice(&main_header, &mut front_slab.mmap_buffer[..], standard())
520            .expect("Failed to encode main header");
521        assert!(nb_bytes < page_size);
522        front_slab.current_global_position = page_size; // align to the next page
523
524        Self {
525            front_slab,
526            back_slabs: Vec::new(),
527            base_file_path: base_file_path.to_path_buf(),
528            slab_size,
529            front_slab_suffix: 0,
530        }
531    }
532
533    pub fn flush_section(&mut self, section: &mut SectionHandle) {
534        for slab in self.back_slabs.iter_mut() {
535            if slab.is_it_my_section(section) {
536                slab.flush_section(section);
537                return;
538            }
539        }
540        self.front_slab.flush_section(section);
541    }
542
543    fn garbage_collect_backslabs(&mut self) {
544        self.back_slabs
545            .retain_mut(|slab| !slab.sections_offsets_in_flight.is_empty());
546    }
547
548    /// The returned slice is section_size or greater.
549    fn add_section(
550        &mut self,
551        entry_type: UnifiedLogType,
552        requested_section_size: usize,
553    ) -> SectionHandle {
554        self.garbage_collect_backslabs(); // Take the opportunity to keep up and close stale back slabs.
555        let maybe_section = self
556            .front_slab
557            .add_section(entry_type, requested_section_size);
558
559        match maybe_section {
560            AllocatedSection::NoMoreSpace => {
561                // move the front slab to the back slab.
562                let new_slab = SlabEntry::new(self.next_slab(), self.front_slab.page_size);
563                // keep the slab until all its sections has been flushed.
564                self.back_slabs
565                    .push(mem::replace(&mut self.front_slab, new_slab));
566                match self
567                    .front_slab
568                    .add_section(entry_type, requested_section_size)
569                {
570                    AllocatedSection::NoMoreSpace => {
571                        panic!("Failed to allocate a section in a new slab");
572                    }
573                    AllocatedSection::Section(section) => section,
574                }
575            }
576            AllocatedSection::Section(section) => section,
577        }
578    }
579
580    pub fn stats(&self) -> (usize, Vec<usize>, usize) {
581        (
582            self.front_slab.current_global_position,
583            self.front_slab.sections_offsets_in_flight.clone(),
584            self.back_slabs.len(),
585        )
586    }
587}
588
589impl Drop for UnifiedLoggerWrite {
590    fn drop(&mut self) {
591        #[cfg(debug_assertions)]
592        eprintln!("Flushing the unified Logger ... "); // Note this cannot be a structured log writing in this log.
593
594        let mut section = self.add_section(UnifiedLogType::LastEntry, 80); // TODO: determine that exactly
595        self.front_slab.flush_section(&mut section);
596        self.garbage_collect_backslabs();
597
598        #[cfg(debug_assertions)]
599        eprintln!("Unified Logger flushed."); // Note this cannot be a structured log writing in this log.
600    }
601}
602
603fn open_slab_index(
604    base_file_path: &Path,
605    slab_index: usize,
606) -> io::Result<(File, Mmap, u16, Option<MainHeader>)> {
607    let mut options = OpenOptions::new();
608    let options = options.read(true);
609
610    let file_path = build_slab_path(base_file_path, slab_index);
611    let file = options.open(file_path)?;
612    let mmap = unsafe { Mmap::map(&file) }?;
613    let mut prolog = 0u16;
614    let mut maybe_main_header: Option<MainHeader> = None;
615    if slab_index == 0 {
616        let main_header: MainHeader;
617        let _read: usize;
618        (main_header, _read) =
619            decode_from_slice(&mmap[..], standard()).expect("Failed to decode main header");
620        if main_header.magic != MAIN_MAGIC {
621            return Err(io::Error::new(
622                io::ErrorKind::InvalidData,
623                "Invalid magic number in main header",
624            ));
625        }
626        prolog = main_header.first_section_offset;
627        maybe_main_header = Some(main_header);
628    }
629    Ok((file, mmap, prolog, maybe_main_header))
630}
631
632impl UnifiedLoggerRead {
633    pub fn new(base_file_path: &Path) -> io::Result<Self> {
634        let (file, mmap, prolog, header) = open_slab_index(base_file_path, 0)?;
635
636        Ok(Self {
637            base_file_path: base_file_path.to_path_buf(),
638            main_header: header.expect("UnifiedLoggerRead needs a header"),
639            current_file: file,
640            current_mmap_buffer: mmap,
641            current_slab_index: 0,
642            current_reading_position: prolog as usize,
643        })
644    }
645
646    fn next_slab(&mut self) -> io::Result<()> {
647        self.current_slab_index += 1;
648        let (file, mmap, prolog, _) =
649            open_slab_index(&self.base_file_path, self.current_slab_index)?;
650        self.current_file = file;
651        self.current_mmap_buffer = mmap;
652        self.current_reading_position = prolog as usize;
653        Ok(())
654    }
655
656    pub fn read_next_section_type(
657        &mut self,
658        datalogtype: UnifiedLogType,
659    ) -> CuResult<Option<Vec<u8>>> {
660        // TODO: eventually implement a 0 copy of this too.
661        loop {
662            if self.current_reading_position >= self.current_mmap_buffer.len() {
663                self.next_slab().map_err(|e| {
664                    CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
665                })?;
666            }
667
668            let header_result = self.read_section_header();
669            let header = header_result.map_err(|error| {
670                CuError::new_with_cause(
671                    &format!(
672                        "Could not read a sections header: {}/{}:{}",
673                        self.base_file_path.as_os_str().to_string_lossy(),
674                        self.current_slab_index,
675                        self.current_reading_position,
676                    ),
677                    error,
678                )
679            })?;
680
681            // Reached the end of file
682            if header.entry_type == UnifiedLogType::LastEntry {
683                return Ok(None);
684            }
685
686            // Found a section of the requested type
687            if header.entry_type == datalogtype {
688                let result = Some(self.read_section_content(&header)?);
689                self.current_reading_position += header.section_size as usize;
690                return Ok(result);
691            }
692
693            // Keep reading until we find the requested type
694            self.current_reading_position += header.section_size as usize;
695        }
696    }
697
698    pub fn raw_main_header(&self) -> &MainHeader {
699        &self.main_header
700    }
701
702    /// Reads the section from the section header pos.
703    pub fn raw_read_section(&mut self) -> CuResult<(SectionHeader, Vec<u8>)> {
704        if self.current_reading_position >= self.current_mmap_buffer.len() {
705            self.next_slab().map_err(|e| {
706                CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
707            })?;
708        }
709
710        let read_result = self.read_section_header();
711
712        match read_result {
713            Err(error) => Err(CuError::new_with_cause(
714                &format!(
715                    "Could not read a sections header: {}/{}:{}",
716                    self.base_file_path.as_os_str().to_string_lossy(),
717                    self.current_slab_index,
718                    self.current_reading_position,
719                ),
720                error,
721            )),
722            Ok(header) => {
723                let data = self.read_section_content(&header)?;
724                self.current_reading_position += header.section_size as usize;
725                Ok((header, data))
726            }
727        }
728    }
729
730    /// Reads the section content from the section header pos.
731    fn read_section_content(&mut self, header: &SectionHeader) -> CuResult<Vec<u8>> {
732        // TODO: we could optimize by asking the buffer to fill
733        let mut section = vec![0; header.filled_size as usize];
734        let start_of_data = self.current_reading_position + MAX_HEADER_SIZE;
735        section.copy_from_slice(
736            &self.current_mmap_buffer[start_of_data..start_of_data + header.filled_size as usize],
737        );
738
739        Ok(section)
740    }
741
742    fn read_section_header(&mut self) -> CuResult<SectionHeader> {
743        let section_header: SectionHeader;
744        (section_header, _) = decode_from_slice(
745            &self.current_mmap_buffer[self.current_reading_position..],
746            standard(),
747        )
748        .expect("Failed to decode section header");
749        if section_header.magic != SECTION_MAGIC {
750            return Err("Invalid magic number in section header".into());
751        }
752        Ok(section_header)
753    }
754}
755
756/// This a convenience wrapper around the UnifiedLoggerRead to implement the Read trait.
757pub struct UnifiedLoggerIOReader {
758    logger: UnifiedLoggerRead,
759    log_type: UnifiedLogType,
760    buffer: Vec<u8>,
761    buffer_pos: usize,
762}
763
764impl UnifiedLoggerIOReader {
765    pub fn new(logger: UnifiedLoggerRead, log_type: UnifiedLogType) -> Self {
766        Self {
767            logger,
768            log_type,
769            buffer: Vec::new(),
770            buffer_pos: 0,
771        }
772    }
773
774    /// returns true if there is more data to read.
775    fn fill_buffer(&mut self) -> io::Result<bool> {
776        match self.logger.read_next_section_type(self.log_type) {
777            Ok(Some(section)) => {
778                self.buffer = section;
779                self.buffer_pos = 0;
780                Ok(true)
781            }
782            Ok(None) => Ok(false), // No more sections of this type
783            Err(e) => Err(io::Error::other(e.to_string())),
784        }
785    }
786}
787
788impl Read for UnifiedLoggerIOReader {
789    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
790        if self.buffer_pos >= self.buffer.len() && !self.fill_buffer()? {
791            // This means we hit the last section.
792            return Ok(0);
793        }
794
795        // If we still have no data after trying to fill the buffer, we're at EOF
796        if self.buffer_pos >= self.buffer.len() {
797            return Ok(0);
798        }
799
800        // Copy as much as we can from the buffer to `buf`
801        let len = std::cmp::min(buf.len(), self.buffer.len() - self.buffer_pos);
802        buf[..len].copy_from_slice(&self.buffer[self.buffer_pos..self.buffer_pos + len]);
803        self.buffer_pos += len;
804        Ok(len)
805    }
806}
807
808#[cfg(test)]
809mod tests {
810    use super::*;
811    use bincode::decode_from_reader;
812    use std::io::BufReader;
813    use std::path::PathBuf;
814    use tempfile::TempDir;
815
816    const LARGE_SLAB: usize = 100 * 1024; // 100KB
817    const SMALL_SLAB: usize = 16 * 2 * 1024; // 16KB is the page size on MacOS for example
818
819    fn make_a_logger(
820        tmp_dir: &TempDir,
821        slab_size: usize,
822    ) -> (Arc<Mutex<UnifiedLoggerWrite>>, PathBuf) {
823        let file_path = tmp_dir.path().join("test.bin");
824        let UnifiedLogger::Write(data_logger) = UnifiedLoggerBuilder::new()
825            .write(true)
826            .create(true)
827            .file_base_name(&file_path)
828            .preallocated_size(slab_size)
829            .build()
830            .expect("Failed to create logger")
831        else {
832            panic!("Failed to create logger")
833        };
834
835        (Arc::new(Mutex::new(data_logger)), file_path)
836    }
837
838    #[test]
839    fn test_truncation_and_sections_creations() {
840        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
841        let file_path = tmp_dir.path().join("test.bin");
842        let _used = {
843            let UnifiedLogger::Write(mut logger) = UnifiedLoggerBuilder::new()
844                .write(true)
845                .create(true)
846                .file_base_name(&file_path)
847                .preallocated_size(100000)
848                .build()
849                .expect("Failed to create logger")
850            else {
851                panic!("Failed to create logger")
852            };
853            logger.add_section(UnifiedLogType::StructuredLogLine, 1024);
854            logger.add_section(UnifiedLogType::CopperList, 2048);
855            let used = logger.front_slab.used();
856            assert!(used < 4 * page_size::get()); // ie. 3 headers, 1 page max per
857                                                  // logger drops
858
859            used
860        };
861
862        let _file = OpenOptions::new()
863            .read(true)
864            .open(tmp_dir.path().join("test_0.bin"))
865            .expect("Could not reopen the file");
866        // Check if we have correctly truncated the file
867        // TODO: recompute this math
868        //assert_eq!(
869        //    file.metadata().unwrap().len(),
870        //    (used + size_of::<SectionHeader>()) as u64
871        //);
872    }
873
874    #[test]
875    fn test_one_section_self_cleaning() {
876        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
877        let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
878        {
879            let _stream =
880                stream_write::<()>(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
881            assert_eq!(
882                logger
883                    .lock()
884                    .unwrap()
885                    .front_slab
886                    .sections_offsets_in_flight
887                    .len(),
888                1
889            );
890        }
891        assert_eq!(
892            logger
893                .lock()
894                .unwrap()
895                .front_slab
896                .sections_offsets_in_flight
897                .len(),
898            0
899        );
900        let logger = logger.lock().unwrap();
901        assert_eq!(
902            logger.front_slab.flushed_until_offset,
903            logger.front_slab.current_global_position
904        );
905    }
906
907    #[test]
908    fn test_two_sections_self_cleaning_in_order() {
909        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
910        let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
911        let s1 = stream_write::<()>(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
912        assert_eq!(
913            logger
914                .lock()
915                .unwrap()
916                .front_slab
917                .sections_offsets_in_flight
918                .len(),
919            1
920        );
921        let s2 = stream_write::<()>(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
922        assert_eq!(
923            logger
924                .lock()
925                .unwrap()
926                .front_slab
927                .sections_offsets_in_flight
928                .len(),
929            2
930        );
931        drop(s2);
932        assert_eq!(
933            logger
934                .lock()
935                .unwrap()
936                .front_slab
937                .sections_offsets_in_flight
938                .len(),
939            1
940        );
941        drop(s1);
942        let lg = logger.lock().unwrap();
943        assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
944        assert_eq!(
945            lg.front_slab.flushed_until_offset,
946            lg.front_slab.current_global_position
947        );
948    }
949
950    #[test]
951    fn test_two_sections_self_cleaning_out_of_order() {
952        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
953        let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
954        let s1 = stream_write::<()>(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
955        assert_eq!(
956            logger
957                .lock()
958                .unwrap()
959                .front_slab
960                .sections_offsets_in_flight
961                .len(),
962            1
963        );
964        let s2 = stream_write::<()>(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
965        assert_eq!(
966            logger
967                .lock()
968                .unwrap()
969                .front_slab
970                .sections_offsets_in_flight
971                .len(),
972            2
973        );
974        drop(s1);
975        assert_eq!(
976            logger
977                .lock()
978                .unwrap()
979                .front_slab
980                .sections_offsets_in_flight
981                .len(),
982            1
983        );
984        drop(s2);
985        let lg = logger.lock().unwrap();
986        assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
987        assert_eq!(
988            lg.front_slab.flushed_until_offset,
989            lg.front_slab.current_global_position
990        );
991    }
992
993    #[test]
994    fn test_write_then_read_one_section() {
995        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
996        let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
997        {
998            let mut stream = stream_write(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
999            stream.log(&1u32).unwrap();
1000            stream.log(&2u32).unwrap();
1001            stream.log(&3u32).unwrap();
1002        }
1003        drop(logger);
1004        let UnifiedLogger::Read(mut dl) = UnifiedLoggerBuilder::new()
1005            .file_base_name(&f)
1006            .build()
1007            .expect("Failed to build logger")
1008        else {
1009            panic!("Failed to build logger");
1010        };
1011        let section = dl
1012            .read_next_section_type(UnifiedLogType::StructuredLogLine)
1013            .expect("Failed to read section");
1014        assert!(section.is_some());
1015        let section = section.unwrap();
1016
1017        let mut reader = BufReader::new(&section[..]);
1018        let v1: u32 = decode_from_reader(&mut reader, standard()).unwrap();
1019        let v2: u32 = decode_from_reader(&mut reader, standard()).unwrap();
1020        let v3: u32 = decode_from_reader(&mut reader, standard()).unwrap();
1021        assert_eq!(v1, 1);
1022        assert_eq!(v2, 2);
1023        assert_eq!(v3, 3);
1024    }
1025
1026    /// Mimic a basic CopperList implementation.
1027
1028    #[derive(Debug, Encode, Decode)]
1029    enum CopperListStateMock {
1030        Free,
1031        ProcessingTasks,
1032        BeingSerialized,
1033    }
1034
1035    #[derive(Encode, Decode)]
1036    struct CopperList<P: bincode::enc::Encode> {
1037        state: CopperListStateMock,
1038        payload: P, // This is generated from the runtime.
1039    }
1040
1041    #[test]
1042    fn test_copperlist_list_like_logging() {
1043        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1044        let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
1045        {
1046            let mut stream = stream_write(logger.clone(), UnifiedLogType::CopperList, 1024);
1047            let cl0 = CopperList {
1048                state: CopperListStateMock::Free,
1049                payload: (1u32, 2u32, 3u32),
1050            };
1051            let cl1 = CopperList {
1052                state: CopperListStateMock::ProcessingTasks,
1053                payload: (4u32, 5u32, 6u32),
1054            };
1055            stream.log(&cl0).unwrap();
1056            stream.log(&cl1).unwrap();
1057        }
1058        drop(logger);
1059
1060        let UnifiedLogger::Read(mut dl) = UnifiedLoggerBuilder::new()
1061            .file_base_name(&f)
1062            .build()
1063            .expect("Failed to build logger")
1064        else {
1065            panic!("Failed to build logger");
1066        };
1067        let section = dl
1068            .read_next_section_type(UnifiedLogType::CopperList)
1069            .expect("Failed to read section");
1070        assert!(section.is_some());
1071        let section = section.unwrap();
1072
1073        let mut reader = BufReader::new(&section[..]);
1074        let cl0: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
1075        let cl1: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
1076        assert_eq!(cl0.payload.1, 2);
1077        assert_eq!(cl1.payload.2, 6);
1078    }
1079
1080    #[test]
1081    fn test_multi_slab_end2end() {
1082        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1083        let (logger, f) = make_a_logger(&tmp_dir, SMALL_SLAB);
1084        {
1085            let mut stream = stream_write(logger.clone(), UnifiedLogType::CopperList, 1024);
1086            let cl0 = CopperList {
1087                state: CopperListStateMock::Free,
1088                payload: (1u32, 2u32, 3u32),
1089            };
1090            // large enough so we are sure to create a few slabs
1091            for _ in 0..10000 {
1092                stream.log(&cl0).unwrap();
1093            }
1094        }
1095        drop(logger);
1096
1097        let UnifiedLogger::Read(mut dl) = UnifiedLoggerBuilder::new()
1098            .file_base_name(&f)
1099            .build()
1100            .expect("Failed to build logger")
1101        else {
1102            panic!("Failed to build logger");
1103        };
1104        let mut total_readback = 0;
1105        loop {
1106            let section = dl.read_next_section_type(UnifiedLogType::CopperList);
1107            if section.is_err() {
1108                break;
1109            }
1110            let section = section.unwrap();
1111            if section.is_none() {
1112                break;
1113            }
1114            let section = section.unwrap();
1115
1116            let mut reader = BufReader::new(&section[..]);
1117            loop {
1118                let maybe_cl: Result<CopperList<(u32, u32, u32)>, _> =
1119                    decode_from_reader(&mut reader, standard());
1120                if maybe_cl.is_ok() {
1121                    total_readback += 1;
1122                } else {
1123                    break;
1124                }
1125            }
1126        }
1127        assert_eq!(total_readback, 10000);
1128    }
1129}