Skip to main content

cu29_unifiedlog/
memmap.rs

1//! This is the memory map file implementation for the unified logger for Copper.
2//! It is std only.
3
4use crate::{
5    AllocatedSection, MAIN_MAGIC, MainHeader, SECTION_MAGIC, SectionHandle, SectionHeader,
6    SectionStorage, UNIFIED_LOG_FORMAT_VERSION, UnifiedLogRead, UnifiedLogStatus, UnifiedLogWrite,
7};
8
9use crate::SECTION_HEADER_COMPACT_SIZE;
10
11use AllocatedSection::Section;
12use bincode::config::standard;
13use bincode::enc::EncoderImpl;
14use bincode::enc::write::SliceWriter;
15use bincode::error::EncodeError;
16use bincode::{Encode, decode_from_slice, encode_into_slice};
17use core::slice::from_raw_parts_mut;
18use cu29_traits::{
19    CuError, CuResult, ObservedWriter, UnifiedLogType, abort_observed_encode,
20    begin_observed_encode, finish_observed_encode,
21};
22use memmap2::{Mmap, MmapMut};
23use std::fs::{File, OpenOptions};
24use std::io::Read;
25use std::mem::ManuallyDrop;
26use std::path::{Path, PathBuf};
27use std::{io, mem};
28
29pub struct MmapSectionStorage {
30    buffer: &'static mut [u8],
31    offset: usize,
32    block_size: usize,
33}
34
35impl MmapSectionStorage {
36    pub fn new(buffer: &'static mut [u8], block_size: usize) -> Self {
37        Self {
38            buffer,
39            offset: 0,
40            block_size,
41        }
42    }
43
44    pub fn buffer_ptr(&self) -> *const u8 {
45        &self.buffer[0] as *const u8
46    }
47}
48
49impl SectionStorage for MmapSectionStorage {
50    fn initialize<E: Encode>(&mut self, header: &E) -> Result<usize, EncodeError> {
51        self.post_update_header(header)?;
52        self.offset = self.block_size;
53        Ok(self.offset)
54    }
55
56    fn post_update_header<E: Encode>(&mut self, header: &E) -> Result<usize, EncodeError> {
57        encode_into_slice(header, &mut self.buffer[0..], standard())
58    }
59
60    fn append<E: Encode>(&mut self, entry: &E) -> Result<usize, EncodeError> {
61        begin_observed_encode();
62        let result = (|| {
63            let mut encoder = EncoderImpl::new(
64                ObservedWriter::new(SliceWriter::new(&mut self.buffer[self.offset..])),
65                standard(),
66            );
67            entry.encode(&mut encoder)?;
68            Ok(encoder.into_writer().into_inner().bytes_written())
69        })();
70        let size = match result {
71            Ok(size) => {
72                debug_assert_eq!(size, finish_observed_encode());
73                size
74            }
75            Err(err) => {
76                abort_observed_encode();
77                return Err(err);
78            }
79        };
80        self.offset += size;
81        Ok(size)
82    }
83
84    fn flush(&mut self) -> CuResult<usize> {
85        // Flushing is handled at the slab level for mmap-backed storage.
86        Ok(self.offset)
87    }
88}
89
90///
91/// Holds the read or write side of the datalogger.
92pub enum MmapUnifiedLogger {
93    Read(MmapUnifiedLoggerRead),
94    Write(MmapUnifiedLoggerWrite),
95}
96
97/// Use this builder to create a new DataLogger.
98pub struct MmapUnifiedLoggerBuilder {
99    file_base_name: Option<PathBuf>,
100    preallocated_size: Option<usize>,
101    write: bool,
102    create: bool,
103}
104
105impl Default for MmapUnifiedLoggerBuilder {
106    fn default() -> Self {
107        Self::new()
108    }
109}
110
111impl MmapUnifiedLoggerBuilder {
112    pub fn new() -> Self {
113        Self {
114            file_base_name: None,
115            preallocated_size: None,
116            write: false,
117            create: false, // This is the safest default
118        }
119    }
120
121    /// If "something/toto.copper" is given, it will find or create "something/toto_0.copper",  "something/toto_1.copper" etc.
122    pub fn file_base_name(mut self, file_path: &Path) -> Self {
123        self.file_base_name = Some(file_path.to_path_buf());
124        self
125    }
126
127    pub fn preallocated_size(mut self, preallocated_size: usize) -> Self {
128        self.preallocated_size = Some(preallocated_size);
129        self
130    }
131
132    pub fn write(mut self, write: bool) -> Self {
133        self.write = write;
134        self
135    }
136
137    pub fn create(mut self, create: bool) -> Self {
138        self.create = create;
139        self
140    }
141
142    pub fn build(self) -> io::Result<MmapUnifiedLogger> {
143        let page_size = page_size::get();
144
145        if self.write && self.create {
146            let file_path = self.file_base_name.ok_or_else(|| {
147                io::Error::new(
148                    io::ErrorKind::InvalidInput,
149                    "File path is required for write mode",
150                )
151            })?;
152            let preallocated_size = self.preallocated_size.ok_or_else(|| {
153                io::Error::new(
154                    io::ErrorKind::InvalidInput,
155                    "Preallocated size is required for write mode",
156                )
157            })?;
158            let ulw = MmapUnifiedLoggerWrite::new(&file_path, preallocated_size, page_size)?;
159            Ok(MmapUnifiedLogger::Write(ulw))
160        } else {
161            let file_path = self.file_base_name.ok_or_else(|| {
162                io::Error::new(io::ErrorKind::InvalidInput, "File path is required")
163            })?;
164            let ulr = MmapUnifiedLoggerRead::new(&file_path)?;
165            Ok(MmapUnifiedLogger::Read(ulr))
166        }
167    }
168}
169
170struct SlabEntry {
171    file: File,
172    mmap_buffer: ManuallyDrop<MmapMut>,
173    current_global_position: usize,
174    sections_offsets_in_flight: Vec<usize>,
175    flushed_until_offset: usize,
176    page_size: usize,
177    temporary_end_marker: Option<usize>,
178    #[cfg(test)]
179    closed_sections: Vec<(usize, usize)>,
180    #[cfg(test)]
181    flushed_ranges: Vec<(usize, usize)>,
182    #[cfg(all(test, feature = "mmap-fsync"))]
183    sync_call_count: usize,
184}
185
186impl Drop for SlabEntry {
187    fn drop(&mut self) {
188        self.flush_until(self.current_global_position);
189        // SAFETY: We own the mapping and must drop it before trimming the file.
190        unsafe { ManuallyDrop::drop(&mut self.mmap_buffer) };
191        if let Err(error) = self.file.set_len(self.current_global_position as u64) {
192            eprintln!("Failed to trim datalogger file: {}", error);
193        }
194        self.sync_file();
195
196        if !self.sections_offsets_in_flight.is_empty() {
197            eprintln!("Error: Slab not full flushed.");
198        }
199    }
200}
201
202impl SlabEntry {
203    fn new(file: File, page_size: usize) -> io::Result<Self> {
204        let mmap_buffer = ManuallyDrop::new(
205            // SAFETY: The file descriptor is valid and mapping is confined to this struct.
206            unsafe { MmapMut::map_mut(&file) }
207                .map_err(|e| io::Error::new(e.kind(), format!("Failed to map file: {e}")))?,
208        );
209        Ok(Self {
210            file,
211            mmap_buffer,
212            current_global_position: 0,
213            sections_offsets_in_flight: Vec::with_capacity(16),
214            flushed_until_offset: 0,
215            page_size,
216            temporary_end_marker: None,
217            #[cfg(test)]
218            closed_sections: Vec::new(),
219            #[cfg(test)]
220            flushed_ranges: Vec::new(),
221            #[cfg(all(test, feature = "mmap-fsync"))]
222            sync_call_count: 0,
223        })
224    }
225
226    fn flush_range(&mut self, start: usize, len: usize) {
227        if len == 0 {
228            return;
229        }
230        self.mmap_buffer
231            .flush_async_range(start, len)
232            .expect("Failed to flush memory map");
233        self.sync_file();
234        #[cfg(test)]
235        self.record_flushed_range(start, len);
236    }
237
238    fn sync_file(&mut self) {
239        #[cfg(feature = "mmap-fsync")]
240        {
241            self.file.sync_all().expect("Failed to fsync log file");
242            #[cfg(test)]
243            {
244                self.sync_call_count += 1;
245            }
246        }
247    }
248    /// Unsure the underlying mmap is flush to disk until the given position.
249    fn flush_until(&mut self, until_position: usize) {
250        // This is tolerated under linux, but crashes on macos
251        if (self.flushed_until_offset == until_position) || (until_position == 0) {
252            return;
253        }
254        self.flush_range(
255            self.flushed_until_offset,
256            until_position - self.flushed_until_offset,
257        );
258        self.flushed_until_offset = until_position;
259    }
260
261    fn clear_temporary_end_marker(&mut self) {
262        if let Some(marker_start) = self.temporary_end_marker.take() {
263            self.current_global_position = marker_start;
264            if self.flushed_until_offset > marker_start {
265                self.flushed_until_offset = marker_start;
266            }
267        }
268    }
269
270    fn write_end_marker(&mut self, temporary: bool) -> CuResult<()> {
271        let block_size = SECTION_HEADER_COMPACT_SIZE as usize;
272        let marker_start = self.align_to_next_page(self.current_global_position);
273        let total_marker_size = block_size; // header only
274        let marker_end = marker_start + total_marker_size;
275        if marker_end > self.mmap_buffer.len() {
276            return Err("Not enough space to write end-of-log marker".into());
277        }
278
279        let header = SectionHeader {
280            magic: SECTION_MAGIC,
281            block_size: SECTION_HEADER_COMPACT_SIZE,
282            entry_type: UnifiedLogType::LastEntry,
283            offset_to_next_section: total_marker_size as u32,
284            used: 0,
285            is_open: temporary,
286        };
287
288        encode_into_slice(
289            &header,
290            &mut self.mmap_buffer
291                [marker_start..marker_start + SECTION_HEADER_COMPACT_SIZE as usize],
292            standard(),
293        )
294        .map_err(|e| CuError::new_with_cause("Failed to encode end-of-log header", e))?;
295
296        self.temporary_end_marker = Some(marker_start);
297        self.current_global_position = marker_end;
298        Ok(())
299    }
300
301    fn is_it_my_section(&self, section: &SectionHandle<MmapSectionStorage>) -> bool {
302        let storage = section.get_storage();
303        let ptr = storage.buffer_ptr();
304        (ptr >= self.mmap_buffer.as_ptr())
305            && (ptr as usize)
306                < (self.mmap_buffer.as_ref().as_ptr() as usize + self.mmap_buffer.as_ref().len())
307    }
308
309    /// Flush the section to disk.
310    /// the flushing is permanent and the section is considered closed.
311    fn flush_section(&mut self, section: &mut SectionHandle<MmapSectionStorage>) {
312        section
313            .post_update_header()
314            .expect("Failed to update section header");
315
316        let storage = section.get_storage();
317        let ptr = storage.buffer_ptr();
318
319        if ptr < self.mmap_buffer.as_ptr()
320            || ptr as usize > self.mmap_buffer.as_ptr() as usize + self.mmap_buffer.len()
321        {
322            panic!("Invalid section buffer, not in the slab");
323        }
324
325        let base = self.mmap_buffer.as_ptr() as usize;
326        let section_start = ptr as usize - base;
327        let section_len = section.header.offset_to_next_section as usize;
328        #[cfg(test)]
329        self.record_closed_section(section_start, section_len);
330        self.sections_offsets_in_flight
331            .retain(|&x| x != section_start);
332
333        if self.sections_offsets_in_flight.is_empty() {
334            self.flush_until(self.current_global_position);
335            return;
336        }
337        let next_open_offset = self.sections_offsets_in_flight[0];
338        if self.flushed_until_offset < next_open_offset {
339            self.flush_until(next_open_offset);
340        }
341        if section_start + section_len > self.flushed_until_offset {
342            // A long-lived early section can otherwise pin later closed sections
343            // behind the prefix cursor until shutdown.
344            self.flush_range(section_start, section_len);
345        }
346    }
347
348    #[cfg(test)]
349    fn record_closed_section(&mut self, start: usize, len: usize) {
350        self.closed_sections.push((start, len));
351    }
352
353    #[cfg(test)]
354    fn record_flushed_range(&mut self, start: usize, len: usize) {
355        let mut merged_start = start;
356        let mut merged_end = start + len;
357        let mut merged_ranges = Vec::with_capacity(self.flushed_ranges.len() + 1);
358        let mut inserted = false;
359
360        for (range_start, range_len) in self.flushed_ranges.drain(..) {
361            let range_end = range_start + range_len;
362            if range_end < merged_start {
363                merged_ranges.push((range_start, range_len));
364                continue;
365            }
366            if merged_end < range_start {
367                if !inserted {
368                    merged_ranges.push((merged_start, merged_end - merged_start));
369                    inserted = true;
370                }
371                merged_ranges.push((range_start, range_len));
372                continue;
373            }
374
375            merged_start = merged_start.min(range_start);
376            merged_end = merged_end.max(range_end);
377        }
378
379        if !inserted {
380            merged_ranges.push((merged_start, merged_end - merged_start));
381        }
382
383        self.flushed_ranges = merged_ranges;
384    }
385
386    #[cfg(test)]
387    fn pending_closed_bytes(&self) -> usize {
388        let mut pending = 0;
389
390        for (section_start, section_len) in &self.closed_sections {
391            let section_end = section_start + section_len;
392            let mut cursor = *section_start;
393
394            for (range_start, range_len) in &self.flushed_ranges {
395                let range_end = range_start + range_len;
396                if range_end <= cursor {
397                    continue;
398                }
399                if *range_start >= section_end {
400                    break;
401                }
402                if *range_start > cursor {
403                    pending += *range_start - cursor;
404                }
405                cursor = cursor.max(range_end);
406                if cursor >= section_end {
407                    break;
408                }
409            }
410
411            if cursor < section_end {
412                pending += section_end - cursor;
413            }
414        }
415
416        pending
417    }
418
419    #[inline]
420    fn align_to_next_page(&self, ptr: usize) -> usize {
421        (ptr + self.page_size - 1) & !(self.page_size - 1)
422    }
423
424    /// The returned slice is section_size or greater.
425    fn add_section(
426        &mut self,
427        entry_type: UnifiedLogType,
428        requested_section_size: usize,
429    ) -> AllocatedSection<MmapSectionStorage> {
430        // align current_position to the next page
431        self.current_global_position = self.align_to_next_page(self.current_global_position);
432        let section_size = self.align_to_next_page(requested_section_size) as u32;
433
434        // We need to have enough space to store the section in that slab
435        if self.current_global_position + section_size as usize > self.mmap_buffer.len() {
436            return AllocatedSection::NoMoreSpace;
437        }
438
439        #[cfg(feature = "compact")]
440        let block_size = SECTION_HEADER_COMPACT_SIZE;
441
442        #[cfg(not(feature = "compact"))]
443        let block_size = self.page_size as u16;
444
445        let section_header = SectionHeader {
446            magic: SECTION_MAGIC,
447            block_size,
448            entry_type,
449            offset_to_next_section: section_size,
450            used: 0u32,
451            is_open: true,
452        };
453
454        // save the position to keep track for in flight sections
455        self.sections_offsets_in_flight
456            .push(self.current_global_position);
457        let end_of_section = self.current_global_position + requested_section_size;
458        let user_buffer = &mut self.mmap_buffer[self.current_global_position..end_of_section];
459
460        // SAFETY: We have exclusive access to user_buffer for the handle's lifetime.
461        let handle_buffer =
462            unsafe { from_raw_parts_mut(user_buffer.as_mut_ptr(), user_buffer.len()) };
463        let storage = MmapSectionStorage::new(handle_buffer, block_size as usize);
464
465        self.current_global_position = end_of_section;
466
467        Section(SectionHandle::create(section_header, storage).expect("Failed to create section"))
468    }
469
470    #[cfg(test)]
471    fn used(&self) -> usize {
472        self.current_global_position
473    }
474}
475
476/// A write side of the datalogger.
477pub struct MmapUnifiedLoggerWrite {
478    /// the front slab is the current active slab for any new section.
479    front_slab: SlabEntry,
480    /// the back slab is the previous slab that is being flushed.
481    back_slabs: Vec<SlabEntry>,
482    /// base file path to create the backing files from.
483    base_file_path: PathBuf,
484    /// allocation size for the backing files.
485    slab_size: usize,
486    /// current suffix for the backing files.
487    front_slab_suffix: usize,
488}
489
490fn build_slab_path(base_file_path: &Path, slab_index: usize) -> io::Result<PathBuf> {
491    let mut file_path = base_file_path.to_path_buf();
492    let stem = file_path.file_stem().ok_or_else(|| {
493        io::Error::new(
494            io::ErrorKind::InvalidInput,
495            "Base file path has no file name",
496        )
497    })?;
498    let stem = stem.to_str().ok_or_else(|| {
499        io::Error::new(
500            io::ErrorKind::InvalidInput,
501            "Base file name is not valid UTF-8",
502        )
503    })?;
504    let extension = file_path.extension().ok_or_else(|| {
505        io::Error::new(
506            io::ErrorKind::InvalidInput,
507            "Base file path has no extension",
508        )
509    })?;
510    let extension = extension.to_str().ok_or_else(|| {
511        io::Error::new(
512            io::ErrorKind::InvalidInput,
513            "Base file extension is not valid UTF-8",
514        )
515    })?;
516    if stem.is_empty() {
517        return Err(io::Error::new(
518            io::ErrorKind::InvalidInput,
519            "Base file name is empty",
520        ));
521    }
522    let file_name = format!("{stem}_{slab_index}.{extension}");
523    file_path.set_file_name(file_name);
524    Ok(file_path)
525}
526
527fn make_slab_file(base_file_path: &Path, slab_size: usize, slab_suffix: usize) -> io::Result<File> {
528    let file_path = build_slab_path(base_file_path, slab_suffix)?;
529    let file = OpenOptions::new()
530        .read(true)
531        .write(true)
532        .create(true)
533        .truncate(true)
534        .open(&file_path)
535        .map_err(|e| {
536            io::Error::new(
537                e.kind(),
538                format!("Failed to open file {}: {e}", file_path.display()),
539            )
540        })?;
541    file.set_len(slab_size as u64).map_err(|e| {
542        io::Error::new(
543            e.kind(),
544            format!("Failed to set file length for {}: {e}", file_path.display()),
545        )
546    })?;
547    Ok(file)
548}
549
550fn remove_existing_alias(base_file_path: &Path) -> io::Result<()> {
551    match std::fs::symlink_metadata(base_file_path) {
552        Ok(meta) => {
553            if meta.is_dir() {
554                return Err(io::Error::new(
555                    io::ErrorKind::AlreadyExists,
556                    format!(
557                        "Cannot create base log alias at {} because a directory already exists there",
558                        base_file_path.display()
559                    ),
560                ));
561            }
562            std::fs::remove_file(base_file_path).map_err(|e| {
563                io::Error::new(
564                    e.kind(),
565                    format!(
566                        "Failed to remove existing base log alias {}: {e}",
567                        base_file_path.display()
568                    ),
569                )
570            })
571        }
572        Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
573        Err(e) => Err(io::Error::new(
574            e.kind(),
575            format!(
576                "Failed to inspect existing base log alias {}: {e}",
577                base_file_path.display()
578            ),
579        )),
580    }
581}
582
583fn create_base_alias_link(base_file_path: &Path) -> io::Result<()> {
584    let first_slab_path = build_slab_path(base_file_path, 0)?;
585    remove_existing_alias(base_file_path)?;
586
587    #[cfg(unix)]
588    {
589        use std::os::unix::fs::symlink;
590        let relative_target = Path::new(first_slab_path.file_name().ok_or_else(|| {
591            io::Error::new(
592                io::ErrorKind::InvalidInput,
593                "First slab file has no name component",
594            )
595        })?);
596        symlink(relative_target, base_file_path).map_err(|e| {
597            io::Error::new(
598                e.kind(),
599                format!(
600                    "Failed to create base log alias {} -> {}: {e}",
601                    base_file_path.display(),
602                    first_slab_path.display()
603                ),
604            )
605        })
606    }
607
608    #[cfg(windows)]
609    {
610        use std::os::windows::fs::symlink_file;
611        let relative_target = Path::new(first_slab_path.file_name().ok_or_else(|| {
612            io::Error::new(
613                io::ErrorKind::InvalidInput,
614                "First slab file has no name component",
615            )
616        })?);
617        match symlink_file(relative_target, base_file_path) {
618            Ok(()) => Ok(()),
619            Err(symlink_err) => std::fs::hard_link(&first_slab_path, base_file_path).map_err(
620                |hard_link_err| {
621                    io::Error::other(format!(
622                        "Failed to create base log alias {}. Symlink error: {symlink_err}. Hard-link fallback error: {hard_link_err}",
623                        base_file_path.display()
624                    ))
625                },
626            ),
627        }?;
628        Ok(())
629    }
630
631    #[cfg(not(any(unix, windows)))]
632    {
633        std::fs::hard_link(&first_slab_path, base_file_path).map_err(|e| {
634            io::Error::new(
635                e.kind(),
636                format!(
637                    "Failed to create base log alias {} -> {}: {e}",
638                    base_file_path.display(),
639                    first_slab_path.display()
640                ),
641            )
642        })
643    }
644}
645
646impl UnifiedLogWrite<MmapSectionStorage> for MmapUnifiedLoggerWrite {
647    /// The returned slice is section_size or greater.
648    fn add_section(
649        &mut self,
650        entry_type: UnifiedLogType,
651        requested_section_size: usize,
652    ) -> CuResult<SectionHandle<MmapSectionStorage>> {
653        self.garbage_collect_backslabs(); // Take the opportunity to keep up and close stale back slabs.
654        self.front_slab.clear_temporary_end_marker();
655        let maybe_section = self
656            .front_slab
657            .add_section(entry_type, requested_section_size);
658
659        match maybe_section {
660            AllocatedSection::NoMoreSpace => {
661                // move the front slab to the back slab.
662                let new_slab = self.create_slab()?;
663                // keep the slab until all its sections has been flushed.
664                self.back_slabs
665                    .push(mem::replace(&mut self.front_slab, new_slab));
666                match self
667                    .front_slab
668                    .add_section(entry_type, requested_section_size)
669                {
670                    AllocatedSection::NoMoreSpace => Err(CuError::from("out of space")),
671                    Section(section) => {
672                        self.place_end_marker(true)?;
673                        Ok(section)
674                    }
675                }
676            }
677            Section(section) => {
678                self.place_end_marker(true)?;
679                Ok(section)
680            }
681        }
682    }
683
684    fn flush_section(&mut self, section: &mut SectionHandle<MmapSectionStorage>) {
685        section.mark_closed();
686        for slab in self.back_slabs.iter_mut() {
687            if slab.is_it_my_section(section) {
688                slab.flush_section(section);
689                return;
690            }
691        }
692        self.front_slab.flush_section(section);
693    }
694
695    fn status(&self) -> UnifiedLogStatus {
696        UnifiedLogStatus {
697            total_used_space: self.front_slab.current_global_position,
698            total_allocated_space: self.slab_size * self.front_slab_suffix,
699        }
700    }
701}
702
703impl MmapUnifiedLoggerWrite {
704    fn next_slab(&mut self) -> io::Result<File> {
705        let next_suffix = self.front_slab_suffix + 1;
706        let file = make_slab_file(&self.base_file_path, self.slab_size, next_suffix)?;
707        self.front_slab_suffix = next_suffix;
708        Ok(file)
709    }
710
711    fn new(base_file_path: &Path, slab_size: usize, page_size: usize) -> io::Result<Self> {
712        let file = make_slab_file(base_file_path, slab_size, 0)?;
713        create_base_alias_link(base_file_path)?;
714        let mut front_slab = SlabEntry::new(file, page_size)?;
715
716        // This is the first slab so add the main header.
717        let main_header = MainHeader {
718            magic: MAIN_MAGIC,
719            format_version: UNIFIED_LOG_FORMAT_VERSION,
720            first_section_offset: page_size as u16,
721            page_size: page_size as u16,
722        };
723        let nb_bytes = encode_into_slice(&main_header, &mut front_slab.mmap_buffer[..], standard())
724            .map_err(|e| io::Error::other(format!("Failed to encode main header: {e}")))?;
725        assert!(nb_bytes < page_size);
726        front_slab.current_global_position = page_size; // align to the next page
727
728        Ok(Self {
729            front_slab,
730            back_slabs: Vec::new(),
731            base_file_path: base_file_path.to_path_buf(),
732            slab_size,
733            front_slab_suffix: 0,
734        })
735    }
736
737    fn garbage_collect_backslabs(&mut self) {
738        self.back_slabs
739            .retain_mut(|slab| !slab.sections_offsets_in_flight.is_empty());
740    }
741
742    fn place_end_marker(&mut self, temporary: bool) -> CuResult<()> {
743        match self.front_slab.write_end_marker(temporary) {
744            Ok(_) => Ok(()),
745            Err(_) => {
746                // Not enough space in the current slab, roll to a new one.
747                let new_slab = self.create_slab()?;
748                self.back_slabs
749                    .push(mem::replace(&mut self.front_slab, new_slab));
750                self.front_slab.write_end_marker(temporary)
751            }
752        }
753    }
754
755    pub fn stats(&self) -> (usize, Vec<usize>, usize) {
756        (
757            self.front_slab.current_global_position,
758            self.front_slab.sections_offsets_in_flight.clone(),
759            self.back_slabs.len(),
760        )
761    }
762
763    fn create_slab(&mut self) -> CuResult<SlabEntry> {
764        let file = self
765            .next_slab()
766            .map_err(|e| CuError::new_with_cause("Failed to create slab file", e))?;
767        SlabEntry::new(file, self.front_slab.page_size)
768            .map_err(|e| CuError::new_with_cause("Failed to create slab memory map", e))
769    }
770}
771
772impl Drop for MmapUnifiedLoggerWrite {
773    fn drop(&mut self) {
774        #[cfg(debug_assertions)]
775        eprintln!("Flushing the unified Logger ... "); // Note this cannot be a structured log writing in this log.
776
777        self.front_slab.clear_temporary_end_marker();
778        if let Err(e) = self.place_end_marker(false) {
779            panic!("Failed to flush the unified logger: {}", e);
780        }
781        self.front_slab
782            .flush_until(self.front_slab.current_global_position);
783        self.garbage_collect_backslabs();
784        #[cfg(debug_assertions)]
785        eprintln!("Unified Logger flushed."); // Note this cannot be a structured log writing in this log.
786    }
787}
788
789fn open_slab_index(
790    base_file_path: &Path,
791    slab_index: usize,
792) -> io::Result<(File, Mmap, u16, Option<MainHeader>)> {
793    let mut options = OpenOptions::new();
794    let options = options.read(true);
795
796    let file_path = build_slab_path(base_file_path, slab_index)?;
797    let file = options.open(&file_path).map_err(|e| {
798        io::Error::new(
799            e.kind(),
800            format!("Failed to open slab file {}: {e}", file_path.display()),
801        )
802    })?;
803    // SAFETY: The file is kept open for the lifetime of the mapping.
804    let mmap = unsafe { Mmap::map(&file) }
805        .map_err(|e| io::Error::new(e.kind(), format!("Failed to map slab file: {e}")))?;
806    let mut prolog = 0u16;
807    let mut maybe_main_header: Option<MainHeader> = None;
808    if slab_index == 0 {
809        let main_header: MainHeader;
810        let _read: usize;
811        (main_header, _read) = decode_from_slice(&mmap[..], standard()).map_err(|e| {
812            io::Error::new(
813                io::ErrorKind::InvalidData,
814                format!("Failed to decode main header: {e}"),
815            )
816        })?;
817        if main_header.magic != MAIN_MAGIC {
818            return Err(io::Error::new(
819                io::ErrorKind::InvalidData,
820                "Invalid magic number in main header",
821            ));
822        }
823        if main_header.format_version != UNIFIED_LOG_FORMAT_VERSION {
824            return Err(io::Error::new(
825                io::ErrorKind::InvalidData,
826                format!(
827                    "Unsupported unified log format version {} in main header; this reader supports version {}",
828                    main_header.format_version, UNIFIED_LOG_FORMAT_VERSION
829                ),
830            ));
831        }
832        prolog = main_header.first_section_offset;
833        maybe_main_header = Some(main_header);
834    }
835    Ok((file, mmap, prolog, maybe_main_header))
836}
837
838/// A read side of the memory map based unified logger.
839pub struct MmapUnifiedLoggerRead {
840    base_file_path: PathBuf,
841    main_header: MainHeader,
842    current_mmap_buffer: Mmap,
843    current_file: File,
844    current_slab_index: usize,
845    current_reading_position: usize,
846}
847
848/// Absolute position inside a unified log (slab index + byte offset).
849#[derive(Clone, Copy, Debug, PartialEq, Eq)]
850pub struct LogPosition {
851    pub slab_index: usize,
852    pub offset: usize,
853}
854
855impl UnifiedLogRead for MmapUnifiedLoggerRead {
856    fn read_next_section_type(&mut self, datalogtype: UnifiedLogType) -> CuResult<Option<Vec<u8>>> {
857        // TODO: eventually implement a 0 copy of this too.
858        loop {
859            if self.current_reading_position >= self.current_mmap_buffer.len() {
860                self.next_slab().map_err(|e| {
861                    CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
862                })?;
863            }
864
865            let header_result = self.read_section_header();
866            let header = header_result.map_err(|error| {
867                CuError::new_with_cause(
868                    &format!(
869                        "Could not read a sections header: {}/{}:{}",
870                        self.base_file_path.as_os_str().to_string_lossy(),
871                        self.current_slab_index,
872                        self.current_reading_position,
873                    ),
874                    error,
875                )
876            })?;
877
878            // Reached the end of file
879            if header.entry_type == UnifiedLogType::LastEntry {
880                return Ok(None);
881            }
882
883            // Found a section of the requested type
884            if header.entry_type == datalogtype {
885                let result = Some(self.read_section_content(&header)?);
886                self.current_reading_position += header.offset_to_next_section as usize;
887                return Ok(result);
888            }
889
890            // Keep reading until we find the requested type
891            self.current_reading_position += header.offset_to_next_section as usize;
892        }
893    }
894
895    /// Reads the section from the section header pos.
896    fn raw_read_section(&mut self) -> CuResult<(SectionHeader, Vec<u8>)> {
897        if self.current_reading_position >= self.current_mmap_buffer.len() {
898            self.next_slab().map_err(|e| {
899                CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
900            })?;
901        }
902
903        let read_result = self.read_section_header();
904
905        match read_result {
906            Err(error) => Err(CuError::new_with_cause(
907                &format!(
908                    "Could not read a sections header: {}/{}:{}",
909                    self.base_file_path.as_os_str().to_string_lossy(),
910                    self.current_slab_index,
911                    self.current_reading_position,
912                ),
913                error,
914            )),
915            Ok(header) => {
916                let data = self.read_section_content(&header)?;
917                self.current_reading_position += header.offset_to_next_section as usize;
918                Ok((header, data))
919            }
920        }
921    }
922}
923
924impl MmapUnifiedLoggerRead {
925    pub fn new(base_file_path: &Path) -> io::Result<Self> {
926        let (file, mmap, prolog, header) = open_slab_index(base_file_path, 0)?;
927        let main_header = header.ok_or_else(|| {
928            io::Error::new(io::ErrorKind::InvalidData, "Missing main header in slab 0")
929        })?;
930
931        Ok(Self {
932            base_file_path: base_file_path.to_path_buf(),
933            main_header,
934            current_file: file,
935            current_mmap_buffer: mmap,
936            current_slab_index: 0,
937            current_reading_position: prolog as usize,
938        })
939    }
940
941    /// Current cursor position (start of next section header).
942    pub fn position(&self) -> LogPosition {
943        LogPosition {
944            slab_index: self.current_slab_index,
945            offset: self.current_reading_position,
946        }
947    }
948
949    /// Seek to an absolute position (start of a section header).
950    pub fn seek(&mut self, pos: LogPosition) -> CuResult<()> {
951        if pos.slab_index != self.current_slab_index {
952            let (file, mmap, _prolog, _header) =
953                open_slab_index(&self.base_file_path, pos.slab_index).map_err(|e| {
954                    CuError::new_with_cause(
955                        &format!("Failed to open slab {} for seek", pos.slab_index),
956                        e,
957                    )
958                })?;
959            self.current_file = file;
960            self.current_mmap_buffer = mmap;
961            self.current_slab_index = pos.slab_index;
962        }
963        self.current_reading_position = pos.offset;
964        Ok(())
965    }
966
967    fn next_slab(&mut self) -> io::Result<()> {
968        self.current_slab_index += 1;
969        let (file, mmap, prolog, _) =
970            open_slab_index(&self.base_file_path, self.current_slab_index)?;
971        self.current_file = file;
972        self.current_mmap_buffer = mmap;
973        self.current_reading_position = prolog as usize;
974        Ok(())
975    }
976
977    pub fn raw_main_header(&self) -> &MainHeader {
978        &self.main_header
979    }
980
981    pub fn scan_section_bytes(&mut self, datalogtype: UnifiedLogType) -> CuResult<u64> {
982        let mut total = 0u64;
983
984        loop {
985            if self.current_reading_position >= self.current_mmap_buffer.len() {
986                self.next_slab().map_err(|e| {
987                    CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
988                })?;
989            }
990
991            let header = self.read_section_header()?;
992
993            if header.entry_type == UnifiedLogType::LastEntry {
994                return Ok(total);
995            }
996
997            if header.entry_type == datalogtype {
998                total = total.saturating_add(header.used as u64);
999            }
1000
1001            self.current_reading_position += header.offset_to_next_section as usize;
1002        }
1003    }
1004
1005    /// Reads the section content from the section header pos.
1006    fn read_section_content(&mut self, header: &SectionHeader) -> CuResult<Vec<u8>> {
1007        // TODO: we could optimize by asking the buffer to fill
1008        let mut section_data = vec![0; header.used as usize];
1009        let start_of_data = self.current_reading_position + header.block_size as usize;
1010        section_data.copy_from_slice(
1011            &self.current_mmap_buffer[start_of_data..start_of_data + header.used as usize],
1012        );
1013
1014        Ok(section_data)
1015    }
1016
1017    fn read_section_header(&mut self) -> CuResult<SectionHeader> {
1018        let section_header: SectionHeader;
1019        (section_header, _) = decode_from_slice(
1020            &self.current_mmap_buffer[self.current_reading_position..],
1021            standard(),
1022        )
1023        .map_err(|e| {
1024            CuError::new_with_cause(
1025                &format!(
1026                    "Could not read a sections header: {}/{}:{}",
1027                    self.base_file_path.as_os_str().to_string_lossy(),
1028                    self.current_slab_index,
1029                    self.current_reading_position,
1030                ),
1031                e,
1032            )
1033        })?;
1034        if section_header.magic != SECTION_MAGIC {
1035            return Err("Invalid magic number in section header".into());
1036        }
1037
1038        Ok(section_header)
1039    }
1040}
1041
1042/// This a convenience wrapper around the UnifiedLoggerRead to implement the Read trait.
1043pub struct UnifiedLoggerIOReader {
1044    logger: MmapUnifiedLoggerRead,
1045    log_type: UnifiedLogType,
1046    buffer: Vec<u8>,
1047    buffer_pos: usize,
1048}
1049
1050impl UnifiedLoggerIOReader {
1051    pub fn new(logger: MmapUnifiedLoggerRead, log_type: UnifiedLogType) -> Self {
1052        Self {
1053            logger,
1054            log_type,
1055            buffer: Vec::new(),
1056            buffer_pos: 0,
1057        }
1058    }
1059
1060    /// returns true if there is more data to read.
1061    fn fill_buffer(&mut self) -> io::Result<bool> {
1062        match self.logger.read_next_section_type(self.log_type) {
1063            Ok(Some(section)) => {
1064                self.buffer = section;
1065                self.buffer_pos = 0;
1066                Ok(true)
1067            }
1068            Ok(None) => Ok(false), // No more sections of this type
1069            Err(e) => Err(io::Error::other(e.to_string())),
1070        }
1071    }
1072}
1073
1074impl Read for UnifiedLoggerIOReader {
1075    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1076        if self.buffer_pos >= self.buffer.len() && !self.fill_buffer()? {
1077            // This means we hit the last section.
1078            return Ok(0);
1079        }
1080
1081        // If we still have no data after trying to fill the buffer, we're at EOF
1082        if self.buffer_pos >= self.buffer.len() {
1083            return Ok(0);
1084        }
1085
1086        // Copy as much as we can from the buffer to `buf`
1087        let len = std::cmp::min(buf.len(), self.buffer.len() - self.buffer_pos);
1088        buf[..len].copy_from_slice(&self.buffer[self.buffer_pos..self.buffer_pos + len]);
1089        self.buffer_pos += len;
1090        Ok(len)
1091    }
1092}
1093
1094#[cfg(feature = "std")]
1095#[cfg(test)]
1096mod tests {
1097    use super::*;
1098    use crate::stream_write;
1099    use bincode::de::read::SliceReader;
1100    use bincode::{Decode, Encode, decode_from_reader, decode_from_slice};
1101    use cu29_traits::WriteStream;
1102    use std::io::{Seek, SeekFrom, Write};
1103    use std::path::PathBuf;
1104    use std::sync::{Arc, Mutex};
1105    use tempfile::TempDir;
1106
1107    const LARGE_SLAB: usize = 100 * 1024; // 100KB
1108    const SMALL_SLAB: usize = 16 * 2 * 1024; // 16KB is the page size on MacOS for example
1109
1110    fn make_a_logger(
1111        tmp_dir: &TempDir,
1112        slab_size: usize,
1113    ) -> (Arc<Mutex<MmapUnifiedLoggerWrite>>, PathBuf) {
1114        let file_path = tmp_dir.path().join("test.bin");
1115        let MmapUnifiedLogger::Write(data_logger) = MmapUnifiedLoggerBuilder::new()
1116            .write(true)
1117            .create(true)
1118            .file_base_name(&file_path)
1119            .preallocated_size(slab_size)
1120            .build()
1121            .expect("Failed to create logger")
1122        else {
1123            panic!("Failed to create logger")
1124        };
1125
1126        (Arc::new(Mutex::new(data_logger)), file_path)
1127    }
1128
1129    #[test]
1130    fn test_truncation_and_sections_creations() {
1131        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1132        let file_path = tmp_dir.path().join("test.bin");
1133        let _used = {
1134            let MmapUnifiedLogger::Write(mut logger) = MmapUnifiedLoggerBuilder::new()
1135                .write(true)
1136                .create(true)
1137                .file_base_name(&file_path)
1138                .preallocated_size(100000)
1139                .build()
1140                .expect("Failed to create logger")
1141            else {
1142                panic!("Failed to create logger")
1143            };
1144            logger
1145                .add_section(UnifiedLogType::StructuredLogLine, 1024)
1146                .unwrap();
1147            logger
1148                .add_section(UnifiedLogType::CopperList, 2048)
1149                .unwrap();
1150            let used = logger.front_slab.used();
1151            assert!(used < 4 * page_size::get()); // ie. 3 headers, 1 page max per
1152            // logger drops
1153
1154            used
1155        };
1156
1157        let _file = OpenOptions::new()
1158            .read(true)
1159            .open(tmp_dir.path().join("test_0.bin"))
1160            .expect("Could not reopen the file");
1161        // Check if we have correctly truncated the file
1162        // TODO: recompute this math
1163        //assert_eq!(
1164        //    file.metadata().unwrap().len(),
1165        //    (used + size_of::<SectionHeader>()) as u64
1166        //);
1167    }
1168
1169    #[test]
1170    fn test_unsupported_main_header_format_version_is_rejected() {
1171        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1172        let file_path = tmp_dir.path().join("test.bin");
1173        {
1174            let MmapUnifiedLogger::Write(_logger) = MmapUnifiedLoggerBuilder::new()
1175                .write(true)
1176                .create(true)
1177                .file_base_name(&file_path)
1178                .preallocated_size(100000)
1179                .build()
1180                .expect("Failed to create logger")
1181            else {
1182                panic!("Failed to create logger")
1183            };
1184        }
1185
1186        let mut file = OpenOptions::new()
1187            .read(true)
1188            .write(true)
1189            .open(tmp_dir.path().join("test_0.bin"))
1190            .expect("Could not reopen the slab");
1191        let unsupported_version = UNIFIED_LOG_FORMAT_VERSION + 1;
1192        file.seek(SeekFrom::Start(MAIN_MAGIC.len() as u64))
1193            .expect("Could not seek to format version");
1194        file.write_all(&[unsupported_version])
1195            .expect("Could not write unsupported format version");
1196        drop(file);
1197
1198        let err = match MmapUnifiedLoggerBuilder::new()
1199            .file_base_name(&file_path)
1200            .build()
1201        {
1202            Ok(_) => panic!("Reader accepted unsupported unified log format version"),
1203            Err(err) => err,
1204        };
1205
1206        assert_eq!(err.kind(), io::ErrorKind::InvalidData);
1207        assert_eq!(
1208            err.to_string(),
1209            format!(
1210                "Unsupported unified log format version {unsupported_version} in main header; this reader supports version {UNIFIED_LOG_FORMAT_VERSION}"
1211            )
1212        );
1213    }
1214
1215    #[test]
1216    fn test_base_alias_exists_and_matches_first_slab() {
1217        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1218        let file_path = tmp_dir.path().join("test.bin");
1219        let _logger = MmapUnifiedLoggerBuilder::new()
1220            .write(true)
1221            .create(true)
1222            .file_base_name(&file_path)
1223            .preallocated_size(LARGE_SLAB)
1224            .build()
1225            .expect("Failed to create logger");
1226
1227        let first_slab = build_slab_path(&file_path, 0).expect("Failed to build first slab path");
1228        assert!(file_path.exists(), "base alias does not exist");
1229        assert!(first_slab.exists(), "first slab does not exist");
1230
1231        let alias_bytes = std::fs::read(&file_path).expect("Failed to read base alias");
1232        let slab_bytes = std::fs::read(&first_slab).expect("Failed to read first slab");
1233        assert_eq!(alias_bytes, slab_bytes);
1234    }
1235
1236    #[test]
1237    fn test_one_section_self_cleaning() {
1238        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1239        let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
1240        {
1241            let _stream = stream_write::<(), MmapSectionStorage>(
1242                logger.clone(),
1243                UnifiedLogType::StructuredLogLine,
1244                1024,
1245            );
1246            assert_eq!(
1247                logger
1248                    .lock()
1249                    .unwrap()
1250                    .front_slab
1251                    .sections_offsets_in_flight
1252                    .len(),
1253                1
1254            );
1255        }
1256        assert_eq!(
1257            logger
1258                .lock()
1259                .unwrap()
1260                .front_slab
1261                .sections_offsets_in_flight
1262                .len(),
1263            0
1264        );
1265        let logger = logger.lock().unwrap();
1266        assert_eq!(
1267            logger.front_slab.flushed_until_offset,
1268            logger.front_slab.current_global_position
1269        );
1270    }
1271
1272    #[test]
1273    fn test_temporary_end_marker_is_created() {
1274        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1275        let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
1276        {
1277            let mut stream = stream_write::<u32, MmapSectionStorage>(
1278                logger.clone(),
1279                UnifiedLogType::StructuredLogLine,
1280                1024,
1281            )
1282            .unwrap();
1283            stream.log(&42u32).unwrap();
1284        }
1285
1286        let logger_guard = logger.lock().unwrap();
1287        let slab = &logger_guard.front_slab;
1288        let marker_start = slab
1289            .temporary_end_marker
1290            .expect("temporary end-of-log marker missing");
1291        let (eof_header, _) =
1292            decode_from_slice::<SectionHeader, _>(&slab.mmap_buffer[marker_start..], standard())
1293                .expect("Could not decode end-of-log marker header");
1294        assert_eq!(eof_header.entry_type, UnifiedLogType::LastEntry);
1295        assert!(eof_header.is_open);
1296        assert_eq!(eof_header.used, 0);
1297    }
1298
1299    #[test]
1300    fn test_final_end_marker_is_not_temporary() {
1301        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1302        let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
1303        {
1304            let mut stream = stream_write::<u32, MmapSectionStorage>(
1305                logger.clone(),
1306                UnifiedLogType::CopperList,
1307                1024,
1308            )
1309            .unwrap();
1310            stream.log(&1u32).unwrap();
1311        }
1312        drop(logger);
1313
1314        let MmapUnifiedLogger::Read(mut reader) = MmapUnifiedLoggerBuilder::new()
1315            .file_base_name(&f)
1316            .build()
1317            .expect("Failed to build reader")
1318        else {
1319            panic!("Failed to create reader");
1320        };
1321
1322        loop {
1323            let (header, _data) = reader
1324                .raw_read_section()
1325                .expect("Failed to read section while searching for EOF");
1326            if header.entry_type == UnifiedLogType::LastEntry {
1327                assert!(!header.is_open);
1328                break;
1329            }
1330        }
1331    }
1332
1333    #[test]
1334    fn test_two_sections_self_cleaning_in_order() {
1335        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1336        let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
1337        let s1 = stream_write::<(), MmapSectionStorage>(
1338            logger.clone(),
1339            UnifiedLogType::StructuredLogLine,
1340            1024,
1341        );
1342        assert_eq!(
1343            logger
1344                .lock()
1345                .unwrap()
1346                .front_slab
1347                .sections_offsets_in_flight
1348                .len(),
1349            1
1350        );
1351        let s2 = stream_write::<(), MmapSectionStorage>(
1352            logger.clone(),
1353            UnifiedLogType::StructuredLogLine,
1354            1024,
1355        );
1356        assert_eq!(
1357            logger
1358                .lock()
1359                .unwrap()
1360                .front_slab
1361                .sections_offsets_in_flight
1362                .len(),
1363            2
1364        );
1365        drop(s2);
1366        assert_eq!(
1367            logger
1368                .lock()
1369                .unwrap()
1370                .front_slab
1371                .sections_offsets_in_flight
1372                .len(),
1373            1
1374        );
1375        drop(s1);
1376        let lg = logger.lock().unwrap();
1377        assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
1378        assert_eq!(
1379            lg.front_slab.flushed_until_offset,
1380            lg.front_slab.current_global_position
1381        );
1382    }
1383
1384    #[test]
1385    fn test_two_sections_self_cleaning_out_of_order() {
1386        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1387        let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
1388        let s1 = stream_write::<(), MmapSectionStorage>(
1389            logger.clone(),
1390            UnifiedLogType::StructuredLogLine,
1391            1024,
1392        );
1393        assert_eq!(
1394            logger
1395                .lock()
1396                .unwrap()
1397                .front_slab
1398                .sections_offsets_in_flight
1399                .len(),
1400            1
1401        );
1402        let s2 = stream_write::<(), MmapSectionStorage>(
1403            logger.clone(),
1404            UnifiedLogType::StructuredLogLine,
1405            1024,
1406        );
1407        assert_eq!(
1408            logger
1409                .lock()
1410                .unwrap()
1411                .front_slab
1412                .sections_offsets_in_flight
1413                .len(),
1414            2
1415        );
1416        drop(s1);
1417        assert_eq!(
1418            logger
1419                .lock()
1420                .unwrap()
1421                .front_slab
1422                .sections_offsets_in_flight
1423                .len(),
1424            1
1425        );
1426        drop(s2);
1427        let lg = logger.lock().unwrap();
1428        assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
1429        assert_eq!(
1430            lg.front_slab.flushed_until_offset,
1431            lg.front_slab.current_global_position
1432        );
1433    }
1434
1435    #[test]
1436    fn test_closed_section_flushes_behind_open_earlier_section() {
1437        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1438        let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
1439        let s1 = stream_write::<(), MmapSectionStorage>(
1440            logger.clone(),
1441            UnifiedLogType::StructuredLogLine,
1442            1024,
1443        )
1444        .unwrap();
1445        {
1446            let mut s2 = stream_write::<u32, MmapSectionStorage>(
1447                logger.clone(),
1448                UnifiedLogType::CopperList,
1449                1024,
1450            )
1451            .unwrap();
1452            s2.log(&42u32).unwrap();
1453        }
1454
1455        let logger_guard = logger.lock().unwrap();
1456        assert_eq!(logger_guard.front_slab.sections_offsets_in_flight.len(), 1);
1457        assert!(
1458            logger_guard.front_slab.flushed_until_offset
1459                < logger_guard.front_slab.current_global_position
1460        );
1461        assert_eq!(logger_guard.front_slab.pending_closed_bytes(), 0);
1462        drop(logger_guard);
1463        drop(s1);
1464    }
1465
1466    #[test]
1467    fn test_write_then_read_one_section() {
1468        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1469        let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
1470        {
1471            let mut stream =
1472                stream_write(logger.clone(), UnifiedLogType::StructuredLogLine, 1024).unwrap();
1473            stream.log(&1u32).unwrap();
1474            stream.log(&2u32).unwrap();
1475            stream.log(&3u32).unwrap();
1476        }
1477        drop(logger);
1478        let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
1479            .file_base_name(&f)
1480            .build()
1481            .expect("Failed to build logger")
1482        else {
1483            panic!("Failed to build logger");
1484        };
1485        let section = dl
1486            .read_next_section_type(UnifiedLogType::StructuredLogLine)
1487            .expect("Failed to read section");
1488        assert!(section.is_some());
1489        let section = section.unwrap();
1490        let mut reader = SliceReader::new(&section[..]);
1491        let v1: u32 = decode_from_reader(&mut reader, standard()).unwrap();
1492        let v2: u32 = decode_from_reader(&mut reader, standard()).unwrap();
1493        let v3: u32 = decode_from_reader(&mut reader, standard()).unwrap();
1494        assert_eq!(v1, 1);
1495        assert_eq!(v2, 2);
1496        assert_eq!(v3, 3);
1497    }
1498
1499    #[cfg(feature = "mmap-fsync")]
1500    #[test]
1501    fn test_fsync_feature_syncs_on_section_flush() {
1502        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1503        let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
1504        {
1505            let mut stream =
1506                stream_write(logger.clone(), UnifiedLogType::StructuredLogLine, 1024).unwrap();
1507            stream.log(&1u32).unwrap();
1508        }
1509
1510        let logger = logger.lock().unwrap();
1511        assert!(
1512            logger.front_slab.sync_call_count > 0,
1513            "expected mmap-fsync to issue at least one sync_all call"
1514        );
1515    }
1516
1517    /// Mimic a basic CopperList implementation.
1518
1519    #[derive(Debug, Encode, Decode)]
1520    enum CopperListStateMock {
1521        Free,
1522        ProcessingTasks,
1523        BeingSerialized,
1524    }
1525
1526    #[derive(Encode, Decode)]
1527    struct CopperList<P: bincode::enc::Encode> {
1528        state: CopperListStateMock,
1529        payload: P, // This is generated from the runtime.
1530    }
1531
1532    #[test]
1533    fn test_copperlist_list_like_logging() {
1534        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1535        let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
1536        {
1537            let mut stream =
1538                stream_write(logger.clone(), UnifiedLogType::CopperList, 1024).unwrap();
1539            let cl0 = CopperList {
1540                state: CopperListStateMock::Free,
1541                payload: (1u32, 2u32, 3u32),
1542            };
1543            let cl1 = CopperList {
1544                state: CopperListStateMock::ProcessingTasks,
1545                payload: (4u32, 5u32, 6u32),
1546            };
1547            stream.log(&cl0).unwrap();
1548            stream.log(&cl1).unwrap();
1549        }
1550        drop(logger);
1551
1552        let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
1553            .file_base_name(&f)
1554            .build()
1555            .expect("Failed to build logger")
1556        else {
1557            panic!("Failed to build logger");
1558        };
1559        let section = dl
1560            .read_next_section_type(UnifiedLogType::CopperList)
1561            .expect("Failed to read section");
1562        assert!(section.is_some());
1563        let section = section.unwrap();
1564
1565        let mut reader = SliceReader::new(&section[..]);
1566        let cl0: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
1567        let cl1: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
1568        assert_eq!(cl0.payload.1, 2);
1569        assert_eq!(cl1.payload.2, 6);
1570    }
1571
1572    #[test]
1573    fn test_multi_slab_end2end() {
1574        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1575        let (logger, f) = make_a_logger(&tmp_dir, SMALL_SLAB);
1576        {
1577            let mut stream =
1578                stream_write(logger.clone(), UnifiedLogType::CopperList, 1024).unwrap();
1579            let cl0 = CopperList {
1580                state: CopperListStateMock::Free,
1581                payload: (1u32, 2u32, 3u32),
1582            };
1583            // large enough so we are sure to create a few slabs
1584            for _ in 0..10000 {
1585                stream.log(&cl0).unwrap();
1586            }
1587        }
1588        drop(logger);
1589
1590        let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
1591            .file_base_name(&f)
1592            .build()
1593            .expect("Failed to build logger")
1594        else {
1595            panic!("Failed to build logger");
1596        };
1597        let mut total_readback = 0;
1598        loop {
1599            let section = dl.read_next_section_type(UnifiedLogType::CopperList);
1600            if section.is_err() {
1601                break;
1602            }
1603            let section = section.unwrap();
1604            if section.is_none() {
1605                break;
1606            }
1607            let section = section.unwrap();
1608
1609            let mut reader = SliceReader::new(&section[..]);
1610            loop {
1611                let maybe_cl: Result<CopperList<(u32, u32, u32)>, _> =
1612                    decode_from_reader(&mut reader, standard());
1613                if maybe_cl.is_ok() {
1614                    total_readback += 1;
1615                } else {
1616                    break;
1617                }
1618            }
1619        }
1620        assert_eq!(total_readback, 10000);
1621    }
1622}