cu29_unifiedlog/
memmap.rs

1//! This is the memory map file implementation for the unified logger for Copper.
2//! It is std only.
3
4use crate::{
5    AllocatedSection, MAIN_MAGIC, MainHeader, SECTION_MAGIC, SectionHandle, SectionHeader,
6    SectionStorage, UnifiedLogRead, UnifiedLogStatus, UnifiedLogWrite,
7};
8
9use crate::SECTION_HEADER_COMPACT_SIZE;
10
11use AllocatedSection::Section;
12use bincode::config::standard;
13use bincode::error::EncodeError;
14use bincode::{Encode, decode_from_slice, encode_into_slice};
15use core::slice::from_raw_parts_mut;
16use cu29_traits::{CuError, CuResult, UnifiedLogType};
17use memmap2::{Mmap, MmapMut};
18use std::fs::{File, OpenOptions};
19use std::io::Read;
20use std::mem::ManuallyDrop;
21use std::path::{Path, PathBuf};
22use std::{io, mem};
23
24pub struct MmapSectionStorage {
25    buffer: &'static mut [u8],
26    offset: usize,
27    block_size: usize,
28}
29
30impl MmapSectionStorage {
31    pub fn new(buffer: &'static mut [u8], block_size: usize) -> Self {
32        Self {
33            buffer,
34            offset: 0,
35            block_size,
36        }
37    }
38
39    pub fn buffer_ptr(&self) -> *const u8 {
40        &self.buffer[0] as *const u8
41    }
42}
43
44impl SectionStorage for MmapSectionStorage {
45    fn initialize<E: Encode>(&mut self, header: &E) -> Result<usize, EncodeError> {
46        self.post_update_header(header)?;
47        self.offset = self.block_size;
48        Ok(self.offset)
49    }
50
51    fn post_update_header<E: Encode>(&mut self, header: &E) -> Result<usize, EncodeError> {
52        encode_into_slice(header, &mut self.buffer[0..], standard())
53    }
54
55    fn append<E: Encode>(&mut self, entry: &E) -> Result<usize, EncodeError> {
56        let size = encode_into_slice(entry, &mut self.buffer[self.offset..], standard())?;
57        self.offset += size;
58        Ok(size)
59    }
60
61    fn flush(&mut self) -> CuResult<usize> {
62        todo!()
63    }
64}
65
66///
67/// Holds the read or write side of the datalogger.
68pub enum MmapUnifiedLogger {
69    Read(MmapUnifiedLoggerRead),
70    Write(MmapUnifiedLoggerWrite),
71}
72
73/// Use this builder to create a new DataLogger.
74pub struct MmapUnifiedLoggerBuilder {
75    file_base_name: Option<PathBuf>,
76    preallocated_size: Option<usize>,
77    write: bool,
78    create: bool,
79}
80
81impl Default for MmapUnifiedLoggerBuilder {
82    fn default() -> Self {
83        Self::new()
84    }
85}
86
87impl MmapUnifiedLoggerBuilder {
88    pub fn new() -> Self {
89        Self {
90            file_base_name: None,
91            preallocated_size: None,
92            write: false,
93            create: false, // This is the safest default
94        }
95    }
96
97    /// If "something/toto.copper" is given, it will find or create "something/toto_0.copper",  "something/toto_1.copper" etc.
98    pub fn file_base_name(mut self, file_path: &Path) -> Self {
99        self.file_base_name = Some(file_path.to_path_buf());
100        self
101    }
102
103    pub fn preallocated_size(mut self, preallocated_size: usize) -> Self {
104        self.preallocated_size = Some(preallocated_size);
105        self
106    }
107
108    pub fn write(mut self, write: bool) -> Self {
109        self.write = write;
110        self
111    }
112
113    pub fn create(mut self, create: bool) -> Self {
114        self.create = create;
115        self
116    }
117
118    pub fn build(self) -> io::Result<MmapUnifiedLogger> {
119        let page_size = page_size::get();
120
121        if self.write && self.create {
122            let ulw = MmapUnifiedLoggerWrite::new(
123                &self
124                    .file_base_name
125                    .expect("This unified logger has no filename."),
126                self.preallocated_size
127                    .expect("This unified logger has no preallocated size."),
128                page_size,
129            );
130
131            Ok(MmapUnifiedLogger::Write(ulw))
132        } else {
133            let file_path = self.file_base_name.ok_or_else(|| {
134                io::Error::new(io::ErrorKind::InvalidInput, "File path is required")
135            })?;
136            let ulr = MmapUnifiedLoggerRead::new(&file_path)?;
137            Ok(MmapUnifiedLogger::Read(ulr))
138        }
139    }
140}
141
142struct SlabEntry {
143    file: File,
144    mmap_buffer: ManuallyDrop<MmapMut>,
145    current_global_position: usize,
146    sections_offsets_in_flight: Vec<usize>,
147    flushed_until_offset: usize,
148    page_size: usize,
149    temporary_end_marker: Option<usize>,
150}
151
152impl Drop for SlabEntry {
153    fn drop(&mut self) {
154        self.flush_until(self.current_global_position);
155        unsafe { ManuallyDrop::drop(&mut self.mmap_buffer) };
156        self.file
157            .set_len(self.current_global_position as u64)
158            .expect("Failed to trim datalogger file");
159
160        if !self.sections_offsets_in_flight.is_empty() {
161            eprintln!("Error: Slab not full flushed.");
162        }
163    }
164}
165
166impl SlabEntry {
167    fn new(file: File, page_size: usize) -> Self {
168        let mmap_buffer =
169            ManuallyDrop::new(unsafe { MmapMut::map_mut(&file).expect("Failed to map file") });
170        Self {
171            file,
172            mmap_buffer,
173            current_global_position: 0,
174            sections_offsets_in_flight: Vec::with_capacity(16),
175            flushed_until_offset: 0,
176            page_size,
177            temporary_end_marker: None,
178        }
179    }
180
181    /// Unsure the underlying mmap is flush to disk until the given position.
182    fn flush_until(&mut self, until_position: usize) {
183        // This is tolerated under linux, but crashes on macos
184        if (self.flushed_until_offset == until_position) || (until_position == 0) {
185            return;
186        }
187        self.mmap_buffer
188            .flush_async_range(
189                self.flushed_until_offset,
190                until_position - self.flushed_until_offset,
191            )
192            .expect("Failed to flush memory map");
193        self.flushed_until_offset = until_position;
194    }
195
196    fn clear_temporary_end_marker(&mut self) {
197        if let Some(marker_start) = self.temporary_end_marker.take() {
198            self.current_global_position = marker_start;
199            if self.flushed_until_offset > marker_start {
200                self.flushed_until_offset = marker_start;
201            }
202        }
203    }
204
205    fn write_end_marker(&mut self, temporary: bool) -> CuResult<()> {
206        let block_size = SECTION_HEADER_COMPACT_SIZE as usize;
207        let marker_start = self.align_to_next_page(self.current_global_position);
208        let total_marker_size = block_size; // header only
209        let marker_end = marker_start + total_marker_size;
210        if marker_end > self.mmap_buffer.len() {
211            return Err("Not enough space to write end-of-log marker".into());
212        }
213
214        let header = SectionHeader {
215            magic: SECTION_MAGIC,
216            block_size: SECTION_HEADER_COMPACT_SIZE,
217            entry_type: UnifiedLogType::LastEntry,
218            offset_to_next_section: total_marker_size as u32,
219            used: 0,
220            is_open: temporary,
221        };
222
223        encode_into_slice(
224            &header,
225            &mut self.mmap_buffer
226                [marker_start..marker_start + SECTION_HEADER_COMPACT_SIZE as usize],
227            standard(),
228        )
229        .map_err(|e| CuError::new_with_cause("Failed to encode end-of-log header", e))?;
230
231        self.temporary_end_marker = Some(marker_start);
232        self.current_global_position = marker_end;
233        Ok(())
234    }
235
236    fn is_it_my_section(&self, section: &SectionHandle<MmapSectionStorage>) -> bool {
237        let storage = section.get_storage();
238        let ptr = storage.buffer_ptr();
239        (ptr >= self.mmap_buffer.as_ptr())
240            && (ptr as usize)
241                < (self.mmap_buffer.as_ref().as_ptr() as usize + self.mmap_buffer.as_ref().len())
242    }
243
244    /// Flush the section to disk.
245    /// the flushing is permanent and the section is considered closed.
246    fn flush_section(&mut self, section: &mut SectionHandle<MmapSectionStorage>) {
247        section
248            .post_update_header()
249            .expect("Failed to update section header");
250
251        let storage = section.get_storage();
252        let ptr = storage.buffer_ptr();
253
254        if ptr < self.mmap_buffer.as_ptr()
255            || ptr as usize > self.mmap_buffer.as_ptr() as usize + self.mmap_buffer.len()
256        {
257            panic!("Invalid section buffer, not in the slab");
258        }
259
260        let base = self.mmap_buffer.as_ptr() as usize;
261        self.sections_offsets_in_flight
262            .retain(|&x| x != ptr as usize - base);
263
264        if self.sections_offsets_in_flight.is_empty() {
265            self.flush_until(self.current_global_position);
266            return;
267        }
268        if self.flushed_until_offset < self.sections_offsets_in_flight[0] {
269            self.flush_until(self.sections_offsets_in_flight[0]);
270        }
271    }
272
273    #[inline]
274    fn align_to_next_page(&self, ptr: usize) -> usize {
275        (ptr + self.page_size - 1) & !(self.page_size - 1)
276    }
277
278    /// The returned slice is section_size or greater.
279    fn add_section(
280        &mut self,
281        entry_type: UnifiedLogType,
282        requested_section_size: usize,
283    ) -> AllocatedSection<MmapSectionStorage> {
284        // align current_position to the next page
285        self.current_global_position = self.align_to_next_page(self.current_global_position);
286        let section_size = self.align_to_next_page(requested_section_size) as u32;
287
288        // We need to have enough space to store the section in that slab
289        if self.current_global_position + section_size as usize > self.mmap_buffer.len() {
290            return AllocatedSection::NoMoreSpace;
291        }
292
293        #[cfg(feature = "compact")]
294        let block_size = SECTION_HEADER_COMPACT_SIZE;
295
296        #[cfg(not(feature = "compact"))]
297        let block_size = self.page_size as u16;
298
299        let section_header = SectionHeader {
300            magic: SECTION_MAGIC,
301            block_size,
302            entry_type,
303            offset_to_next_section: section_size,
304            used: 0u32,
305            is_open: true,
306        };
307
308        // save the position to keep track for in flight sections
309        self.sections_offsets_in_flight
310            .push(self.current_global_position);
311        let end_of_section = self.current_global_position + requested_section_size;
312        let user_buffer = &mut self.mmap_buffer[self.current_global_position..end_of_section];
313
314        // here we have the guarantee for exclusive access to that memory for the lifetime of the handle, the borrow checker cannot understand that ever.
315        let handle_buffer =
316            unsafe { from_raw_parts_mut(user_buffer.as_mut_ptr(), user_buffer.len()) };
317        let storage = MmapSectionStorage::new(handle_buffer, block_size as usize);
318
319        self.current_global_position = end_of_section;
320
321        Section(SectionHandle::create(section_header, storage).expect("Failed to create section"))
322    }
323
324    #[cfg(test)]
325    fn used(&self) -> usize {
326        self.current_global_position
327    }
328}
329
330/// A write side of the datalogger.
331pub struct MmapUnifiedLoggerWrite {
332    /// the front slab is the current active slab for any new section.
333    front_slab: SlabEntry,
334    /// the back slab is the previous slab that is being flushed.
335    back_slabs: Vec<SlabEntry>,
336    /// base file path to create the backing files from.
337    base_file_path: PathBuf,
338    /// allocation size for the backing files.
339    slab_size: usize,
340    /// current suffix for the backing files.
341    front_slab_suffix: usize,
342}
343
344fn build_slab_path(base_file_path: &Path, slab_index: usize) -> PathBuf {
345    let mut file_path = base_file_path.to_path_buf();
346    let file_name = file_path
347        .file_name()
348        .expect("Invalid base file path")
349        .to_str()
350        .expect("Could not translate the filename OsStr to str");
351    let mut file_name = file_name.split('.').collect::<Vec<&str>>();
352    let extension = file_name.pop().expect("Could not find the file extension.");
353    let file_name = file_name.join(".");
354    let file_name = format!("{file_name}_{slab_index}.{extension}");
355    file_path.set_file_name(file_name);
356    file_path
357}
358
359fn make_slab_file(base_file_path: &Path, slab_size: usize, slab_suffix: usize) -> File {
360    let file_path = build_slab_path(base_file_path, slab_suffix);
361    let file = OpenOptions::new()
362        .read(true)
363        .write(true)
364        .create(true)
365        .truncate(true)
366        .open(&file_path)
367        .unwrap_or_else(|_| panic!("Failed to open file: {}", file_path.display()));
368    file.set_len(slab_size as u64)
369        .expect("Failed to set file length");
370    file
371}
372
373impl UnifiedLogWrite<MmapSectionStorage> for MmapUnifiedLoggerWrite {
374    /// The returned slice is section_size or greater.
375    fn add_section(
376        &mut self,
377        entry_type: UnifiedLogType,
378        requested_section_size: usize,
379    ) -> CuResult<SectionHandle<MmapSectionStorage>> {
380        self.garbage_collect_backslabs(); // Take the opportunity to keep up and close stale back slabs.
381        self.front_slab.clear_temporary_end_marker();
382        let maybe_section = self
383            .front_slab
384            .add_section(entry_type, requested_section_size);
385
386        match maybe_section {
387            AllocatedSection::NoMoreSpace => {
388                // move the front slab to the back slab.
389                let new_slab = SlabEntry::new(self.next_slab(), self.front_slab.page_size);
390                // keep the slab until all its sections has been flushed.
391                self.back_slabs
392                    .push(mem::replace(&mut self.front_slab, new_slab));
393                match self
394                    .front_slab
395                    .add_section(entry_type, requested_section_size)
396                {
397                    AllocatedSection::NoMoreSpace => Err(CuError::from("out of space")),
398                    Section(section) => {
399                        self.place_end_marker(true)?;
400                        Ok(section)
401                    }
402                }
403            }
404            Section(section) => {
405                self.place_end_marker(true)?;
406                Ok(section)
407            }
408        }
409    }
410
411    fn flush_section(&mut self, section: &mut SectionHandle<MmapSectionStorage>) {
412        section.mark_closed();
413        for slab in self.back_slabs.iter_mut() {
414            if slab.is_it_my_section(section) {
415                slab.flush_section(section);
416                return;
417            }
418        }
419        self.front_slab.flush_section(section);
420    }
421
422    fn status(&self) -> UnifiedLogStatus {
423        UnifiedLogStatus {
424            total_used_space: self.front_slab.current_global_position,
425            total_allocated_space: self.slab_size * self.front_slab_suffix,
426        }
427    }
428}
429
430impl MmapUnifiedLoggerWrite {
431    fn next_slab(&mut self) -> File {
432        self.front_slab_suffix += 1;
433
434        make_slab_file(&self.base_file_path, self.slab_size, self.front_slab_suffix)
435    }
436
437    fn new(base_file_path: &Path, slab_size: usize, page_size: usize) -> Self {
438        let file = make_slab_file(base_file_path, slab_size, 0);
439        let mut front_slab = SlabEntry::new(file, page_size);
440
441        // This is the first slab so add the main header.
442        let main_header = MainHeader {
443            magic: MAIN_MAGIC,
444            first_section_offset: page_size as u16,
445            page_size: page_size as u16,
446        };
447        let nb_bytes = encode_into_slice(&main_header, &mut front_slab.mmap_buffer[..], standard())
448            .expect("Failed to encode main header");
449        assert!(nb_bytes < page_size);
450        front_slab.current_global_position = page_size; // align to the next page
451
452        Self {
453            front_slab,
454            back_slabs: Vec::new(),
455            base_file_path: base_file_path.to_path_buf(),
456            slab_size,
457            front_slab_suffix: 0,
458        }
459    }
460
461    fn garbage_collect_backslabs(&mut self) {
462        self.back_slabs
463            .retain_mut(|slab| !slab.sections_offsets_in_flight.is_empty());
464    }
465
466    fn place_end_marker(&mut self, temporary: bool) -> CuResult<()> {
467        match self.front_slab.write_end_marker(temporary) {
468            Ok(_) => Ok(()),
469            Err(_) => {
470                // Not enough space in the current slab, roll to a new one.
471                let new_slab = SlabEntry::new(self.next_slab(), self.front_slab.page_size);
472                self.back_slabs
473                    .push(mem::replace(&mut self.front_slab, new_slab));
474                self.front_slab.write_end_marker(temporary)
475            }
476        }
477    }
478
479    pub fn stats(&self) -> (usize, Vec<usize>, usize) {
480        (
481            self.front_slab.current_global_position,
482            self.front_slab.sections_offsets_in_flight.clone(),
483            self.back_slabs.len(),
484        )
485    }
486}
487
488impl Drop for MmapUnifiedLoggerWrite {
489    fn drop(&mut self) {
490        #[cfg(debug_assertions)]
491        eprintln!("Flushing the unified Logger ... "); // Note this cannot be a structured log writing in this log.
492
493        self.front_slab.clear_temporary_end_marker();
494        if let Err(e) = self.place_end_marker(false) {
495            panic!("Failed to flush the unified logger: {}", e);
496        }
497        self.front_slab
498            .flush_until(self.front_slab.current_global_position);
499        self.garbage_collect_backslabs();
500        #[cfg(debug_assertions)]
501        eprintln!("Unified Logger flushed."); // Note this cannot be a structured log writing in this log.
502    }
503}
504
505fn open_slab_index(
506    base_file_path: &Path,
507    slab_index: usize,
508) -> io::Result<(File, Mmap, u16, Option<MainHeader>)> {
509    let mut options = OpenOptions::new();
510    let options = options.read(true);
511
512    let file_path = build_slab_path(base_file_path, slab_index);
513    let file = options.open(file_path)?;
514    let mmap = unsafe { Mmap::map(&file) }?;
515    let mut prolog = 0u16;
516    let mut maybe_main_header: Option<MainHeader> = None;
517    if slab_index == 0 {
518        let main_header: MainHeader;
519        let _read: usize;
520        (main_header, _read) =
521            decode_from_slice(&mmap[..], standard()).expect("Failed to decode main header");
522        if main_header.magic != MAIN_MAGIC {
523            return Err(io::Error::new(
524                io::ErrorKind::InvalidData,
525                "Invalid magic number in main header",
526            ));
527        }
528        prolog = main_header.first_section_offset;
529        maybe_main_header = Some(main_header);
530    }
531    Ok((file, mmap, prolog, maybe_main_header))
532}
533
534/// A read side of the memory map based unified logger.
535pub struct MmapUnifiedLoggerRead {
536    base_file_path: PathBuf,
537    main_header: MainHeader,
538    current_mmap_buffer: Mmap,
539    current_file: File,
540    current_slab_index: usize,
541    current_reading_position: usize,
542}
543
544impl UnifiedLogRead for MmapUnifiedLoggerRead {
545    fn read_next_section_type(&mut self, datalogtype: UnifiedLogType) -> CuResult<Option<Vec<u8>>> {
546        // TODO: eventually implement a 0 copy of this too.
547        loop {
548            if self.current_reading_position >= self.current_mmap_buffer.len() {
549                self.next_slab().map_err(|e| {
550                    CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
551                })?;
552            }
553
554            let header_result = self.read_section_header();
555            let header = header_result.map_err(|error| {
556                CuError::new_with_cause(
557                    &format!(
558                        "Could not read a sections header: {}/{}:{}",
559                        self.base_file_path.as_os_str().to_string_lossy(),
560                        self.current_slab_index,
561                        self.current_reading_position,
562                    ),
563                    error,
564                )
565            })?;
566
567            // Reached the end of file
568            if header.entry_type == UnifiedLogType::LastEntry {
569                return Ok(None);
570            }
571
572            // Found a section of the requested type
573            if header.entry_type == datalogtype {
574                let result = Some(self.read_section_content(&header)?);
575                self.current_reading_position += header.offset_to_next_section as usize;
576                return Ok(result);
577            }
578
579            // Keep reading until we find the requested type
580            self.current_reading_position += header.offset_to_next_section as usize;
581        }
582    }
583
584    /// Reads the section from the section header pos.
585    fn raw_read_section(&mut self) -> CuResult<(SectionHeader, Vec<u8>)> {
586        if self.current_reading_position >= self.current_mmap_buffer.len() {
587            self.next_slab().map_err(|e| {
588                CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
589            })?;
590        }
591
592        let read_result = self.read_section_header();
593
594        match read_result {
595            Err(error) => Err(CuError::new_with_cause(
596                &format!(
597                    "Could not read a sections header: {}/{}:{}",
598                    self.base_file_path.as_os_str().to_string_lossy(),
599                    self.current_slab_index,
600                    self.current_reading_position,
601                ),
602                error,
603            )),
604            Ok(header) => {
605                let data = self.read_section_content(&header)?;
606                self.current_reading_position += header.offset_to_next_section as usize;
607                Ok((header, data))
608            }
609        }
610    }
611}
612
613impl MmapUnifiedLoggerRead {
614    pub fn new(base_file_path: &Path) -> io::Result<Self> {
615        let (file, mmap, prolog, header) = open_slab_index(base_file_path, 0)?;
616
617        Ok(Self {
618            base_file_path: base_file_path.to_path_buf(),
619            main_header: header.expect("UnifiedLoggerRead needs a header"),
620            current_file: file,
621            current_mmap_buffer: mmap,
622            current_slab_index: 0,
623            current_reading_position: prolog as usize,
624        })
625    }
626
627    fn next_slab(&mut self) -> io::Result<()> {
628        self.current_slab_index += 1;
629        let (file, mmap, prolog, _) =
630            open_slab_index(&self.base_file_path, self.current_slab_index)?;
631        self.current_file = file;
632        self.current_mmap_buffer = mmap;
633        self.current_reading_position = prolog as usize;
634        Ok(())
635    }
636
637    pub fn raw_main_header(&self) -> &MainHeader {
638        &self.main_header
639    }
640
641    /// Reads the section content from the section header pos.
642    fn read_section_content(&mut self, header: &SectionHeader) -> CuResult<Vec<u8>> {
643        // TODO: we could optimize by asking the buffer to fill
644        let mut section_data = vec![0; header.used as usize];
645        let start_of_data = self.current_reading_position + header.block_size as usize;
646        section_data.copy_from_slice(
647            &self.current_mmap_buffer[start_of_data..start_of_data + header.used as usize],
648        );
649
650        Ok(section_data)
651    }
652
653    fn read_section_header(&mut self) -> CuResult<SectionHeader> {
654        let section_header: SectionHeader;
655        (section_header, _) = decode_from_slice(
656            &self.current_mmap_buffer[self.current_reading_position..],
657            standard(),
658        )
659        .map_err(|e| {
660            CuError::new_with_cause(
661                &format!(
662                    "Could not read a sections header: {}/{}:{}",
663                    self.base_file_path.as_os_str().to_string_lossy(),
664                    self.current_slab_index,
665                    self.current_reading_position,
666                ),
667                e,
668            )
669        })?;
670        if section_header.magic != SECTION_MAGIC {
671            return Err("Invalid magic number in section header".into());
672        }
673
674        Ok(section_header)
675    }
676}
677
678/// This a convenience wrapper around the UnifiedLoggerRead to implement the Read trait.
679pub struct UnifiedLoggerIOReader {
680    logger: MmapUnifiedLoggerRead,
681    log_type: UnifiedLogType,
682    buffer: Vec<u8>,
683    buffer_pos: usize,
684}
685
686impl UnifiedLoggerIOReader {
687    pub fn new(logger: MmapUnifiedLoggerRead, log_type: UnifiedLogType) -> Self {
688        Self {
689            logger,
690            log_type,
691            buffer: Vec::new(),
692            buffer_pos: 0,
693        }
694    }
695
696    /// returns true if there is more data to read.
697    fn fill_buffer(&mut self) -> io::Result<bool> {
698        match self.logger.read_next_section_type(self.log_type) {
699            Ok(Some(section)) => {
700                self.buffer = section;
701                self.buffer_pos = 0;
702                Ok(true)
703            }
704            Ok(None) => Ok(false), // No more sections of this type
705            Err(e) => Err(io::Error::other(e.to_string())),
706        }
707    }
708}
709
710impl Read for UnifiedLoggerIOReader {
711    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
712        if self.buffer_pos >= self.buffer.len() && !self.fill_buffer()? {
713            // This means we hit the last section.
714            return Ok(0);
715        }
716
717        // If we still have no data after trying to fill the buffer, we're at EOF
718        if self.buffer_pos >= self.buffer.len() {
719            return Ok(0);
720        }
721
722        // Copy as much as we can from the buffer to `buf`
723        let len = std::cmp::min(buf.len(), self.buffer.len() - self.buffer_pos);
724        buf[..len].copy_from_slice(&self.buffer[self.buffer_pos..self.buffer_pos + len]);
725        self.buffer_pos += len;
726        Ok(len)
727    }
728}
729
730#[cfg(feature = "std")]
731#[cfg(test)]
732mod tests {
733    use super::*;
734    use crate::stream_write;
735    use bincode::de::read::SliceReader;
736    use bincode::{Decode, Encode, decode_from_reader, decode_from_slice};
737    use cu29_traits::WriteStream;
738    use std::path::PathBuf;
739    use std::sync::{Arc, Mutex};
740    use tempfile::TempDir;
741
742    const LARGE_SLAB: usize = 100 * 1024; // 100KB
743    const SMALL_SLAB: usize = 16 * 2 * 1024; // 16KB is the page size on MacOS for example
744
745    fn make_a_logger(
746        tmp_dir: &TempDir,
747        slab_size: usize,
748    ) -> (Arc<Mutex<MmapUnifiedLoggerWrite>>, PathBuf) {
749        let file_path = tmp_dir.path().join("test.bin");
750        let MmapUnifiedLogger::Write(data_logger) = MmapUnifiedLoggerBuilder::new()
751            .write(true)
752            .create(true)
753            .file_base_name(&file_path)
754            .preallocated_size(slab_size)
755            .build()
756            .expect("Failed to create logger")
757        else {
758            panic!("Failed to create logger")
759        };
760
761        (Arc::new(Mutex::new(data_logger)), file_path)
762    }
763
764    #[test]
765    fn test_truncation_and_sections_creations() {
766        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
767        let file_path = tmp_dir.path().join("test.bin");
768        let _used = {
769            let MmapUnifiedLogger::Write(mut logger) = MmapUnifiedLoggerBuilder::new()
770                .write(true)
771                .create(true)
772                .file_base_name(&file_path)
773                .preallocated_size(100000)
774                .build()
775                .expect("Failed to create logger")
776            else {
777                panic!("Failed to create logger")
778            };
779            logger
780                .add_section(UnifiedLogType::StructuredLogLine, 1024)
781                .unwrap();
782            logger
783                .add_section(UnifiedLogType::CopperList, 2048)
784                .unwrap();
785            let used = logger.front_slab.used();
786            assert!(used < 4 * page_size::get()); // ie. 3 headers, 1 page max per
787            // logger drops
788
789            used
790        };
791
792        let _file = OpenOptions::new()
793            .read(true)
794            .open(tmp_dir.path().join("test_0.bin"))
795            .expect("Could not reopen the file");
796        // Check if we have correctly truncated the file
797        // TODO: recompute this math
798        //assert_eq!(
799        //    file.metadata().unwrap().len(),
800        //    (used + size_of::<SectionHeader>()) as u64
801        //);
802    }
803
804    #[test]
805    fn test_one_section_self_cleaning() {
806        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
807        let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
808        {
809            let _stream = stream_write::<(), MmapSectionStorage>(
810                logger.clone(),
811                UnifiedLogType::StructuredLogLine,
812                1024,
813            );
814            assert_eq!(
815                logger
816                    .lock()
817                    .unwrap()
818                    .front_slab
819                    .sections_offsets_in_flight
820                    .len(),
821                1
822            );
823        }
824        assert_eq!(
825            logger
826                .lock()
827                .unwrap()
828                .front_slab
829                .sections_offsets_in_flight
830                .len(),
831            0
832        );
833        let logger = logger.lock().unwrap();
834        assert_eq!(
835            logger.front_slab.flushed_until_offset,
836            logger.front_slab.current_global_position
837        );
838    }
839
840    #[test]
841    fn test_temporary_end_marker_is_created() {
842        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
843        let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
844        {
845            let mut stream = stream_write::<u32, MmapSectionStorage>(
846                logger.clone(),
847                UnifiedLogType::StructuredLogLine,
848                1024,
849            )
850            .unwrap();
851            stream.log(&42u32).unwrap();
852        }
853
854        let logger_guard = logger.lock().unwrap();
855        let slab = &logger_guard.front_slab;
856        let marker_start = slab
857            .temporary_end_marker
858            .expect("temporary end-of-log marker missing");
859        let (eof_header, _) =
860            decode_from_slice::<SectionHeader, _>(&slab.mmap_buffer[marker_start..], standard())
861                .expect("Could not decode end-of-log marker header");
862        assert_eq!(eof_header.entry_type, UnifiedLogType::LastEntry);
863        assert!(eof_header.is_open);
864        assert_eq!(eof_header.used, 0);
865    }
866
867    #[test]
868    fn test_final_end_marker_is_not_temporary() {
869        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
870        let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
871        {
872            let mut stream = stream_write::<u32, MmapSectionStorage>(
873                logger.clone(),
874                UnifiedLogType::CopperList,
875                1024,
876            )
877            .unwrap();
878            stream.log(&1u32).unwrap();
879        }
880        drop(logger);
881
882        let MmapUnifiedLogger::Read(mut reader) = MmapUnifiedLoggerBuilder::new()
883            .file_base_name(&f)
884            .build()
885            .expect("Failed to build reader")
886        else {
887            panic!("Failed to create reader");
888        };
889
890        loop {
891            let (header, _data) = reader
892                .raw_read_section()
893                .expect("Failed to read section while searching for EOF");
894            if header.entry_type == UnifiedLogType::LastEntry {
895                assert!(!header.is_open);
896                break;
897            }
898        }
899    }
900
901    #[test]
902    fn test_two_sections_self_cleaning_in_order() {
903        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
904        let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
905        let s1 = stream_write::<(), MmapSectionStorage>(
906            logger.clone(),
907            UnifiedLogType::StructuredLogLine,
908            1024,
909        );
910        assert_eq!(
911            logger
912                .lock()
913                .unwrap()
914                .front_slab
915                .sections_offsets_in_flight
916                .len(),
917            1
918        );
919        let s2 = stream_write::<(), MmapSectionStorage>(
920            logger.clone(),
921            UnifiedLogType::StructuredLogLine,
922            1024,
923        );
924        assert_eq!(
925            logger
926                .lock()
927                .unwrap()
928                .front_slab
929                .sections_offsets_in_flight
930                .len(),
931            2
932        );
933        drop(s2);
934        assert_eq!(
935            logger
936                .lock()
937                .unwrap()
938                .front_slab
939                .sections_offsets_in_flight
940                .len(),
941            1
942        );
943        drop(s1);
944        let lg = logger.lock().unwrap();
945        assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
946        assert_eq!(
947            lg.front_slab.flushed_until_offset,
948            lg.front_slab.current_global_position
949        );
950    }
951
952    #[test]
953    fn test_two_sections_self_cleaning_out_of_order() {
954        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
955        let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
956        let s1 = stream_write::<(), MmapSectionStorage>(
957            logger.clone(),
958            UnifiedLogType::StructuredLogLine,
959            1024,
960        );
961        assert_eq!(
962            logger
963                .lock()
964                .unwrap()
965                .front_slab
966                .sections_offsets_in_flight
967                .len(),
968            1
969        );
970        let s2 = stream_write::<(), MmapSectionStorage>(
971            logger.clone(),
972            UnifiedLogType::StructuredLogLine,
973            1024,
974        );
975        assert_eq!(
976            logger
977                .lock()
978                .unwrap()
979                .front_slab
980                .sections_offsets_in_flight
981                .len(),
982            2
983        );
984        drop(s1);
985        assert_eq!(
986            logger
987                .lock()
988                .unwrap()
989                .front_slab
990                .sections_offsets_in_flight
991                .len(),
992            1
993        );
994        drop(s2);
995        let lg = logger.lock().unwrap();
996        assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
997        assert_eq!(
998            lg.front_slab.flushed_until_offset,
999            lg.front_slab.current_global_position
1000        );
1001    }
1002
1003    #[test]
1004    fn test_write_then_read_one_section() {
1005        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1006        let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
1007        {
1008            let mut stream =
1009                stream_write(logger.clone(), UnifiedLogType::StructuredLogLine, 1024).unwrap();
1010            stream.log(&1u32).unwrap();
1011            stream.log(&2u32).unwrap();
1012            stream.log(&3u32).unwrap();
1013        }
1014        drop(logger);
1015        let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
1016            .file_base_name(&f)
1017            .build()
1018            .expect("Failed to build logger")
1019        else {
1020            panic!("Failed to build logger");
1021        };
1022        let section = dl
1023            .read_next_section_type(UnifiedLogType::StructuredLogLine)
1024            .expect("Failed to read section");
1025        assert!(section.is_some());
1026        let section = section.unwrap();
1027        let mut reader = SliceReader::new(&section[..]);
1028        let v1: u32 = decode_from_reader(&mut reader, standard()).unwrap();
1029        let v2: u32 = decode_from_reader(&mut reader, standard()).unwrap();
1030        let v3: u32 = decode_from_reader(&mut reader, standard()).unwrap();
1031        assert_eq!(v1, 1);
1032        assert_eq!(v2, 2);
1033        assert_eq!(v3, 3);
1034    }
1035
1036    /// Mimic a basic CopperList implementation.
1037
1038    #[derive(Debug, Encode, Decode)]
1039    enum CopperListStateMock {
1040        Free,
1041        ProcessingTasks,
1042        BeingSerialized,
1043    }
1044
1045    #[derive(Encode, Decode)]
1046    struct CopperList<P: bincode::enc::Encode> {
1047        state: CopperListStateMock,
1048        payload: P, // This is generated from the runtime.
1049    }
1050
1051    #[test]
1052    fn test_copperlist_list_like_logging() {
1053        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1054        let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
1055        {
1056            let mut stream =
1057                stream_write(logger.clone(), UnifiedLogType::CopperList, 1024).unwrap();
1058            let cl0 = CopperList {
1059                state: CopperListStateMock::Free,
1060                payload: (1u32, 2u32, 3u32),
1061            };
1062            let cl1 = CopperList {
1063                state: CopperListStateMock::ProcessingTasks,
1064                payload: (4u32, 5u32, 6u32),
1065            };
1066            stream.log(&cl0).unwrap();
1067            stream.log(&cl1).unwrap();
1068        }
1069        drop(logger);
1070
1071        let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
1072            .file_base_name(&f)
1073            .build()
1074            .expect("Failed to build logger")
1075        else {
1076            panic!("Failed to build logger");
1077        };
1078        let section = dl
1079            .read_next_section_type(UnifiedLogType::CopperList)
1080            .expect("Failed to read section");
1081        assert!(section.is_some());
1082        let section = section.unwrap();
1083
1084        let mut reader = SliceReader::new(&section[..]);
1085        let cl0: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
1086        let cl1: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
1087        assert_eq!(cl0.payload.1, 2);
1088        assert_eq!(cl1.payload.2, 6);
1089    }
1090
1091    #[test]
1092    fn test_multi_slab_end2end() {
1093        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1094        let (logger, f) = make_a_logger(&tmp_dir, SMALL_SLAB);
1095        {
1096            let mut stream =
1097                stream_write(logger.clone(), UnifiedLogType::CopperList, 1024).unwrap();
1098            let cl0 = CopperList {
1099                state: CopperListStateMock::Free,
1100                payload: (1u32, 2u32, 3u32),
1101            };
1102            // large enough so we are sure to create a few slabs
1103            for _ in 0..10000 {
1104                stream.log(&cl0).unwrap();
1105            }
1106        }
1107        drop(logger);
1108
1109        let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
1110            .file_base_name(&f)
1111            .build()
1112            .expect("Failed to build logger")
1113        else {
1114            panic!("Failed to build logger");
1115        };
1116        let mut total_readback = 0;
1117        loop {
1118            let section = dl.read_next_section_type(UnifiedLogType::CopperList);
1119            if section.is_err() {
1120                break;
1121            }
1122            let section = section.unwrap();
1123            if section.is_none() {
1124                break;
1125            }
1126            let section = section.unwrap();
1127
1128            let mut reader = SliceReader::new(&section[..]);
1129            loop {
1130                let maybe_cl: Result<CopperList<(u32, u32, u32)>, _> =
1131                    decode_from_reader(&mut reader, standard());
1132                if maybe_cl.is_ok() {
1133                    total_readback += 1;
1134                } else {
1135                    break;
1136                }
1137            }
1138        }
1139        assert_eq!(total_readback, 10000);
1140    }
1141}