Skip to main content

cu29_unifiedlog/
memmap.rs

1//! This is the memory map file implementation for the unified logger for Copper.
2//! It is std only.
3
4use crate::{
5    AllocatedSection, MAIN_MAGIC, MainHeader, SECTION_MAGIC, SectionHandle, SectionHeader,
6    SectionStorage, UnifiedLogRead, UnifiedLogStatus, UnifiedLogWrite,
7};
8
9use crate::SECTION_HEADER_COMPACT_SIZE;
10
11use AllocatedSection::Section;
12use bincode::config::standard;
13use bincode::enc::EncoderImpl;
14use bincode::enc::write::SliceWriter;
15use bincode::error::EncodeError;
16use bincode::{Encode, decode_from_slice, encode_into_slice};
17use core::slice::from_raw_parts_mut;
18use cu29_traits::{
19    CuError, CuResult, ObservedWriter, UnifiedLogType, abort_observed_encode,
20    begin_observed_encode, finish_observed_encode,
21};
22use memmap2::{Mmap, MmapMut};
23use std::fs::{File, OpenOptions};
24use std::io::Read;
25use std::mem::ManuallyDrop;
26use std::path::{Path, PathBuf};
27use std::{io, mem};
28
29pub struct MmapSectionStorage {
30    buffer: &'static mut [u8],
31    offset: usize,
32    block_size: usize,
33}
34
35impl MmapSectionStorage {
36    pub fn new(buffer: &'static mut [u8], block_size: usize) -> Self {
37        Self {
38            buffer,
39            offset: 0,
40            block_size,
41        }
42    }
43
44    pub fn buffer_ptr(&self) -> *const u8 {
45        &self.buffer[0] as *const u8
46    }
47}
48
49impl SectionStorage for MmapSectionStorage {
50    fn initialize<E: Encode>(&mut self, header: &E) -> Result<usize, EncodeError> {
51        self.post_update_header(header)?;
52        self.offset = self.block_size;
53        Ok(self.offset)
54    }
55
56    fn post_update_header<E: Encode>(&mut self, header: &E) -> Result<usize, EncodeError> {
57        encode_into_slice(header, &mut self.buffer[0..], standard())
58    }
59
60    fn append<E: Encode>(&mut self, entry: &E) -> Result<usize, EncodeError> {
61        begin_observed_encode();
62        let result = (|| {
63            let mut encoder = EncoderImpl::new(
64                ObservedWriter::new(SliceWriter::new(&mut self.buffer[self.offset..])),
65                standard(),
66            );
67            entry.encode(&mut encoder)?;
68            Ok(encoder.into_writer().into_inner().bytes_written())
69        })();
70        let size = match result {
71            Ok(size) => {
72                debug_assert_eq!(size, finish_observed_encode());
73                size
74            }
75            Err(err) => {
76                abort_observed_encode();
77                return Err(err);
78            }
79        };
80        self.offset += size;
81        Ok(size)
82    }
83
84    fn flush(&mut self) -> CuResult<usize> {
85        // Flushing is handled at the slab level for mmap-backed storage.
86        Ok(self.offset)
87    }
88}
89
90///
91/// Holds the read or write side of the datalogger.
92pub enum MmapUnifiedLogger {
93    Read(MmapUnifiedLoggerRead),
94    Write(MmapUnifiedLoggerWrite),
95}
96
97/// Use this builder to create a new DataLogger.
98pub struct MmapUnifiedLoggerBuilder {
99    file_base_name: Option<PathBuf>,
100    preallocated_size: Option<usize>,
101    write: bool,
102    create: bool,
103}
104
105impl Default for MmapUnifiedLoggerBuilder {
106    fn default() -> Self {
107        Self::new()
108    }
109}
110
111impl MmapUnifiedLoggerBuilder {
112    pub fn new() -> Self {
113        Self {
114            file_base_name: None,
115            preallocated_size: None,
116            write: false,
117            create: false, // This is the safest default
118        }
119    }
120
121    /// If "something/toto.copper" is given, it will find or create "something/toto_0.copper",  "something/toto_1.copper" etc.
122    pub fn file_base_name(mut self, file_path: &Path) -> Self {
123        self.file_base_name = Some(file_path.to_path_buf());
124        self
125    }
126
127    pub fn preallocated_size(mut self, preallocated_size: usize) -> Self {
128        self.preallocated_size = Some(preallocated_size);
129        self
130    }
131
132    pub fn write(mut self, write: bool) -> Self {
133        self.write = write;
134        self
135    }
136
137    pub fn create(mut self, create: bool) -> Self {
138        self.create = create;
139        self
140    }
141
142    pub fn build(self) -> io::Result<MmapUnifiedLogger> {
143        let page_size = page_size::get();
144
145        if self.write && self.create {
146            let file_path = self.file_base_name.ok_or_else(|| {
147                io::Error::new(
148                    io::ErrorKind::InvalidInput,
149                    "File path is required for write mode",
150                )
151            })?;
152            let preallocated_size = self.preallocated_size.ok_or_else(|| {
153                io::Error::new(
154                    io::ErrorKind::InvalidInput,
155                    "Preallocated size is required for write mode",
156                )
157            })?;
158            let ulw = MmapUnifiedLoggerWrite::new(&file_path, preallocated_size, page_size)?;
159            Ok(MmapUnifiedLogger::Write(ulw))
160        } else {
161            let file_path = self.file_base_name.ok_or_else(|| {
162                io::Error::new(io::ErrorKind::InvalidInput, "File path is required")
163            })?;
164            let ulr = MmapUnifiedLoggerRead::new(&file_path)?;
165            Ok(MmapUnifiedLogger::Read(ulr))
166        }
167    }
168}
169
170struct SlabEntry {
171    file: File,
172    mmap_buffer: ManuallyDrop<MmapMut>,
173    current_global_position: usize,
174    sections_offsets_in_flight: Vec<usize>,
175    flushed_until_offset: usize,
176    page_size: usize,
177    temporary_end_marker: Option<usize>,
178    #[cfg(test)]
179    closed_sections: Vec<(usize, usize)>,
180    #[cfg(test)]
181    flushed_ranges: Vec<(usize, usize)>,
182    #[cfg(all(test, feature = "mmap-fsync"))]
183    sync_call_count: usize,
184}
185
186impl Drop for SlabEntry {
187    fn drop(&mut self) {
188        self.flush_until(self.current_global_position);
189        // SAFETY: We own the mapping and must drop it before trimming the file.
190        unsafe { ManuallyDrop::drop(&mut self.mmap_buffer) };
191        if let Err(error) = self.file.set_len(self.current_global_position as u64) {
192            eprintln!("Failed to trim datalogger file: {}", error);
193        }
194        self.sync_file();
195
196        if !self.sections_offsets_in_flight.is_empty() {
197            eprintln!("Error: Slab not full flushed.");
198        }
199    }
200}
201
202impl SlabEntry {
203    fn new(file: File, page_size: usize) -> io::Result<Self> {
204        let mmap_buffer = ManuallyDrop::new(
205            // SAFETY: The file descriptor is valid and mapping is confined to this struct.
206            unsafe { MmapMut::map_mut(&file) }
207                .map_err(|e| io::Error::new(e.kind(), format!("Failed to map file: {e}")))?,
208        );
209        Ok(Self {
210            file,
211            mmap_buffer,
212            current_global_position: 0,
213            sections_offsets_in_flight: Vec::with_capacity(16),
214            flushed_until_offset: 0,
215            page_size,
216            temporary_end_marker: None,
217            #[cfg(test)]
218            closed_sections: Vec::new(),
219            #[cfg(test)]
220            flushed_ranges: Vec::new(),
221            #[cfg(all(test, feature = "mmap-fsync"))]
222            sync_call_count: 0,
223        })
224    }
225
226    fn flush_range(&mut self, start: usize, len: usize) {
227        if len == 0 {
228            return;
229        }
230        self.mmap_buffer
231            .flush_async_range(start, len)
232            .expect("Failed to flush memory map");
233        self.sync_file();
234        #[cfg(test)]
235        self.record_flushed_range(start, len);
236    }
237
238    fn sync_file(&mut self) {
239        #[cfg(feature = "mmap-fsync")]
240        {
241            self.file.sync_all().expect("Failed to fsync log file");
242            #[cfg(test)]
243            {
244                self.sync_call_count += 1;
245            }
246        }
247    }
248    /// Unsure the underlying mmap is flush to disk until the given position.
249    fn flush_until(&mut self, until_position: usize) {
250        // This is tolerated under linux, but crashes on macos
251        if (self.flushed_until_offset == until_position) || (until_position == 0) {
252            return;
253        }
254        self.flush_range(
255            self.flushed_until_offset,
256            until_position - self.flushed_until_offset,
257        );
258        self.flushed_until_offset = until_position;
259    }
260
261    fn clear_temporary_end_marker(&mut self) {
262        if let Some(marker_start) = self.temporary_end_marker.take() {
263            self.current_global_position = marker_start;
264            if self.flushed_until_offset > marker_start {
265                self.flushed_until_offset = marker_start;
266            }
267        }
268    }
269
270    fn write_end_marker(&mut self, temporary: bool) -> CuResult<()> {
271        let block_size = SECTION_HEADER_COMPACT_SIZE as usize;
272        let marker_start = self.align_to_next_page(self.current_global_position);
273        let total_marker_size = block_size; // header only
274        let marker_end = marker_start + total_marker_size;
275        if marker_end > self.mmap_buffer.len() {
276            return Err("Not enough space to write end-of-log marker".into());
277        }
278
279        let header = SectionHeader {
280            magic: SECTION_MAGIC,
281            block_size: SECTION_HEADER_COMPACT_SIZE,
282            entry_type: UnifiedLogType::LastEntry,
283            offset_to_next_section: total_marker_size as u32,
284            used: 0,
285            is_open: temporary,
286        };
287
288        encode_into_slice(
289            &header,
290            &mut self.mmap_buffer
291                [marker_start..marker_start + SECTION_HEADER_COMPACT_SIZE as usize],
292            standard(),
293        )
294        .map_err(|e| CuError::new_with_cause("Failed to encode end-of-log header", e))?;
295
296        self.temporary_end_marker = Some(marker_start);
297        self.current_global_position = marker_end;
298        Ok(())
299    }
300
301    fn is_it_my_section(&self, section: &SectionHandle<MmapSectionStorage>) -> bool {
302        let storage = section.get_storage();
303        let ptr = storage.buffer_ptr();
304        (ptr >= self.mmap_buffer.as_ptr())
305            && (ptr as usize)
306                < (self.mmap_buffer.as_ref().as_ptr() as usize + self.mmap_buffer.as_ref().len())
307    }
308
309    /// Flush the section to disk.
310    /// the flushing is permanent and the section is considered closed.
311    fn flush_section(&mut self, section: &mut SectionHandle<MmapSectionStorage>) {
312        section
313            .post_update_header()
314            .expect("Failed to update section header");
315
316        let storage = section.get_storage();
317        let ptr = storage.buffer_ptr();
318
319        if ptr < self.mmap_buffer.as_ptr()
320            || ptr as usize > self.mmap_buffer.as_ptr() as usize + self.mmap_buffer.len()
321        {
322            panic!("Invalid section buffer, not in the slab");
323        }
324
325        let base = self.mmap_buffer.as_ptr() as usize;
326        let section_start = ptr as usize - base;
327        let section_len = section.header.offset_to_next_section as usize;
328        #[cfg(test)]
329        self.record_closed_section(section_start, section_len);
330        self.sections_offsets_in_flight
331            .retain(|&x| x != section_start);
332
333        if self.sections_offsets_in_flight.is_empty() {
334            self.flush_until(self.current_global_position);
335            return;
336        }
337        let next_open_offset = self.sections_offsets_in_flight[0];
338        if self.flushed_until_offset < next_open_offset {
339            self.flush_until(next_open_offset);
340        }
341        if section_start + section_len > self.flushed_until_offset {
342            // A long-lived early section can otherwise pin later closed sections
343            // behind the prefix cursor until shutdown.
344            self.flush_range(section_start, section_len);
345        }
346    }
347
348    #[cfg(test)]
349    fn record_closed_section(&mut self, start: usize, len: usize) {
350        self.closed_sections.push((start, len));
351    }
352
353    #[cfg(test)]
354    fn record_flushed_range(&mut self, start: usize, len: usize) {
355        let mut merged_start = start;
356        let mut merged_end = start + len;
357        let mut merged_ranges = Vec::with_capacity(self.flushed_ranges.len() + 1);
358        let mut inserted = false;
359
360        for (range_start, range_len) in self.flushed_ranges.drain(..) {
361            let range_end = range_start + range_len;
362            if range_end < merged_start {
363                merged_ranges.push((range_start, range_len));
364                continue;
365            }
366            if merged_end < range_start {
367                if !inserted {
368                    merged_ranges.push((merged_start, merged_end - merged_start));
369                    inserted = true;
370                }
371                merged_ranges.push((range_start, range_len));
372                continue;
373            }
374
375            merged_start = merged_start.min(range_start);
376            merged_end = merged_end.max(range_end);
377        }
378
379        if !inserted {
380            merged_ranges.push((merged_start, merged_end - merged_start));
381        }
382
383        self.flushed_ranges = merged_ranges;
384    }
385
386    #[cfg(test)]
387    fn pending_closed_bytes(&self) -> usize {
388        let mut pending = 0;
389
390        for (section_start, section_len) in &self.closed_sections {
391            let section_end = section_start + section_len;
392            let mut cursor = *section_start;
393
394            for (range_start, range_len) in &self.flushed_ranges {
395                let range_end = range_start + range_len;
396                if range_end <= cursor {
397                    continue;
398                }
399                if *range_start >= section_end {
400                    break;
401                }
402                if *range_start > cursor {
403                    pending += *range_start - cursor;
404                }
405                cursor = cursor.max(range_end);
406                if cursor >= section_end {
407                    break;
408                }
409            }
410
411            if cursor < section_end {
412                pending += section_end - cursor;
413            }
414        }
415
416        pending
417    }
418
419    #[inline]
420    fn align_to_next_page(&self, ptr: usize) -> usize {
421        (ptr + self.page_size - 1) & !(self.page_size - 1)
422    }
423
424    /// The returned slice is section_size or greater.
425    fn add_section(
426        &mut self,
427        entry_type: UnifiedLogType,
428        requested_section_size: usize,
429    ) -> AllocatedSection<MmapSectionStorage> {
430        // align current_position to the next page
431        self.current_global_position = self.align_to_next_page(self.current_global_position);
432        let section_size = self.align_to_next_page(requested_section_size) as u32;
433
434        // We need to have enough space to store the section in that slab
435        if self.current_global_position + section_size as usize > self.mmap_buffer.len() {
436            return AllocatedSection::NoMoreSpace;
437        }
438
439        #[cfg(feature = "compact")]
440        let block_size = SECTION_HEADER_COMPACT_SIZE;
441
442        #[cfg(not(feature = "compact"))]
443        let block_size = self.page_size as u16;
444
445        let section_header = SectionHeader {
446            magic: SECTION_MAGIC,
447            block_size,
448            entry_type,
449            offset_to_next_section: section_size,
450            used: 0u32,
451            is_open: true,
452        };
453
454        // save the position to keep track for in flight sections
455        self.sections_offsets_in_flight
456            .push(self.current_global_position);
457        let end_of_section = self.current_global_position + requested_section_size;
458        let user_buffer = &mut self.mmap_buffer[self.current_global_position..end_of_section];
459
460        // SAFETY: We have exclusive access to user_buffer for the handle's lifetime.
461        let handle_buffer =
462            unsafe { from_raw_parts_mut(user_buffer.as_mut_ptr(), user_buffer.len()) };
463        let storage = MmapSectionStorage::new(handle_buffer, block_size as usize);
464
465        self.current_global_position = end_of_section;
466
467        Section(SectionHandle::create(section_header, storage).expect("Failed to create section"))
468    }
469
470    #[cfg(test)]
471    fn used(&self) -> usize {
472        self.current_global_position
473    }
474}
475
476/// A write side of the datalogger.
477pub struct MmapUnifiedLoggerWrite {
478    /// the front slab is the current active slab for any new section.
479    front_slab: SlabEntry,
480    /// the back slab is the previous slab that is being flushed.
481    back_slabs: Vec<SlabEntry>,
482    /// base file path to create the backing files from.
483    base_file_path: PathBuf,
484    /// allocation size for the backing files.
485    slab_size: usize,
486    /// current suffix for the backing files.
487    front_slab_suffix: usize,
488}
489
490fn build_slab_path(base_file_path: &Path, slab_index: usize) -> io::Result<PathBuf> {
491    let mut file_path = base_file_path.to_path_buf();
492    let stem = file_path.file_stem().ok_or_else(|| {
493        io::Error::new(
494            io::ErrorKind::InvalidInput,
495            "Base file path has no file name",
496        )
497    })?;
498    let stem = stem.to_str().ok_or_else(|| {
499        io::Error::new(
500            io::ErrorKind::InvalidInput,
501            "Base file name is not valid UTF-8",
502        )
503    })?;
504    let extension = file_path.extension().ok_or_else(|| {
505        io::Error::new(
506            io::ErrorKind::InvalidInput,
507            "Base file path has no extension",
508        )
509    })?;
510    let extension = extension.to_str().ok_or_else(|| {
511        io::Error::new(
512            io::ErrorKind::InvalidInput,
513            "Base file extension is not valid UTF-8",
514        )
515    })?;
516    if stem.is_empty() {
517        return Err(io::Error::new(
518            io::ErrorKind::InvalidInput,
519            "Base file name is empty",
520        ));
521    }
522    let file_name = format!("{stem}_{slab_index}.{extension}");
523    file_path.set_file_name(file_name);
524    Ok(file_path)
525}
526
527fn make_slab_file(base_file_path: &Path, slab_size: usize, slab_suffix: usize) -> io::Result<File> {
528    let file_path = build_slab_path(base_file_path, slab_suffix)?;
529    let file = OpenOptions::new()
530        .read(true)
531        .write(true)
532        .create(true)
533        .truncate(true)
534        .open(&file_path)
535        .map_err(|e| {
536            io::Error::new(
537                e.kind(),
538                format!("Failed to open file {}: {e}", file_path.display()),
539            )
540        })?;
541    file.set_len(slab_size as u64).map_err(|e| {
542        io::Error::new(
543            e.kind(),
544            format!("Failed to set file length for {}: {e}", file_path.display()),
545        )
546    })?;
547    Ok(file)
548}
549
550fn remove_existing_alias(base_file_path: &Path) -> io::Result<()> {
551    match std::fs::symlink_metadata(base_file_path) {
552        Ok(meta) => {
553            if meta.is_dir() {
554                return Err(io::Error::new(
555                    io::ErrorKind::AlreadyExists,
556                    format!(
557                        "Cannot create base log alias at {} because a directory already exists there",
558                        base_file_path.display()
559                    ),
560                ));
561            }
562            std::fs::remove_file(base_file_path).map_err(|e| {
563                io::Error::new(
564                    e.kind(),
565                    format!(
566                        "Failed to remove existing base log alias {}: {e}",
567                        base_file_path.display()
568                    ),
569                )
570            })
571        }
572        Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
573        Err(e) => Err(io::Error::new(
574            e.kind(),
575            format!(
576                "Failed to inspect existing base log alias {}: {e}",
577                base_file_path.display()
578            ),
579        )),
580    }
581}
582
583fn create_base_alias_link(base_file_path: &Path) -> io::Result<()> {
584    let first_slab_path = build_slab_path(base_file_path, 0)?;
585    remove_existing_alias(base_file_path)?;
586
587    #[cfg(unix)]
588    {
589        use std::os::unix::fs::symlink;
590        let relative_target = Path::new(first_slab_path.file_name().ok_or_else(|| {
591            io::Error::new(
592                io::ErrorKind::InvalidInput,
593                "First slab file has no name component",
594            )
595        })?);
596        symlink(relative_target, base_file_path).map_err(|e| {
597            io::Error::new(
598                e.kind(),
599                format!(
600                    "Failed to create base log alias {} -> {}: {e}",
601                    base_file_path.display(),
602                    first_slab_path.display()
603                ),
604            )
605        })
606    }
607
608    #[cfg(windows)]
609    {
610        use std::os::windows::fs::symlink_file;
611        let relative_target = Path::new(first_slab_path.file_name().ok_or_else(|| {
612            io::Error::new(
613                io::ErrorKind::InvalidInput,
614                "First slab file has no name component",
615            )
616        })?);
617        match symlink_file(relative_target, base_file_path) {
618            Ok(()) => Ok(()),
619            Err(symlink_err) => std::fs::hard_link(&first_slab_path, base_file_path).map_err(
620                |hard_link_err| {
621                    io::Error::other(format!(
622                        "Failed to create base log alias {}. Symlink error: {symlink_err}. Hard-link fallback error: {hard_link_err}",
623                        base_file_path.display()
624                    ))
625                },
626            ),
627        }?;
628        Ok(())
629    }
630
631    #[cfg(not(any(unix, windows)))]
632    {
633        std::fs::hard_link(&first_slab_path, base_file_path).map_err(|e| {
634            io::Error::new(
635                e.kind(),
636                format!(
637                    "Failed to create base log alias {} -> {}: {e}",
638                    base_file_path.display(),
639                    first_slab_path.display()
640                ),
641            )
642        })
643    }
644}
645
646impl UnifiedLogWrite<MmapSectionStorage> for MmapUnifiedLoggerWrite {
647    /// The returned slice is section_size or greater.
648    fn add_section(
649        &mut self,
650        entry_type: UnifiedLogType,
651        requested_section_size: usize,
652    ) -> CuResult<SectionHandle<MmapSectionStorage>> {
653        self.garbage_collect_backslabs(); // Take the opportunity to keep up and close stale back slabs.
654        self.front_slab.clear_temporary_end_marker();
655        let maybe_section = self
656            .front_slab
657            .add_section(entry_type, requested_section_size);
658
659        match maybe_section {
660            AllocatedSection::NoMoreSpace => {
661                // move the front slab to the back slab.
662                let new_slab = self.create_slab()?;
663                // keep the slab until all its sections has been flushed.
664                self.back_slabs
665                    .push(mem::replace(&mut self.front_slab, new_slab));
666                match self
667                    .front_slab
668                    .add_section(entry_type, requested_section_size)
669                {
670                    AllocatedSection::NoMoreSpace => Err(CuError::from("out of space")),
671                    Section(section) => {
672                        self.place_end_marker(true)?;
673                        Ok(section)
674                    }
675                }
676            }
677            Section(section) => {
678                self.place_end_marker(true)?;
679                Ok(section)
680            }
681        }
682    }
683
684    fn flush_section(&mut self, section: &mut SectionHandle<MmapSectionStorage>) {
685        section.mark_closed();
686        for slab in self.back_slabs.iter_mut() {
687            if slab.is_it_my_section(section) {
688                slab.flush_section(section);
689                return;
690            }
691        }
692        self.front_slab.flush_section(section);
693    }
694
695    fn status(&self) -> UnifiedLogStatus {
696        UnifiedLogStatus {
697            total_used_space: self.front_slab.current_global_position,
698            total_allocated_space: self.slab_size * self.front_slab_suffix,
699        }
700    }
701}
702
703impl MmapUnifiedLoggerWrite {
704    fn next_slab(&mut self) -> io::Result<File> {
705        let next_suffix = self.front_slab_suffix + 1;
706        let file = make_slab_file(&self.base_file_path, self.slab_size, next_suffix)?;
707        self.front_slab_suffix = next_suffix;
708        Ok(file)
709    }
710
711    fn new(base_file_path: &Path, slab_size: usize, page_size: usize) -> io::Result<Self> {
712        let file = make_slab_file(base_file_path, slab_size, 0)?;
713        create_base_alias_link(base_file_path)?;
714        let mut front_slab = SlabEntry::new(file, page_size)?;
715
716        // This is the first slab so add the main header.
717        let main_header = MainHeader {
718            magic: MAIN_MAGIC,
719            first_section_offset: page_size as u16,
720            page_size: page_size as u16,
721        };
722        let nb_bytes = encode_into_slice(&main_header, &mut front_slab.mmap_buffer[..], standard())
723            .map_err(|e| io::Error::other(format!("Failed to encode main header: {e}")))?;
724        assert!(nb_bytes < page_size);
725        front_slab.current_global_position = page_size; // align to the next page
726
727        Ok(Self {
728            front_slab,
729            back_slabs: Vec::new(),
730            base_file_path: base_file_path.to_path_buf(),
731            slab_size,
732            front_slab_suffix: 0,
733        })
734    }
735
736    fn garbage_collect_backslabs(&mut self) {
737        self.back_slabs
738            .retain_mut(|slab| !slab.sections_offsets_in_flight.is_empty());
739    }
740
741    fn place_end_marker(&mut self, temporary: bool) -> CuResult<()> {
742        match self.front_slab.write_end_marker(temporary) {
743            Ok(_) => Ok(()),
744            Err(_) => {
745                // Not enough space in the current slab, roll to a new one.
746                let new_slab = self.create_slab()?;
747                self.back_slabs
748                    .push(mem::replace(&mut self.front_slab, new_slab));
749                self.front_slab.write_end_marker(temporary)
750            }
751        }
752    }
753
754    pub fn stats(&self) -> (usize, Vec<usize>, usize) {
755        (
756            self.front_slab.current_global_position,
757            self.front_slab.sections_offsets_in_flight.clone(),
758            self.back_slabs.len(),
759        )
760    }
761
762    fn create_slab(&mut self) -> CuResult<SlabEntry> {
763        let file = self
764            .next_slab()
765            .map_err(|e| CuError::new_with_cause("Failed to create slab file", e))?;
766        SlabEntry::new(file, self.front_slab.page_size)
767            .map_err(|e| CuError::new_with_cause("Failed to create slab memory map", e))
768    }
769}
770
771impl Drop for MmapUnifiedLoggerWrite {
772    fn drop(&mut self) {
773        #[cfg(debug_assertions)]
774        eprintln!("Flushing the unified Logger ... "); // Note this cannot be a structured log writing in this log.
775
776        self.front_slab.clear_temporary_end_marker();
777        if let Err(e) = self.place_end_marker(false) {
778            panic!("Failed to flush the unified logger: {}", e);
779        }
780        self.front_slab
781            .flush_until(self.front_slab.current_global_position);
782        self.garbage_collect_backslabs();
783        #[cfg(debug_assertions)]
784        eprintln!("Unified Logger flushed."); // Note this cannot be a structured log writing in this log.
785    }
786}
787
788fn open_slab_index(
789    base_file_path: &Path,
790    slab_index: usize,
791) -> io::Result<(File, Mmap, u16, Option<MainHeader>)> {
792    let mut options = OpenOptions::new();
793    let options = options.read(true);
794
795    let file_path = build_slab_path(base_file_path, slab_index)?;
796    let file = options.open(&file_path).map_err(|e| {
797        io::Error::new(
798            e.kind(),
799            format!("Failed to open slab file {}: {e}", file_path.display()),
800        )
801    })?;
802    // SAFETY: The file is kept open for the lifetime of the mapping.
803    let mmap = unsafe { Mmap::map(&file) }
804        .map_err(|e| io::Error::new(e.kind(), format!("Failed to map slab file: {e}")))?;
805    let mut prolog = 0u16;
806    let mut maybe_main_header: Option<MainHeader> = None;
807    if slab_index == 0 {
808        let main_header: MainHeader;
809        let _read: usize;
810        (main_header, _read) = decode_from_slice(&mmap[..], standard()).map_err(|e| {
811            io::Error::new(
812                io::ErrorKind::InvalidData,
813                format!("Failed to decode main header: {e}"),
814            )
815        })?;
816        if main_header.magic != MAIN_MAGIC {
817            return Err(io::Error::new(
818                io::ErrorKind::InvalidData,
819                "Invalid magic number in main header",
820            ));
821        }
822        prolog = main_header.first_section_offset;
823        maybe_main_header = Some(main_header);
824    }
825    Ok((file, mmap, prolog, maybe_main_header))
826}
827
828/// A read side of the memory map based unified logger.
829pub struct MmapUnifiedLoggerRead {
830    base_file_path: PathBuf,
831    main_header: MainHeader,
832    current_mmap_buffer: Mmap,
833    current_file: File,
834    current_slab_index: usize,
835    current_reading_position: usize,
836}
837
838/// Absolute position inside a unified log (slab index + byte offset).
839#[derive(Clone, Copy, Debug, PartialEq, Eq)]
840pub struct LogPosition {
841    pub slab_index: usize,
842    pub offset: usize,
843}
844
845impl UnifiedLogRead for MmapUnifiedLoggerRead {
846    fn read_next_section_type(&mut self, datalogtype: UnifiedLogType) -> CuResult<Option<Vec<u8>>> {
847        // TODO: eventually implement a 0 copy of this too.
848        loop {
849            if self.current_reading_position >= self.current_mmap_buffer.len() {
850                self.next_slab().map_err(|e| {
851                    CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
852                })?;
853            }
854
855            let header_result = self.read_section_header();
856            let header = header_result.map_err(|error| {
857                CuError::new_with_cause(
858                    &format!(
859                        "Could not read a sections header: {}/{}:{}",
860                        self.base_file_path.as_os_str().to_string_lossy(),
861                        self.current_slab_index,
862                        self.current_reading_position,
863                    ),
864                    error,
865                )
866            })?;
867
868            // Reached the end of file
869            if header.entry_type == UnifiedLogType::LastEntry {
870                return Ok(None);
871            }
872
873            // Found a section of the requested type
874            if header.entry_type == datalogtype {
875                let result = Some(self.read_section_content(&header)?);
876                self.current_reading_position += header.offset_to_next_section as usize;
877                return Ok(result);
878            }
879
880            // Keep reading until we find the requested type
881            self.current_reading_position += header.offset_to_next_section as usize;
882        }
883    }
884
885    /// Reads the section from the section header pos.
886    fn raw_read_section(&mut self) -> CuResult<(SectionHeader, Vec<u8>)> {
887        if self.current_reading_position >= self.current_mmap_buffer.len() {
888            self.next_slab().map_err(|e| {
889                CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
890            })?;
891        }
892
893        let read_result = self.read_section_header();
894
895        match read_result {
896            Err(error) => Err(CuError::new_with_cause(
897                &format!(
898                    "Could not read a sections header: {}/{}:{}",
899                    self.base_file_path.as_os_str().to_string_lossy(),
900                    self.current_slab_index,
901                    self.current_reading_position,
902                ),
903                error,
904            )),
905            Ok(header) => {
906                let data = self.read_section_content(&header)?;
907                self.current_reading_position += header.offset_to_next_section as usize;
908                Ok((header, data))
909            }
910        }
911    }
912}
913
914impl MmapUnifiedLoggerRead {
915    pub fn new(base_file_path: &Path) -> io::Result<Self> {
916        let (file, mmap, prolog, header) = open_slab_index(base_file_path, 0)?;
917        let main_header = header.ok_or_else(|| {
918            io::Error::new(io::ErrorKind::InvalidData, "Missing main header in slab 0")
919        })?;
920
921        Ok(Self {
922            base_file_path: base_file_path.to_path_buf(),
923            main_header,
924            current_file: file,
925            current_mmap_buffer: mmap,
926            current_slab_index: 0,
927            current_reading_position: prolog as usize,
928        })
929    }
930
931    /// Current cursor position (start of next section header).
932    pub fn position(&self) -> LogPosition {
933        LogPosition {
934            slab_index: self.current_slab_index,
935            offset: self.current_reading_position,
936        }
937    }
938
939    /// Seek to an absolute position (start of a section header).
940    pub fn seek(&mut self, pos: LogPosition) -> CuResult<()> {
941        if pos.slab_index != self.current_slab_index {
942            let (file, mmap, _prolog, _header) =
943                open_slab_index(&self.base_file_path, pos.slab_index).map_err(|e| {
944                    CuError::new_with_cause(
945                        &format!("Failed to open slab {} for seek", pos.slab_index),
946                        e,
947                    )
948                })?;
949            self.current_file = file;
950            self.current_mmap_buffer = mmap;
951            self.current_slab_index = pos.slab_index;
952        }
953        self.current_reading_position = pos.offset;
954        Ok(())
955    }
956
957    fn next_slab(&mut self) -> io::Result<()> {
958        self.current_slab_index += 1;
959        let (file, mmap, prolog, _) =
960            open_slab_index(&self.base_file_path, self.current_slab_index)?;
961        self.current_file = file;
962        self.current_mmap_buffer = mmap;
963        self.current_reading_position = prolog as usize;
964        Ok(())
965    }
966
967    pub fn raw_main_header(&self) -> &MainHeader {
968        &self.main_header
969    }
970
971    pub fn scan_section_bytes(&mut self, datalogtype: UnifiedLogType) -> CuResult<u64> {
972        let mut total = 0u64;
973
974        loop {
975            if self.current_reading_position >= self.current_mmap_buffer.len() {
976                self.next_slab().map_err(|e| {
977                    CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
978                })?;
979            }
980
981            let header = self.read_section_header()?;
982
983            if header.entry_type == UnifiedLogType::LastEntry {
984                return Ok(total);
985            }
986
987            if header.entry_type == datalogtype {
988                total = total.saturating_add(header.used as u64);
989            }
990
991            self.current_reading_position += header.offset_to_next_section as usize;
992        }
993    }
994
995    /// Reads the section content from the section header pos.
996    fn read_section_content(&mut self, header: &SectionHeader) -> CuResult<Vec<u8>> {
997        // TODO: we could optimize by asking the buffer to fill
998        let mut section_data = vec![0; header.used as usize];
999        let start_of_data = self.current_reading_position + header.block_size as usize;
1000        section_data.copy_from_slice(
1001            &self.current_mmap_buffer[start_of_data..start_of_data + header.used as usize],
1002        );
1003
1004        Ok(section_data)
1005    }
1006
1007    fn read_section_header(&mut self) -> CuResult<SectionHeader> {
1008        let section_header: SectionHeader;
1009        (section_header, _) = decode_from_slice(
1010            &self.current_mmap_buffer[self.current_reading_position..],
1011            standard(),
1012        )
1013        .map_err(|e| {
1014            CuError::new_with_cause(
1015                &format!(
1016                    "Could not read a sections header: {}/{}:{}",
1017                    self.base_file_path.as_os_str().to_string_lossy(),
1018                    self.current_slab_index,
1019                    self.current_reading_position,
1020                ),
1021                e,
1022            )
1023        })?;
1024        if section_header.magic != SECTION_MAGIC {
1025            return Err("Invalid magic number in section header".into());
1026        }
1027
1028        Ok(section_header)
1029    }
1030}
1031
1032/// This a convenience wrapper around the UnifiedLoggerRead to implement the Read trait.
1033pub struct UnifiedLoggerIOReader {
1034    logger: MmapUnifiedLoggerRead,
1035    log_type: UnifiedLogType,
1036    buffer: Vec<u8>,
1037    buffer_pos: usize,
1038}
1039
1040impl UnifiedLoggerIOReader {
1041    pub fn new(logger: MmapUnifiedLoggerRead, log_type: UnifiedLogType) -> Self {
1042        Self {
1043            logger,
1044            log_type,
1045            buffer: Vec::new(),
1046            buffer_pos: 0,
1047        }
1048    }
1049
1050    /// returns true if there is more data to read.
1051    fn fill_buffer(&mut self) -> io::Result<bool> {
1052        match self.logger.read_next_section_type(self.log_type) {
1053            Ok(Some(section)) => {
1054                self.buffer = section;
1055                self.buffer_pos = 0;
1056                Ok(true)
1057            }
1058            Ok(None) => Ok(false), // No more sections of this type
1059            Err(e) => Err(io::Error::other(e.to_string())),
1060        }
1061    }
1062}
1063
1064impl Read for UnifiedLoggerIOReader {
1065    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1066        if self.buffer_pos >= self.buffer.len() && !self.fill_buffer()? {
1067            // This means we hit the last section.
1068            return Ok(0);
1069        }
1070
1071        // If we still have no data after trying to fill the buffer, we're at EOF
1072        if self.buffer_pos >= self.buffer.len() {
1073            return Ok(0);
1074        }
1075
1076        // Copy as much as we can from the buffer to `buf`
1077        let len = std::cmp::min(buf.len(), self.buffer.len() - self.buffer_pos);
1078        buf[..len].copy_from_slice(&self.buffer[self.buffer_pos..self.buffer_pos + len]);
1079        self.buffer_pos += len;
1080        Ok(len)
1081    }
1082}
1083
1084#[cfg(feature = "std")]
1085#[cfg(test)]
1086mod tests {
1087    use super::*;
1088    use crate::stream_write;
1089    use bincode::de::read::SliceReader;
1090    use bincode::{Decode, Encode, decode_from_reader, decode_from_slice};
1091    use cu29_traits::WriteStream;
1092    use std::path::PathBuf;
1093    use std::sync::{Arc, Mutex};
1094    use tempfile::TempDir;
1095
1096    const LARGE_SLAB: usize = 100 * 1024; // 100KB
1097    const SMALL_SLAB: usize = 16 * 2 * 1024; // 16KB is the page size on MacOS for example
1098
1099    fn make_a_logger(
1100        tmp_dir: &TempDir,
1101        slab_size: usize,
1102    ) -> (Arc<Mutex<MmapUnifiedLoggerWrite>>, PathBuf) {
1103        let file_path = tmp_dir.path().join("test.bin");
1104        let MmapUnifiedLogger::Write(data_logger) = MmapUnifiedLoggerBuilder::new()
1105            .write(true)
1106            .create(true)
1107            .file_base_name(&file_path)
1108            .preallocated_size(slab_size)
1109            .build()
1110            .expect("Failed to create logger")
1111        else {
1112            panic!("Failed to create logger")
1113        };
1114
1115        (Arc::new(Mutex::new(data_logger)), file_path)
1116    }
1117
1118    #[test]
1119    fn test_truncation_and_sections_creations() {
1120        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1121        let file_path = tmp_dir.path().join("test.bin");
1122        let _used = {
1123            let MmapUnifiedLogger::Write(mut logger) = MmapUnifiedLoggerBuilder::new()
1124                .write(true)
1125                .create(true)
1126                .file_base_name(&file_path)
1127                .preallocated_size(100000)
1128                .build()
1129                .expect("Failed to create logger")
1130            else {
1131                panic!("Failed to create logger")
1132            };
1133            logger
1134                .add_section(UnifiedLogType::StructuredLogLine, 1024)
1135                .unwrap();
1136            logger
1137                .add_section(UnifiedLogType::CopperList, 2048)
1138                .unwrap();
1139            let used = logger.front_slab.used();
1140            assert!(used < 4 * page_size::get()); // ie. 3 headers, 1 page max per
1141            // logger drops
1142
1143            used
1144        };
1145
1146        let _file = OpenOptions::new()
1147            .read(true)
1148            .open(tmp_dir.path().join("test_0.bin"))
1149            .expect("Could not reopen the file");
1150        // Check if we have correctly truncated the file
1151        // TODO: recompute this math
1152        //assert_eq!(
1153        //    file.metadata().unwrap().len(),
1154        //    (used + size_of::<SectionHeader>()) as u64
1155        //);
1156    }
1157
1158    #[test]
1159    fn test_base_alias_exists_and_matches_first_slab() {
1160        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1161        let file_path = tmp_dir.path().join("test.bin");
1162        let _logger = MmapUnifiedLoggerBuilder::new()
1163            .write(true)
1164            .create(true)
1165            .file_base_name(&file_path)
1166            .preallocated_size(LARGE_SLAB)
1167            .build()
1168            .expect("Failed to create logger");
1169
1170        let first_slab = build_slab_path(&file_path, 0).expect("Failed to build first slab path");
1171        assert!(file_path.exists(), "base alias does not exist");
1172        assert!(first_slab.exists(), "first slab does not exist");
1173
1174        let alias_bytes = std::fs::read(&file_path).expect("Failed to read base alias");
1175        let slab_bytes = std::fs::read(&first_slab).expect("Failed to read first slab");
1176        assert_eq!(alias_bytes, slab_bytes);
1177    }
1178
1179    #[test]
1180    fn test_one_section_self_cleaning() {
1181        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1182        let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
1183        {
1184            let _stream = stream_write::<(), MmapSectionStorage>(
1185                logger.clone(),
1186                UnifiedLogType::StructuredLogLine,
1187                1024,
1188            );
1189            assert_eq!(
1190                logger
1191                    .lock()
1192                    .unwrap()
1193                    .front_slab
1194                    .sections_offsets_in_flight
1195                    .len(),
1196                1
1197            );
1198        }
1199        assert_eq!(
1200            logger
1201                .lock()
1202                .unwrap()
1203                .front_slab
1204                .sections_offsets_in_flight
1205                .len(),
1206            0
1207        );
1208        let logger = logger.lock().unwrap();
1209        assert_eq!(
1210            logger.front_slab.flushed_until_offset,
1211            logger.front_slab.current_global_position
1212        );
1213    }
1214
1215    #[test]
1216    fn test_temporary_end_marker_is_created() {
1217        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1218        let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
1219        {
1220            let mut stream = stream_write::<u32, MmapSectionStorage>(
1221                logger.clone(),
1222                UnifiedLogType::StructuredLogLine,
1223                1024,
1224            )
1225            .unwrap();
1226            stream.log(&42u32).unwrap();
1227        }
1228
1229        let logger_guard = logger.lock().unwrap();
1230        let slab = &logger_guard.front_slab;
1231        let marker_start = slab
1232            .temporary_end_marker
1233            .expect("temporary end-of-log marker missing");
1234        let (eof_header, _) =
1235            decode_from_slice::<SectionHeader, _>(&slab.mmap_buffer[marker_start..], standard())
1236                .expect("Could not decode end-of-log marker header");
1237        assert_eq!(eof_header.entry_type, UnifiedLogType::LastEntry);
1238        assert!(eof_header.is_open);
1239        assert_eq!(eof_header.used, 0);
1240    }
1241
1242    #[test]
1243    fn test_final_end_marker_is_not_temporary() {
1244        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1245        let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
1246        {
1247            let mut stream = stream_write::<u32, MmapSectionStorage>(
1248                logger.clone(),
1249                UnifiedLogType::CopperList,
1250                1024,
1251            )
1252            .unwrap();
1253            stream.log(&1u32).unwrap();
1254        }
1255        drop(logger);
1256
1257        let MmapUnifiedLogger::Read(mut reader) = MmapUnifiedLoggerBuilder::new()
1258            .file_base_name(&f)
1259            .build()
1260            .expect("Failed to build reader")
1261        else {
1262            panic!("Failed to create reader");
1263        };
1264
1265        loop {
1266            let (header, _data) = reader
1267                .raw_read_section()
1268                .expect("Failed to read section while searching for EOF");
1269            if header.entry_type == UnifiedLogType::LastEntry {
1270                assert!(!header.is_open);
1271                break;
1272            }
1273        }
1274    }
1275
1276    #[test]
1277    fn test_two_sections_self_cleaning_in_order() {
1278        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1279        let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
1280        let s1 = stream_write::<(), MmapSectionStorage>(
1281            logger.clone(),
1282            UnifiedLogType::StructuredLogLine,
1283            1024,
1284        );
1285        assert_eq!(
1286            logger
1287                .lock()
1288                .unwrap()
1289                .front_slab
1290                .sections_offsets_in_flight
1291                .len(),
1292            1
1293        );
1294        let s2 = stream_write::<(), MmapSectionStorage>(
1295            logger.clone(),
1296            UnifiedLogType::StructuredLogLine,
1297            1024,
1298        );
1299        assert_eq!(
1300            logger
1301                .lock()
1302                .unwrap()
1303                .front_slab
1304                .sections_offsets_in_flight
1305                .len(),
1306            2
1307        );
1308        drop(s2);
1309        assert_eq!(
1310            logger
1311                .lock()
1312                .unwrap()
1313                .front_slab
1314                .sections_offsets_in_flight
1315                .len(),
1316            1
1317        );
1318        drop(s1);
1319        let lg = logger.lock().unwrap();
1320        assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
1321        assert_eq!(
1322            lg.front_slab.flushed_until_offset,
1323            lg.front_slab.current_global_position
1324        );
1325    }
1326
1327    #[test]
1328    fn test_two_sections_self_cleaning_out_of_order() {
1329        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1330        let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
1331        let s1 = stream_write::<(), MmapSectionStorage>(
1332            logger.clone(),
1333            UnifiedLogType::StructuredLogLine,
1334            1024,
1335        );
1336        assert_eq!(
1337            logger
1338                .lock()
1339                .unwrap()
1340                .front_slab
1341                .sections_offsets_in_flight
1342                .len(),
1343            1
1344        );
1345        let s2 = stream_write::<(), MmapSectionStorage>(
1346            logger.clone(),
1347            UnifiedLogType::StructuredLogLine,
1348            1024,
1349        );
1350        assert_eq!(
1351            logger
1352                .lock()
1353                .unwrap()
1354                .front_slab
1355                .sections_offsets_in_flight
1356                .len(),
1357            2
1358        );
1359        drop(s1);
1360        assert_eq!(
1361            logger
1362                .lock()
1363                .unwrap()
1364                .front_slab
1365                .sections_offsets_in_flight
1366                .len(),
1367            1
1368        );
1369        drop(s2);
1370        let lg = logger.lock().unwrap();
1371        assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
1372        assert_eq!(
1373            lg.front_slab.flushed_until_offset,
1374            lg.front_slab.current_global_position
1375        );
1376    }
1377
1378    #[test]
1379    fn test_closed_section_flushes_behind_open_earlier_section() {
1380        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1381        let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
1382        let s1 = stream_write::<(), MmapSectionStorage>(
1383            logger.clone(),
1384            UnifiedLogType::StructuredLogLine,
1385            1024,
1386        )
1387        .unwrap();
1388        {
1389            let mut s2 = stream_write::<u32, MmapSectionStorage>(
1390                logger.clone(),
1391                UnifiedLogType::CopperList,
1392                1024,
1393            )
1394            .unwrap();
1395            s2.log(&42u32).unwrap();
1396        }
1397
1398        let logger_guard = logger.lock().unwrap();
1399        assert_eq!(logger_guard.front_slab.sections_offsets_in_flight.len(), 1);
1400        assert!(
1401            logger_guard.front_slab.flushed_until_offset
1402                < logger_guard.front_slab.current_global_position
1403        );
1404        assert_eq!(logger_guard.front_slab.pending_closed_bytes(), 0);
1405        drop(logger_guard);
1406        drop(s1);
1407    }
1408
1409    #[test]
1410    fn test_write_then_read_one_section() {
1411        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1412        let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
1413        {
1414            let mut stream =
1415                stream_write(logger.clone(), UnifiedLogType::StructuredLogLine, 1024).unwrap();
1416            stream.log(&1u32).unwrap();
1417            stream.log(&2u32).unwrap();
1418            stream.log(&3u32).unwrap();
1419        }
1420        drop(logger);
1421        let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
1422            .file_base_name(&f)
1423            .build()
1424            .expect("Failed to build logger")
1425        else {
1426            panic!("Failed to build logger");
1427        };
1428        let section = dl
1429            .read_next_section_type(UnifiedLogType::StructuredLogLine)
1430            .expect("Failed to read section");
1431        assert!(section.is_some());
1432        let section = section.unwrap();
1433        let mut reader = SliceReader::new(&section[..]);
1434        let v1: u32 = decode_from_reader(&mut reader, standard()).unwrap();
1435        let v2: u32 = decode_from_reader(&mut reader, standard()).unwrap();
1436        let v3: u32 = decode_from_reader(&mut reader, standard()).unwrap();
1437        assert_eq!(v1, 1);
1438        assert_eq!(v2, 2);
1439        assert_eq!(v3, 3);
1440    }
1441
1442    #[cfg(feature = "mmap-fsync")]
1443    #[test]
1444    fn test_fsync_feature_syncs_on_section_flush() {
1445        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1446        let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
1447        {
1448            let mut stream =
1449                stream_write(logger.clone(), UnifiedLogType::StructuredLogLine, 1024).unwrap();
1450            stream.log(&1u32).unwrap();
1451        }
1452
1453        let logger = logger.lock().unwrap();
1454        assert!(
1455            logger.front_slab.sync_call_count > 0,
1456            "expected mmap-fsync to issue at least one sync_all call"
1457        );
1458    }
1459
1460    /// Mimic a basic CopperList implementation.
1461
1462    #[derive(Debug, Encode, Decode)]
1463    enum CopperListStateMock {
1464        Free,
1465        ProcessingTasks,
1466        BeingSerialized,
1467    }
1468
1469    #[derive(Encode, Decode)]
1470    struct CopperList<P: bincode::enc::Encode> {
1471        state: CopperListStateMock,
1472        payload: P, // This is generated from the runtime.
1473    }
1474
1475    #[test]
1476    fn test_copperlist_list_like_logging() {
1477        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1478        let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
1479        {
1480            let mut stream =
1481                stream_write(logger.clone(), UnifiedLogType::CopperList, 1024).unwrap();
1482            let cl0 = CopperList {
1483                state: CopperListStateMock::Free,
1484                payload: (1u32, 2u32, 3u32),
1485            };
1486            let cl1 = CopperList {
1487                state: CopperListStateMock::ProcessingTasks,
1488                payload: (4u32, 5u32, 6u32),
1489            };
1490            stream.log(&cl0).unwrap();
1491            stream.log(&cl1).unwrap();
1492        }
1493        drop(logger);
1494
1495        let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
1496            .file_base_name(&f)
1497            .build()
1498            .expect("Failed to build logger")
1499        else {
1500            panic!("Failed to build logger");
1501        };
1502        let section = dl
1503            .read_next_section_type(UnifiedLogType::CopperList)
1504            .expect("Failed to read section");
1505        assert!(section.is_some());
1506        let section = section.unwrap();
1507
1508        let mut reader = SliceReader::new(&section[..]);
1509        let cl0: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
1510        let cl1: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
1511        assert_eq!(cl0.payload.1, 2);
1512        assert_eq!(cl1.payload.2, 6);
1513    }
1514
1515    #[test]
1516    fn test_multi_slab_end2end() {
1517        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1518        let (logger, f) = make_a_logger(&tmp_dir, SMALL_SLAB);
1519        {
1520            let mut stream =
1521                stream_write(logger.clone(), UnifiedLogType::CopperList, 1024).unwrap();
1522            let cl0 = CopperList {
1523                state: CopperListStateMock::Free,
1524                payload: (1u32, 2u32, 3u32),
1525            };
1526            // large enough so we are sure to create a few slabs
1527            for _ in 0..10000 {
1528                stream.log(&cl0).unwrap();
1529            }
1530        }
1531        drop(logger);
1532
1533        let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
1534            .file_base_name(&f)
1535            .build()
1536            .expect("Failed to build logger")
1537        else {
1538            panic!("Failed to build logger");
1539        };
1540        let mut total_readback = 0;
1541        loop {
1542            let section = dl.read_next_section_type(UnifiedLogType::CopperList);
1543            if section.is_err() {
1544                break;
1545            }
1546            let section = section.unwrap();
1547            if section.is_none() {
1548                break;
1549            }
1550            let section = section.unwrap();
1551
1552            let mut reader = SliceReader::new(&section[..]);
1553            loop {
1554                let maybe_cl: Result<CopperList<(u32, u32, u32)>, _> =
1555                    decode_from_reader(&mut reader, standard());
1556                if maybe_cl.is_ok() {
1557                    total_readback += 1;
1558                } else {
1559                    break;
1560                }
1561            }
1562        }
1563        assert_eq!(total_readback, 10000);
1564    }
1565}