1use crate::{
5 AllocatedSection, MainHeader, SectionHandle, SectionHeader, SectionStorage, UnifiedLogRead,
6 UnifiedLogStatus, UnifiedLogWrite, MAIN_MAGIC, SECTION_MAGIC,
7};
8
9#[cfg(feature = "compact")]
10use crate::SECTION_HEADER_COMPACT_SIZE;
11
12use bincode::config::standard;
13use bincode::error::EncodeError;
14use bincode::{decode_from_slice, encode_into_slice, Encode};
15use core::slice::from_raw_parts_mut;
16use cu29_traits::{CuError, CuResult, UnifiedLogType};
17use memmap2::{Mmap, MmapMut};
18use std::fs::{File, OpenOptions};
19use std::io::Read;
20use std::mem::ManuallyDrop;
21use std::path::{Path, PathBuf};
22use std::{io, mem};
23use AllocatedSection::Section;
24
25pub struct MmapSectionStorage {
26 buffer: &'static mut [u8],
27 offset: usize,
28 block_size: usize,
29}
30
31impl MmapSectionStorage {
32 pub fn new(buffer: &'static mut [u8], block_size: usize) -> Self {
33 Self {
34 buffer,
35 offset: 0,
36 block_size,
37 }
38 }
39
40 pub fn buffer_ptr(&self) -> *const u8 {
41 &self.buffer[0] as *const u8
42 }
43}
44
45impl SectionStorage for MmapSectionStorage {
46 fn initialize<E: Encode>(&mut self, header: &E) -> Result<usize, EncodeError> {
47 self.post_update_header(header)?;
48 self.offset = self.block_size;
49 Ok(self.offset)
50 }
51
52 fn post_update_header<E: Encode>(&mut self, header: &E) -> Result<usize, EncodeError> {
53 encode_into_slice(header, &mut self.buffer[0..], standard())
54 }
55
56 fn append<E: Encode>(&mut self, entry: &E) -> Result<usize, EncodeError> {
57 let size = encode_into_slice(entry, &mut self.buffer[self.offset..], standard())?;
58 self.offset += size;
59 Ok(size)
60 }
61
62 fn flush(&mut self) -> CuResult<usize> {
63 todo!()
64 }
65}
66
67pub enum MmapUnifiedLogger {
70 Read(MmapUnifiedLoggerRead),
71 Write(MmapUnifiedLoggerWrite),
72}
73
74pub struct MmapUnifiedLoggerBuilder {
76 file_base_name: Option<PathBuf>,
77 preallocated_size: Option<usize>,
78 write: bool,
79 create: bool,
80}
81
82impl Default for MmapUnifiedLoggerBuilder {
83 fn default() -> Self {
84 Self::new()
85 }
86}
87
88impl MmapUnifiedLoggerBuilder {
89 pub fn new() -> Self {
90 Self {
91 file_base_name: None,
92 preallocated_size: None,
93 write: false,
94 create: false, }
96 }
97
98 pub fn file_base_name(mut self, file_path: &Path) -> Self {
100 self.file_base_name = Some(file_path.to_path_buf());
101 self
102 }
103
104 pub fn preallocated_size(mut self, preallocated_size: usize) -> Self {
105 self.preallocated_size = Some(preallocated_size);
106 self
107 }
108
109 pub fn write(mut self, write: bool) -> Self {
110 self.write = write;
111 self
112 }
113
114 pub fn create(mut self, create: bool) -> Self {
115 self.create = create;
116 self
117 }
118
119 pub fn build(self) -> io::Result<MmapUnifiedLogger> {
120 let page_size = page_size::get();
121
122 if self.write && self.create {
123 let ulw = MmapUnifiedLoggerWrite::new(
124 &self
125 .file_base_name
126 .expect("This unified logger has no filename."),
127 self.preallocated_size
128 .expect("This unified logger has no preallocated size."),
129 page_size,
130 );
131
132 Ok(MmapUnifiedLogger::Write(ulw))
133 } else {
134 let file_path = self.file_base_name.ok_or_else(|| {
135 io::Error::new(io::ErrorKind::InvalidInput, "File path is required")
136 })?;
137 let ulr = MmapUnifiedLoggerRead::new(&file_path)?;
138 Ok(MmapUnifiedLogger::Read(ulr))
139 }
140 }
141}
142
143struct SlabEntry {
144 file: File,
145 mmap_buffer: ManuallyDrop<MmapMut>,
146 current_global_position: usize,
147 sections_offsets_in_flight: Vec<usize>,
148 flushed_until_offset: usize,
149 page_size: usize,
150}
151
152impl Drop for SlabEntry {
153 fn drop(&mut self) {
154 self.flush_until(self.current_global_position);
155 unsafe { ManuallyDrop::drop(&mut self.mmap_buffer) };
156 self.file
157 .set_len(self.current_global_position as u64)
158 .expect("Failed to trim datalogger file");
159
160 if !self.sections_offsets_in_flight.is_empty() {
161 eprintln!("Error: Slab not full flushed.");
162 }
163 }
164}
165
166impl SlabEntry {
167 fn new(file: File, page_size: usize) -> Self {
168 let mmap_buffer =
169 ManuallyDrop::new(unsafe { MmapMut::map_mut(&file).expect("Failed to map file") });
170 Self {
171 file,
172 mmap_buffer,
173 current_global_position: 0,
174 sections_offsets_in_flight: Vec::with_capacity(16),
175 flushed_until_offset: 0,
176 page_size,
177 }
178 }
179
180 fn flush_until(&mut self, until_position: usize) {
182 if (self.flushed_until_offset == until_position) || (until_position == 0) {
184 return;
185 }
186 self.mmap_buffer
187 .flush_async_range(
188 self.flushed_until_offset,
189 until_position - self.flushed_until_offset,
190 )
191 .expect("Failed to flush memory map");
192 self.flushed_until_offset = until_position;
193 }
194
195 fn is_it_my_section(&self, section: &SectionHandle<MmapSectionStorage>) -> bool {
196 let storage = section.get_storage();
197 let ptr = storage.buffer_ptr();
198 (ptr >= self.mmap_buffer.as_ptr())
199 && (ptr as usize)
200 < (self.mmap_buffer.as_ref().as_ptr() as usize + self.mmap_buffer.as_ref().len())
201 }
202
203 fn flush_section(&mut self, section: &mut SectionHandle<MmapSectionStorage>) {
206 section
207 .post_update_header()
208 .expect("Failed to update section header");
209
210 let storage = section.get_storage();
211 let ptr = storage.buffer_ptr();
212
213 if ptr < self.mmap_buffer.as_ptr()
214 || ptr as usize > self.mmap_buffer.as_ptr() as usize + self.mmap_buffer.len()
215 {
216 panic!("Invalid section buffer, not in the slab");
217 }
218
219 let base = self.mmap_buffer.as_ptr() as usize;
220 self.sections_offsets_in_flight
221 .retain(|&x| x != ptr as usize - base);
222
223 if self.sections_offsets_in_flight.is_empty() {
224 self.flush_until(self.current_global_position);
225 return;
226 }
227 if self.flushed_until_offset < self.sections_offsets_in_flight[0] {
228 self.flush_until(self.sections_offsets_in_flight[0]);
229 }
230 }
231
232 #[inline]
233 fn align_to_next_page(&self, ptr: usize) -> usize {
234 (ptr + self.page_size - 1) & !(self.page_size - 1)
235 }
236
237 fn add_section(
239 &mut self,
240 entry_type: UnifiedLogType,
241 requested_section_size: usize,
242 ) -> AllocatedSection<MmapSectionStorage> {
243 self.current_global_position = self.align_to_next_page(self.current_global_position);
245 let section_size = self.align_to_next_page(requested_section_size) as u32;
246
247 if self.current_global_position + section_size as usize > self.mmap_buffer.len() {
249 return AllocatedSection::NoMoreSpace;
250 }
251
252 #[cfg(feature = "compact")]
253 let block_size = SECTION_HEADER_COMPACT_SIZE;
254
255 #[cfg(not(feature = "compact"))]
256 let block_size = self.page_size as u16;
257
258 let section_header = SectionHeader {
259 magic: SECTION_MAGIC,
260 block_size,
261 entry_type,
262 offset_to_next_section: section_size,
263 used: 0u32,
264 };
265
266 self.sections_offsets_in_flight
268 .push(self.current_global_position);
269 let end_of_section = self.current_global_position + requested_section_size;
270 let user_buffer = &mut self.mmap_buffer[self.current_global_position..end_of_section];
271
272 let handle_buffer =
274 unsafe { from_raw_parts_mut(user_buffer.as_mut_ptr(), user_buffer.len()) };
275 let storage = MmapSectionStorage::new(handle_buffer, block_size as usize);
276
277 self.current_global_position = end_of_section;
278
279 Section(SectionHandle::create(section_header, storage).expect("Failed to create section"))
280 }
281
282 #[cfg(test)]
283 fn used(&self) -> usize {
284 self.current_global_position
285 }
286}
287
288pub struct MmapUnifiedLoggerWrite {
290 front_slab: SlabEntry,
292 back_slabs: Vec<SlabEntry>,
294 base_file_path: PathBuf,
296 slab_size: usize,
298 front_slab_suffix: usize,
300}
301
302fn build_slab_path(base_file_path: &Path, slab_index: usize) -> PathBuf {
303 let mut file_path = base_file_path.to_path_buf();
304 let file_name = file_path
305 .file_name()
306 .expect("Invalid base file path")
307 .to_str()
308 .expect("Could not translate the filename OsStr to str");
309 let mut file_name = file_name.split('.').collect::<Vec<&str>>();
310 let extension = file_name.pop().expect("Could not find the file extension.");
311 let file_name = file_name.join(".");
312 let file_name = format!("{file_name}_{slab_index}.{extension}");
313 file_path.set_file_name(file_name);
314 file_path
315}
316
317fn make_slab_file(base_file_path: &Path, slab_size: usize, slab_suffix: usize) -> File {
318 let file_path = build_slab_path(base_file_path, slab_suffix);
319 let file = OpenOptions::new()
320 .read(true)
321 .write(true)
322 .create(true)
323 .truncate(true)
324 .open(&file_path)
325 .unwrap_or_else(|_| panic!("Failed to open file: {}", file_path.display()));
326 file.set_len(slab_size as u64)
327 .expect("Failed to set file length");
328 file
329}
330
331impl UnifiedLogWrite<MmapSectionStorage> for MmapUnifiedLoggerWrite {
332 fn add_section(
334 &mut self,
335 entry_type: UnifiedLogType,
336 requested_section_size: usize,
337 ) -> CuResult<SectionHandle<MmapSectionStorage>> {
338 self.garbage_collect_backslabs(); let maybe_section = self
340 .front_slab
341 .add_section(entry_type, requested_section_size);
342
343 match maybe_section {
344 AllocatedSection::NoMoreSpace => {
345 let new_slab = SlabEntry::new(self.next_slab(), self.front_slab.page_size);
347 self.back_slabs
349 .push(mem::replace(&mut self.front_slab, new_slab));
350 match self
351 .front_slab
352 .add_section(entry_type, requested_section_size)
353 {
354 AllocatedSection::NoMoreSpace => Err(CuError::from("out of space")),
355 Section(section) => Ok(section),
356 }
357 }
358 Section(section) => Ok(section),
359 }
360 }
361
362 fn flush_section(&mut self, section: &mut SectionHandle<MmapSectionStorage>) {
363 for slab in self.back_slabs.iter_mut() {
364 if slab.is_it_my_section(section) {
365 slab.flush_section(section);
366 return;
367 }
368 }
369 self.front_slab.flush_section(section);
370 }
371
372 fn status(&self) -> UnifiedLogStatus {
373 UnifiedLogStatus {
374 total_used_space: self.front_slab.current_global_position,
375 total_allocated_space: self.slab_size * self.front_slab_suffix,
376 }
377 }
378}
379
380impl MmapUnifiedLoggerWrite {
381 fn next_slab(&mut self) -> File {
382 self.front_slab_suffix += 1;
383
384 make_slab_file(&self.base_file_path, self.slab_size, self.front_slab_suffix)
385 }
386
387 fn new(base_file_path: &Path, slab_size: usize, page_size: usize) -> Self {
388 let file = make_slab_file(base_file_path, slab_size, 0);
389 let mut front_slab = SlabEntry::new(file, page_size);
390
391 let main_header = MainHeader {
393 magic: MAIN_MAGIC,
394 first_section_offset: page_size as u16,
395 page_size: page_size as u16,
396 };
397 let nb_bytes = encode_into_slice(&main_header, &mut front_slab.mmap_buffer[..], standard())
398 .expect("Failed to encode main header");
399 assert!(nb_bytes < page_size);
400 front_slab.current_global_position = page_size; Self {
403 front_slab,
404 back_slabs: Vec::new(),
405 base_file_path: base_file_path.to_path_buf(),
406 slab_size,
407 front_slab_suffix: 0,
408 }
409 }
410
411 fn garbage_collect_backslabs(&mut self) {
412 self.back_slabs
413 .retain_mut(|slab| !slab.sections_offsets_in_flight.is_empty());
414 }
415
416 pub fn stats(&self) -> (usize, Vec<usize>, usize) {
417 (
418 self.front_slab.current_global_position,
419 self.front_slab.sections_offsets_in_flight.clone(),
420 self.back_slabs.len(),
421 )
422 }
423}
424
425impl Drop for MmapUnifiedLoggerWrite {
426 fn drop(&mut self) {
427 #[cfg(debug_assertions)]
428 eprintln!("Flushing the unified Logger ... "); let section = self.add_section(
431 UnifiedLogType::LastEntry,
432 SECTION_HEADER_COMPACT_SIZE as usize, );
434 match section {
435 Ok(mut section) => {
436 self.front_slab.flush_section(&mut section);
437 self.garbage_collect_backslabs();
438 #[cfg(debug_assertions)]
439 eprintln!("Unified Logger flushed."); }
441 Err(e) => {
442 panic!("Failed to flush the unified logger: {}", e);
443 }
444 }
445 }
446}
447
448fn open_slab_index(
449 base_file_path: &Path,
450 slab_index: usize,
451) -> io::Result<(File, Mmap, u16, Option<MainHeader>)> {
452 let mut options = OpenOptions::new();
453 let options = options.read(true);
454
455 let file_path = build_slab_path(base_file_path, slab_index);
456 let file = options.open(file_path)?;
457 let mmap = unsafe { Mmap::map(&file) }?;
458 let mut prolog = 0u16;
459 let mut maybe_main_header: Option<MainHeader> = None;
460 if slab_index == 0 {
461 let main_header: MainHeader;
462 let _read: usize;
463 (main_header, _read) =
464 decode_from_slice(&mmap[..], standard()).expect("Failed to decode main header");
465 if main_header.magic != MAIN_MAGIC {
466 return Err(io::Error::new(
467 io::ErrorKind::InvalidData,
468 "Invalid magic number in main header",
469 ));
470 }
471 prolog = main_header.first_section_offset;
472 maybe_main_header = Some(main_header);
473 }
474 Ok((file, mmap, prolog, maybe_main_header))
475}
476
477pub struct MmapUnifiedLoggerRead {
479 base_file_path: PathBuf,
480 main_header: MainHeader,
481 current_mmap_buffer: Mmap,
482 current_file: File,
483 current_slab_index: usize,
484 current_reading_position: usize,
485}
486
487impl UnifiedLogRead for MmapUnifiedLoggerRead {
488 fn read_next_section_type(&mut self, datalogtype: UnifiedLogType) -> CuResult<Option<Vec<u8>>> {
489 loop {
491 if self.current_reading_position >= self.current_mmap_buffer.len() {
492 self.next_slab().map_err(|e| {
493 CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
494 })?;
495 }
496
497 let header_result = self.read_section_header();
498 let header = header_result.map_err(|error| {
499 CuError::new_with_cause(
500 &format!(
501 "Could not read a sections header: {}/{}:{}",
502 self.base_file_path.as_os_str().to_string_lossy(),
503 self.current_slab_index,
504 self.current_reading_position,
505 ),
506 error,
507 )
508 })?;
509
510 if header.entry_type == UnifiedLogType::LastEntry {
512 return Ok(None);
513 }
514
515 if header.entry_type == datalogtype {
517 let result = Some(self.read_section_content(&header)?);
518 self.current_reading_position += header.offset_to_next_section as usize;
519 return Ok(result);
520 }
521
522 self.current_reading_position += header.offset_to_next_section as usize;
524 }
525 }
526
527 fn raw_read_section(&mut self) -> CuResult<(SectionHeader, Vec<u8>)> {
529 if self.current_reading_position >= self.current_mmap_buffer.len() {
530 self.next_slab().map_err(|e| {
531 CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
532 })?;
533 }
534
535 let read_result = self.read_section_header();
536
537 match read_result {
538 Err(error) => Err(CuError::new_with_cause(
539 &format!(
540 "Could not read a sections header: {}/{}:{}",
541 self.base_file_path.as_os_str().to_string_lossy(),
542 self.current_slab_index,
543 self.current_reading_position,
544 ),
545 error,
546 )),
547 Ok(header) => {
548 let data = self.read_section_content(&header)?;
549 self.current_reading_position += header.offset_to_next_section as usize;
550 Ok((header, data))
551 }
552 }
553 }
554}
555
556impl MmapUnifiedLoggerRead {
557 pub fn new(base_file_path: &Path) -> io::Result<Self> {
558 let (file, mmap, prolog, header) = open_slab_index(base_file_path, 0)?;
559
560 Ok(Self {
561 base_file_path: base_file_path.to_path_buf(),
562 main_header: header.expect("UnifiedLoggerRead needs a header"),
563 current_file: file,
564 current_mmap_buffer: mmap,
565 current_slab_index: 0,
566 current_reading_position: prolog as usize,
567 })
568 }
569
570 fn next_slab(&mut self) -> io::Result<()> {
571 self.current_slab_index += 1;
572 let (file, mmap, prolog, _) =
573 open_slab_index(&self.base_file_path, self.current_slab_index)?;
574 self.current_file = file;
575 self.current_mmap_buffer = mmap;
576 self.current_reading_position = prolog as usize;
577 Ok(())
578 }
579
580 pub fn raw_main_header(&self) -> &MainHeader {
581 &self.main_header
582 }
583
584 fn read_section_content(&mut self, header: &SectionHeader) -> CuResult<Vec<u8>> {
586 let mut section_data = vec![0; header.used as usize];
588 let start_of_data = self.current_reading_position + header.block_size as usize;
589 section_data.copy_from_slice(
590 &self.current_mmap_buffer[start_of_data..start_of_data + header.used as usize],
591 );
592
593 Ok(section_data)
594 }
595
596 fn read_section_header(&mut self) -> CuResult<SectionHeader> {
597 let section_header: SectionHeader;
598 (section_header, _) = decode_from_slice(
599 &self.current_mmap_buffer[self.current_reading_position..],
600 standard(),
601 )
602 .map_err(|e| {
603 CuError::new_with_cause(
604 &format!(
605 "Could not read a sections header: {}/{}:{}",
606 self.base_file_path.as_os_str().to_string_lossy(),
607 self.current_slab_index,
608 self.current_reading_position,
609 ),
610 e,
611 )
612 })?;
613 if section_header.magic != SECTION_MAGIC {
614 return Err("Invalid magic number in section header".into());
615 }
616
617 Ok(section_header)
618 }
619}
620
621pub struct UnifiedLoggerIOReader {
623 logger: MmapUnifiedLoggerRead,
624 log_type: UnifiedLogType,
625 buffer: Vec<u8>,
626 buffer_pos: usize,
627}
628
629impl UnifiedLoggerIOReader {
630 pub fn new(logger: MmapUnifiedLoggerRead, log_type: UnifiedLogType) -> Self {
631 Self {
632 logger,
633 log_type,
634 buffer: Vec::new(),
635 buffer_pos: 0,
636 }
637 }
638
639 fn fill_buffer(&mut self) -> io::Result<bool> {
641 match self.logger.read_next_section_type(self.log_type) {
642 Ok(Some(section)) => {
643 self.buffer = section;
644 self.buffer_pos = 0;
645 Ok(true)
646 }
647 Ok(None) => Ok(false), Err(e) => Err(io::Error::other(e.to_string())),
649 }
650 }
651}
652
653impl Read for UnifiedLoggerIOReader {
654 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
655 if self.buffer_pos >= self.buffer.len() && !self.fill_buffer()? {
656 return Ok(0);
658 }
659
660 if self.buffer_pos >= self.buffer.len() {
662 return Ok(0);
663 }
664
665 let len = std::cmp::min(buf.len(), self.buffer.len() - self.buffer_pos);
667 buf[..len].copy_from_slice(&self.buffer[self.buffer_pos..self.buffer_pos + len]);
668 self.buffer_pos += len;
669 Ok(len)
670 }
671}
672
673#[cfg(feature = "std")]
674#[cfg(test)]
675mod tests {
676 use super::*;
677 use crate::stream_write;
678 use bincode::de::read::SliceReader;
679 use bincode::{decode_from_reader, Decode, Encode};
680 use cu29_traits::WriteStream;
681 use std::path::PathBuf;
682 use std::sync::{Arc, Mutex};
683 use tempfile::TempDir;
684
685 const LARGE_SLAB: usize = 100 * 1024; const SMALL_SLAB: usize = 16 * 2 * 1024; fn make_a_logger(
689 tmp_dir: &TempDir,
690 slab_size: usize,
691 ) -> (Arc<Mutex<MmapUnifiedLoggerWrite>>, PathBuf) {
692 let file_path = tmp_dir.path().join("test.bin");
693 let MmapUnifiedLogger::Write(data_logger) = MmapUnifiedLoggerBuilder::new()
694 .write(true)
695 .create(true)
696 .file_base_name(&file_path)
697 .preallocated_size(slab_size)
698 .build()
699 .expect("Failed to create logger")
700 else {
701 panic!("Failed to create logger")
702 };
703
704 (Arc::new(Mutex::new(data_logger)), file_path)
705 }
706
707 #[test]
708 fn test_truncation_and_sections_creations() {
709 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
710 let file_path = tmp_dir.path().join("test.bin");
711 let _used = {
712 let MmapUnifiedLogger::Write(mut logger) = MmapUnifiedLoggerBuilder::new()
713 .write(true)
714 .create(true)
715 .file_base_name(&file_path)
716 .preallocated_size(100000)
717 .build()
718 .expect("Failed to create logger")
719 else {
720 panic!("Failed to create logger")
721 };
722 logger
723 .add_section(UnifiedLogType::StructuredLogLine, 1024)
724 .unwrap();
725 logger
726 .add_section(UnifiedLogType::CopperList, 2048)
727 .unwrap();
728 let used = logger.front_slab.used();
729 assert!(used < 4 * page_size::get()); used
733 };
734
735 let _file = OpenOptions::new()
736 .read(true)
737 .open(tmp_dir.path().join("test_0.bin"))
738 .expect("Could not reopen the file");
739 }
746
747 #[test]
748 fn test_one_section_self_cleaning() {
749 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
750 let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
751 {
752 let _stream = stream_write::<(), MmapSectionStorage>(
753 logger.clone(),
754 UnifiedLogType::StructuredLogLine,
755 1024,
756 );
757 assert_eq!(
758 logger
759 .lock()
760 .unwrap()
761 .front_slab
762 .sections_offsets_in_flight
763 .len(),
764 1
765 );
766 }
767 assert_eq!(
768 logger
769 .lock()
770 .unwrap()
771 .front_slab
772 .sections_offsets_in_flight
773 .len(),
774 0
775 );
776 let logger = logger.lock().unwrap();
777 assert_eq!(
778 logger.front_slab.flushed_until_offset,
779 logger.front_slab.current_global_position
780 );
781 }
782
783 #[test]
784 fn test_two_sections_self_cleaning_in_order() {
785 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
786 let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
787 let s1 = stream_write::<(), MmapSectionStorage>(
788 logger.clone(),
789 UnifiedLogType::StructuredLogLine,
790 1024,
791 );
792 assert_eq!(
793 logger
794 .lock()
795 .unwrap()
796 .front_slab
797 .sections_offsets_in_flight
798 .len(),
799 1
800 );
801 let s2 = stream_write::<(), MmapSectionStorage>(
802 logger.clone(),
803 UnifiedLogType::StructuredLogLine,
804 1024,
805 );
806 assert_eq!(
807 logger
808 .lock()
809 .unwrap()
810 .front_slab
811 .sections_offsets_in_flight
812 .len(),
813 2
814 );
815 drop(s2);
816 assert_eq!(
817 logger
818 .lock()
819 .unwrap()
820 .front_slab
821 .sections_offsets_in_flight
822 .len(),
823 1
824 );
825 drop(s1);
826 let lg = logger.lock().unwrap();
827 assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
828 assert_eq!(
829 lg.front_slab.flushed_until_offset,
830 lg.front_slab.current_global_position
831 );
832 }
833
834 #[test]
835 fn test_two_sections_self_cleaning_out_of_order() {
836 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
837 let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
838 let s1 = stream_write::<(), MmapSectionStorage>(
839 logger.clone(),
840 UnifiedLogType::StructuredLogLine,
841 1024,
842 );
843 assert_eq!(
844 logger
845 .lock()
846 .unwrap()
847 .front_slab
848 .sections_offsets_in_flight
849 .len(),
850 1
851 );
852 let s2 = stream_write::<(), MmapSectionStorage>(
853 logger.clone(),
854 UnifiedLogType::StructuredLogLine,
855 1024,
856 );
857 assert_eq!(
858 logger
859 .lock()
860 .unwrap()
861 .front_slab
862 .sections_offsets_in_flight
863 .len(),
864 2
865 );
866 drop(s1);
867 assert_eq!(
868 logger
869 .lock()
870 .unwrap()
871 .front_slab
872 .sections_offsets_in_flight
873 .len(),
874 1
875 );
876 drop(s2);
877 let lg = logger.lock().unwrap();
878 assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
879 assert_eq!(
880 lg.front_slab.flushed_until_offset,
881 lg.front_slab.current_global_position
882 );
883 }
884
885 #[test]
886 fn test_write_then_read_one_section() {
887 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
888 let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
889 {
890 let mut stream =
891 stream_write(logger.clone(), UnifiedLogType::StructuredLogLine, 1024).unwrap();
892 stream.log(&1u32).unwrap();
893 stream.log(&2u32).unwrap();
894 stream.log(&3u32).unwrap();
895 }
896 drop(logger);
897 let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
898 .file_base_name(&f)
899 .build()
900 .expect("Failed to build logger")
901 else {
902 panic!("Failed to build logger");
903 };
904 let section = dl
905 .read_next_section_type(UnifiedLogType::StructuredLogLine)
906 .expect("Failed to read section");
907 assert!(section.is_some());
908 let section = section.unwrap();
909 let mut reader = SliceReader::new(§ion[..]);
910 let v1: u32 = decode_from_reader(&mut reader, standard()).unwrap();
911 let v2: u32 = decode_from_reader(&mut reader, standard()).unwrap();
912 let v3: u32 = decode_from_reader(&mut reader, standard()).unwrap();
913 assert_eq!(v1, 1);
914 assert_eq!(v2, 2);
915 assert_eq!(v3, 3);
916 }
917
918 #[derive(Debug, Encode, Decode)]
921 enum CopperListStateMock {
922 Free,
923 ProcessingTasks,
924 BeingSerialized,
925 }
926
927 #[derive(Encode, Decode)]
928 struct CopperList<P: bincode::enc::Encode> {
929 state: CopperListStateMock,
930 payload: P, }
932
933 #[test]
934 fn test_copperlist_list_like_logging() {
935 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
936 let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
937 {
938 let mut stream =
939 stream_write(logger.clone(), UnifiedLogType::CopperList, 1024).unwrap();
940 let cl0 = CopperList {
941 state: CopperListStateMock::Free,
942 payload: (1u32, 2u32, 3u32),
943 };
944 let cl1 = CopperList {
945 state: CopperListStateMock::ProcessingTasks,
946 payload: (4u32, 5u32, 6u32),
947 };
948 stream.log(&cl0).unwrap();
949 stream.log(&cl1).unwrap();
950 }
951 drop(logger);
952
953 let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
954 .file_base_name(&f)
955 .build()
956 .expect("Failed to build logger")
957 else {
958 panic!("Failed to build logger");
959 };
960 let section = dl
961 .read_next_section_type(UnifiedLogType::CopperList)
962 .expect("Failed to read section");
963 assert!(section.is_some());
964 let section = section.unwrap();
965
966 let mut reader = SliceReader::new(§ion[..]);
967 let cl0: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
968 let cl1: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
969 assert_eq!(cl0.payload.1, 2);
970 assert_eq!(cl1.payload.2, 6);
971 }
972
973 #[test]
974 fn test_multi_slab_end2end() {
975 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
976 let (logger, f) = make_a_logger(&tmp_dir, SMALL_SLAB);
977 {
978 let mut stream =
979 stream_write(logger.clone(), UnifiedLogType::CopperList, 1024).unwrap();
980 let cl0 = CopperList {
981 state: CopperListStateMock::Free,
982 payload: (1u32, 2u32, 3u32),
983 };
984 for _ in 0..10000 {
986 stream.log(&cl0).unwrap();
987 }
988 }
989 drop(logger);
990
991 let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
992 .file_base_name(&f)
993 .build()
994 .expect("Failed to build logger")
995 else {
996 panic!("Failed to build logger");
997 };
998 let mut total_readback = 0;
999 loop {
1000 let section = dl.read_next_section_type(UnifiedLogType::CopperList);
1001 if section.is_err() {
1002 break;
1003 }
1004 let section = section.unwrap();
1005 if section.is_none() {
1006 break;
1007 }
1008 let section = section.unwrap();
1009
1010 let mut reader = SliceReader::new(§ion[..]);
1011 loop {
1012 let maybe_cl: Result<CopperList<(u32, u32, u32)>, _> =
1013 decode_from_reader(&mut reader, standard());
1014 if maybe_cl.is_ok() {
1015 total_readback += 1;
1016 } else {
1017 break;
1018 }
1019 }
1020 }
1021 assert_eq!(total_readback, 10000);
1022 }
1023}