1use crate::{
5 AllocatedSection, MAIN_MAGIC, MainHeader, SECTION_MAGIC, SectionHandle, SectionHeader,
6 SectionStorage, UnifiedLogRead, UnifiedLogStatus, UnifiedLogWrite,
7};
8
9use crate::SECTION_HEADER_COMPACT_SIZE;
10
11use AllocatedSection::Section;
12use bincode::config::standard;
13use bincode::error::EncodeError;
14use bincode::{Encode, decode_from_slice, encode_into_slice};
15use core::slice::from_raw_parts_mut;
16use cu29_traits::{CuError, CuResult, UnifiedLogType};
17use memmap2::{Mmap, MmapMut};
18use std::fs::{File, OpenOptions};
19use std::io::Read;
20use std::mem::ManuallyDrop;
21use std::path::{Path, PathBuf};
22use std::{io, mem};
23
24pub struct MmapSectionStorage {
25 buffer: &'static mut [u8],
26 offset: usize,
27 block_size: usize,
28}
29
30impl MmapSectionStorage {
31 pub fn new(buffer: &'static mut [u8], block_size: usize) -> Self {
32 Self {
33 buffer,
34 offset: 0,
35 block_size,
36 }
37 }
38
39 pub fn buffer_ptr(&self) -> *const u8 {
40 &self.buffer[0] as *const u8
41 }
42}
43
44impl SectionStorage for MmapSectionStorage {
45 fn initialize<E: Encode>(&mut self, header: &E) -> Result<usize, EncodeError> {
46 self.post_update_header(header)?;
47 self.offset = self.block_size;
48 Ok(self.offset)
49 }
50
51 fn post_update_header<E: Encode>(&mut self, header: &E) -> Result<usize, EncodeError> {
52 encode_into_slice(header, &mut self.buffer[0..], standard())
53 }
54
55 fn append<E: Encode>(&mut self, entry: &E) -> Result<usize, EncodeError> {
56 let size = encode_into_slice(entry, &mut self.buffer[self.offset..], standard())?;
57 self.offset += size;
58 Ok(size)
59 }
60
61 fn flush(&mut self) -> CuResult<usize> {
62 todo!()
63 }
64}
65
66pub enum MmapUnifiedLogger {
69 Read(MmapUnifiedLoggerRead),
70 Write(MmapUnifiedLoggerWrite),
71}
72
73pub struct MmapUnifiedLoggerBuilder {
75 file_base_name: Option<PathBuf>,
76 preallocated_size: Option<usize>,
77 write: bool,
78 create: bool,
79}
80
81impl Default for MmapUnifiedLoggerBuilder {
82 fn default() -> Self {
83 Self::new()
84 }
85}
86
87impl MmapUnifiedLoggerBuilder {
88 pub fn new() -> Self {
89 Self {
90 file_base_name: None,
91 preallocated_size: None,
92 write: false,
93 create: false, }
95 }
96
97 pub fn file_base_name(mut self, file_path: &Path) -> Self {
99 self.file_base_name = Some(file_path.to_path_buf());
100 self
101 }
102
103 pub fn preallocated_size(mut self, preallocated_size: usize) -> Self {
104 self.preallocated_size = Some(preallocated_size);
105 self
106 }
107
108 pub fn write(mut self, write: bool) -> Self {
109 self.write = write;
110 self
111 }
112
113 pub fn create(mut self, create: bool) -> Self {
114 self.create = create;
115 self
116 }
117
118 pub fn build(self) -> io::Result<MmapUnifiedLogger> {
119 let page_size = page_size::get();
120
121 if self.write && self.create {
122 let ulw = MmapUnifiedLoggerWrite::new(
123 &self
124 .file_base_name
125 .expect("This unified logger has no filename."),
126 self.preallocated_size
127 .expect("This unified logger has no preallocated size."),
128 page_size,
129 );
130
131 Ok(MmapUnifiedLogger::Write(ulw))
132 } else {
133 let file_path = self.file_base_name.ok_or_else(|| {
134 io::Error::new(io::ErrorKind::InvalidInput, "File path is required")
135 })?;
136 let ulr = MmapUnifiedLoggerRead::new(&file_path)?;
137 Ok(MmapUnifiedLogger::Read(ulr))
138 }
139 }
140}
141
142struct SlabEntry {
143 file: File,
144 mmap_buffer: ManuallyDrop<MmapMut>,
145 current_global_position: usize,
146 sections_offsets_in_flight: Vec<usize>,
147 flushed_until_offset: usize,
148 page_size: usize,
149 temporary_end_marker: Option<usize>,
150}
151
152impl Drop for SlabEntry {
153 fn drop(&mut self) {
154 self.flush_until(self.current_global_position);
155 unsafe { ManuallyDrop::drop(&mut self.mmap_buffer) };
156 self.file
157 .set_len(self.current_global_position as u64)
158 .expect("Failed to trim datalogger file");
159
160 if !self.sections_offsets_in_flight.is_empty() {
161 eprintln!("Error: Slab not full flushed.");
162 }
163 }
164}
165
166impl SlabEntry {
167 fn new(file: File, page_size: usize) -> Self {
168 let mmap_buffer =
169 ManuallyDrop::new(unsafe { MmapMut::map_mut(&file).expect("Failed to map file") });
170 Self {
171 file,
172 mmap_buffer,
173 current_global_position: 0,
174 sections_offsets_in_flight: Vec::with_capacity(16),
175 flushed_until_offset: 0,
176 page_size,
177 temporary_end_marker: None,
178 }
179 }
180
181 fn flush_until(&mut self, until_position: usize) {
183 if (self.flushed_until_offset == until_position) || (until_position == 0) {
185 return;
186 }
187 self.mmap_buffer
188 .flush_async_range(
189 self.flushed_until_offset,
190 until_position - self.flushed_until_offset,
191 )
192 .expect("Failed to flush memory map");
193 self.flushed_until_offset = until_position;
194 }
195
196 fn clear_temporary_end_marker(&mut self) {
197 if let Some(marker_start) = self.temporary_end_marker.take() {
198 self.current_global_position = marker_start;
199 if self.flushed_until_offset > marker_start {
200 self.flushed_until_offset = marker_start;
201 }
202 }
203 }
204
205 fn write_end_marker(&mut self, temporary: bool) -> CuResult<()> {
206 let block_size = SECTION_HEADER_COMPACT_SIZE as usize;
207 let marker_start = self.align_to_next_page(self.current_global_position);
208 let total_marker_size = block_size; let marker_end = marker_start + total_marker_size;
210 if marker_end > self.mmap_buffer.len() {
211 return Err("Not enough space to write end-of-log marker".into());
212 }
213
214 let header = SectionHeader {
215 magic: SECTION_MAGIC,
216 block_size: SECTION_HEADER_COMPACT_SIZE,
217 entry_type: UnifiedLogType::LastEntry,
218 offset_to_next_section: total_marker_size as u32,
219 used: 0,
220 is_open: temporary,
221 };
222
223 encode_into_slice(
224 &header,
225 &mut self.mmap_buffer
226 [marker_start..marker_start + SECTION_HEADER_COMPACT_SIZE as usize],
227 standard(),
228 )
229 .map_err(|e| CuError::new_with_cause("Failed to encode end-of-log header", e))?;
230
231 self.temporary_end_marker = Some(marker_start);
232 self.current_global_position = marker_end;
233 Ok(())
234 }
235
236 fn is_it_my_section(&self, section: &SectionHandle<MmapSectionStorage>) -> bool {
237 let storage = section.get_storage();
238 let ptr = storage.buffer_ptr();
239 (ptr >= self.mmap_buffer.as_ptr())
240 && (ptr as usize)
241 < (self.mmap_buffer.as_ref().as_ptr() as usize + self.mmap_buffer.as_ref().len())
242 }
243
244 fn flush_section(&mut self, section: &mut SectionHandle<MmapSectionStorage>) {
247 section
248 .post_update_header()
249 .expect("Failed to update section header");
250
251 let storage = section.get_storage();
252 let ptr = storage.buffer_ptr();
253
254 if ptr < self.mmap_buffer.as_ptr()
255 || ptr as usize > self.mmap_buffer.as_ptr() as usize + self.mmap_buffer.len()
256 {
257 panic!("Invalid section buffer, not in the slab");
258 }
259
260 let base = self.mmap_buffer.as_ptr() as usize;
261 self.sections_offsets_in_flight
262 .retain(|&x| x != ptr as usize - base);
263
264 if self.sections_offsets_in_flight.is_empty() {
265 self.flush_until(self.current_global_position);
266 return;
267 }
268 if self.flushed_until_offset < self.sections_offsets_in_flight[0] {
269 self.flush_until(self.sections_offsets_in_flight[0]);
270 }
271 }
272
273 #[inline]
274 fn align_to_next_page(&self, ptr: usize) -> usize {
275 (ptr + self.page_size - 1) & !(self.page_size - 1)
276 }
277
278 fn add_section(
280 &mut self,
281 entry_type: UnifiedLogType,
282 requested_section_size: usize,
283 ) -> AllocatedSection<MmapSectionStorage> {
284 self.current_global_position = self.align_to_next_page(self.current_global_position);
286 let section_size = self.align_to_next_page(requested_section_size) as u32;
287
288 if self.current_global_position + section_size as usize > self.mmap_buffer.len() {
290 return AllocatedSection::NoMoreSpace;
291 }
292
293 #[cfg(feature = "compact")]
294 let block_size = SECTION_HEADER_COMPACT_SIZE;
295
296 #[cfg(not(feature = "compact"))]
297 let block_size = self.page_size as u16;
298
299 let section_header = SectionHeader {
300 magic: SECTION_MAGIC,
301 block_size,
302 entry_type,
303 offset_to_next_section: section_size,
304 used: 0u32,
305 is_open: true,
306 };
307
308 self.sections_offsets_in_flight
310 .push(self.current_global_position);
311 let end_of_section = self.current_global_position + requested_section_size;
312 let user_buffer = &mut self.mmap_buffer[self.current_global_position..end_of_section];
313
314 let handle_buffer =
316 unsafe { from_raw_parts_mut(user_buffer.as_mut_ptr(), user_buffer.len()) };
317 let storage = MmapSectionStorage::new(handle_buffer, block_size as usize);
318
319 self.current_global_position = end_of_section;
320
321 Section(SectionHandle::create(section_header, storage).expect("Failed to create section"))
322 }
323
324 #[cfg(test)]
325 fn used(&self) -> usize {
326 self.current_global_position
327 }
328}
329
330pub struct MmapUnifiedLoggerWrite {
332 front_slab: SlabEntry,
334 back_slabs: Vec<SlabEntry>,
336 base_file_path: PathBuf,
338 slab_size: usize,
340 front_slab_suffix: usize,
342}
343
344fn build_slab_path(base_file_path: &Path, slab_index: usize) -> PathBuf {
345 let mut file_path = base_file_path.to_path_buf();
346 let file_name = file_path
347 .file_name()
348 .expect("Invalid base file path")
349 .to_str()
350 .expect("Could not translate the filename OsStr to str");
351 let mut file_name = file_name.split('.').collect::<Vec<&str>>();
352 let extension = file_name.pop().expect("Could not find the file extension.");
353 let file_name = file_name.join(".");
354 let file_name = format!("{file_name}_{slab_index}.{extension}");
355 file_path.set_file_name(file_name);
356 file_path
357}
358
359fn make_slab_file(base_file_path: &Path, slab_size: usize, slab_suffix: usize) -> File {
360 let file_path = build_slab_path(base_file_path, slab_suffix);
361 let file = OpenOptions::new()
362 .read(true)
363 .write(true)
364 .create(true)
365 .truncate(true)
366 .open(&file_path)
367 .unwrap_or_else(|_| panic!("Failed to open file: {}", file_path.display()));
368 file.set_len(slab_size as u64)
369 .expect("Failed to set file length");
370 file
371}
372
373impl UnifiedLogWrite<MmapSectionStorage> for MmapUnifiedLoggerWrite {
374 fn add_section(
376 &mut self,
377 entry_type: UnifiedLogType,
378 requested_section_size: usize,
379 ) -> CuResult<SectionHandle<MmapSectionStorage>> {
380 self.garbage_collect_backslabs(); self.front_slab.clear_temporary_end_marker();
382 let maybe_section = self
383 .front_slab
384 .add_section(entry_type, requested_section_size);
385
386 match maybe_section {
387 AllocatedSection::NoMoreSpace => {
388 let new_slab = SlabEntry::new(self.next_slab(), self.front_slab.page_size);
390 self.back_slabs
392 .push(mem::replace(&mut self.front_slab, new_slab));
393 match self
394 .front_slab
395 .add_section(entry_type, requested_section_size)
396 {
397 AllocatedSection::NoMoreSpace => Err(CuError::from("out of space")),
398 Section(section) => {
399 self.place_end_marker(true)?;
400 Ok(section)
401 }
402 }
403 }
404 Section(section) => {
405 self.place_end_marker(true)?;
406 Ok(section)
407 }
408 }
409 }
410
411 fn flush_section(&mut self, section: &mut SectionHandle<MmapSectionStorage>) {
412 section.mark_closed();
413 for slab in self.back_slabs.iter_mut() {
414 if slab.is_it_my_section(section) {
415 slab.flush_section(section);
416 return;
417 }
418 }
419 self.front_slab.flush_section(section);
420 }
421
422 fn status(&self) -> UnifiedLogStatus {
423 UnifiedLogStatus {
424 total_used_space: self.front_slab.current_global_position,
425 total_allocated_space: self.slab_size * self.front_slab_suffix,
426 }
427 }
428}
429
430impl MmapUnifiedLoggerWrite {
431 fn next_slab(&mut self) -> File {
432 self.front_slab_suffix += 1;
433
434 make_slab_file(&self.base_file_path, self.slab_size, self.front_slab_suffix)
435 }
436
437 fn new(base_file_path: &Path, slab_size: usize, page_size: usize) -> Self {
438 let file = make_slab_file(base_file_path, slab_size, 0);
439 let mut front_slab = SlabEntry::new(file, page_size);
440
441 let main_header = MainHeader {
443 magic: MAIN_MAGIC,
444 first_section_offset: page_size as u16,
445 page_size: page_size as u16,
446 };
447 let nb_bytes = encode_into_slice(&main_header, &mut front_slab.mmap_buffer[..], standard())
448 .expect("Failed to encode main header");
449 assert!(nb_bytes < page_size);
450 front_slab.current_global_position = page_size; Self {
453 front_slab,
454 back_slabs: Vec::new(),
455 base_file_path: base_file_path.to_path_buf(),
456 slab_size,
457 front_slab_suffix: 0,
458 }
459 }
460
461 fn garbage_collect_backslabs(&mut self) {
462 self.back_slabs
463 .retain_mut(|slab| !slab.sections_offsets_in_flight.is_empty());
464 }
465
466 fn place_end_marker(&mut self, temporary: bool) -> CuResult<()> {
467 match self.front_slab.write_end_marker(temporary) {
468 Ok(_) => Ok(()),
469 Err(_) => {
470 let new_slab = SlabEntry::new(self.next_slab(), self.front_slab.page_size);
472 self.back_slabs
473 .push(mem::replace(&mut self.front_slab, new_slab));
474 self.front_slab.write_end_marker(temporary)
475 }
476 }
477 }
478
479 pub fn stats(&self) -> (usize, Vec<usize>, usize) {
480 (
481 self.front_slab.current_global_position,
482 self.front_slab.sections_offsets_in_flight.clone(),
483 self.back_slabs.len(),
484 )
485 }
486}
487
488impl Drop for MmapUnifiedLoggerWrite {
489 fn drop(&mut self) {
490 #[cfg(debug_assertions)]
491 eprintln!("Flushing the unified Logger ... "); self.front_slab.clear_temporary_end_marker();
494 if let Err(e) = self.place_end_marker(false) {
495 panic!("Failed to flush the unified logger: {}", e);
496 }
497 self.front_slab
498 .flush_until(self.front_slab.current_global_position);
499 self.garbage_collect_backslabs();
500 #[cfg(debug_assertions)]
501 eprintln!("Unified Logger flushed."); }
503}
504
505fn open_slab_index(
506 base_file_path: &Path,
507 slab_index: usize,
508) -> io::Result<(File, Mmap, u16, Option<MainHeader>)> {
509 let mut options = OpenOptions::new();
510 let options = options.read(true);
511
512 let file_path = build_slab_path(base_file_path, slab_index);
513 let file = options.open(file_path)?;
514 let mmap = unsafe { Mmap::map(&file) }?;
515 let mut prolog = 0u16;
516 let mut maybe_main_header: Option<MainHeader> = None;
517 if slab_index == 0 {
518 let main_header: MainHeader;
519 let _read: usize;
520 (main_header, _read) =
521 decode_from_slice(&mmap[..], standard()).expect("Failed to decode main header");
522 if main_header.magic != MAIN_MAGIC {
523 return Err(io::Error::new(
524 io::ErrorKind::InvalidData,
525 "Invalid magic number in main header",
526 ));
527 }
528 prolog = main_header.first_section_offset;
529 maybe_main_header = Some(main_header);
530 }
531 Ok((file, mmap, prolog, maybe_main_header))
532}
533
534pub struct MmapUnifiedLoggerRead {
536 base_file_path: PathBuf,
537 main_header: MainHeader,
538 current_mmap_buffer: Mmap,
539 current_file: File,
540 current_slab_index: usize,
541 current_reading_position: usize,
542}
543
544impl UnifiedLogRead for MmapUnifiedLoggerRead {
545 fn read_next_section_type(&mut self, datalogtype: UnifiedLogType) -> CuResult<Option<Vec<u8>>> {
546 loop {
548 if self.current_reading_position >= self.current_mmap_buffer.len() {
549 self.next_slab().map_err(|e| {
550 CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
551 })?;
552 }
553
554 let header_result = self.read_section_header();
555 let header = header_result.map_err(|error| {
556 CuError::new_with_cause(
557 &format!(
558 "Could not read a sections header: {}/{}:{}",
559 self.base_file_path.as_os_str().to_string_lossy(),
560 self.current_slab_index,
561 self.current_reading_position,
562 ),
563 error,
564 )
565 })?;
566
567 if header.entry_type == UnifiedLogType::LastEntry {
569 return Ok(None);
570 }
571
572 if header.entry_type == datalogtype {
574 let result = Some(self.read_section_content(&header)?);
575 self.current_reading_position += header.offset_to_next_section as usize;
576 return Ok(result);
577 }
578
579 self.current_reading_position += header.offset_to_next_section as usize;
581 }
582 }
583
584 fn raw_read_section(&mut self) -> CuResult<(SectionHeader, Vec<u8>)> {
586 if self.current_reading_position >= self.current_mmap_buffer.len() {
587 self.next_slab().map_err(|e| {
588 CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
589 })?;
590 }
591
592 let read_result = self.read_section_header();
593
594 match read_result {
595 Err(error) => Err(CuError::new_with_cause(
596 &format!(
597 "Could not read a sections header: {}/{}:{}",
598 self.base_file_path.as_os_str().to_string_lossy(),
599 self.current_slab_index,
600 self.current_reading_position,
601 ),
602 error,
603 )),
604 Ok(header) => {
605 let data = self.read_section_content(&header)?;
606 self.current_reading_position += header.offset_to_next_section as usize;
607 Ok((header, data))
608 }
609 }
610 }
611}
612
613impl MmapUnifiedLoggerRead {
614 pub fn new(base_file_path: &Path) -> io::Result<Self> {
615 let (file, mmap, prolog, header) = open_slab_index(base_file_path, 0)?;
616
617 Ok(Self {
618 base_file_path: base_file_path.to_path_buf(),
619 main_header: header.expect("UnifiedLoggerRead needs a header"),
620 current_file: file,
621 current_mmap_buffer: mmap,
622 current_slab_index: 0,
623 current_reading_position: prolog as usize,
624 })
625 }
626
627 fn next_slab(&mut self) -> io::Result<()> {
628 self.current_slab_index += 1;
629 let (file, mmap, prolog, _) =
630 open_slab_index(&self.base_file_path, self.current_slab_index)?;
631 self.current_file = file;
632 self.current_mmap_buffer = mmap;
633 self.current_reading_position = prolog as usize;
634 Ok(())
635 }
636
637 pub fn raw_main_header(&self) -> &MainHeader {
638 &self.main_header
639 }
640
641 fn read_section_content(&mut self, header: &SectionHeader) -> CuResult<Vec<u8>> {
643 let mut section_data = vec![0; header.used as usize];
645 let start_of_data = self.current_reading_position + header.block_size as usize;
646 section_data.copy_from_slice(
647 &self.current_mmap_buffer[start_of_data..start_of_data + header.used as usize],
648 );
649
650 Ok(section_data)
651 }
652
653 fn read_section_header(&mut self) -> CuResult<SectionHeader> {
654 let section_header: SectionHeader;
655 (section_header, _) = decode_from_slice(
656 &self.current_mmap_buffer[self.current_reading_position..],
657 standard(),
658 )
659 .map_err(|e| {
660 CuError::new_with_cause(
661 &format!(
662 "Could not read a sections header: {}/{}:{}",
663 self.base_file_path.as_os_str().to_string_lossy(),
664 self.current_slab_index,
665 self.current_reading_position,
666 ),
667 e,
668 )
669 })?;
670 if section_header.magic != SECTION_MAGIC {
671 return Err("Invalid magic number in section header".into());
672 }
673
674 Ok(section_header)
675 }
676}
677
678pub struct UnifiedLoggerIOReader {
680 logger: MmapUnifiedLoggerRead,
681 log_type: UnifiedLogType,
682 buffer: Vec<u8>,
683 buffer_pos: usize,
684}
685
686impl UnifiedLoggerIOReader {
687 pub fn new(logger: MmapUnifiedLoggerRead, log_type: UnifiedLogType) -> Self {
688 Self {
689 logger,
690 log_type,
691 buffer: Vec::new(),
692 buffer_pos: 0,
693 }
694 }
695
696 fn fill_buffer(&mut self) -> io::Result<bool> {
698 match self.logger.read_next_section_type(self.log_type) {
699 Ok(Some(section)) => {
700 self.buffer = section;
701 self.buffer_pos = 0;
702 Ok(true)
703 }
704 Ok(None) => Ok(false), Err(e) => Err(io::Error::other(e.to_string())),
706 }
707 }
708}
709
710impl Read for UnifiedLoggerIOReader {
711 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
712 if self.buffer_pos >= self.buffer.len() && !self.fill_buffer()? {
713 return Ok(0);
715 }
716
717 if self.buffer_pos >= self.buffer.len() {
719 return Ok(0);
720 }
721
722 let len = std::cmp::min(buf.len(), self.buffer.len() - self.buffer_pos);
724 buf[..len].copy_from_slice(&self.buffer[self.buffer_pos..self.buffer_pos + len]);
725 self.buffer_pos += len;
726 Ok(len)
727 }
728}
729
730#[cfg(feature = "std")]
731#[cfg(test)]
732mod tests {
733 use super::*;
734 use crate::stream_write;
735 use bincode::de::read::SliceReader;
736 use bincode::{Decode, Encode, decode_from_reader, decode_from_slice};
737 use cu29_traits::WriteStream;
738 use std::path::PathBuf;
739 use std::sync::{Arc, Mutex};
740 use tempfile::TempDir;
741
742 const LARGE_SLAB: usize = 100 * 1024; const SMALL_SLAB: usize = 16 * 2 * 1024; fn make_a_logger(
746 tmp_dir: &TempDir,
747 slab_size: usize,
748 ) -> (Arc<Mutex<MmapUnifiedLoggerWrite>>, PathBuf) {
749 let file_path = tmp_dir.path().join("test.bin");
750 let MmapUnifiedLogger::Write(data_logger) = MmapUnifiedLoggerBuilder::new()
751 .write(true)
752 .create(true)
753 .file_base_name(&file_path)
754 .preallocated_size(slab_size)
755 .build()
756 .expect("Failed to create logger")
757 else {
758 panic!("Failed to create logger")
759 };
760
761 (Arc::new(Mutex::new(data_logger)), file_path)
762 }
763
764 #[test]
765 fn test_truncation_and_sections_creations() {
766 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
767 let file_path = tmp_dir.path().join("test.bin");
768 let _used = {
769 let MmapUnifiedLogger::Write(mut logger) = MmapUnifiedLoggerBuilder::new()
770 .write(true)
771 .create(true)
772 .file_base_name(&file_path)
773 .preallocated_size(100000)
774 .build()
775 .expect("Failed to create logger")
776 else {
777 panic!("Failed to create logger")
778 };
779 logger
780 .add_section(UnifiedLogType::StructuredLogLine, 1024)
781 .unwrap();
782 logger
783 .add_section(UnifiedLogType::CopperList, 2048)
784 .unwrap();
785 let used = logger.front_slab.used();
786 assert!(used < 4 * page_size::get()); used
790 };
791
792 let _file = OpenOptions::new()
793 .read(true)
794 .open(tmp_dir.path().join("test_0.bin"))
795 .expect("Could not reopen the file");
796 }
803
804 #[test]
805 fn test_one_section_self_cleaning() {
806 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
807 let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
808 {
809 let _stream = stream_write::<(), MmapSectionStorage>(
810 logger.clone(),
811 UnifiedLogType::StructuredLogLine,
812 1024,
813 );
814 assert_eq!(
815 logger
816 .lock()
817 .unwrap()
818 .front_slab
819 .sections_offsets_in_flight
820 .len(),
821 1
822 );
823 }
824 assert_eq!(
825 logger
826 .lock()
827 .unwrap()
828 .front_slab
829 .sections_offsets_in_flight
830 .len(),
831 0
832 );
833 let logger = logger.lock().unwrap();
834 assert_eq!(
835 logger.front_slab.flushed_until_offset,
836 logger.front_slab.current_global_position
837 );
838 }
839
840 #[test]
841 fn test_temporary_end_marker_is_created() {
842 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
843 let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
844 {
845 let mut stream = stream_write::<u32, MmapSectionStorage>(
846 logger.clone(),
847 UnifiedLogType::StructuredLogLine,
848 1024,
849 )
850 .unwrap();
851 stream.log(&42u32).unwrap();
852 }
853
854 let logger_guard = logger.lock().unwrap();
855 let slab = &logger_guard.front_slab;
856 let marker_start = slab
857 .temporary_end_marker
858 .expect("temporary end-of-log marker missing");
859 let (eof_header, _) =
860 decode_from_slice::<SectionHeader, _>(&slab.mmap_buffer[marker_start..], standard())
861 .expect("Could not decode end-of-log marker header");
862 assert_eq!(eof_header.entry_type, UnifiedLogType::LastEntry);
863 assert!(eof_header.is_open);
864 assert_eq!(eof_header.used, 0);
865 }
866
867 #[test]
868 fn test_final_end_marker_is_not_temporary() {
869 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
870 let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
871 {
872 let mut stream = stream_write::<u32, MmapSectionStorage>(
873 logger.clone(),
874 UnifiedLogType::CopperList,
875 1024,
876 )
877 .unwrap();
878 stream.log(&1u32).unwrap();
879 }
880 drop(logger);
881
882 let MmapUnifiedLogger::Read(mut reader) = MmapUnifiedLoggerBuilder::new()
883 .file_base_name(&f)
884 .build()
885 .expect("Failed to build reader")
886 else {
887 panic!("Failed to create reader");
888 };
889
890 loop {
891 let (header, _data) = reader
892 .raw_read_section()
893 .expect("Failed to read section while searching for EOF");
894 if header.entry_type == UnifiedLogType::LastEntry {
895 assert!(!header.is_open);
896 break;
897 }
898 }
899 }
900
901 #[test]
902 fn test_two_sections_self_cleaning_in_order() {
903 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
904 let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
905 let s1 = stream_write::<(), MmapSectionStorage>(
906 logger.clone(),
907 UnifiedLogType::StructuredLogLine,
908 1024,
909 );
910 assert_eq!(
911 logger
912 .lock()
913 .unwrap()
914 .front_slab
915 .sections_offsets_in_flight
916 .len(),
917 1
918 );
919 let s2 = stream_write::<(), MmapSectionStorage>(
920 logger.clone(),
921 UnifiedLogType::StructuredLogLine,
922 1024,
923 );
924 assert_eq!(
925 logger
926 .lock()
927 .unwrap()
928 .front_slab
929 .sections_offsets_in_flight
930 .len(),
931 2
932 );
933 drop(s2);
934 assert_eq!(
935 logger
936 .lock()
937 .unwrap()
938 .front_slab
939 .sections_offsets_in_flight
940 .len(),
941 1
942 );
943 drop(s1);
944 let lg = logger.lock().unwrap();
945 assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
946 assert_eq!(
947 lg.front_slab.flushed_until_offset,
948 lg.front_slab.current_global_position
949 );
950 }
951
952 #[test]
953 fn test_two_sections_self_cleaning_out_of_order() {
954 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
955 let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
956 let s1 = stream_write::<(), MmapSectionStorage>(
957 logger.clone(),
958 UnifiedLogType::StructuredLogLine,
959 1024,
960 );
961 assert_eq!(
962 logger
963 .lock()
964 .unwrap()
965 .front_slab
966 .sections_offsets_in_flight
967 .len(),
968 1
969 );
970 let s2 = stream_write::<(), MmapSectionStorage>(
971 logger.clone(),
972 UnifiedLogType::StructuredLogLine,
973 1024,
974 );
975 assert_eq!(
976 logger
977 .lock()
978 .unwrap()
979 .front_slab
980 .sections_offsets_in_flight
981 .len(),
982 2
983 );
984 drop(s1);
985 assert_eq!(
986 logger
987 .lock()
988 .unwrap()
989 .front_slab
990 .sections_offsets_in_flight
991 .len(),
992 1
993 );
994 drop(s2);
995 let lg = logger.lock().unwrap();
996 assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
997 assert_eq!(
998 lg.front_slab.flushed_until_offset,
999 lg.front_slab.current_global_position
1000 );
1001 }
1002
1003 #[test]
1004 fn test_write_then_read_one_section() {
1005 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1006 let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
1007 {
1008 let mut stream =
1009 stream_write(logger.clone(), UnifiedLogType::StructuredLogLine, 1024).unwrap();
1010 stream.log(&1u32).unwrap();
1011 stream.log(&2u32).unwrap();
1012 stream.log(&3u32).unwrap();
1013 }
1014 drop(logger);
1015 let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
1016 .file_base_name(&f)
1017 .build()
1018 .expect("Failed to build logger")
1019 else {
1020 panic!("Failed to build logger");
1021 };
1022 let section = dl
1023 .read_next_section_type(UnifiedLogType::StructuredLogLine)
1024 .expect("Failed to read section");
1025 assert!(section.is_some());
1026 let section = section.unwrap();
1027 let mut reader = SliceReader::new(§ion[..]);
1028 let v1: u32 = decode_from_reader(&mut reader, standard()).unwrap();
1029 let v2: u32 = decode_from_reader(&mut reader, standard()).unwrap();
1030 let v3: u32 = decode_from_reader(&mut reader, standard()).unwrap();
1031 assert_eq!(v1, 1);
1032 assert_eq!(v2, 2);
1033 assert_eq!(v3, 3);
1034 }
1035
1036 #[derive(Debug, Encode, Decode)]
1039 enum CopperListStateMock {
1040 Free,
1041 ProcessingTasks,
1042 BeingSerialized,
1043 }
1044
1045 #[derive(Encode, Decode)]
1046 struct CopperList<P: bincode::enc::Encode> {
1047 state: CopperListStateMock,
1048 payload: P, }
1050
1051 #[test]
1052 fn test_copperlist_list_like_logging() {
1053 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1054 let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
1055 {
1056 let mut stream =
1057 stream_write(logger.clone(), UnifiedLogType::CopperList, 1024).unwrap();
1058 let cl0 = CopperList {
1059 state: CopperListStateMock::Free,
1060 payload: (1u32, 2u32, 3u32),
1061 };
1062 let cl1 = CopperList {
1063 state: CopperListStateMock::ProcessingTasks,
1064 payload: (4u32, 5u32, 6u32),
1065 };
1066 stream.log(&cl0).unwrap();
1067 stream.log(&cl1).unwrap();
1068 }
1069 drop(logger);
1070
1071 let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
1072 .file_base_name(&f)
1073 .build()
1074 .expect("Failed to build logger")
1075 else {
1076 panic!("Failed to build logger");
1077 };
1078 let section = dl
1079 .read_next_section_type(UnifiedLogType::CopperList)
1080 .expect("Failed to read section");
1081 assert!(section.is_some());
1082 let section = section.unwrap();
1083
1084 let mut reader = SliceReader::new(§ion[..]);
1085 let cl0: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
1086 let cl1: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
1087 assert_eq!(cl0.payload.1, 2);
1088 assert_eq!(cl1.payload.2, 6);
1089 }
1090
1091 #[test]
1092 fn test_multi_slab_end2end() {
1093 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1094 let (logger, f) = make_a_logger(&tmp_dir, SMALL_SLAB);
1095 {
1096 let mut stream =
1097 stream_write(logger.clone(), UnifiedLogType::CopperList, 1024).unwrap();
1098 let cl0 = CopperList {
1099 state: CopperListStateMock::Free,
1100 payload: (1u32, 2u32, 3u32),
1101 };
1102 for _ in 0..10000 {
1104 stream.log(&cl0).unwrap();
1105 }
1106 }
1107 drop(logger);
1108
1109 let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
1110 .file_base_name(&f)
1111 .build()
1112 .expect("Failed to build logger")
1113 else {
1114 panic!("Failed to build logger");
1115 };
1116 let mut total_readback = 0;
1117 loop {
1118 let section = dl.read_next_section_type(UnifiedLogType::CopperList);
1119 if section.is_err() {
1120 break;
1121 }
1122 let section = section.unwrap();
1123 if section.is_none() {
1124 break;
1125 }
1126 let section = section.unwrap();
1127
1128 let mut reader = SliceReader::new(§ion[..]);
1129 loop {
1130 let maybe_cl: Result<CopperList<(u32, u32, u32)>, _> =
1131 decode_from_reader(&mut reader, standard());
1132 if maybe_cl.is_ok() {
1133 total_readback += 1;
1134 } else {
1135 break;
1136 }
1137 }
1138 }
1139 assert_eq!(total_readback, 10000);
1140 }
1141}