Skip to main content

cu29_unifiedlog/
memmap.rs

1//! This is the memory map file implementation for the unified logger for Copper.
2//! It is std only.
3
4use crate::{
5    AllocatedSection, MAIN_MAGIC, MainHeader, SECTION_MAGIC, SectionHandle, SectionHeader,
6    SectionStorage, UnifiedLogRead, UnifiedLogStatus, UnifiedLogWrite,
7};
8
9use crate::SECTION_HEADER_COMPACT_SIZE;
10
11use AllocatedSection::Section;
12use bincode::config::standard;
13use bincode::error::EncodeError;
14use bincode::{Encode, decode_from_slice, encode_into_slice};
15use core::slice::from_raw_parts_mut;
16use cu29_traits::{CuError, CuResult, UnifiedLogType};
17use memmap2::{Mmap, MmapMut};
18use std::fs::{File, OpenOptions};
19use std::io::Read;
20use std::mem::ManuallyDrop;
21use std::path::{Path, PathBuf};
22use std::{io, mem};
23
24pub struct MmapSectionStorage {
25    buffer: &'static mut [u8],
26    offset: usize,
27    block_size: usize,
28}
29
30impl MmapSectionStorage {
31    pub fn new(buffer: &'static mut [u8], block_size: usize) -> Self {
32        Self {
33            buffer,
34            offset: 0,
35            block_size,
36        }
37    }
38
39    pub fn buffer_ptr(&self) -> *const u8 {
40        &self.buffer[0] as *const u8
41    }
42}
43
44impl SectionStorage for MmapSectionStorage {
45    fn initialize<E: Encode>(&mut self, header: &E) -> Result<usize, EncodeError> {
46        self.post_update_header(header)?;
47        self.offset = self.block_size;
48        Ok(self.offset)
49    }
50
51    fn post_update_header<E: Encode>(&mut self, header: &E) -> Result<usize, EncodeError> {
52        encode_into_slice(header, &mut self.buffer[0..], standard())
53    }
54
55    fn append<E: Encode>(&mut self, entry: &E) -> Result<usize, EncodeError> {
56        let size = encode_into_slice(entry, &mut self.buffer[self.offset..], standard())?;
57        self.offset += size;
58        Ok(size)
59    }
60
61    fn flush(&mut self) -> CuResult<usize> {
62        // Flushing is handled at the slab level for mmap-backed storage.
63        Ok(self.offset)
64    }
65}
66
67///
68/// Holds the read or write side of the datalogger.
69pub enum MmapUnifiedLogger {
70    Read(MmapUnifiedLoggerRead),
71    Write(MmapUnifiedLoggerWrite),
72}
73
74/// Use this builder to create a new DataLogger.
75pub struct MmapUnifiedLoggerBuilder {
76    file_base_name: Option<PathBuf>,
77    preallocated_size: Option<usize>,
78    write: bool,
79    create: bool,
80}
81
82impl Default for MmapUnifiedLoggerBuilder {
83    fn default() -> Self {
84        Self::new()
85    }
86}
87
88impl MmapUnifiedLoggerBuilder {
89    pub fn new() -> Self {
90        Self {
91            file_base_name: None,
92            preallocated_size: None,
93            write: false,
94            create: false, // This is the safest default
95        }
96    }
97
98    /// If "something/toto.copper" is given, it will find or create "something/toto_0.copper",  "something/toto_1.copper" etc.
99    pub fn file_base_name(mut self, file_path: &Path) -> Self {
100        self.file_base_name = Some(file_path.to_path_buf());
101        self
102    }
103
104    pub fn preallocated_size(mut self, preallocated_size: usize) -> Self {
105        self.preallocated_size = Some(preallocated_size);
106        self
107    }
108
109    pub fn write(mut self, write: bool) -> Self {
110        self.write = write;
111        self
112    }
113
114    pub fn create(mut self, create: bool) -> Self {
115        self.create = create;
116        self
117    }
118
119    pub fn build(self) -> io::Result<MmapUnifiedLogger> {
120        let page_size = page_size::get();
121
122        if self.write && self.create {
123            let file_path = self.file_base_name.ok_or_else(|| {
124                io::Error::new(
125                    io::ErrorKind::InvalidInput,
126                    "File path is required for write mode",
127                )
128            })?;
129            let preallocated_size = self.preallocated_size.ok_or_else(|| {
130                io::Error::new(
131                    io::ErrorKind::InvalidInput,
132                    "Preallocated size is required for write mode",
133                )
134            })?;
135            let ulw = MmapUnifiedLoggerWrite::new(&file_path, preallocated_size, page_size)?;
136            Ok(MmapUnifiedLogger::Write(ulw))
137        } else {
138            let file_path = self.file_base_name.ok_or_else(|| {
139                io::Error::new(io::ErrorKind::InvalidInput, "File path is required")
140            })?;
141            let ulr = MmapUnifiedLoggerRead::new(&file_path)?;
142            Ok(MmapUnifiedLogger::Read(ulr))
143        }
144    }
145}
146
147struct SlabEntry {
148    file: File,
149    mmap_buffer: ManuallyDrop<MmapMut>,
150    current_global_position: usize,
151    sections_offsets_in_flight: Vec<usize>,
152    flushed_until_offset: usize,
153    page_size: usize,
154    temporary_end_marker: Option<usize>,
155}
156
157impl Drop for SlabEntry {
158    fn drop(&mut self) {
159        self.flush_until(self.current_global_position);
160        // SAFETY: We own the mapping and must drop it before trimming the file.
161        unsafe { ManuallyDrop::drop(&mut self.mmap_buffer) };
162        if let Err(error) = self.file.set_len(self.current_global_position as u64) {
163            eprintln!("Failed to trim datalogger file: {}", error);
164        }
165
166        if !self.sections_offsets_in_flight.is_empty() {
167            eprintln!("Error: Slab not full flushed.");
168        }
169    }
170}
171
172impl SlabEntry {
173    fn new(file: File, page_size: usize) -> io::Result<Self> {
174        let mmap_buffer = ManuallyDrop::new(
175            // SAFETY: The file descriptor is valid and mapping is confined to this struct.
176            unsafe { MmapMut::map_mut(&file) }
177                .map_err(|e| io::Error::new(e.kind(), format!("Failed to map file: {e}")))?,
178        );
179        Ok(Self {
180            file,
181            mmap_buffer,
182            current_global_position: 0,
183            sections_offsets_in_flight: Vec::with_capacity(16),
184            flushed_until_offset: 0,
185            page_size,
186            temporary_end_marker: None,
187        })
188    }
189
190    /// Unsure the underlying mmap is flush to disk until the given position.
191    fn flush_until(&mut self, until_position: usize) {
192        // This is tolerated under linux, but crashes on macos
193        if (self.flushed_until_offset == until_position) || (until_position == 0) {
194            return;
195        }
196        self.mmap_buffer
197            .flush_async_range(
198                self.flushed_until_offset,
199                until_position - self.flushed_until_offset,
200            )
201            .expect("Failed to flush memory map");
202        self.flushed_until_offset = until_position;
203    }
204
205    fn clear_temporary_end_marker(&mut self) {
206        if let Some(marker_start) = self.temporary_end_marker.take() {
207            self.current_global_position = marker_start;
208            if self.flushed_until_offset > marker_start {
209                self.flushed_until_offset = marker_start;
210            }
211        }
212    }
213
214    fn write_end_marker(&mut self, temporary: bool) -> CuResult<()> {
215        let block_size = SECTION_HEADER_COMPACT_SIZE as usize;
216        let marker_start = self.align_to_next_page(self.current_global_position);
217        let total_marker_size = block_size; // header only
218        let marker_end = marker_start + total_marker_size;
219        if marker_end > self.mmap_buffer.len() {
220            return Err("Not enough space to write end-of-log marker".into());
221        }
222
223        let header = SectionHeader {
224            magic: SECTION_MAGIC,
225            block_size: SECTION_HEADER_COMPACT_SIZE,
226            entry_type: UnifiedLogType::LastEntry,
227            offset_to_next_section: total_marker_size as u32,
228            used: 0,
229            is_open: temporary,
230        };
231
232        encode_into_slice(
233            &header,
234            &mut self.mmap_buffer
235                [marker_start..marker_start + SECTION_HEADER_COMPACT_SIZE as usize],
236            standard(),
237        )
238        .map_err(|e| CuError::new_with_cause("Failed to encode end-of-log header", e))?;
239
240        self.temporary_end_marker = Some(marker_start);
241        self.current_global_position = marker_end;
242        Ok(())
243    }
244
245    fn is_it_my_section(&self, section: &SectionHandle<MmapSectionStorage>) -> bool {
246        let storage = section.get_storage();
247        let ptr = storage.buffer_ptr();
248        (ptr >= self.mmap_buffer.as_ptr())
249            && (ptr as usize)
250                < (self.mmap_buffer.as_ref().as_ptr() as usize + self.mmap_buffer.as_ref().len())
251    }
252
253    /// Flush the section to disk.
254    /// the flushing is permanent and the section is considered closed.
255    fn flush_section(&mut self, section: &mut SectionHandle<MmapSectionStorage>) {
256        section
257            .post_update_header()
258            .expect("Failed to update section header");
259
260        let storage = section.get_storage();
261        let ptr = storage.buffer_ptr();
262
263        if ptr < self.mmap_buffer.as_ptr()
264            || ptr as usize > self.mmap_buffer.as_ptr() as usize + self.mmap_buffer.len()
265        {
266            panic!("Invalid section buffer, not in the slab");
267        }
268
269        let base = self.mmap_buffer.as_ptr() as usize;
270        self.sections_offsets_in_flight
271            .retain(|&x| x != ptr as usize - base);
272
273        if self.sections_offsets_in_flight.is_empty() {
274            self.flush_until(self.current_global_position);
275            return;
276        }
277        if self.flushed_until_offset < self.sections_offsets_in_flight[0] {
278            self.flush_until(self.sections_offsets_in_flight[0]);
279        }
280    }
281
282    #[inline]
283    fn align_to_next_page(&self, ptr: usize) -> usize {
284        (ptr + self.page_size - 1) & !(self.page_size - 1)
285    }
286
287    /// The returned slice is section_size or greater.
288    fn add_section(
289        &mut self,
290        entry_type: UnifiedLogType,
291        requested_section_size: usize,
292    ) -> AllocatedSection<MmapSectionStorage> {
293        // align current_position to the next page
294        self.current_global_position = self.align_to_next_page(self.current_global_position);
295        let section_size = self.align_to_next_page(requested_section_size) as u32;
296
297        // We need to have enough space to store the section in that slab
298        if self.current_global_position + section_size as usize > self.mmap_buffer.len() {
299            return AllocatedSection::NoMoreSpace;
300        }
301
302        #[cfg(feature = "compact")]
303        let block_size = SECTION_HEADER_COMPACT_SIZE;
304
305        #[cfg(not(feature = "compact"))]
306        let block_size = self.page_size as u16;
307
308        let section_header = SectionHeader {
309            magic: SECTION_MAGIC,
310            block_size,
311            entry_type,
312            offset_to_next_section: section_size,
313            used: 0u32,
314            is_open: true,
315        };
316
317        // save the position to keep track for in flight sections
318        self.sections_offsets_in_flight
319            .push(self.current_global_position);
320        let end_of_section = self.current_global_position + requested_section_size;
321        let user_buffer = &mut self.mmap_buffer[self.current_global_position..end_of_section];
322
323        // SAFETY: We have exclusive access to user_buffer for the handle's lifetime.
324        let handle_buffer =
325            unsafe { from_raw_parts_mut(user_buffer.as_mut_ptr(), user_buffer.len()) };
326        let storage = MmapSectionStorage::new(handle_buffer, block_size as usize);
327
328        self.current_global_position = end_of_section;
329
330        Section(SectionHandle::create(section_header, storage).expect("Failed to create section"))
331    }
332
333    #[cfg(test)]
334    fn used(&self) -> usize {
335        self.current_global_position
336    }
337}
338
339/// A write side of the datalogger.
340pub struct MmapUnifiedLoggerWrite {
341    /// the front slab is the current active slab for any new section.
342    front_slab: SlabEntry,
343    /// the back slab is the previous slab that is being flushed.
344    back_slabs: Vec<SlabEntry>,
345    /// base file path to create the backing files from.
346    base_file_path: PathBuf,
347    /// allocation size for the backing files.
348    slab_size: usize,
349    /// current suffix for the backing files.
350    front_slab_suffix: usize,
351}
352
353fn build_slab_path(base_file_path: &Path, slab_index: usize) -> io::Result<PathBuf> {
354    let mut file_path = base_file_path.to_path_buf();
355    let stem = file_path.file_stem().ok_or_else(|| {
356        io::Error::new(
357            io::ErrorKind::InvalidInput,
358            "Base file path has no file name",
359        )
360    })?;
361    let stem = stem.to_str().ok_or_else(|| {
362        io::Error::new(
363            io::ErrorKind::InvalidInput,
364            "Base file name is not valid UTF-8",
365        )
366    })?;
367    let extension = file_path.extension().ok_or_else(|| {
368        io::Error::new(
369            io::ErrorKind::InvalidInput,
370            "Base file path has no extension",
371        )
372    })?;
373    let extension = extension.to_str().ok_or_else(|| {
374        io::Error::new(
375            io::ErrorKind::InvalidInput,
376            "Base file extension is not valid UTF-8",
377        )
378    })?;
379    if stem.is_empty() {
380        return Err(io::Error::new(
381            io::ErrorKind::InvalidInput,
382            "Base file name is empty",
383        ));
384    }
385    let file_name = format!("{stem}_{slab_index}.{extension}");
386    file_path.set_file_name(file_name);
387    Ok(file_path)
388}
389
390fn make_slab_file(base_file_path: &Path, slab_size: usize, slab_suffix: usize) -> io::Result<File> {
391    let file_path = build_slab_path(base_file_path, slab_suffix)?;
392    let file = OpenOptions::new()
393        .read(true)
394        .write(true)
395        .create(true)
396        .truncate(true)
397        .open(&file_path)
398        .map_err(|e| {
399            io::Error::new(
400                e.kind(),
401                format!("Failed to open file {}: {e}", file_path.display()),
402            )
403        })?;
404    file.set_len(slab_size as u64).map_err(|e| {
405        io::Error::new(
406            e.kind(),
407            format!("Failed to set file length for {}: {e}", file_path.display()),
408        )
409    })?;
410    Ok(file)
411}
412
413impl UnifiedLogWrite<MmapSectionStorage> for MmapUnifiedLoggerWrite {
414    /// The returned slice is section_size or greater.
415    fn add_section(
416        &mut self,
417        entry_type: UnifiedLogType,
418        requested_section_size: usize,
419    ) -> CuResult<SectionHandle<MmapSectionStorage>> {
420        self.garbage_collect_backslabs(); // Take the opportunity to keep up and close stale back slabs.
421        self.front_slab.clear_temporary_end_marker();
422        let maybe_section = self
423            .front_slab
424            .add_section(entry_type, requested_section_size);
425
426        match maybe_section {
427            AllocatedSection::NoMoreSpace => {
428                // move the front slab to the back slab.
429                let new_slab = self.create_slab()?;
430                // keep the slab until all its sections has been flushed.
431                self.back_slabs
432                    .push(mem::replace(&mut self.front_slab, new_slab));
433                match self
434                    .front_slab
435                    .add_section(entry_type, requested_section_size)
436                {
437                    AllocatedSection::NoMoreSpace => Err(CuError::from("out of space")),
438                    Section(section) => {
439                        self.place_end_marker(true)?;
440                        Ok(section)
441                    }
442                }
443            }
444            Section(section) => {
445                self.place_end_marker(true)?;
446                Ok(section)
447            }
448        }
449    }
450
451    fn flush_section(&mut self, section: &mut SectionHandle<MmapSectionStorage>) {
452        section.mark_closed();
453        for slab in self.back_slabs.iter_mut() {
454            if slab.is_it_my_section(section) {
455                slab.flush_section(section);
456                return;
457            }
458        }
459        self.front_slab.flush_section(section);
460    }
461
462    fn status(&self) -> UnifiedLogStatus {
463        UnifiedLogStatus {
464            total_used_space: self.front_slab.current_global_position,
465            total_allocated_space: self.slab_size * self.front_slab_suffix,
466        }
467    }
468}
469
470impl MmapUnifiedLoggerWrite {
471    fn next_slab(&mut self) -> io::Result<File> {
472        let next_suffix = self.front_slab_suffix + 1;
473        let file = make_slab_file(&self.base_file_path, self.slab_size, next_suffix)?;
474        self.front_slab_suffix = next_suffix;
475        Ok(file)
476    }
477
478    fn new(base_file_path: &Path, slab_size: usize, page_size: usize) -> io::Result<Self> {
479        let file = make_slab_file(base_file_path, slab_size, 0)?;
480        let mut front_slab = SlabEntry::new(file, page_size)?;
481
482        // This is the first slab so add the main header.
483        let main_header = MainHeader {
484            magic: MAIN_MAGIC,
485            first_section_offset: page_size as u16,
486            page_size: page_size as u16,
487        };
488        let nb_bytes = encode_into_slice(&main_header, &mut front_slab.mmap_buffer[..], standard())
489            .map_err(|e| io::Error::other(format!("Failed to encode main header: {e}")))?;
490        assert!(nb_bytes < page_size);
491        front_slab.current_global_position = page_size; // align to the next page
492
493        Ok(Self {
494            front_slab,
495            back_slabs: Vec::new(),
496            base_file_path: base_file_path.to_path_buf(),
497            slab_size,
498            front_slab_suffix: 0,
499        })
500    }
501
502    fn garbage_collect_backslabs(&mut self) {
503        self.back_slabs
504            .retain_mut(|slab| !slab.sections_offsets_in_flight.is_empty());
505    }
506
507    fn place_end_marker(&mut self, temporary: bool) -> CuResult<()> {
508        match self.front_slab.write_end_marker(temporary) {
509            Ok(_) => Ok(()),
510            Err(_) => {
511                // Not enough space in the current slab, roll to a new one.
512                let new_slab = self.create_slab()?;
513                self.back_slabs
514                    .push(mem::replace(&mut self.front_slab, new_slab));
515                self.front_slab.write_end_marker(temporary)
516            }
517        }
518    }
519
520    pub fn stats(&self) -> (usize, Vec<usize>, usize) {
521        (
522            self.front_slab.current_global_position,
523            self.front_slab.sections_offsets_in_flight.clone(),
524            self.back_slabs.len(),
525        )
526    }
527
528    fn create_slab(&mut self) -> CuResult<SlabEntry> {
529        let file = self
530            .next_slab()
531            .map_err(|e| CuError::new_with_cause("Failed to create slab file", e))?;
532        SlabEntry::new(file, self.front_slab.page_size)
533            .map_err(|e| CuError::new_with_cause("Failed to create slab memory map", e))
534    }
535}
536
537impl Drop for MmapUnifiedLoggerWrite {
538    fn drop(&mut self) {
539        #[cfg(debug_assertions)]
540        eprintln!("Flushing the unified Logger ... "); // Note this cannot be a structured log writing in this log.
541
542        self.front_slab.clear_temporary_end_marker();
543        if let Err(e) = self.place_end_marker(false) {
544            panic!("Failed to flush the unified logger: {}", e);
545        }
546        self.front_slab
547            .flush_until(self.front_slab.current_global_position);
548        self.garbage_collect_backslabs();
549        #[cfg(debug_assertions)]
550        eprintln!("Unified Logger flushed."); // Note this cannot be a structured log writing in this log.
551    }
552}
553
554fn open_slab_index(
555    base_file_path: &Path,
556    slab_index: usize,
557) -> io::Result<(File, Mmap, u16, Option<MainHeader>)> {
558    let mut options = OpenOptions::new();
559    let options = options.read(true);
560
561    let file_path = build_slab_path(base_file_path, slab_index)?;
562    let file = options.open(&file_path).map_err(|e| {
563        io::Error::new(
564            e.kind(),
565            format!("Failed to open slab file {}: {e}", file_path.display()),
566        )
567    })?;
568    // SAFETY: The file is kept open for the lifetime of the mapping.
569    let mmap = unsafe { Mmap::map(&file) }
570        .map_err(|e| io::Error::new(e.kind(), format!("Failed to map slab file: {e}")))?;
571    let mut prolog = 0u16;
572    let mut maybe_main_header: Option<MainHeader> = None;
573    if slab_index == 0 {
574        let main_header: MainHeader;
575        let _read: usize;
576        (main_header, _read) = decode_from_slice(&mmap[..], standard()).map_err(|e| {
577            io::Error::new(
578                io::ErrorKind::InvalidData,
579                format!("Failed to decode main header: {e}"),
580            )
581        })?;
582        if main_header.magic != MAIN_MAGIC {
583            return Err(io::Error::new(
584                io::ErrorKind::InvalidData,
585                "Invalid magic number in main header",
586            ));
587        }
588        prolog = main_header.first_section_offset;
589        maybe_main_header = Some(main_header);
590    }
591    Ok((file, mmap, prolog, maybe_main_header))
592}
593
594/// A read side of the memory map based unified logger.
595pub struct MmapUnifiedLoggerRead {
596    base_file_path: PathBuf,
597    main_header: MainHeader,
598    current_mmap_buffer: Mmap,
599    current_file: File,
600    current_slab_index: usize,
601    current_reading_position: usize,
602}
603
604/// Absolute position inside a unified log (slab index + byte offset).
605#[derive(Clone, Copy, Debug, PartialEq, Eq)]
606pub struct LogPosition {
607    pub slab_index: usize,
608    pub offset: usize,
609}
610
611impl UnifiedLogRead for MmapUnifiedLoggerRead {
612    fn read_next_section_type(&mut self, datalogtype: UnifiedLogType) -> CuResult<Option<Vec<u8>>> {
613        // TODO: eventually implement a 0 copy of this too.
614        loop {
615            if self.current_reading_position >= self.current_mmap_buffer.len() {
616                self.next_slab().map_err(|e| {
617                    CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
618                })?;
619            }
620
621            let header_result = self.read_section_header();
622            let header = header_result.map_err(|error| {
623                CuError::new_with_cause(
624                    &format!(
625                        "Could not read a sections header: {}/{}:{}",
626                        self.base_file_path.as_os_str().to_string_lossy(),
627                        self.current_slab_index,
628                        self.current_reading_position,
629                    ),
630                    error,
631                )
632            })?;
633
634            // Reached the end of file
635            if header.entry_type == UnifiedLogType::LastEntry {
636                return Ok(None);
637            }
638
639            // Found a section of the requested type
640            if header.entry_type == datalogtype {
641                let result = Some(self.read_section_content(&header)?);
642                self.current_reading_position += header.offset_to_next_section as usize;
643                return Ok(result);
644            }
645
646            // Keep reading until we find the requested type
647            self.current_reading_position += header.offset_to_next_section as usize;
648        }
649    }
650
651    /// Reads the section from the section header pos.
652    fn raw_read_section(&mut self) -> CuResult<(SectionHeader, Vec<u8>)> {
653        if self.current_reading_position >= self.current_mmap_buffer.len() {
654            self.next_slab().map_err(|e| {
655                CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
656            })?;
657        }
658
659        let read_result = self.read_section_header();
660
661        match read_result {
662            Err(error) => Err(CuError::new_with_cause(
663                &format!(
664                    "Could not read a sections header: {}/{}:{}",
665                    self.base_file_path.as_os_str().to_string_lossy(),
666                    self.current_slab_index,
667                    self.current_reading_position,
668                ),
669                error,
670            )),
671            Ok(header) => {
672                let data = self.read_section_content(&header)?;
673                self.current_reading_position += header.offset_to_next_section as usize;
674                Ok((header, data))
675            }
676        }
677    }
678}
679
680impl MmapUnifiedLoggerRead {
681    pub fn new(base_file_path: &Path) -> io::Result<Self> {
682        let (file, mmap, prolog, header) = open_slab_index(base_file_path, 0)?;
683        let main_header = header.ok_or_else(|| {
684            io::Error::new(io::ErrorKind::InvalidData, "Missing main header in slab 0")
685        })?;
686
687        Ok(Self {
688            base_file_path: base_file_path.to_path_buf(),
689            main_header,
690            current_file: file,
691            current_mmap_buffer: mmap,
692            current_slab_index: 0,
693            current_reading_position: prolog as usize,
694        })
695    }
696
697    /// Current cursor position (start of next section header).
698    pub fn position(&self) -> LogPosition {
699        LogPosition {
700            slab_index: self.current_slab_index,
701            offset: self.current_reading_position,
702        }
703    }
704
705    /// Seek to an absolute position (start of a section header).
706    pub fn seek(&mut self, pos: LogPosition) -> CuResult<()> {
707        if pos.slab_index != self.current_slab_index {
708            let (file, mmap, _prolog, _header) =
709                open_slab_index(&self.base_file_path, pos.slab_index).map_err(|e| {
710                    CuError::new_with_cause(
711                        &format!("Failed to open slab {} for seek", pos.slab_index),
712                        e,
713                    )
714                })?;
715            self.current_file = file;
716            self.current_mmap_buffer = mmap;
717            self.current_slab_index = pos.slab_index;
718        }
719        self.current_reading_position = pos.offset;
720        Ok(())
721    }
722
723    fn next_slab(&mut self) -> io::Result<()> {
724        self.current_slab_index += 1;
725        let (file, mmap, prolog, _) =
726            open_slab_index(&self.base_file_path, self.current_slab_index)?;
727        self.current_file = file;
728        self.current_mmap_buffer = mmap;
729        self.current_reading_position = prolog as usize;
730        Ok(())
731    }
732
733    pub fn raw_main_header(&self) -> &MainHeader {
734        &self.main_header
735    }
736
737    pub fn scan_section_bytes(&mut self, datalogtype: UnifiedLogType) -> CuResult<u64> {
738        let mut total = 0u64;
739
740        loop {
741            if self.current_reading_position >= self.current_mmap_buffer.len() {
742                self.next_slab().map_err(|e| {
743                    CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
744                })?;
745            }
746
747            let header = self.read_section_header()?;
748
749            if header.entry_type == UnifiedLogType::LastEntry {
750                return Ok(total);
751            }
752
753            if header.entry_type == datalogtype {
754                total = total.saturating_add(header.used as u64);
755            }
756
757            self.current_reading_position += header.offset_to_next_section as usize;
758        }
759    }
760
761    /// Reads the section content from the section header pos.
762    fn read_section_content(&mut self, header: &SectionHeader) -> CuResult<Vec<u8>> {
763        // TODO: we could optimize by asking the buffer to fill
764        let mut section_data = vec![0; header.used as usize];
765        let start_of_data = self.current_reading_position + header.block_size as usize;
766        section_data.copy_from_slice(
767            &self.current_mmap_buffer[start_of_data..start_of_data + header.used as usize],
768        );
769
770        Ok(section_data)
771    }
772
773    fn read_section_header(&mut self) -> CuResult<SectionHeader> {
774        let section_header: SectionHeader;
775        (section_header, _) = decode_from_slice(
776            &self.current_mmap_buffer[self.current_reading_position..],
777            standard(),
778        )
779        .map_err(|e| {
780            CuError::new_with_cause(
781                &format!(
782                    "Could not read a sections header: {}/{}:{}",
783                    self.base_file_path.as_os_str().to_string_lossy(),
784                    self.current_slab_index,
785                    self.current_reading_position,
786                ),
787                e,
788            )
789        })?;
790        if section_header.magic != SECTION_MAGIC {
791            return Err("Invalid magic number in section header".into());
792        }
793
794        Ok(section_header)
795    }
796}
797
798/// This a convenience wrapper around the UnifiedLoggerRead to implement the Read trait.
799pub struct UnifiedLoggerIOReader {
800    logger: MmapUnifiedLoggerRead,
801    log_type: UnifiedLogType,
802    buffer: Vec<u8>,
803    buffer_pos: usize,
804}
805
806impl UnifiedLoggerIOReader {
807    pub fn new(logger: MmapUnifiedLoggerRead, log_type: UnifiedLogType) -> Self {
808        Self {
809            logger,
810            log_type,
811            buffer: Vec::new(),
812            buffer_pos: 0,
813        }
814    }
815
816    /// returns true if there is more data to read.
817    fn fill_buffer(&mut self) -> io::Result<bool> {
818        match self.logger.read_next_section_type(self.log_type) {
819            Ok(Some(section)) => {
820                self.buffer = section;
821                self.buffer_pos = 0;
822                Ok(true)
823            }
824            Ok(None) => Ok(false), // No more sections of this type
825            Err(e) => Err(io::Error::other(e.to_string())),
826        }
827    }
828}
829
830impl Read for UnifiedLoggerIOReader {
831    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
832        if self.buffer_pos >= self.buffer.len() && !self.fill_buffer()? {
833            // This means we hit the last section.
834            return Ok(0);
835        }
836
837        // If we still have no data after trying to fill the buffer, we're at EOF
838        if self.buffer_pos >= self.buffer.len() {
839            return Ok(0);
840        }
841
842        // Copy as much as we can from the buffer to `buf`
843        let len = std::cmp::min(buf.len(), self.buffer.len() - self.buffer_pos);
844        buf[..len].copy_from_slice(&self.buffer[self.buffer_pos..self.buffer_pos + len]);
845        self.buffer_pos += len;
846        Ok(len)
847    }
848}
849
850#[cfg(feature = "std")]
851#[cfg(test)]
852mod tests {
853    use super::*;
854    use crate::stream_write;
855    use bincode::de::read::SliceReader;
856    use bincode::{Decode, Encode, decode_from_reader, decode_from_slice};
857    use cu29_traits::WriteStream;
858    use std::path::PathBuf;
859    use std::sync::{Arc, Mutex};
860    use tempfile::TempDir;
861
862    const LARGE_SLAB: usize = 100 * 1024; // 100KB
863    const SMALL_SLAB: usize = 16 * 2 * 1024; // 16KB is the page size on MacOS for example
864
865    fn make_a_logger(
866        tmp_dir: &TempDir,
867        slab_size: usize,
868    ) -> (Arc<Mutex<MmapUnifiedLoggerWrite>>, PathBuf) {
869        let file_path = tmp_dir.path().join("test.bin");
870        let MmapUnifiedLogger::Write(data_logger) = MmapUnifiedLoggerBuilder::new()
871            .write(true)
872            .create(true)
873            .file_base_name(&file_path)
874            .preallocated_size(slab_size)
875            .build()
876            .expect("Failed to create logger")
877        else {
878            panic!("Failed to create logger")
879        };
880
881        (Arc::new(Mutex::new(data_logger)), file_path)
882    }
883
884    #[test]
885    fn test_truncation_and_sections_creations() {
886        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
887        let file_path = tmp_dir.path().join("test.bin");
888        let _used = {
889            let MmapUnifiedLogger::Write(mut logger) = MmapUnifiedLoggerBuilder::new()
890                .write(true)
891                .create(true)
892                .file_base_name(&file_path)
893                .preallocated_size(100000)
894                .build()
895                .expect("Failed to create logger")
896            else {
897                panic!("Failed to create logger")
898            };
899            logger
900                .add_section(UnifiedLogType::StructuredLogLine, 1024)
901                .unwrap();
902            logger
903                .add_section(UnifiedLogType::CopperList, 2048)
904                .unwrap();
905            let used = logger.front_slab.used();
906            assert!(used < 4 * page_size::get()); // ie. 3 headers, 1 page max per
907            // logger drops
908
909            used
910        };
911
912        let _file = OpenOptions::new()
913            .read(true)
914            .open(tmp_dir.path().join("test_0.bin"))
915            .expect("Could not reopen the file");
916        // Check if we have correctly truncated the file
917        // TODO: recompute this math
918        //assert_eq!(
919        //    file.metadata().unwrap().len(),
920        //    (used + size_of::<SectionHeader>()) as u64
921        //);
922    }
923
924    #[test]
925    fn test_one_section_self_cleaning() {
926        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
927        let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
928        {
929            let _stream = stream_write::<(), MmapSectionStorage>(
930                logger.clone(),
931                UnifiedLogType::StructuredLogLine,
932                1024,
933            );
934            assert_eq!(
935                logger
936                    .lock()
937                    .unwrap()
938                    .front_slab
939                    .sections_offsets_in_flight
940                    .len(),
941                1
942            );
943        }
944        assert_eq!(
945            logger
946                .lock()
947                .unwrap()
948                .front_slab
949                .sections_offsets_in_flight
950                .len(),
951            0
952        );
953        let logger = logger.lock().unwrap();
954        assert_eq!(
955            logger.front_slab.flushed_until_offset,
956            logger.front_slab.current_global_position
957        );
958    }
959
960    #[test]
961    fn test_temporary_end_marker_is_created() {
962        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
963        let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
964        {
965            let mut stream = stream_write::<u32, MmapSectionStorage>(
966                logger.clone(),
967                UnifiedLogType::StructuredLogLine,
968                1024,
969            )
970            .unwrap();
971            stream.log(&42u32).unwrap();
972        }
973
974        let logger_guard = logger.lock().unwrap();
975        let slab = &logger_guard.front_slab;
976        let marker_start = slab
977            .temporary_end_marker
978            .expect("temporary end-of-log marker missing");
979        let (eof_header, _) =
980            decode_from_slice::<SectionHeader, _>(&slab.mmap_buffer[marker_start..], standard())
981                .expect("Could not decode end-of-log marker header");
982        assert_eq!(eof_header.entry_type, UnifiedLogType::LastEntry);
983        assert!(eof_header.is_open);
984        assert_eq!(eof_header.used, 0);
985    }
986
987    #[test]
988    fn test_final_end_marker_is_not_temporary() {
989        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
990        let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
991        {
992            let mut stream = stream_write::<u32, MmapSectionStorage>(
993                logger.clone(),
994                UnifiedLogType::CopperList,
995                1024,
996            )
997            .unwrap();
998            stream.log(&1u32).unwrap();
999        }
1000        drop(logger);
1001
1002        let MmapUnifiedLogger::Read(mut reader) = MmapUnifiedLoggerBuilder::new()
1003            .file_base_name(&f)
1004            .build()
1005            .expect("Failed to build reader")
1006        else {
1007            panic!("Failed to create reader");
1008        };
1009
1010        loop {
1011            let (header, _data) = reader
1012                .raw_read_section()
1013                .expect("Failed to read section while searching for EOF");
1014            if header.entry_type == UnifiedLogType::LastEntry {
1015                assert!(!header.is_open);
1016                break;
1017            }
1018        }
1019    }
1020
1021    #[test]
1022    fn test_two_sections_self_cleaning_in_order() {
1023        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1024        let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
1025        let s1 = stream_write::<(), MmapSectionStorage>(
1026            logger.clone(),
1027            UnifiedLogType::StructuredLogLine,
1028            1024,
1029        );
1030        assert_eq!(
1031            logger
1032                .lock()
1033                .unwrap()
1034                .front_slab
1035                .sections_offsets_in_flight
1036                .len(),
1037            1
1038        );
1039        let s2 = stream_write::<(), MmapSectionStorage>(
1040            logger.clone(),
1041            UnifiedLogType::StructuredLogLine,
1042            1024,
1043        );
1044        assert_eq!(
1045            logger
1046                .lock()
1047                .unwrap()
1048                .front_slab
1049                .sections_offsets_in_flight
1050                .len(),
1051            2
1052        );
1053        drop(s2);
1054        assert_eq!(
1055            logger
1056                .lock()
1057                .unwrap()
1058                .front_slab
1059                .sections_offsets_in_flight
1060                .len(),
1061            1
1062        );
1063        drop(s1);
1064        let lg = logger.lock().unwrap();
1065        assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
1066        assert_eq!(
1067            lg.front_slab.flushed_until_offset,
1068            lg.front_slab.current_global_position
1069        );
1070    }
1071
1072    #[test]
1073    fn test_two_sections_self_cleaning_out_of_order() {
1074        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1075        let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
1076        let s1 = stream_write::<(), MmapSectionStorage>(
1077            logger.clone(),
1078            UnifiedLogType::StructuredLogLine,
1079            1024,
1080        );
1081        assert_eq!(
1082            logger
1083                .lock()
1084                .unwrap()
1085                .front_slab
1086                .sections_offsets_in_flight
1087                .len(),
1088            1
1089        );
1090        let s2 = stream_write::<(), MmapSectionStorage>(
1091            logger.clone(),
1092            UnifiedLogType::StructuredLogLine,
1093            1024,
1094        );
1095        assert_eq!(
1096            logger
1097                .lock()
1098                .unwrap()
1099                .front_slab
1100                .sections_offsets_in_flight
1101                .len(),
1102            2
1103        );
1104        drop(s1);
1105        assert_eq!(
1106            logger
1107                .lock()
1108                .unwrap()
1109                .front_slab
1110                .sections_offsets_in_flight
1111                .len(),
1112            1
1113        );
1114        drop(s2);
1115        let lg = logger.lock().unwrap();
1116        assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
1117        assert_eq!(
1118            lg.front_slab.flushed_until_offset,
1119            lg.front_slab.current_global_position
1120        );
1121    }
1122
1123    #[test]
1124    fn test_write_then_read_one_section() {
1125        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1126        let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
1127        {
1128            let mut stream =
1129                stream_write(logger.clone(), UnifiedLogType::StructuredLogLine, 1024).unwrap();
1130            stream.log(&1u32).unwrap();
1131            stream.log(&2u32).unwrap();
1132            stream.log(&3u32).unwrap();
1133        }
1134        drop(logger);
1135        let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
1136            .file_base_name(&f)
1137            .build()
1138            .expect("Failed to build logger")
1139        else {
1140            panic!("Failed to build logger");
1141        };
1142        let section = dl
1143            .read_next_section_type(UnifiedLogType::StructuredLogLine)
1144            .expect("Failed to read section");
1145        assert!(section.is_some());
1146        let section = section.unwrap();
1147        let mut reader = SliceReader::new(&section[..]);
1148        let v1: u32 = decode_from_reader(&mut reader, standard()).unwrap();
1149        let v2: u32 = decode_from_reader(&mut reader, standard()).unwrap();
1150        let v3: u32 = decode_from_reader(&mut reader, standard()).unwrap();
1151        assert_eq!(v1, 1);
1152        assert_eq!(v2, 2);
1153        assert_eq!(v3, 3);
1154    }
1155
1156    /// Mimic a basic CopperList implementation.
1157
1158    #[derive(Debug, Encode, Decode)]
1159    enum CopperListStateMock {
1160        Free,
1161        ProcessingTasks,
1162        BeingSerialized,
1163    }
1164
1165    #[derive(Encode, Decode)]
1166    struct CopperList<P: bincode::enc::Encode> {
1167        state: CopperListStateMock,
1168        payload: P, // This is generated from the runtime.
1169    }
1170
1171    #[test]
1172    fn test_copperlist_list_like_logging() {
1173        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1174        let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
1175        {
1176            let mut stream =
1177                stream_write(logger.clone(), UnifiedLogType::CopperList, 1024).unwrap();
1178            let cl0 = CopperList {
1179                state: CopperListStateMock::Free,
1180                payload: (1u32, 2u32, 3u32),
1181            };
1182            let cl1 = CopperList {
1183                state: CopperListStateMock::ProcessingTasks,
1184                payload: (4u32, 5u32, 6u32),
1185            };
1186            stream.log(&cl0).unwrap();
1187            stream.log(&cl1).unwrap();
1188        }
1189        drop(logger);
1190
1191        let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
1192            .file_base_name(&f)
1193            .build()
1194            .expect("Failed to build logger")
1195        else {
1196            panic!("Failed to build logger");
1197        };
1198        let section = dl
1199            .read_next_section_type(UnifiedLogType::CopperList)
1200            .expect("Failed to read section");
1201        assert!(section.is_some());
1202        let section = section.unwrap();
1203
1204        let mut reader = SliceReader::new(&section[..]);
1205        let cl0: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
1206        let cl1: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
1207        assert_eq!(cl0.payload.1, 2);
1208        assert_eq!(cl1.payload.2, 6);
1209    }
1210
1211    #[test]
1212    fn test_multi_slab_end2end() {
1213        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1214        let (logger, f) = make_a_logger(&tmp_dir, SMALL_SLAB);
1215        {
1216            let mut stream =
1217                stream_write(logger.clone(), UnifiedLogType::CopperList, 1024).unwrap();
1218            let cl0 = CopperList {
1219                state: CopperListStateMock::Free,
1220                payload: (1u32, 2u32, 3u32),
1221            };
1222            // large enough so we are sure to create a few slabs
1223            for _ in 0..10000 {
1224                stream.log(&cl0).unwrap();
1225            }
1226        }
1227        drop(logger);
1228
1229        let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
1230            .file_base_name(&f)
1231            .build()
1232            .expect("Failed to build logger")
1233        else {
1234            panic!("Failed to build logger");
1235        };
1236        let mut total_readback = 0;
1237        loop {
1238            let section = dl.read_next_section_type(UnifiedLogType::CopperList);
1239            if section.is_err() {
1240                break;
1241            }
1242            let section = section.unwrap();
1243            if section.is_none() {
1244                break;
1245            }
1246            let section = section.unwrap();
1247
1248            let mut reader = SliceReader::new(&section[..]);
1249            loop {
1250                let maybe_cl: Result<CopperList<(u32, u32, u32)>, _> =
1251                    decode_from_reader(&mut reader, standard());
1252                if maybe_cl.is_ok() {
1253                    total_readback += 1;
1254                } else {
1255                    break;
1256                }
1257            }
1258        }
1259        assert_eq!(total_readback, 10000);
1260    }
1261}