cu29_unifiedlog/
lib.rs

1use memmap2::{Mmap, MmapMut};
2use std::fmt::{Debug, Formatter};
3use std::fs::{File, OpenOptions};
4use std::io::Read;
5use std::mem::ManuallyDrop;
6use std::path::{Path, PathBuf};
7use std::slice::from_raw_parts_mut;
8use std::sync::{Arc, Mutex};
9use std::{io, mem};
10
11use bincode::config::standard;
12use bincode::decode_from_slice;
13use bincode::encode_into_slice;
14use bincode::error::EncodeError;
15use bincode::{Decode, Encode};
16use cu29_traits::{CuError, CuResult, UnifiedLogType, WriteStream};
17
18const MAIN_MAGIC: [u8; 4] = [0xB4, 0xA5, 0x50, 0xFF];
19
20const SECTION_MAGIC: [u8; 2] = [0xFA, 0x57];
21
22/// The main file header of the datalogger.
23#[derive(Encode, Decode, Debug)]
24struct MainHeader {
25    magic: [u8; 4],            // Magic number to identify the file.
26    first_section_offset: u16, // This is to align with a page at write time.
27    page_size: u16,
28}
29
30/// Each concurrent sublogger is tracked through a section header.
31/// They form a linked list of sections.
32/// The entry type is used to identify the type of data in the section.
33#[derive(Encode, Decode, Debug)]
34pub struct SectionHeader {
35    magic: [u8; 2], // Magic number to identify the section.
36    entry_type: UnifiedLogType,
37    section_size: u32, // offset from the first byte of this header to the first byte of the next header (MAGIC to MAGIC).
38    filled_size: u32,  // how much of the section is filled.
39}
40
41const MAX_HEADER_SIZE: usize = mem::size_of::<SectionHeader>() + 3usize; // 3 == additional worse case scenario for the 3 int variable encoding
42
43impl Default for SectionHeader {
44    fn default() -> Self {
45        Self {
46            magic: SECTION_MAGIC,
47            entry_type: UnifiedLogType::Empty,
48            section_size: 0,
49            filled_size: 0,
50        }
51    }
52}
53
54/// A wrapper around a memory mapped file to write to.
55struct MmapStream {
56    entry_type: UnifiedLogType,
57    parent_logger: Arc<Mutex<UnifiedLoggerWrite>>,
58    current_section: SectionHandle,
59    current_position: usize,
60    minimum_allocation_amount: usize,
61}
62
63impl MmapStream {
64    fn new(
65        entry_type: UnifiedLogType,
66        parent_logger: Arc<Mutex<UnifiedLoggerWrite>>,
67        minimum_allocation_amount: usize,
68    ) -> Self {
69        let section = parent_logger
70            .lock()
71            .unwrap()
72            .add_section(entry_type, minimum_allocation_amount);
73        Self {
74            entry_type,
75            parent_logger,
76            current_section: section,
77            current_position: 0,
78            minimum_allocation_amount,
79        }
80    }
81}
82
83impl Debug for MmapStream {
84    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
85        write!(f, "MmapStream {{ entry_type: {:?}, current_position: {}, minimum_allocation_amount: {} }}", self.entry_type, self.current_position, self.minimum_allocation_amount)
86    }
87}
88
89impl<E: Encode> WriteStream<E> for MmapStream {
90    fn log(&mut self, obj: &E) -> CuResult<()> {
91        let dst = self.current_section.get_user_buffer();
92        let result = encode_into_slice(obj, dst, standard());
93        match result {
94            Ok(nb_bytes) => {
95                self.current_position += nb_bytes;
96                self.current_section.used += nb_bytes as u32;
97                Ok(())
98            }
99            Err(e) => match e {
100                EncodeError::UnexpectedEnd => {
101                    let mut logger_guard = self.parent_logger.lock().unwrap();
102                    logger_guard.flush_section(&mut self.current_section);
103                    self.current_section =
104                        logger_guard.add_section(self.entry_type, self.minimum_allocation_amount);
105
106                    let result = encode_into_slice(
107                        obj,
108                        self.current_section.get_user_buffer(),
109                        standard(),
110                    )
111                    .expect(
112                        "Failed to encode object in a newly minted section. Unrecoverable failure.",
113                    ); // If we fail just after creating a section, there is not much we can do, we need to bail.
114                    self.current_position += result;
115                    self.current_section.used += result as u32;
116                    Ok(())
117                }
118                _ => {
119                    let err =
120                        <&str as Into<CuError>>::into("Unexpected error while encoding object.")
121                            .add_cause(e.to_string().as_str());
122                    Err(err)
123                }
124            },
125        }
126    }
127}
128
129impl Drop for MmapStream {
130    fn drop(&mut self) {
131        let mut logger_guard = self.parent_logger.lock().unwrap();
132        logger_guard.flush_section(&mut self.current_section);
133    }
134}
135
136/// Create a new stream to write to the unifiedlogger.
137pub fn stream_write<E: Encode>(
138    logger: Arc<Mutex<UnifiedLoggerWrite>>,
139    entry_type: UnifiedLogType,
140    minimum_allocation_amount: usize,
141) -> impl WriteStream<E> {
142    MmapStream::new(entry_type, logger.clone(), minimum_allocation_amount)
143}
144
145/// Holder of the read or write side of the datalogger.
146pub enum UnifiedLogger {
147    Read(UnifiedLoggerRead),
148    Write(UnifiedLoggerWrite),
149}
150
151/// Use this builder to create a new DataLogger.
152pub struct UnifiedLoggerBuilder {
153    file_base_name: Option<PathBuf>,
154    preallocated_size: Option<usize>,
155    write: bool,
156    create: bool,
157}
158
159impl Default for UnifiedLoggerBuilder {
160    fn default() -> Self {
161        Self::new()
162    }
163}
164
165impl UnifiedLoggerBuilder {
166    pub fn new() -> Self {
167        Self {
168            file_base_name: None,
169            preallocated_size: None,
170            write: false,
171            create: false, // This is the safest default
172        }
173    }
174
175    /// If "something/toto.copper" is given, it will find or create "something/toto_0.copper",  "something/toto_1.copper" etc.
176    pub fn file_base_name(mut self, file_path: &Path) -> Self {
177        self.file_base_name = Some(file_path.to_path_buf());
178        self
179    }
180
181    pub fn preallocated_size(mut self, preallocated_size: usize) -> Self {
182        self.preallocated_size = Some(preallocated_size);
183        self
184    }
185
186    pub fn write(mut self, write: bool) -> Self {
187        self.write = write;
188        self
189    }
190
191    pub fn create(mut self, create: bool) -> Self {
192        self.create = create;
193        self
194    }
195
196    pub fn build(self) -> io::Result<UnifiedLogger> {
197        let page_size = page_size::get();
198
199        if self.write && self.create {
200            let ulw = UnifiedLoggerWrite::new(
201                &self.file_base_name.unwrap(),
202                self.preallocated_size.unwrap(),
203                page_size,
204            );
205
206            Ok(UnifiedLogger::Write(ulw))
207        } else {
208            let file_path = self.file_base_name.ok_or_else(|| {
209                io::Error::new(io::ErrorKind::InvalidInput, "File path is required")
210            })?;
211            let ulr = UnifiedLoggerRead::new(&file_path)?;
212            Ok(UnifiedLogger::Read(ulr))
213        }
214    }
215}
216
217/// A read side of the datalogger.
218pub struct UnifiedLoggerRead {
219    base_file_path: PathBuf,
220    current_mmap_buffer: Mmap,
221    current_file: File,
222    current_slab_index: usize,
223    current_reading_position: usize,
224}
225
226struct SlabEntry {
227    file: File,
228    mmap_buffer: ManuallyDrop<MmapMut>,
229    current_global_position: usize,
230    sections_offsets_in_flight: Vec<usize>,
231    flushed_until_offset: usize,
232    page_size: usize,
233}
234
235impl Drop for SlabEntry {
236    fn drop(&mut self) {
237        self.flush_until(self.current_global_position);
238        unsafe { ManuallyDrop::drop(&mut self.mmap_buffer) };
239        self.file
240            .set_len(self.current_global_position as u64)
241            .expect("Failed to trim datalogger file");
242
243        if !self.sections_offsets_in_flight.is_empty() {
244            eprintln!("Error: Slab not full flushed.");
245        }
246    }
247}
248
249pub enum AllocatedSection {
250    NoMoreSpace,
251    Section(SectionHandle),
252}
253
254impl SlabEntry {
255    fn new(file: File, page_size: usize) -> Self {
256        let mmap_buffer =
257            ManuallyDrop::new(unsafe { MmapMut::map_mut(&file).expect("Failed to map file") });
258        Self {
259            file,
260            mmap_buffer,
261            current_global_position: 0,
262            sections_offsets_in_flight: Vec::with_capacity(16),
263            flushed_until_offset: 0,
264            page_size,
265        }
266    }
267
268    /// Unsure the underlying mmap is flush to disk until the given position.
269    fn flush_until(&mut self, until_position: usize) {
270        // This is tolerated under linux, but crashes on macos
271        if (self.flushed_until_offset == until_position) || (until_position == 0) {
272            return;
273        }
274        self.mmap_buffer
275            .flush_async_range(
276                self.flushed_until_offset,
277                until_position - self.flushed_until_offset,
278            )
279            .expect("Failed to flush memory map");
280        self.flushed_until_offset = until_position;
281    }
282
283    fn is_it_my_section(&self, section: &SectionHandle) -> bool {
284        (section.buffer.as_ptr() >= self.mmap_buffer.as_ptr())
285            && (section.buffer.as_ptr() as usize)
286                < (self.mmap_buffer.as_ref().as_ptr() as usize + self.mmap_buffer.as_ref().len())
287    }
288
289    /// Flush the section to disk.
290    /// the flushing is permanent and the section is considered closed.
291    fn flush_section(&mut self, section: &mut SectionHandle) {
292        if section.buffer.as_ptr() < self.mmap_buffer.as_ptr()
293            || section.buffer.as_ptr() as usize
294                > self.mmap_buffer.as_ptr() as usize + self.mmap_buffer.len()
295        {
296            panic!("Invalid section buffer, not in the slab");
297        }
298
299        // Be sure that the header reflects the actual size of the section.
300        section.update_header();
301
302        let _sz = encode_into_slice(&section.section_header, section.buffer, standard())
303            .expect("Failed to encode section header");
304
305        let base = self.mmap_buffer.as_ptr() as usize;
306        let section_buffer_addr = section.buffer.as_ptr() as usize;
307        self.sections_offsets_in_flight
308            .retain(|&x| x != section_buffer_addr - base);
309
310        if self.sections_offsets_in_flight.is_empty() {
311            self.flush_until(self.current_global_position);
312            return;
313        }
314        if self.flushed_until_offset < self.sections_offsets_in_flight[0] {
315            self.flush_until(self.sections_offsets_in_flight[0]);
316        }
317    }
318
319    #[inline]
320    fn align_to_next_page(&self, ptr: usize) -> usize {
321        (ptr + self.page_size - 1) & !(self.page_size - 1)
322    }
323
324    /// The returned slice is section_size or greater.
325    fn add_section(
326        &mut self,
327        entry_type: UnifiedLogType,
328        requested_section_size: usize,
329    ) -> AllocatedSection {
330        // align current_position to the next page
331        self.current_global_position = self.align_to_next_page(self.current_global_position);
332        let section_size = self.align_to_next_page(requested_section_size) as u32;
333
334        // We need to have enough space to store the section in that slab
335        if self.current_global_position + section_size as usize > self.mmap_buffer.len() {
336            return AllocatedSection::NoMoreSpace;
337        }
338
339        let section_header = SectionHeader {
340            magic: SECTION_MAGIC,
341            entry_type,
342            section_size,
343            filled_size: 0u32,
344        };
345
346        let nb_bytes = encode_into_slice(
347            &section_header,
348            &mut self.mmap_buffer[self.current_global_position..],
349            standard(),
350        )
351        .expect("Failed to encode section header");
352        assert!(nb_bytes < self.page_size);
353
354        // save the position to keep track for in flight sections
355        self.sections_offsets_in_flight
356            .push(self.current_global_position);
357        let end_of_section = self.current_global_position + requested_section_size;
358        let user_buffer = &mut self.mmap_buffer[self.current_global_position..end_of_section];
359
360        // here we have the guarantee for exclusive access to that memory for the lifetime of the handle, the borrow checker cannot understand that ever.
361        let handle_buffer =
362            unsafe { from_raw_parts_mut(user_buffer.as_mut_ptr(), user_buffer.len()) };
363
364        self.current_global_position = end_of_section;
365
366        AllocatedSection::Section(SectionHandle::create(section_header, handle_buffer))
367    }
368
369    #[cfg(test)]
370    fn used(&self) -> usize {
371        self.current_global_position
372    }
373}
374
375/// A SectionHandle is a handle to a section in the datalogger.
376/// It allows to track the lifecycle of a section of the datalogger.
377#[derive(Default)]
378pub struct SectionHandle {
379    section_header: SectionHeader,
380    buffer: &'static mut [u8], // This includes the encoded header for end of section patching.
381    used: u32,                 // this is the size of the used part of the buffer.
382}
383
384// This is for a placeholder to unsure an orderly cleanup as we dodge the borrow checker.
385
386impl SectionHandle {
387    // The buffer is considered static as it is a dedicated piece for the section.
388    pub fn create(section_header: SectionHeader, buffer: &'static mut [u8]) -> Self {
389        // here we assume with are passed a valid section.
390        if buffer[0] != SECTION_MAGIC[0] || buffer[1] != SECTION_MAGIC[1] {
391            panic!("Invalid section buffer, magic number not found");
392        }
393
394        if buffer.len() < MAX_HEADER_SIZE {
395            panic!(
396                "Invalid section buffer, too small: {}, it needs to be > {}",
397                buffer.len(),
398                MAX_HEADER_SIZE
399            );
400        }
401
402        Self {
403            section_header,
404            buffer,
405            used: 0,
406        }
407    }
408    pub fn get_user_buffer(&mut self) -> &mut [u8] {
409        &mut self.buffer[MAX_HEADER_SIZE + self.used as usize..]
410    }
411
412    pub fn update_header(&mut self) {
413        // no need to do anything if we never used the section.
414        if self.section_header.entry_type == UnifiedLogType::Empty || self.used == 0 {
415            return;
416        }
417        self.section_header.filled_size = self.used;
418
419        // FIX ME: This was flushed before and cannot be written back to.
420        // let _sz = encode_into_slice(&self.section_header, &mut self.buffer, standard())
421        //     .expect("Failed to encode section header");
422    }
423}
424
425/// A write side of the datalogger.
426pub struct UnifiedLoggerWrite {
427    /// the front slab is the current active slab for any new section.
428    front_slab: SlabEntry,
429    /// the back slab is the previous slab that is being flushed.
430    back_slabs: Vec<SlabEntry>,
431    /// base file path to create the backing files from.
432    base_file_path: PathBuf,
433    /// allocation size for the backing files.
434    slab_size: usize,
435    /// current suffix for the backing files.
436    front_slab_suffix: usize,
437}
438
439fn build_slab_path(base_file_path: &Path, slab_index: usize) -> PathBuf {
440    let mut file_path = base_file_path.to_path_buf();
441    let file_name = file_path.file_name().unwrap().to_str().unwrap();
442    let mut file_name = file_name.split('.').collect::<Vec<&str>>();
443    let extension = file_name.pop().unwrap();
444    let file_name = file_name.join(".");
445    let file_name = format!("{file_name}_{slab_index}.{extension}");
446    file_path.set_file_name(file_name);
447    file_path
448}
449
450fn make_slab_file(base_file_path: &Path, slab_size: usize, slab_suffix: usize) -> File {
451    let file_path = build_slab_path(base_file_path, slab_suffix);
452    let file = OpenOptions::new()
453        .read(true)
454        .write(true)
455        .create(true)
456        .truncate(true)
457        .open(&file_path)
458        .unwrap_or_else(|_| panic!("Failed to open file: {}", file_path.display()));
459    file.set_len(slab_size as u64)
460        .expect("Failed to set file length");
461    file
462}
463
464impl UnifiedLoggerWrite {
465    fn next_slab(&mut self) -> File {
466        self.front_slab_suffix += 1;
467
468        make_slab_file(&self.base_file_path, self.slab_size, self.front_slab_suffix)
469    }
470
471    fn new(base_file_path: &Path, slab_size: usize, page_size: usize) -> Self {
472        let file = make_slab_file(base_file_path, slab_size, 0);
473        let mut front_slab = SlabEntry::new(file, page_size);
474
475        // This is the first slab so add the main header.
476        let main_header = MainHeader {
477            magic: MAIN_MAGIC,
478            first_section_offset: page_size as u16,
479            page_size: page_size as u16,
480        };
481        let nb_bytes = encode_into_slice(&main_header, &mut front_slab.mmap_buffer[..], standard())
482            .expect("Failed to encode main header");
483        assert!(nb_bytes < page_size);
484        front_slab.current_global_position = page_size; // align to the next page
485
486        Self {
487            front_slab,
488            back_slabs: Vec::new(),
489            base_file_path: base_file_path.to_path_buf(),
490            slab_size,
491            front_slab_suffix: 0,
492        }
493    }
494
495    pub fn flush_section(&mut self, section: &mut SectionHandle) {
496        for slab in self.back_slabs.iter_mut() {
497            if slab.is_it_my_section(section) {
498                slab.flush_section(section);
499                return;
500            }
501        }
502        self.front_slab.flush_section(section);
503    }
504
505    fn garbage_collect_backslabs(&mut self) {
506        self.back_slabs
507            .retain_mut(|slab| !slab.sections_offsets_in_flight.is_empty());
508    }
509
510    /// The returned slice is section_size or greater.
511    fn add_section(
512        &mut self,
513        entry_type: UnifiedLogType,
514        requested_section_size: usize,
515    ) -> SectionHandle {
516        self.garbage_collect_backslabs(); // Take the opportunity to keep up and close stale back slabs.
517        let maybe_section = self
518            .front_slab
519            .add_section(entry_type, requested_section_size);
520
521        match maybe_section {
522            AllocatedSection::NoMoreSpace => {
523                // move the front slab to the back slab.
524                let new_slab = SlabEntry::new(self.next_slab(), self.front_slab.page_size);
525                // keep the slab until all its sections has been flushed.
526                self.back_slabs
527                    .push(mem::replace(&mut self.front_slab, new_slab));
528                match self
529                    .front_slab
530                    .add_section(entry_type, requested_section_size)
531                {
532                    AllocatedSection::NoMoreSpace => {
533                        panic!("Failed to allocate a section in a new slab");
534                    }
535                    AllocatedSection::Section(section) => section,
536                }
537            }
538            AllocatedSection::Section(section) => section,
539        }
540    }
541
542    pub fn stats(&self) -> (usize, Vec<usize>, usize) {
543        (
544            self.front_slab.current_global_position,
545            self.front_slab.sections_offsets_in_flight.clone(),
546            self.back_slabs.len(),
547        )
548    }
549}
550
551impl Drop for UnifiedLoggerWrite {
552    fn drop(&mut self) {
553        let mut section = self.add_section(UnifiedLogType::LastEntry, 80); // TODO: determine that exactly
554        self.front_slab.flush_section(&mut section);
555        self.garbage_collect_backslabs();
556    }
557}
558
559fn open_slab_index(base_file_path: &Path, slab_index: usize) -> io::Result<(File, Mmap, u16)> {
560    let mut options = OpenOptions::new();
561    let options = options.read(true);
562
563    let file_path = build_slab_path(base_file_path, slab_index);
564    let file = options.open(file_path)?;
565    let mmap = unsafe { Mmap::map(&file) }?;
566    let mut prolog = 0u16;
567    if slab_index == 0 {
568        let main_header: MainHeader;
569        let _read: usize;
570        (main_header, _read) =
571            decode_from_slice(&mmap[..], standard()).expect("Failed to decode main header");
572        if main_header.magic != MAIN_MAGIC {
573            return Err(io::Error::new(
574                io::ErrorKind::InvalidData,
575                "Invalid magic number in main header",
576            ));
577        }
578        prolog = main_header.first_section_offset;
579    }
580    Ok((file, mmap, prolog))
581}
582
583impl UnifiedLoggerRead {
584    pub fn new(base_file_path: &Path) -> io::Result<Self> {
585        let (file, mmap, prolog) = open_slab_index(base_file_path, 0)?;
586
587        Ok(Self {
588            base_file_path: base_file_path.to_path_buf(),
589            current_file: file,
590            current_mmap_buffer: mmap,
591            current_slab_index: 0,
592            current_reading_position: prolog as usize,
593        })
594    }
595
596    fn next_slab(&mut self) -> io::Result<()> {
597        self.current_slab_index += 1;
598        let (file, mmap, prolog) = open_slab_index(&self.base_file_path, self.current_slab_index)?;
599        self.current_file = file;
600        self.current_mmap_buffer = mmap;
601        self.current_reading_position = prolog as usize;
602        Ok(())
603    }
604
605    pub fn read_next_section_type(
606        &mut self,
607        datalogtype: UnifiedLogType,
608    ) -> CuResult<Option<Vec<u8>>> {
609        // TODO: eventually implement a 0 copy of this too.
610        loop {
611            if self.current_reading_position >= self.current_mmap_buffer.len() {
612                self.next_slab().map_err(|e| {
613                    CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
614                })?;
615            }
616
617            let header_result = self.read_section_header();
618            if let Err(error) = header_result {
619                return Err(CuError::new_with_cause(
620                    "Could not read a sections header",
621                    error,
622                ));
623            };
624            let header = header_result.unwrap();
625
626            // Reached the end of file
627            if header.entry_type == UnifiedLogType::LastEntry {
628                return Ok(None);
629            }
630
631            // Found a section of the requested type
632            if header.entry_type == datalogtype {
633                let result = Some(self.read_section_content(&header)?);
634                self.current_reading_position += header.section_size as usize;
635                return Ok(result);
636            }
637
638            // Keep reading until we find the requested type
639            self.current_reading_position += header.section_size as usize;
640        }
641    }
642
643    /// Reads the section from the section header pos.
644    pub fn read_section(&mut self) -> CuResult<Vec<u8>> {
645        let read_result = self.read_section_header();
646        if let Err(error) = read_result {
647            return Err(CuError::new_with_cause(
648                "Could not read a sections header",
649                error,
650            ));
651        };
652
653        self.read_section_content(&read_result.unwrap())
654    }
655
656    /// Reads the section content from the section header pos.
657    fn read_section_content(&mut self, header: &SectionHeader) -> CuResult<Vec<u8>> {
658        // TODO: we could optimize by asking the buffer to fill
659        if header.filled_size == 0 {
660            eprintln!("Warning: read an empty section");
661        }
662        let mut section = vec![0; header.filled_size as usize];
663        let start_of_data = self.current_reading_position + MAX_HEADER_SIZE;
664        section.copy_from_slice(
665            &self.current_mmap_buffer[start_of_data..start_of_data + header.filled_size as usize],
666        );
667
668        Ok(section)
669    }
670
671    fn read_section_header(&mut self) -> CuResult<SectionHeader> {
672        let section_header: SectionHeader;
673        (section_header, _) = decode_from_slice(
674            &self.current_mmap_buffer[self.current_reading_position..],
675            standard(),
676        )
677        .expect("Failed to decode section header");
678        if section_header.magic != SECTION_MAGIC {
679            return Err("Invalid magic number in section header".into());
680        }
681        Ok(section_header)
682    }
683}
684
685/// This a convenience wrapper around the UnifiedLoggerRead to implement the Read trait.
686pub struct UnifiedLoggerIOReader {
687    logger: UnifiedLoggerRead,
688    log_type: UnifiedLogType,
689    buffer: Vec<u8>,
690    buffer_pos: usize,
691}
692
693impl UnifiedLoggerIOReader {
694    pub fn new(logger: UnifiedLoggerRead, log_type: UnifiedLogType) -> Self {
695        Self {
696            logger,
697            log_type,
698            buffer: Vec::new(),
699            buffer_pos: 0,
700        }
701    }
702
703    /// returns true if there is more data to read.
704    fn fill_buffer(&mut self) -> io::Result<bool> {
705        match self.logger.read_next_section_type(self.log_type) {
706            Ok(Some(section)) => {
707                self.buffer = section;
708                self.buffer_pos = 0;
709                Ok(true)
710            }
711            Ok(None) => Ok(false), // No more sections of this type
712            Err(e) => Err(io::Error::other(e.to_string())),
713        }
714    }
715}
716
717impl Read for UnifiedLoggerIOReader {
718    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
719        if self.buffer_pos >= self.buffer.len() && !self.fill_buffer()? {
720            // This means we hit the last section.
721            return Ok(0);
722        }
723
724        // If we still have no data after trying to fill the buffer, we're at EOF
725        if self.buffer_pos >= self.buffer.len() {
726            return Ok(0);
727        }
728
729        // Copy as much as we can from the buffer to `buf`
730        let len = std::cmp::min(buf.len(), self.buffer.len() - self.buffer_pos);
731        buf[..len].copy_from_slice(&self.buffer[self.buffer_pos..self.buffer_pos + len]);
732        self.buffer_pos += len;
733        Ok(len)
734    }
735}
736
737#[cfg(test)]
738mod tests {
739    use super::*;
740    use bincode::decode_from_reader;
741    use std::io::BufReader;
742    use std::path::PathBuf;
743    use tempfile::TempDir;
744
745    const LARGE_SLAB: usize = 100 * 1024; // 100KB
746    const SMALL_SLAB: usize = 16 * 2 * 1024; // 16KB is the page size on MacOS for example
747
748    fn make_a_logger(
749        tmp_dir: &TempDir,
750        slab_size: usize,
751    ) -> (Arc<Mutex<UnifiedLoggerWrite>>, PathBuf) {
752        let file_path = tmp_dir.path().join("test.bin");
753        let UnifiedLogger::Write(data_logger) = UnifiedLoggerBuilder::new()
754            .write(true)
755            .create(true)
756            .file_base_name(&file_path)
757            .preallocated_size(slab_size)
758            .build()
759            .expect("Failed to create logger")
760        else {
761            panic!("Failed to create logger")
762        };
763
764        (Arc::new(Mutex::new(data_logger)), file_path)
765    }
766
767    #[test]
768    fn test_truncation_and_sections_creations() {
769        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
770        let file_path = tmp_dir.path().join("test.bin");
771        let _used = {
772            let UnifiedLogger::Write(mut logger) = UnifiedLoggerBuilder::new()
773                .write(true)
774                .create(true)
775                .file_base_name(&file_path)
776                .preallocated_size(100000)
777                .build()
778                .expect("Failed to create logger")
779            else {
780                panic!("Failed to create logger")
781            };
782            logger.add_section(UnifiedLogType::StructuredLogLine, 1024);
783            logger.add_section(UnifiedLogType::CopperList, 2048);
784            let used = logger.front_slab.used();
785            assert!(used < 4 * page_size::get()); // ie. 3 headers, 1 page max per
786                                                  // logger drops
787
788            used
789        };
790
791        let _file = OpenOptions::new()
792            .read(true)
793            .open(tmp_dir.path().join("test_0.bin"))
794            .expect("Could not reopen the file");
795        // Check if we have correctly truncated the file
796        // TODO: recompute this math
797        //assert_eq!(
798        //    file.metadata().unwrap().len(),
799        //    (used + size_of::<SectionHeader>()) as u64
800        //);
801    }
802
803    #[test]
804    fn test_one_section_self_cleaning() {
805        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
806        let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
807        {
808            let _stream =
809                stream_write::<()>(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
810            assert_eq!(
811                logger
812                    .lock()
813                    .unwrap()
814                    .front_slab
815                    .sections_offsets_in_flight
816                    .len(),
817                1
818            );
819        }
820        assert_eq!(
821            logger
822                .lock()
823                .unwrap()
824                .front_slab
825                .sections_offsets_in_flight
826                .len(),
827            0
828        );
829        let logger = logger.lock().unwrap();
830        assert_eq!(
831            logger.front_slab.flushed_until_offset,
832            logger.front_slab.current_global_position
833        );
834    }
835
836    #[test]
837    fn test_two_sections_self_cleaning_in_order() {
838        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
839        let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
840        let s1 = stream_write::<()>(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
841        assert_eq!(
842            logger
843                .lock()
844                .unwrap()
845                .front_slab
846                .sections_offsets_in_flight
847                .len(),
848            1
849        );
850        let s2 = stream_write::<()>(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
851        assert_eq!(
852            logger
853                .lock()
854                .unwrap()
855                .front_slab
856                .sections_offsets_in_flight
857                .len(),
858            2
859        );
860        drop(s2);
861        assert_eq!(
862            logger
863                .lock()
864                .unwrap()
865                .front_slab
866                .sections_offsets_in_flight
867                .len(),
868            1
869        );
870        drop(s1);
871        let lg = logger.lock().unwrap();
872        assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
873        assert_eq!(
874            lg.front_slab.flushed_until_offset,
875            lg.front_slab.current_global_position
876        );
877    }
878
879    #[test]
880    fn test_two_sections_self_cleaning_out_of_order() {
881        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
882        let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
883        let s1 = stream_write::<()>(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
884        assert_eq!(
885            logger
886                .lock()
887                .unwrap()
888                .front_slab
889                .sections_offsets_in_flight
890                .len(),
891            1
892        );
893        let s2 = stream_write::<()>(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
894        assert_eq!(
895            logger
896                .lock()
897                .unwrap()
898                .front_slab
899                .sections_offsets_in_flight
900                .len(),
901            2
902        );
903        drop(s1);
904        assert_eq!(
905            logger
906                .lock()
907                .unwrap()
908                .front_slab
909                .sections_offsets_in_flight
910                .len(),
911            1
912        );
913        drop(s2);
914        let lg = logger.lock().unwrap();
915        assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
916        assert_eq!(
917            lg.front_slab.flushed_until_offset,
918            lg.front_slab.current_global_position
919        );
920    }
921
922    #[test]
923    fn test_write_then_read_one_section() {
924        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
925        let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
926        {
927            let mut stream = stream_write(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
928            stream.log(&1u32).unwrap();
929            stream.log(&2u32).unwrap();
930            stream.log(&3u32).unwrap();
931        }
932        drop(logger);
933        let UnifiedLogger::Read(mut dl) = UnifiedLoggerBuilder::new()
934            .file_base_name(&f)
935            .build()
936            .expect("Failed to build logger")
937        else {
938            panic!("Failed to build logger");
939        };
940        let section = dl
941            .read_next_section_type(UnifiedLogType::StructuredLogLine)
942            .expect("Failed to read section");
943        assert!(section.is_some());
944        let section = section.unwrap();
945
946        let mut reader = BufReader::new(&section[..]);
947        let v1: u32 = decode_from_reader(&mut reader, standard()).unwrap();
948        let v2: u32 = decode_from_reader(&mut reader, standard()).unwrap();
949        let v3: u32 = decode_from_reader(&mut reader, standard()).unwrap();
950        assert_eq!(v1, 1);
951        assert_eq!(v2, 2);
952        assert_eq!(v3, 3);
953    }
954
955    /// Mimic a basic CopperList implementation.
956
957    #[derive(Debug, Encode, Decode)]
958    enum CopperListStateMock {
959        Free,
960        ProcessingTasks,
961        BeingSerialized,
962    }
963
964    #[derive(Encode, Decode)]
965    struct CopperList<P: bincode::enc::Encode> {
966        state: CopperListStateMock,
967        payload: P, // This is generated from the runtime.
968    }
969
970    #[test]
971    fn test_copperlist_list_like_logging() {
972        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
973        let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
974        {
975            let mut stream = stream_write(logger.clone(), UnifiedLogType::CopperList, 1024);
976            let cl0 = CopperList {
977                state: CopperListStateMock::Free,
978                payload: (1u32, 2u32, 3u32),
979            };
980            let cl1 = CopperList {
981                state: CopperListStateMock::ProcessingTasks,
982                payload: (4u32, 5u32, 6u32),
983            };
984            stream.log(&cl0).unwrap();
985            stream.log(&cl1).unwrap();
986        }
987        drop(logger);
988
989        let UnifiedLogger::Read(mut dl) = UnifiedLoggerBuilder::new()
990            .file_base_name(&f)
991            .build()
992            .expect("Failed to build logger")
993        else {
994            panic!("Failed to build logger");
995        };
996        let section = dl
997            .read_next_section_type(UnifiedLogType::CopperList)
998            .expect("Failed to read section");
999        assert!(section.is_some());
1000        let section = section.unwrap();
1001
1002        let mut reader = BufReader::new(&section[..]);
1003        let cl0: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
1004        let cl1: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
1005        assert_eq!(cl0.payload.1, 2);
1006        assert_eq!(cl1.payload.2, 6);
1007    }
1008
1009    #[test]
1010    fn test_multi_slab_end2end() {
1011        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1012        let (logger, f) = make_a_logger(&tmp_dir, SMALL_SLAB);
1013        {
1014            let mut stream = stream_write(logger.clone(), UnifiedLogType::CopperList, 1024);
1015            let cl0 = CopperList {
1016                state: CopperListStateMock::Free,
1017                payload: (1u32, 2u32, 3u32),
1018            };
1019            // large enough so we are sure to create a few slabs
1020            for _ in 0..10000 {
1021                stream.log(&cl0).unwrap();
1022            }
1023        }
1024        drop(logger);
1025
1026        let UnifiedLogger::Read(mut dl) = UnifiedLoggerBuilder::new()
1027            .file_base_name(&f)
1028            .build()
1029            .expect("Failed to build logger")
1030        else {
1031            panic!("Failed to build logger");
1032        };
1033        let mut total_readback = 0;
1034        loop {
1035            let section = dl.read_next_section_type(UnifiedLogType::CopperList);
1036            if section.is_err() {
1037                break;
1038            }
1039            let section = section.unwrap();
1040            if section.is_none() {
1041                break;
1042            }
1043            let section = section.unwrap();
1044
1045            let mut reader = BufReader::new(&section[..]);
1046            loop {
1047                let maybe_cl: Result<CopperList<(u32, u32, u32)>, _> =
1048                    decode_from_reader(&mut reader, standard());
1049                if maybe_cl.is_ok() {
1050                    total_readback += 1;
1051                } else {
1052                    break;
1053                }
1054            }
1055        }
1056        assert_eq!(total_readback, 10000);
1057    }
1058}