cu29_unifiedlog/
memmap.rs

1//! This is the memory map file implementation for the unified logger for Copper.
2//! It is std only.
3
4use crate::{
5    AllocatedSection, MainHeader, SectionHandle, SectionHeader, SectionStorage, UnifiedLogRead,
6    UnifiedLogStatus, UnifiedLogWrite, MAIN_MAGIC, SECTION_MAGIC,
7};
8
9#[cfg(feature = "compact")]
10use crate::SECTION_HEADER_COMPACT_SIZE;
11
12use bincode::config::standard;
13use bincode::error::EncodeError;
14use bincode::{decode_from_slice, encode_into_slice, Encode};
15use core::slice::from_raw_parts_mut;
16use cu29_traits::{CuError, CuResult, UnifiedLogType};
17use memmap2::{Mmap, MmapMut};
18use std::fs::{File, OpenOptions};
19use std::io::Read;
20use std::mem::ManuallyDrop;
21use std::path::{Path, PathBuf};
22use std::{io, mem};
23use AllocatedSection::Section;
24
25pub struct MmapSectionStorage {
26    buffer: &'static mut [u8],
27    offset: usize,
28    block_size: usize,
29}
30
31impl MmapSectionStorage {
32    pub fn new(buffer: &'static mut [u8], block_size: usize) -> Self {
33        Self {
34            buffer,
35            offset: 0,
36            block_size,
37        }
38    }
39
40    pub fn buffer_ptr(&self) -> *const u8 {
41        &self.buffer[0] as *const u8
42    }
43}
44
45impl SectionStorage for MmapSectionStorage {
46    fn initialize<E: Encode>(&mut self, header: &E) -> Result<usize, EncodeError> {
47        self.post_update_header(header)?;
48        self.offset = self.block_size;
49        Ok(self.offset)
50    }
51
52    fn post_update_header<E: Encode>(&mut self, header: &E) -> Result<usize, EncodeError> {
53        encode_into_slice(header, &mut self.buffer[0..], standard())
54    }
55
56    fn append<E: Encode>(&mut self, entry: &E) -> Result<usize, EncodeError> {
57        let size = encode_into_slice(entry, &mut self.buffer[self.offset..], standard())?;
58        self.offset += size;
59        Ok(size)
60    }
61
62    fn flush(&mut self) -> CuResult<usize> {
63        todo!()
64    }
65}
66
67///
68/// Holds the read or write side of the datalogger.
69pub enum MmapUnifiedLogger {
70    Read(MmapUnifiedLoggerRead),
71    Write(MmapUnifiedLoggerWrite),
72}
73
74/// Use this builder to create a new DataLogger.
75pub struct MmapUnifiedLoggerBuilder {
76    file_base_name: Option<PathBuf>,
77    preallocated_size: Option<usize>,
78    write: bool,
79    create: bool,
80}
81
82impl Default for MmapUnifiedLoggerBuilder {
83    fn default() -> Self {
84        Self::new()
85    }
86}
87
88impl MmapUnifiedLoggerBuilder {
89    pub fn new() -> Self {
90        Self {
91            file_base_name: None,
92            preallocated_size: None,
93            write: false,
94            create: false, // This is the safest default
95        }
96    }
97
98    /// If "something/toto.copper" is given, it will find or create "something/toto_0.copper",  "something/toto_1.copper" etc.
99    pub fn file_base_name(mut self, file_path: &Path) -> Self {
100        self.file_base_name = Some(file_path.to_path_buf());
101        self
102    }
103
104    pub fn preallocated_size(mut self, preallocated_size: usize) -> Self {
105        self.preallocated_size = Some(preallocated_size);
106        self
107    }
108
109    pub fn write(mut self, write: bool) -> Self {
110        self.write = write;
111        self
112    }
113
114    pub fn create(mut self, create: bool) -> Self {
115        self.create = create;
116        self
117    }
118
119    pub fn build(self) -> io::Result<MmapUnifiedLogger> {
120        let page_size = page_size::get();
121
122        if self.write && self.create {
123            let ulw = MmapUnifiedLoggerWrite::new(
124                &self
125                    .file_base_name
126                    .expect("This unified logger has no filename."),
127                self.preallocated_size
128                    .expect("This unified logger has no preallocated size."),
129                page_size,
130            );
131
132            Ok(MmapUnifiedLogger::Write(ulw))
133        } else {
134            let file_path = self.file_base_name.ok_or_else(|| {
135                io::Error::new(io::ErrorKind::InvalidInput, "File path is required")
136            })?;
137            let ulr = MmapUnifiedLoggerRead::new(&file_path)?;
138            Ok(MmapUnifiedLogger::Read(ulr))
139        }
140    }
141}
142
143struct SlabEntry {
144    file: File,
145    mmap_buffer: ManuallyDrop<MmapMut>,
146    current_global_position: usize,
147    sections_offsets_in_flight: Vec<usize>,
148    flushed_until_offset: usize,
149    page_size: usize,
150}
151
152impl Drop for SlabEntry {
153    fn drop(&mut self) {
154        self.flush_until(self.current_global_position);
155        unsafe { ManuallyDrop::drop(&mut self.mmap_buffer) };
156        self.file
157            .set_len(self.current_global_position as u64)
158            .expect("Failed to trim datalogger file");
159
160        if !self.sections_offsets_in_flight.is_empty() {
161            eprintln!("Error: Slab not full flushed.");
162        }
163    }
164}
165
166impl SlabEntry {
167    fn new(file: File, page_size: usize) -> Self {
168        let mmap_buffer =
169            ManuallyDrop::new(unsafe { MmapMut::map_mut(&file).expect("Failed to map file") });
170        Self {
171            file,
172            mmap_buffer,
173            current_global_position: 0,
174            sections_offsets_in_flight: Vec::with_capacity(16),
175            flushed_until_offset: 0,
176            page_size,
177        }
178    }
179
180    /// Unsure the underlying mmap is flush to disk until the given position.
181    fn flush_until(&mut self, until_position: usize) {
182        // This is tolerated under linux, but crashes on macos
183        if (self.flushed_until_offset == until_position) || (until_position == 0) {
184            return;
185        }
186        self.mmap_buffer
187            .flush_async_range(
188                self.flushed_until_offset,
189                until_position - self.flushed_until_offset,
190            )
191            .expect("Failed to flush memory map");
192        self.flushed_until_offset = until_position;
193    }
194
195    fn is_it_my_section(&self, section: &SectionHandle<MmapSectionStorage>) -> bool {
196        let storage = section.get_storage();
197        let ptr = storage.buffer_ptr();
198        (ptr >= self.mmap_buffer.as_ptr())
199            && (ptr as usize)
200                < (self.mmap_buffer.as_ref().as_ptr() as usize + self.mmap_buffer.as_ref().len())
201    }
202
203    /// Flush the section to disk.
204    /// the flushing is permanent and the section is considered closed.
205    fn flush_section(&mut self, section: &mut SectionHandle<MmapSectionStorage>) {
206        section
207            .post_update_header()
208            .expect("Failed to update section header");
209
210        let storage = section.get_storage();
211        let ptr = storage.buffer_ptr();
212
213        if ptr < self.mmap_buffer.as_ptr()
214            || ptr as usize > self.mmap_buffer.as_ptr() as usize + self.mmap_buffer.len()
215        {
216            panic!("Invalid section buffer, not in the slab");
217        }
218
219        let base = self.mmap_buffer.as_ptr() as usize;
220        self.sections_offsets_in_flight
221            .retain(|&x| x != ptr as usize - base);
222
223        if self.sections_offsets_in_flight.is_empty() {
224            self.flush_until(self.current_global_position);
225            return;
226        }
227        if self.flushed_until_offset < self.sections_offsets_in_flight[0] {
228            self.flush_until(self.sections_offsets_in_flight[0]);
229        }
230    }
231
232    #[inline]
233    fn align_to_next_page(&self, ptr: usize) -> usize {
234        (ptr + self.page_size - 1) & !(self.page_size - 1)
235    }
236
237    /// The returned slice is section_size or greater.
238    fn add_section(
239        &mut self,
240        entry_type: UnifiedLogType,
241        requested_section_size: usize,
242    ) -> AllocatedSection<MmapSectionStorage> {
243        // align current_position to the next page
244        self.current_global_position = self.align_to_next_page(self.current_global_position);
245        let section_size = self.align_to_next_page(requested_section_size) as u32;
246
247        // We need to have enough space to store the section in that slab
248        if self.current_global_position + section_size as usize > self.mmap_buffer.len() {
249            return AllocatedSection::NoMoreSpace;
250        }
251
252        #[cfg(feature = "compact")]
253        let block_size = SECTION_HEADER_COMPACT_SIZE;
254
255        #[cfg(not(feature = "compact"))]
256        let block_size = self.page_size as u16;
257
258        let section_header = SectionHeader {
259            magic: SECTION_MAGIC,
260            block_size,
261            entry_type,
262            offset_to_next_section: section_size,
263            used: 0u32,
264        };
265
266        // save the position to keep track for in flight sections
267        self.sections_offsets_in_flight
268            .push(self.current_global_position);
269        let end_of_section = self.current_global_position + requested_section_size;
270        let user_buffer = &mut self.mmap_buffer[self.current_global_position..end_of_section];
271
272        // here we have the guarantee for exclusive access to that memory for the lifetime of the handle, the borrow checker cannot understand that ever.
273        let handle_buffer =
274            unsafe { from_raw_parts_mut(user_buffer.as_mut_ptr(), user_buffer.len()) };
275        let storage = MmapSectionStorage::new(handle_buffer, block_size as usize);
276
277        self.current_global_position = end_of_section;
278
279        Section(SectionHandle::create(section_header, storage).expect("Failed to create section"))
280    }
281
282    #[cfg(test)]
283    fn used(&self) -> usize {
284        self.current_global_position
285    }
286}
287
288/// A write side of the datalogger.
289pub struct MmapUnifiedLoggerWrite {
290    /// the front slab is the current active slab for any new section.
291    front_slab: SlabEntry,
292    /// the back slab is the previous slab that is being flushed.
293    back_slabs: Vec<SlabEntry>,
294    /// base file path to create the backing files from.
295    base_file_path: PathBuf,
296    /// allocation size for the backing files.
297    slab_size: usize,
298    /// current suffix for the backing files.
299    front_slab_suffix: usize,
300}
301
302fn build_slab_path(base_file_path: &Path, slab_index: usize) -> PathBuf {
303    let mut file_path = base_file_path.to_path_buf();
304    let file_name = file_path
305        .file_name()
306        .expect("Invalid base file path")
307        .to_str()
308        .expect("Could not translate the filename OsStr to str");
309    let mut file_name = file_name.split('.').collect::<Vec<&str>>();
310    let extension = file_name.pop().expect("Could not find the file extension.");
311    let file_name = file_name.join(".");
312    let file_name = format!("{file_name}_{slab_index}.{extension}");
313    file_path.set_file_name(file_name);
314    file_path
315}
316
317fn make_slab_file(base_file_path: &Path, slab_size: usize, slab_suffix: usize) -> File {
318    let file_path = build_slab_path(base_file_path, slab_suffix);
319    let file = OpenOptions::new()
320        .read(true)
321        .write(true)
322        .create(true)
323        .truncate(true)
324        .open(&file_path)
325        .unwrap_or_else(|_| panic!("Failed to open file: {}", file_path.display()));
326    file.set_len(slab_size as u64)
327        .expect("Failed to set file length");
328    file
329}
330
331impl UnifiedLogWrite<MmapSectionStorage> for MmapUnifiedLoggerWrite {
332    /// The returned slice is section_size or greater.
333    fn add_section(
334        &mut self,
335        entry_type: UnifiedLogType,
336        requested_section_size: usize,
337    ) -> CuResult<SectionHandle<MmapSectionStorage>> {
338        self.garbage_collect_backslabs(); // Take the opportunity to keep up and close stale back slabs.
339        let maybe_section = self
340            .front_slab
341            .add_section(entry_type, requested_section_size);
342
343        match maybe_section {
344            AllocatedSection::NoMoreSpace => {
345                // move the front slab to the back slab.
346                let new_slab = SlabEntry::new(self.next_slab(), self.front_slab.page_size);
347                // keep the slab until all its sections has been flushed.
348                self.back_slabs
349                    .push(mem::replace(&mut self.front_slab, new_slab));
350                match self
351                    .front_slab
352                    .add_section(entry_type, requested_section_size)
353                {
354                    AllocatedSection::NoMoreSpace => Err(CuError::from("out of space")),
355                    Section(section) => Ok(section),
356                }
357            }
358            Section(section) => Ok(section),
359        }
360    }
361
362    fn flush_section(&mut self, section: &mut SectionHandle<MmapSectionStorage>) {
363        for slab in self.back_slabs.iter_mut() {
364            if slab.is_it_my_section(section) {
365                slab.flush_section(section);
366                return;
367            }
368        }
369        self.front_slab.flush_section(section);
370    }
371
372    fn status(&self) -> UnifiedLogStatus {
373        UnifiedLogStatus {
374            total_used_space: self.front_slab.current_global_position,
375            total_allocated_space: self.slab_size * self.front_slab_suffix,
376        }
377    }
378}
379
380impl MmapUnifiedLoggerWrite {
381    fn next_slab(&mut self) -> File {
382        self.front_slab_suffix += 1;
383
384        make_slab_file(&self.base_file_path, self.slab_size, self.front_slab_suffix)
385    }
386
387    fn new(base_file_path: &Path, slab_size: usize, page_size: usize) -> Self {
388        let file = make_slab_file(base_file_path, slab_size, 0);
389        let mut front_slab = SlabEntry::new(file, page_size);
390
391        // This is the first slab so add the main header.
392        let main_header = MainHeader {
393            magic: MAIN_MAGIC,
394            first_section_offset: page_size as u16,
395            page_size: page_size as u16,
396        };
397        let nb_bytes = encode_into_slice(&main_header, &mut front_slab.mmap_buffer[..], standard())
398            .expect("Failed to encode main header");
399        assert!(nb_bytes < page_size);
400        front_slab.current_global_position = page_size; // align to the next page
401
402        Self {
403            front_slab,
404            back_slabs: Vec::new(),
405            base_file_path: base_file_path.to_path_buf(),
406            slab_size,
407            front_slab_suffix: 0,
408        }
409    }
410
411    fn garbage_collect_backslabs(&mut self) {
412        self.back_slabs
413            .retain_mut(|slab| !slab.sections_offsets_in_flight.is_empty());
414    }
415
416    pub fn stats(&self) -> (usize, Vec<usize>, usize) {
417        (
418            self.front_slab.current_global_position,
419            self.front_slab.sections_offsets_in_flight.clone(),
420            self.back_slabs.len(),
421        )
422    }
423}
424
425impl Drop for MmapUnifiedLoggerWrite {
426    fn drop(&mut self) {
427        #[cfg(debug_assertions)]
428        eprintln!("Flushing the unified Logger ... "); // Note this cannot be a structured log writing in this log.
429
430        let section = self.add_section(
431            UnifiedLogType::LastEntry,
432            SECTION_HEADER_COMPACT_SIZE as usize, // this is the absolute minimum size of a section.
433        );
434        match section {
435            Ok(mut section) => {
436                self.front_slab.flush_section(&mut section);
437                self.garbage_collect_backslabs();
438                #[cfg(debug_assertions)]
439                eprintln!("Unified Logger flushed."); // Note this cannot be a structured log writing in this log.
440            }
441            Err(e) => {
442                panic!("Failed to flush the unified logger: {}", e);
443            }
444        }
445    }
446}
447
448fn open_slab_index(
449    base_file_path: &Path,
450    slab_index: usize,
451) -> io::Result<(File, Mmap, u16, Option<MainHeader>)> {
452    let mut options = OpenOptions::new();
453    let options = options.read(true);
454
455    let file_path = build_slab_path(base_file_path, slab_index);
456    let file = options.open(file_path)?;
457    let mmap = unsafe { Mmap::map(&file) }?;
458    let mut prolog = 0u16;
459    let mut maybe_main_header: Option<MainHeader> = None;
460    if slab_index == 0 {
461        let main_header: MainHeader;
462        let _read: usize;
463        (main_header, _read) =
464            decode_from_slice(&mmap[..], standard()).expect("Failed to decode main header");
465        if main_header.magic != MAIN_MAGIC {
466            return Err(io::Error::new(
467                io::ErrorKind::InvalidData,
468                "Invalid magic number in main header",
469            ));
470        }
471        prolog = main_header.first_section_offset;
472        maybe_main_header = Some(main_header);
473    }
474    Ok((file, mmap, prolog, maybe_main_header))
475}
476
477/// A read side of the memory map based unified logger.
478pub struct MmapUnifiedLoggerRead {
479    base_file_path: PathBuf,
480    main_header: MainHeader,
481    current_mmap_buffer: Mmap,
482    current_file: File,
483    current_slab_index: usize,
484    current_reading_position: usize,
485}
486
487impl UnifiedLogRead for MmapUnifiedLoggerRead {
488    fn read_next_section_type(&mut self, datalogtype: UnifiedLogType) -> CuResult<Option<Vec<u8>>> {
489        // TODO: eventually implement a 0 copy of this too.
490        loop {
491            if self.current_reading_position >= self.current_mmap_buffer.len() {
492                self.next_slab().map_err(|e| {
493                    CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
494                })?;
495            }
496
497            let header_result = self.read_section_header();
498            let header = header_result.map_err(|error| {
499                CuError::new_with_cause(
500                    &format!(
501                        "Could not read a sections header: {}/{}:{}",
502                        self.base_file_path.as_os_str().to_string_lossy(),
503                        self.current_slab_index,
504                        self.current_reading_position,
505                    ),
506                    error,
507                )
508            })?;
509
510            // Reached the end of file
511            if header.entry_type == UnifiedLogType::LastEntry {
512                return Ok(None);
513            }
514
515            // Found a section of the requested type
516            if header.entry_type == datalogtype {
517                let result = Some(self.read_section_content(&header)?);
518                self.current_reading_position += header.offset_to_next_section as usize;
519                return Ok(result);
520            }
521
522            // Keep reading until we find the requested type
523            self.current_reading_position += header.offset_to_next_section as usize;
524        }
525    }
526
527    /// Reads the section from the section header pos.
528    fn raw_read_section(&mut self) -> CuResult<(SectionHeader, Vec<u8>)> {
529        if self.current_reading_position >= self.current_mmap_buffer.len() {
530            self.next_slab().map_err(|e| {
531                CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
532            })?;
533        }
534
535        let read_result = self.read_section_header();
536
537        match read_result {
538            Err(error) => Err(CuError::new_with_cause(
539                &format!(
540                    "Could not read a sections header: {}/{}:{}",
541                    self.base_file_path.as_os_str().to_string_lossy(),
542                    self.current_slab_index,
543                    self.current_reading_position,
544                ),
545                error,
546            )),
547            Ok(header) => {
548                let data = self.read_section_content(&header)?;
549                self.current_reading_position += header.offset_to_next_section as usize;
550                Ok((header, data))
551            }
552        }
553    }
554}
555
556impl MmapUnifiedLoggerRead {
557    pub fn new(base_file_path: &Path) -> io::Result<Self> {
558        let (file, mmap, prolog, header) = open_slab_index(base_file_path, 0)?;
559
560        Ok(Self {
561            base_file_path: base_file_path.to_path_buf(),
562            main_header: header.expect("UnifiedLoggerRead needs a header"),
563            current_file: file,
564            current_mmap_buffer: mmap,
565            current_slab_index: 0,
566            current_reading_position: prolog as usize,
567        })
568    }
569
570    fn next_slab(&mut self) -> io::Result<()> {
571        self.current_slab_index += 1;
572        let (file, mmap, prolog, _) =
573            open_slab_index(&self.base_file_path, self.current_slab_index)?;
574        self.current_file = file;
575        self.current_mmap_buffer = mmap;
576        self.current_reading_position = prolog as usize;
577        Ok(())
578    }
579
580    pub fn raw_main_header(&self) -> &MainHeader {
581        &self.main_header
582    }
583
584    /// Reads the section content from the section header pos.
585    fn read_section_content(&mut self, header: &SectionHeader) -> CuResult<Vec<u8>> {
586        // TODO: we could optimize by asking the buffer to fill
587        let mut section_data = vec![0; header.used as usize];
588        let start_of_data = self.current_reading_position + header.block_size as usize;
589        section_data.copy_from_slice(
590            &self.current_mmap_buffer[start_of_data..start_of_data + header.used as usize],
591        );
592
593        Ok(section_data)
594    }
595
596    fn read_section_header(&mut self) -> CuResult<SectionHeader> {
597        let section_header: SectionHeader;
598        (section_header, _) = decode_from_slice(
599            &self.current_mmap_buffer[self.current_reading_position..],
600            standard(),
601        )
602        .map_err(|e| {
603            CuError::new_with_cause(
604                &format!(
605                    "Could not read a sections header: {}/{}:{}",
606                    self.base_file_path.as_os_str().to_string_lossy(),
607                    self.current_slab_index,
608                    self.current_reading_position,
609                ),
610                e,
611            )
612        })?;
613        if section_header.magic != SECTION_MAGIC {
614            return Err("Invalid magic number in section header".into());
615        }
616
617        Ok(section_header)
618    }
619}
620
621/// This a convenience wrapper around the UnifiedLoggerRead to implement the Read trait.
622pub struct UnifiedLoggerIOReader {
623    logger: MmapUnifiedLoggerRead,
624    log_type: UnifiedLogType,
625    buffer: Vec<u8>,
626    buffer_pos: usize,
627}
628
629impl UnifiedLoggerIOReader {
630    pub fn new(logger: MmapUnifiedLoggerRead, log_type: UnifiedLogType) -> Self {
631        Self {
632            logger,
633            log_type,
634            buffer: Vec::new(),
635            buffer_pos: 0,
636        }
637    }
638
639    /// returns true if there is more data to read.
640    fn fill_buffer(&mut self) -> io::Result<bool> {
641        match self.logger.read_next_section_type(self.log_type) {
642            Ok(Some(section)) => {
643                self.buffer = section;
644                self.buffer_pos = 0;
645                Ok(true)
646            }
647            Ok(None) => Ok(false), // No more sections of this type
648            Err(e) => Err(io::Error::other(e.to_string())),
649        }
650    }
651}
652
653impl Read for UnifiedLoggerIOReader {
654    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
655        if self.buffer_pos >= self.buffer.len() && !self.fill_buffer()? {
656            // This means we hit the last section.
657            return Ok(0);
658        }
659
660        // If we still have no data after trying to fill the buffer, we're at EOF
661        if self.buffer_pos >= self.buffer.len() {
662            return Ok(0);
663        }
664
665        // Copy as much as we can from the buffer to `buf`
666        let len = std::cmp::min(buf.len(), self.buffer.len() - self.buffer_pos);
667        buf[..len].copy_from_slice(&self.buffer[self.buffer_pos..self.buffer_pos + len]);
668        self.buffer_pos += len;
669        Ok(len)
670    }
671}
672
673#[cfg(feature = "std")]
674#[cfg(test)]
675mod tests {
676    use super::*;
677    use crate::stream_write;
678    use bincode::de::read::SliceReader;
679    use bincode::{decode_from_reader, Decode, Encode};
680    use cu29_traits::WriteStream;
681    use std::path::PathBuf;
682    use std::sync::{Arc, Mutex};
683    use tempfile::TempDir;
684
685    const LARGE_SLAB: usize = 100 * 1024; // 100KB
686    const SMALL_SLAB: usize = 16 * 2 * 1024; // 16KB is the page size on MacOS for example
687
688    fn make_a_logger(
689        tmp_dir: &TempDir,
690        slab_size: usize,
691    ) -> (Arc<Mutex<MmapUnifiedLoggerWrite>>, PathBuf) {
692        let file_path = tmp_dir.path().join("test.bin");
693        let MmapUnifiedLogger::Write(data_logger) = MmapUnifiedLoggerBuilder::new()
694            .write(true)
695            .create(true)
696            .file_base_name(&file_path)
697            .preallocated_size(slab_size)
698            .build()
699            .expect("Failed to create logger")
700        else {
701            panic!("Failed to create logger")
702        };
703
704        (Arc::new(Mutex::new(data_logger)), file_path)
705    }
706
707    #[test]
708    fn test_truncation_and_sections_creations() {
709        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
710        let file_path = tmp_dir.path().join("test.bin");
711        let _used = {
712            let MmapUnifiedLogger::Write(mut logger) = MmapUnifiedLoggerBuilder::new()
713                .write(true)
714                .create(true)
715                .file_base_name(&file_path)
716                .preallocated_size(100000)
717                .build()
718                .expect("Failed to create logger")
719            else {
720                panic!("Failed to create logger")
721            };
722            logger
723                .add_section(UnifiedLogType::StructuredLogLine, 1024)
724                .unwrap();
725            logger
726                .add_section(UnifiedLogType::CopperList, 2048)
727                .unwrap();
728            let used = logger.front_slab.used();
729            assert!(used < 4 * page_size::get()); // ie. 3 headers, 1 page max per
730                                                  // logger drops
731
732            used
733        };
734
735        let _file = OpenOptions::new()
736            .read(true)
737            .open(tmp_dir.path().join("test_0.bin"))
738            .expect("Could not reopen the file");
739        // Check if we have correctly truncated the file
740        // TODO: recompute this math
741        //assert_eq!(
742        //    file.metadata().unwrap().len(),
743        //    (used + size_of::<SectionHeader>()) as u64
744        //);
745    }
746
747    #[test]
748    fn test_one_section_self_cleaning() {
749        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
750        let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
751        {
752            let _stream = stream_write::<(), MmapSectionStorage>(
753                logger.clone(),
754                UnifiedLogType::StructuredLogLine,
755                1024,
756            );
757            assert_eq!(
758                logger
759                    .lock()
760                    .unwrap()
761                    .front_slab
762                    .sections_offsets_in_flight
763                    .len(),
764                1
765            );
766        }
767        assert_eq!(
768            logger
769                .lock()
770                .unwrap()
771                .front_slab
772                .sections_offsets_in_flight
773                .len(),
774            0
775        );
776        let logger = logger.lock().unwrap();
777        assert_eq!(
778            logger.front_slab.flushed_until_offset,
779            logger.front_slab.current_global_position
780        );
781    }
782
783    #[test]
784    fn test_two_sections_self_cleaning_in_order() {
785        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
786        let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
787        let s1 = stream_write::<(), MmapSectionStorage>(
788            logger.clone(),
789            UnifiedLogType::StructuredLogLine,
790            1024,
791        );
792        assert_eq!(
793            logger
794                .lock()
795                .unwrap()
796                .front_slab
797                .sections_offsets_in_flight
798                .len(),
799            1
800        );
801        let s2 = stream_write::<(), MmapSectionStorage>(
802            logger.clone(),
803            UnifiedLogType::StructuredLogLine,
804            1024,
805        );
806        assert_eq!(
807            logger
808                .lock()
809                .unwrap()
810                .front_slab
811                .sections_offsets_in_flight
812                .len(),
813            2
814        );
815        drop(s2);
816        assert_eq!(
817            logger
818                .lock()
819                .unwrap()
820                .front_slab
821                .sections_offsets_in_flight
822                .len(),
823            1
824        );
825        drop(s1);
826        let lg = logger.lock().unwrap();
827        assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
828        assert_eq!(
829            lg.front_slab.flushed_until_offset,
830            lg.front_slab.current_global_position
831        );
832    }
833
834    #[test]
835    fn test_two_sections_self_cleaning_out_of_order() {
836        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
837        let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
838        let s1 = stream_write::<(), MmapSectionStorage>(
839            logger.clone(),
840            UnifiedLogType::StructuredLogLine,
841            1024,
842        );
843        assert_eq!(
844            logger
845                .lock()
846                .unwrap()
847                .front_slab
848                .sections_offsets_in_flight
849                .len(),
850            1
851        );
852        let s2 = stream_write::<(), MmapSectionStorage>(
853            logger.clone(),
854            UnifiedLogType::StructuredLogLine,
855            1024,
856        );
857        assert_eq!(
858            logger
859                .lock()
860                .unwrap()
861                .front_slab
862                .sections_offsets_in_flight
863                .len(),
864            2
865        );
866        drop(s1);
867        assert_eq!(
868            logger
869                .lock()
870                .unwrap()
871                .front_slab
872                .sections_offsets_in_flight
873                .len(),
874            1
875        );
876        drop(s2);
877        let lg = logger.lock().unwrap();
878        assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
879        assert_eq!(
880            lg.front_slab.flushed_until_offset,
881            lg.front_slab.current_global_position
882        );
883    }
884
885    #[test]
886    fn test_write_then_read_one_section() {
887        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
888        let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
889        {
890            let mut stream =
891                stream_write(logger.clone(), UnifiedLogType::StructuredLogLine, 1024).unwrap();
892            stream.log(&1u32).unwrap();
893            stream.log(&2u32).unwrap();
894            stream.log(&3u32).unwrap();
895        }
896        drop(logger);
897        let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
898            .file_base_name(&f)
899            .build()
900            .expect("Failed to build logger")
901        else {
902            panic!("Failed to build logger");
903        };
904        let section = dl
905            .read_next_section_type(UnifiedLogType::StructuredLogLine)
906            .expect("Failed to read section");
907        assert!(section.is_some());
908        let section = section.unwrap();
909        let mut reader = SliceReader::new(&section[..]);
910        let v1: u32 = decode_from_reader(&mut reader, standard()).unwrap();
911        let v2: u32 = decode_from_reader(&mut reader, standard()).unwrap();
912        let v3: u32 = decode_from_reader(&mut reader, standard()).unwrap();
913        assert_eq!(v1, 1);
914        assert_eq!(v2, 2);
915        assert_eq!(v3, 3);
916    }
917
918    /// Mimic a basic CopperList implementation.
919
920    #[derive(Debug, Encode, Decode)]
921    enum CopperListStateMock {
922        Free,
923        ProcessingTasks,
924        BeingSerialized,
925    }
926
927    #[derive(Encode, Decode)]
928    struct CopperList<P: bincode::enc::Encode> {
929        state: CopperListStateMock,
930        payload: P, // This is generated from the runtime.
931    }
932
933    #[test]
934    fn test_copperlist_list_like_logging() {
935        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
936        let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
937        {
938            let mut stream =
939                stream_write(logger.clone(), UnifiedLogType::CopperList, 1024).unwrap();
940            let cl0 = CopperList {
941                state: CopperListStateMock::Free,
942                payload: (1u32, 2u32, 3u32),
943            };
944            let cl1 = CopperList {
945                state: CopperListStateMock::ProcessingTasks,
946                payload: (4u32, 5u32, 6u32),
947            };
948            stream.log(&cl0).unwrap();
949            stream.log(&cl1).unwrap();
950        }
951        drop(logger);
952
953        let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
954            .file_base_name(&f)
955            .build()
956            .expect("Failed to build logger")
957        else {
958            panic!("Failed to build logger");
959        };
960        let section = dl
961            .read_next_section_type(UnifiedLogType::CopperList)
962            .expect("Failed to read section");
963        assert!(section.is_some());
964        let section = section.unwrap();
965
966        let mut reader = SliceReader::new(&section[..]);
967        let cl0: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
968        let cl1: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
969        assert_eq!(cl0.payload.1, 2);
970        assert_eq!(cl1.payload.2, 6);
971    }
972
973    #[test]
974    fn test_multi_slab_end2end() {
975        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
976        let (logger, f) = make_a_logger(&tmp_dir, SMALL_SLAB);
977        {
978            let mut stream =
979                stream_write(logger.clone(), UnifiedLogType::CopperList, 1024).unwrap();
980            let cl0 = CopperList {
981                state: CopperListStateMock::Free,
982                payload: (1u32, 2u32, 3u32),
983            };
984            // large enough so we are sure to create a few slabs
985            for _ in 0..10000 {
986                stream.log(&cl0).unwrap();
987            }
988        }
989        drop(logger);
990
991        let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
992            .file_base_name(&f)
993            .build()
994            .expect("Failed to build logger")
995        else {
996            panic!("Failed to build logger");
997        };
998        let mut total_readback = 0;
999        loop {
1000            let section = dl.read_next_section_type(UnifiedLogType::CopperList);
1001            if section.is_err() {
1002                break;
1003            }
1004            let section = section.unwrap();
1005            if section.is_none() {
1006                break;
1007            }
1008            let section = section.unwrap();
1009
1010            let mut reader = SliceReader::new(&section[..]);
1011            loop {
1012                let maybe_cl: Result<CopperList<(u32, u32, u32)>, _> =
1013                    decode_from_reader(&mut reader, standard());
1014                if maybe_cl.is_ok() {
1015                    total_readback += 1;
1016                } else {
1017                    break;
1018                }
1019            }
1020        }
1021        assert_eq!(total_readback, 10000);
1022    }
1023}