1use memmap2::{Mmap, MmapMut};
2use std::fmt::{Debug, Display, Formatter};
3use std::fs::{File, OpenOptions};
4use std::io::Read;
5use std::mem::ManuallyDrop;
6use std::path::{Path, PathBuf};
7use std::slice::from_raw_parts_mut;
8use std::sync::{Arc, Mutex};
9use std::{io, mem};
10
11use bincode::config::standard;
12use bincode::decode_from_slice;
13use bincode::encode_into_slice;
14use bincode::error::EncodeError;
15use bincode::{Decode, Encode};
16use cu29_traits::{CuError, CuResult, UnifiedLogType, WriteStream};
17
18const MAIN_MAGIC: [u8; 4] = [0xB4, 0xA5, 0x50, 0xFF];
19
20const SECTION_MAGIC: [u8; 2] = [0xFA, 0x57];
21
22#[derive(Encode, Decode, Debug)]
24pub struct MainHeader {
25 pub magic: [u8; 4], pub first_section_offset: u16, pub page_size: u16,
28}
29
30impl Display for MainHeader {
31 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
32 writeln!(
33 f,
34 " Magic -> {:2x}{:2x}{:2x}{:2x}",
35 self.magic[0], self.magic[1], self.magic[2], self.magic[3]
36 )?;
37 writeln!(f, " first_section_offset -> {}", self.first_section_offset)?;
38 writeln!(f, " page_size -> {}", self.page_size)
39 }
40}
41
42#[derive(Encode, Decode, Debug)]
46pub struct SectionHeader {
47 pub magic: [u8; 2], pub entry_type: UnifiedLogType,
49 pub section_size: u32, pub filled_size: u32, }
52
53const MAX_HEADER_SIZE: usize = mem::size_of::<SectionHeader>() + 3usize; impl Display for SectionHeader {
56 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
57 writeln!(f, " Magic -> {:2x}{:2x}", self.magic[0], self.magic[1])?;
58 writeln!(f, " type -> {:?}", self.entry_type)?;
59 write!(
60 f,
61 " use -> {} / {}",
62 self.filled_size, self.section_size
63 )
64 }
65}
66
67impl Default for SectionHeader {
68 fn default() -> Self {
69 Self {
70 magic: SECTION_MAGIC,
71 entry_type: UnifiedLogType::Empty,
72 section_size: 0,
73 filled_size: 0,
74 }
75 }
76}
77
78struct MmapStream {
80 entry_type: UnifiedLogType,
81 parent_logger: Arc<Mutex<UnifiedLoggerWrite>>,
82 current_section: SectionHandle,
83 current_position: usize,
84 minimum_allocation_amount: usize,
85}
86
87impl MmapStream {
88 fn new(
89 entry_type: UnifiedLogType,
90 parent_logger: Arc<Mutex<UnifiedLoggerWrite>>,
91 minimum_allocation_amount: usize,
92 ) -> Self {
93 let section = parent_logger
94 .lock()
95 .expect("Could not lock a section at MmapStream creation")
96 .add_section(entry_type, minimum_allocation_amount);
97 Self {
98 entry_type,
99 parent_logger,
100 current_section: section,
101 current_position: 0,
102 minimum_allocation_amount,
103 }
104 }
105}
106
107impl Debug for MmapStream {
108 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
109 write!(f, "MmapStream {{ entry_type: {:?}, current_position: {}, minimum_allocation_amount: {} }}", self.entry_type, self.current_position, self.minimum_allocation_amount)
110 }
111}
112
113impl<E: Encode> WriteStream<E> for MmapStream {
114 fn log(&mut self, obj: &E) -> CuResult<()> {
115 let dst = self.current_section.get_user_buffer();
116 let result = encode_into_slice(obj, dst, standard());
117 match result {
118 Ok(nb_bytes) => {
119 self.current_position += nb_bytes;
120 self.current_section.used += nb_bytes as u32;
121 Ok(())
122 }
123 Err(e) => match e {
124 EncodeError::UnexpectedEnd => {
125 if let Ok(mut logger_guard) = self.parent_logger.lock() {
126 logger_guard.flush_section(&mut self.current_section);
127 self.current_section = logger_guard
128 .add_section(self.entry_type, self.minimum_allocation_amount);
129
130 let result = encode_into_slice(
131 obj,
132 self.current_section.get_user_buffer(),
133 standard(),
134 )
135 .expect(
136 "Failed to encode object in a newly minted section. Unrecoverable failure.",
137 ); self.current_position += result;
139 self.current_section.used += result as u32;
140 Ok(())
141 } else {
142 Err(
144 "Logger mutex poisoned while reporting EncodeError::UnexpectedEnd"
145 .into(),
146 )
147 }
148 }
149 _ => {
150 let err =
151 <&str as Into<CuError>>::into("Unexpected error while encoding object.")
152 .add_cause(e.to_string().as_str());
153 Err(err)
154 }
155 },
156 }
157 }
158}
159
160impl Drop for MmapStream {
161 fn drop(&mut self) {
162 if let Ok(mut logger_guard) = self.parent_logger.lock() {
163 logger_guard.flush_section(&mut self.current_section);
164 } else if !std::thread::panicking() {
165 eprintln!("⚠️ MmapStream::drop: logger mutex poisoned");
166 }
167 }
168}
169
170pub fn stream_write<E: Encode>(
172 logger: Arc<Mutex<UnifiedLoggerWrite>>,
173 entry_type: UnifiedLogType,
174 minimum_allocation_amount: usize,
175) -> impl WriteStream<E> {
176 MmapStream::new(entry_type, logger.clone(), minimum_allocation_amount)
177}
178
179pub enum UnifiedLogger {
181 Read(UnifiedLoggerRead),
182 Write(UnifiedLoggerWrite),
183}
184
185pub struct UnifiedLoggerBuilder {
187 file_base_name: Option<PathBuf>,
188 preallocated_size: Option<usize>,
189 write: bool,
190 create: bool,
191}
192
193impl Default for UnifiedLoggerBuilder {
194 fn default() -> Self {
195 Self::new()
196 }
197}
198
199impl UnifiedLoggerBuilder {
200 pub fn new() -> Self {
201 Self {
202 file_base_name: None,
203 preallocated_size: None,
204 write: false,
205 create: false, }
207 }
208
209 pub fn file_base_name(mut self, file_path: &Path) -> Self {
211 self.file_base_name = Some(file_path.to_path_buf());
212 self
213 }
214
215 pub fn preallocated_size(mut self, preallocated_size: usize) -> Self {
216 self.preallocated_size = Some(preallocated_size);
217 self
218 }
219
220 pub fn write(mut self, write: bool) -> Self {
221 self.write = write;
222 self
223 }
224
225 pub fn create(mut self, create: bool) -> Self {
226 self.create = create;
227 self
228 }
229
230 pub fn build(self) -> io::Result<UnifiedLogger> {
231 let page_size = page_size::get();
232
233 if self.write && self.create {
234 let ulw = UnifiedLoggerWrite::new(
235 &self
236 .file_base_name
237 .expect("This unified logger has no filename."),
238 self.preallocated_size
239 .expect("This unified logger has no preallocated size."),
240 page_size,
241 );
242
243 Ok(UnifiedLogger::Write(ulw))
244 } else {
245 let file_path = self.file_base_name.ok_or_else(|| {
246 io::Error::new(io::ErrorKind::InvalidInput, "File path is required")
247 })?;
248 let ulr = UnifiedLoggerRead::new(&file_path)?;
249 Ok(UnifiedLogger::Read(ulr))
250 }
251 }
252}
253
254pub struct UnifiedLoggerRead {
256 base_file_path: PathBuf,
257 main_header: MainHeader,
258 current_mmap_buffer: Mmap,
259 current_file: File,
260 current_slab_index: usize,
261 current_reading_position: usize,
262}
263
264struct SlabEntry {
265 file: File,
266 mmap_buffer: ManuallyDrop<MmapMut>,
267 current_global_position: usize,
268 sections_offsets_in_flight: Vec<usize>,
269 flushed_until_offset: usize,
270 page_size: usize,
271}
272
273impl Drop for SlabEntry {
274 fn drop(&mut self) {
275 self.flush_until(self.current_global_position);
276 unsafe { ManuallyDrop::drop(&mut self.mmap_buffer) };
277 self.file
278 .set_len(self.current_global_position as u64)
279 .expect("Failed to trim datalogger file");
280
281 if !self.sections_offsets_in_flight.is_empty() {
282 eprintln!("Error: Slab not full flushed.");
283 }
284 }
285}
286
287pub enum AllocatedSection {
288 NoMoreSpace,
289 Section(SectionHandle),
290}
291
292impl SlabEntry {
293 fn new(file: File, page_size: usize) -> Self {
294 let mmap_buffer =
295 ManuallyDrop::new(unsafe { MmapMut::map_mut(&file).expect("Failed to map file") });
296 Self {
297 file,
298 mmap_buffer,
299 current_global_position: 0,
300 sections_offsets_in_flight: Vec::with_capacity(16),
301 flushed_until_offset: 0,
302 page_size,
303 }
304 }
305
306 fn flush_until(&mut self, until_position: usize) {
308 if (self.flushed_until_offset == until_position) || (until_position == 0) {
310 return;
311 }
312 self.mmap_buffer
313 .flush_async_range(
314 self.flushed_until_offset,
315 until_position - self.flushed_until_offset,
316 )
317 .expect("Failed to flush memory map");
318 self.flushed_until_offset = until_position;
319 }
320
321 fn is_it_my_section(&self, section: &SectionHandle) -> bool {
322 (section.buffer.as_ptr() >= self.mmap_buffer.as_ptr())
323 && (section.buffer.as_ptr() as usize)
324 < (self.mmap_buffer.as_ref().as_ptr() as usize + self.mmap_buffer.as_ref().len())
325 }
326
327 fn flush_section(&mut self, section: &mut SectionHandle) {
330 if section.buffer.as_ptr() < self.mmap_buffer.as_ptr()
331 || section.buffer.as_ptr() as usize
332 > self.mmap_buffer.as_ptr() as usize + self.mmap_buffer.len()
333 {
334 panic!("Invalid section buffer, not in the slab");
335 }
336
337 section.update_header();
339
340 let _sz = encode_into_slice(§ion.section_header, section.buffer, standard())
341 .expect("Failed to encode section header");
342
343 let base = self.mmap_buffer.as_ptr() as usize;
344 let section_buffer_addr = section.buffer.as_ptr() as usize;
345 self.sections_offsets_in_flight
346 .retain(|&x| x != section_buffer_addr - base);
347
348 if self.sections_offsets_in_flight.is_empty() {
349 self.flush_until(self.current_global_position);
350 return;
351 }
352 if self.flushed_until_offset < self.sections_offsets_in_flight[0] {
353 self.flush_until(self.sections_offsets_in_flight[0]);
354 }
355 }
356
357 #[inline]
358 fn align_to_next_page(&self, ptr: usize) -> usize {
359 (ptr + self.page_size - 1) & !(self.page_size - 1)
360 }
361
362 fn add_section(
364 &mut self,
365 entry_type: UnifiedLogType,
366 requested_section_size: usize,
367 ) -> AllocatedSection {
368 self.current_global_position = self.align_to_next_page(self.current_global_position);
370 let section_size = self.align_to_next_page(requested_section_size) as u32;
371
372 if self.current_global_position + section_size as usize > self.mmap_buffer.len() {
374 return AllocatedSection::NoMoreSpace;
375 }
376
377 let section_header = SectionHeader {
378 magic: SECTION_MAGIC,
379 entry_type,
380 section_size,
381 filled_size: 0u32,
382 };
383
384 let nb_bytes = encode_into_slice(
385 §ion_header,
386 &mut self.mmap_buffer[self.current_global_position..],
387 standard(),
388 )
389 .expect("Failed to encode section header");
390 assert!(nb_bytes < self.page_size);
391
392 self.sections_offsets_in_flight
394 .push(self.current_global_position);
395 let end_of_section = self.current_global_position + requested_section_size;
396 let user_buffer = &mut self.mmap_buffer[self.current_global_position..end_of_section];
397
398 let handle_buffer =
400 unsafe { from_raw_parts_mut(user_buffer.as_mut_ptr(), user_buffer.len()) };
401
402 self.current_global_position = end_of_section;
403
404 AllocatedSection::Section(SectionHandle::create(section_header, handle_buffer))
405 }
406
407 #[cfg(test)]
408 fn used(&self) -> usize {
409 self.current_global_position
410 }
411}
412
413#[derive(Default)]
416pub struct SectionHandle {
417 section_header: SectionHeader,
418 buffer: &'static mut [u8], used: u32, }
421
422impl SectionHandle {
425 pub fn create(section_header: SectionHeader, buffer: &'static mut [u8]) -> Self {
427 if buffer[0] != SECTION_MAGIC[0] || buffer[1] != SECTION_MAGIC[1] {
429 panic!("Invalid section buffer, magic number not found");
430 }
431
432 if buffer.len() < MAX_HEADER_SIZE {
433 panic!(
434 "Invalid section buffer, too small: {}, it needs to be > {}",
435 buffer.len(),
436 MAX_HEADER_SIZE
437 );
438 }
439
440 Self {
441 section_header,
442 buffer,
443 used: 0,
444 }
445 }
446 pub fn get_user_buffer(&mut self) -> &mut [u8] {
447 &mut self.buffer[MAX_HEADER_SIZE + self.used as usize..]
448 }
449
450 pub fn update_header(&mut self) {
451 if self.section_header.entry_type == UnifiedLogType::Empty || self.used == 0 {
453 return;
454 }
455 self.section_header.filled_size = self.used;
456 }
457}
458
459pub struct UnifiedLoggerWrite {
461 front_slab: SlabEntry,
463 back_slabs: Vec<SlabEntry>,
465 base_file_path: PathBuf,
467 slab_size: usize,
469 front_slab_suffix: usize,
471}
472
473fn build_slab_path(base_file_path: &Path, slab_index: usize) -> PathBuf {
474 let mut file_path = base_file_path.to_path_buf();
475 let file_name = file_path
476 .file_name()
477 .expect("Invalid base file path")
478 .to_str()
479 .expect("Could not translate the filename OsStr to str");
480 let mut file_name = file_name.split('.').collect::<Vec<&str>>();
481 let extension = file_name.pop().expect("Could not find the file extension.");
482 let file_name = file_name.join(".");
483 let file_name = format!("{file_name}_{slab_index}.{extension}");
484 file_path.set_file_name(file_name);
485 file_path
486}
487
488fn make_slab_file(base_file_path: &Path, slab_size: usize, slab_suffix: usize) -> File {
489 let file_path = build_slab_path(base_file_path, slab_suffix);
490 let file = OpenOptions::new()
491 .read(true)
492 .write(true)
493 .create(true)
494 .truncate(true)
495 .open(&file_path)
496 .unwrap_or_else(|_| panic!("Failed to open file: {}", file_path.display()));
497 file.set_len(slab_size as u64)
498 .expect("Failed to set file length");
499 file
500}
501
502impl UnifiedLoggerWrite {
503 fn next_slab(&mut self) -> File {
504 self.front_slab_suffix += 1;
505
506 make_slab_file(&self.base_file_path, self.slab_size, self.front_slab_suffix)
507 }
508
509 fn new(base_file_path: &Path, slab_size: usize, page_size: usize) -> Self {
510 let file = make_slab_file(base_file_path, slab_size, 0);
511 let mut front_slab = SlabEntry::new(file, page_size);
512
513 let main_header = MainHeader {
515 magic: MAIN_MAGIC,
516 first_section_offset: page_size as u16,
517 page_size: page_size as u16,
518 };
519 let nb_bytes = encode_into_slice(&main_header, &mut front_slab.mmap_buffer[..], standard())
520 .expect("Failed to encode main header");
521 assert!(nb_bytes < page_size);
522 front_slab.current_global_position = page_size; Self {
525 front_slab,
526 back_slabs: Vec::new(),
527 base_file_path: base_file_path.to_path_buf(),
528 slab_size,
529 front_slab_suffix: 0,
530 }
531 }
532
533 pub fn flush_section(&mut self, section: &mut SectionHandle) {
534 for slab in self.back_slabs.iter_mut() {
535 if slab.is_it_my_section(section) {
536 slab.flush_section(section);
537 return;
538 }
539 }
540 self.front_slab.flush_section(section);
541 }
542
543 fn garbage_collect_backslabs(&mut self) {
544 self.back_slabs
545 .retain_mut(|slab| !slab.sections_offsets_in_flight.is_empty());
546 }
547
548 fn add_section(
550 &mut self,
551 entry_type: UnifiedLogType,
552 requested_section_size: usize,
553 ) -> SectionHandle {
554 self.garbage_collect_backslabs(); let maybe_section = self
556 .front_slab
557 .add_section(entry_type, requested_section_size);
558
559 match maybe_section {
560 AllocatedSection::NoMoreSpace => {
561 let new_slab = SlabEntry::new(self.next_slab(), self.front_slab.page_size);
563 self.back_slabs
565 .push(mem::replace(&mut self.front_slab, new_slab));
566 match self
567 .front_slab
568 .add_section(entry_type, requested_section_size)
569 {
570 AllocatedSection::NoMoreSpace => {
571 panic!("Failed to allocate a section in a new slab");
572 }
573 AllocatedSection::Section(section) => section,
574 }
575 }
576 AllocatedSection::Section(section) => section,
577 }
578 }
579
580 pub fn stats(&self) -> (usize, Vec<usize>, usize) {
581 (
582 self.front_slab.current_global_position,
583 self.front_slab.sections_offsets_in_flight.clone(),
584 self.back_slabs.len(),
585 )
586 }
587}
588
589impl Drop for UnifiedLoggerWrite {
590 fn drop(&mut self) {
591 #[cfg(debug_assertions)]
592 eprintln!("Flushing the unified Logger ... "); let mut section = self.add_section(UnifiedLogType::LastEntry, 80); self.front_slab.flush_section(&mut section);
596 self.garbage_collect_backslabs();
597
598 #[cfg(debug_assertions)]
599 eprintln!("Unified Logger flushed."); }
601}
602
603fn open_slab_index(
604 base_file_path: &Path,
605 slab_index: usize,
606) -> io::Result<(File, Mmap, u16, Option<MainHeader>)> {
607 let mut options = OpenOptions::new();
608 let options = options.read(true);
609
610 let file_path = build_slab_path(base_file_path, slab_index);
611 let file = options.open(file_path)?;
612 let mmap = unsafe { Mmap::map(&file) }?;
613 let mut prolog = 0u16;
614 let mut maybe_main_header: Option<MainHeader> = None;
615 if slab_index == 0 {
616 let main_header: MainHeader;
617 let _read: usize;
618 (main_header, _read) =
619 decode_from_slice(&mmap[..], standard()).expect("Failed to decode main header");
620 if main_header.magic != MAIN_MAGIC {
621 return Err(io::Error::new(
622 io::ErrorKind::InvalidData,
623 "Invalid magic number in main header",
624 ));
625 }
626 prolog = main_header.first_section_offset;
627 maybe_main_header = Some(main_header);
628 }
629 Ok((file, mmap, prolog, maybe_main_header))
630}
631
632impl UnifiedLoggerRead {
633 pub fn new(base_file_path: &Path) -> io::Result<Self> {
634 let (file, mmap, prolog, header) = open_slab_index(base_file_path, 0)?;
635
636 Ok(Self {
637 base_file_path: base_file_path.to_path_buf(),
638 main_header: header.expect("UnifiedLoggerRead needs a header"),
639 current_file: file,
640 current_mmap_buffer: mmap,
641 current_slab_index: 0,
642 current_reading_position: prolog as usize,
643 })
644 }
645
646 fn next_slab(&mut self) -> io::Result<()> {
647 self.current_slab_index += 1;
648 let (file, mmap, prolog, _) =
649 open_slab_index(&self.base_file_path, self.current_slab_index)?;
650 self.current_file = file;
651 self.current_mmap_buffer = mmap;
652 self.current_reading_position = prolog as usize;
653 Ok(())
654 }
655
656 pub fn read_next_section_type(
657 &mut self,
658 datalogtype: UnifiedLogType,
659 ) -> CuResult<Option<Vec<u8>>> {
660 loop {
662 if self.current_reading_position >= self.current_mmap_buffer.len() {
663 self.next_slab().map_err(|e| {
664 CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
665 })?;
666 }
667
668 let header_result = self.read_section_header();
669 let header = header_result.map_err(|error| {
670 CuError::new_with_cause(
671 &format!(
672 "Could not read a sections header: {}/{}:{}",
673 self.base_file_path.as_os_str().to_string_lossy(),
674 self.current_slab_index,
675 self.current_reading_position,
676 ),
677 error,
678 )
679 })?;
680
681 if header.entry_type == UnifiedLogType::LastEntry {
683 return Ok(None);
684 }
685
686 if header.entry_type == datalogtype {
688 let result = Some(self.read_section_content(&header)?);
689 self.current_reading_position += header.section_size as usize;
690 return Ok(result);
691 }
692
693 self.current_reading_position += header.section_size as usize;
695 }
696 }
697
698 pub fn raw_main_header(&self) -> &MainHeader {
699 &self.main_header
700 }
701
702 pub fn raw_read_section(&mut self) -> CuResult<(SectionHeader, Vec<u8>)> {
704 if self.current_reading_position >= self.current_mmap_buffer.len() {
705 self.next_slab().map_err(|e| {
706 CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
707 })?;
708 }
709
710 let read_result = self.read_section_header();
711
712 match read_result {
713 Err(error) => Err(CuError::new_with_cause(
714 &format!(
715 "Could not read a sections header: {}/{}:{}",
716 self.base_file_path.as_os_str().to_string_lossy(),
717 self.current_slab_index,
718 self.current_reading_position,
719 ),
720 error,
721 )),
722 Ok(header) => {
723 let data = self.read_section_content(&header)?;
724 self.current_reading_position += header.section_size as usize;
725 Ok((header, data))
726 }
727 }
728 }
729
730 fn read_section_content(&mut self, header: &SectionHeader) -> CuResult<Vec<u8>> {
732 let mut section = vec![0; header.filled_size as usize];
734 let start_of_data = self.current_reading_position + MAX_HEADER_SIZE;
735 section.copy_from_slice(
736 &self.current_mmap_buffer[start_of_data..start_of_data + header.filled_size as usize],
737 );
738
739 Ok(section)
740 }
741
742 fn read_section_header(&mut self) -> CuResult<SectionHeader> {
743 let section_header: SectionHeader;
744 (section_header, _) = decode_from_slice(
745 &self.current_mmap_buffer[self.current_reading_position..],
746 standard(),
747 )
748 .expect("Failed to decode section header");
749 if section_header.magic != SECTION_MAGIC {
750 return Err("Invalid magic number in section header".into());
751 }
752 Ok(section_header)
753 }
754}
755
756pub struct UnifiedLoggerIOReader {
758 logger: UnifiedLoggerRead,
759 log_type: UnifiedLogType,
760 buffer: Vec<u8>,
761 buffer_pos: usize,
762}
763
764impl UnifiedLoggerIOReader {
765 pub fn new(logger: UnifiedLoggerRead, log_type: UnifiedLogType) -> Self {
766 Self {
767 logger,
768 log_type,
769 buffer: Vec::new(),
770 buffer_pos: 0,
771 }
772 }
773
774 fn fill_buffer(&mut self) -> io::Result<bool> {
776 match self.logger.read_next_section_type(self.log_type) {
777 Ok(Some(section)) => {
778 self.buffer = section;
779 self.buffer_pos = 0;
780 Ok(true)
781 }
782 Ok(None) => Ok(false), Err(e) => Err(io::Error::other(e.to_string())),
784 }
785 }
786}
787
788impl Read for UnifiedLoggerIOReader {
789 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
790 if self.buffer_pos >= self.buffer.len() && !self.fill_buffer()? {
791 return Ok(0);
793 }
794
795 if self.buffer_pos >= self.buffer.len() {
797 return Ok(0);
798 }
799
800 let len = std::cmp::min(buf.len(), self.buffer.len() - self.buffer_pos);
802 buf[..len].copy_from_slice(&self.buffer[self.buffer_pos..self.buffer_pos + len]);
803 self.buffer_pos += len;
804 Ok(len)
805 }
806}
807
808#[cfg(test)]
809mod tests {
810 use super::*;
811 use bincode::decode_from_reader;
812 use std::io::BufReader;
813 use std::path::PathBuf;
814 use tempfile::TempDir;
815
816 const LARGE_SLAB: usize = 100 * 1024; const SMALL_SLAB: usize = 16 * 2 * 1024; fn make_a_logger(
820 tmp_dir: &TempDir,
821 slab_size: usize,
822 ) -> (Arc<Mutex<UnifiedLoggerWrite>>, PathBuf) {
823 let file_path = tmp_dir.path().join("test.bin");
824 let UnifiedLogger::Write(data_logger) = UnifiedLoggerBuilder::new()
825 .write(true)
826 .create(true)
827 .file_base_name(&file_path)
828 .preallocated_size(slab_size)
829 .build()
830 .expect("Failed to create logger")
831 else {
832 panic!("Failed to create logger")
833 };
834
835 (Arc::new(Mutex::new(data_logger)), file_path)
836 }
837
838 #[test]
839 fn test_truncation_and_sections_creations() {
840 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
841 let file_path = tmp_dir.path().join("test.bin");
842 let _used = {
843 let UnifiedLogger::Write(mut logger) = UnifiedLoggerBuilder::new()
844 .write(true)
845 .create(true)
846 .file_base_name(&file_path)
847 .preallocated_size(100000)
848 .build()
849 .expect("Failed to create logger")
850 else {
851 panic!("Failed to create logger")
852 };
853 logger.add_section(UnifiedLogType::StructuredLogLine, 1024);
854 logger.add_section(UnifiedLogType::CopperList, 2048);
855 let used = logger.front_slab.used();
856 assert!(used < 4 * page_size::get()); used
860 };
861
862 let _file = OpenOptions::new()
863 .read(true)
864 .open(tmp_dir.path().join("test_0.bin"))
865 .expect("Could not reopen the file");
866 }
873
874 #[test]
875 fn test_one_section_self_cleaning() {
876 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
877 let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
878 {
879 let _stream =
880 stream_write::<()>(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
881 assert_eq!(
882 logger
883 .lock()
884 .unwrap()
885 .front_slab
886 .sections_offsets_in_flight
887 .len(),
888 1
889 );
890 }
891 assert_eq!(
892 logger
893 .lock()
894 .unwrap()
895 .front_slab
896 .sections_offsets_in_flight
897 .len(),
898 0
899 );
900 let logger = logger.lock().unwrap();
901 assert_eq!(
902 logger.front_slab.flushed_until_offset,
903 logger.front_slab.current_global_position
904 );
905 }
906
907 #[test]
908 fn test_two_sections_self_cleaning_in_order() {
909 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
910 let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
911 let s1 = stream_write::<()>(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
912 assert_eq!(
913 logger
914 .lock()
915 .unwrap()
916 .front_slab
917 .sections_offsets_in_flight
918 .len(),
919 1
920 );
921 let s2 = stream_write::<()>(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
922 assert_eq!(
923 logger
924 .lock()
925 .unwrap()
926 .front_slab
927 .sections_offsets_in_flight
928 .len(),
929 2
930 );
931 drop(s2);
932 assert_eq!(
933 logger
934 .lock()
935 .unwrap()
936 .front_slab
937 .sections_offsets_in_flight
938 .len(),
939 1
940 );
941 drop(s1);
942 let lg = logger.lock().unwrap();
943 assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
944 assert_eq!(
945 lg.front_slab.flushed_until_offset,
946 lg.front_slab.current_global_position
947 );
948 }
949
950 #[test]
951 fn test_two_sections_self_cleaning_out_of_order() {
952 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
953 let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
954 let s1 = stream_write::<()>(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
955 assert_eq!(
956 logger
957 .lock()
958 .unwrap()
959 .front_slab
960 .sections_offsets_in_flight
961 .len(),
962 1
963 );
964 let s2 = stream_write::<()>(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
965 assert_eq!(
966 logger
967 .lock()
968 .unwrap()
969 .front_slab
970 .sections_offsets_in_flight
971 .len(),
972 2
973 );
974 drop(s1);
975 assert_eq!(
976 logger
977 .lock()
978 .unwrap()
979 .front_slab
980 .sections_offsets_in_flight
981 .len(),
982 1
983 );
984 drop(s2);
985 let lg = logger.lock().unwrap();
986 assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
987 assert_eq!(
988 lg.front_slab.flushed_until_offset,
989 lg.front_slab.current_global_position
990 );
991 }
992
993 #[test]
994 fn test_write_then_read_one_section() {
995 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
996 let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
997 {
998 let mut stream = stream_write(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
999 stream.log(&1u32).unwrap();
1000 stream.log(&2u32).unwrap();
1001 stream.log(&3u32).unwrap();
1002 }
1003 drop(logger);
1004 let UnifiedLogger::Read(mut dl) = UnifiedLoggerBuilder::new()
1005 .file_base_name(&f)
1006 .build()
1007 .expect("Failed to build logger")
1008 else {
1009 panic!("Failed to build logger");
1010 };
1011 let section = dl
1012 .read_next_section_type(UnifiedLogType::StructuredLogLine)
1013 .expect("Failed to read section");
1014 assert!(section.is_some());
1015 let section = section.unwrap();
1016
1017 let mut reader = BufReader::new(§ion[..]);
1018 let v1: u32 = decode_from_reader(&mut reader, standard()).unwrap();
1019 let v2: u32 = decode_from_reader(&mut reader, standard()).unwrap();
1020 let v3: u32 = decode_from_reader(&mut reader, standard()).unwrap();
1021 assert_eq!(v1, 1);
1022 assert_eq!(v2, 2);
1023 assert_eq!(v3, 3);
1024 }
1025
1026 #[derive(Debug, Encode, Decode)]
1029 enum CopperListStateMock {
1030 Free,
1031 ProcessingTasks,
1032 BeingSerialized,
1033 }
1034
1035 #[derive(Encode, Decode)]
1036 struct CopperList<P: bincode::enc::Encode> {
1037 state: CopperListStateMock,
1038 payload: P, }
1040
1041 #[test]
1042 fn test_copperlist_list_like_logging() {
1043 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1044 let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
1045 {
1046 let mut stream = stream_write(logger.clone(), UnifiedLogType::CopperList, 1024);
1047 let cl0 = CopperList {
1048 state: CopperListStateMock::Free,
1049 payload: (1u32, 2u32, 3u32),
1050 };
1051 let cl1 = CopperList {
1052 state: CopperListStateMock::ProcessingTasks,
1053 payload: (4u32, 5u32, 6u32),
1054 };
1055 stream.log(&cl0).unwrap();
1056 stream.log(&cl1).unwrap();
1057 }
1058 drop(logger);
1059
1060 let UnifiedLogger::Read(mut dl) = UnifiedLoggerBuilder::new()
1061 .file_base_name(&f)
1062 .build()
1063 .expect("Failed to build logger")
1064 else {
1065 panic!("Failed to build logger");
1066 };
1067 let section = dl
1068 .read_next_section_type(UnifiedLogType::CopperList)
1069 .expect("Failed to read section");
1070 assert!(section.is_some());
1071 let section = section.unwrap();
1072
1073 let mut reader = BufReader::new(§ion[..]);
1074 let cl0: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
1075 let cl1: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
1076 assert_eq!(cl0.payload.1, 2);
1077 assert_eq!(cl1.payload.2, 6);
1078 }
1079
1080 #[test]
1081 fn test_multi_slab_end2end() {
1082 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1083 let (logger, f) = make_a_logger(&tmp_dir, SMALL_SLAB);
1084 {
1085 let mut stream = stream_write(logger.clone(), UnifiedLogType::CopperList, 1024);
1086 let cl0 = CopperList {
1087 state: CopperListStateMock::Free,
1088 payload: (1u32, 2u32, 3u32),
1089 };
1090 for _ in 0..10000 {
1092 stream.log(&cl0).unwrap();
1093 }
1094 }
1095 drop(logger);
1096
1097 let UnifiedLogger::Read(mut dl) = UnifiedLoggerBuilder::new()
1098 .file_base_name(&f)
1099 .build()
1100 .expect("Failed to build logger")
1101 else {
1102 panic!("Failed to build logger");
1103 };
1104 let mut total_readback = 0;
1105 loop {
1106 let section = dl.read_next_section_type(UnifiedLogType::CopperList);
1107 if section.is_err() {
1108 break;
1109 }
1110 let section = section.unwrap();
1111 if section.is_none() {
1112 break;
1113 }
1114 let section = section.unwrap();
1115
1116 let mut reader = BufReader::new(§ion[..]);
1117 loop {
1118 let maybe_cl: Result<CopperList<(u32, u32, u32)>, _> =
1119 decode_from_reader(&mut reader, standard());
1120 if maybe_cl.is_ok() {
1121 total_readback += 1;
1122 } else {
1123 break;
1124 }
1125 }
1126 }
1127 assert_eq!(total_readback, 10000);
1128 }
1129}