1use memmap2::{Mmap, MmapMut};
2use std::fmt::{Debug, Formatter};
3use std::fs::{File, OpenOptions};
4use std::io::Read;
5use std::mem::ManuallyDrop;
6use std::path::{Path, PathBuf};
7use std::slice::from_raw_parts_mut;
8use std::sync::{Arc, Mutex};
9use std::{io, mem};
10
11use bincode::config::standard;
12use bincode::decode_from_slice;
13use bincode::encode_into_slice;
14use bincode::error::EncodeError;
15use bincode::{Decode, Encode};
16use cu29_traits::{CuError, CuResult, UnifiedLogType, WriteStream};
17
18const MAIN_MAGIC: [u8; 4] = [0xB4, 0xA5, 0x50, 0xFF];
19
20const SECTION_MAGIC: [u8; 2] = [0xFA, 0x57];
21
22#[derive(Encode, Decode, Debug)]
24struct MainHeader {
25 magic: [u8; 4], first_section_offset: u16, page_size: u16,
28}
29
30#[derive(Encode, Decode, Debug)]
34pub struct SectionHeader {
35 magic: [u8; 2], entry_type: UnifiedLogType,
37 section_size: u32, filled_size: u32, }
40
41const MAX_HEADER_SIZE: usize = mem::size_of::<SectionHeader>() + 3usize; impl Default for SectionHeader {
44 fn default() -> Self {
45 Self {
46 magic: SECTION_MAGIC,
47 entry_type: UnifiedLogType::Empty,
48 section_size: 0,
49 filled_size: 0,
50 }
51 }
52}
53
54struct MmapStream {
56 entry_type: UnifiedLogType,
57 parent_logger: Arc<Mutex<UnifiedLoggerWrite>>,
58 current_section: SectionHandle,
59 current_position: usize,
60 minimum_allocation_amount: usize,
61}
62
63impl MmapStream {
64 fn new(
65 entry_type: UnifiedLogType,
66 parent_logger: Arc<Mutex<UnifiedLoggerWrite>>,
67 minimum_allocation_amount: usize,
68 ) -> Self {
69 let section = parent_logger
70 .lock()
71 .unwrap()
72 .add_section(entry_type, minimum_allocation_amount);
73 Self {
74 entry_type,
75 parent_logger,
76 current_section: section,
77 current_position: 0,
78 minimum_allocation_amount,
79 }
80 }
81}
82
83impl Debug for MmapStream {
84 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
85 write!(f, "MmapStream {{ entry_type: {:?}, current_position: {}, minimum_allocation_amount: {} }}", self.entry_type, self.current_position, self.minimum_allocation_amount)
86 }
87}
88
89impl<E: Encode> WriteStream<E> for MmapStream {
90 fn log(&mut self, obj: &E) -> CuResult<()> {
91 let dst = self.current_section.get_user_buffer();
92 let result = encode_into_slice(obj, dst, standard());
93 match result {
94 Ok(nb_bytes) => {
95 self.current_position += nb_bytes;
96 self.current_section.used += nb_bytes as u32;
97 Ok(())
98 }
99 Err(e) => match e {
100 EncodeError::UnexpectedEnd => {
101 let mut logger_guard = self.parent_logger.lock().unwrap();
102 logger_guard.flush_section(&mut self.current_section);
103 self.current_section =
104 logger_guard.add_section(self.entry_type, self.minimum_allocation_amount);
105
106 let result = encode_into_slice(
107 obj,
108 self.current_section.get_user_buffer(),
109 standard(),
110 )
111 .expect(
112 "Failed to encode object in a newly minted section. Unrecoverable failure.",
113 ); self.current_position += result;
115 self.current_section.used += result as u32;
116 Ok(())
117 }
118 _ => {
119 let err =
120 <&str as Into<CuError>>::into("Unexpected error while encoding object.")
121 .add_cause(e.to_string().as_str());
122 Err(err)
123 }
124 },
125 }
126 }
127}
128
129impl Drop for MmapStream {
130 fn drop(&mut self) {
131 let mut logger_guard = self.parent_logger.lock().unwrap();
132 logger_guard.flush_section(&mut self.current_section);
133 }
134}
135
136pub fn stream_write<E: Encode>(
138 logger: Arc<Mutex<UnifiedLoggerWrite>>,
139 entry_type: UnifiedLogType,
140 minimum_allocation_amount: usize,
141) -> impl WriteStream<E> {
142 MmapStream::new(entry_type, logger.clone(), minimum_allocation_amount)
143}
144
145pub enum UnifiedLogger {
147 Read(UnifiedLoggerRead),
148 Write(UnifiedLoggerWrite),
149}
150
151pub struct UnifiedLoggerBuilder {
153 file_base_name: Option<PathBuf>,
154 preallocated_size: Option<usize>,
155 write: bool,
156 create: bool,
157}
158
159impl Default for UnifiedLoggerBuilder {
160 fn default() -> Self {
161 Self::new()
162 }
163}
164
165impl UnifiedLoggerBuilder {
166 pub fn new() -> Self {
167 Self {
168 file_base_name: None,
169 preallocated_size: None,
170 write: false,
171 create: false, }
173 }
174
175 pub fn file_base_name(mut self, file_path: &Path) -> Self {
177 self.file_base_name = Some(file_path.to_path_buf());
178 self
179 }
180
181 pub fn preallocated_size(mut self, preallocated_size: usize) -> Self {
182 self.preallocated_size = Some(preallocated_size);
183 self
184 }
185
186 pub fn write(mut self, write: bool) -> Self {
187 self.write = write;
188 self
189 }
190
191 pub fn create(mut self, create: bool) -> Self {
192 self.create = create;
193 self
194 }
195
196 pub fn build(self) -> io::Result<UnifiedLogger> {
197 let page_size = page_size::get();
198
199 if self.write && self.create {
200 let ulw = UnifiedLoggerWrite::new(
201 &self.file_base_name.unwrap(),
202 self.preallocated_size.unwrap(),
203 page_size,
204 );
205
206 Ok(UnifiedLogger::Write(ulw))
207 } else {
208 let file_path = self.file_base_name.ok_or_else(|| {
209 io::Error::new(io::ErrorKind::InvalidInput, "File path is required")
210 })?;
211 let ulr = UnifiedLoggerRead::new(&file_path)?;
212 Ok(UnifiedLogger::Read(ulr))
213 }
214 }
215}
216
217pub struct UnifiedLoggerRead {
219 base_file_path: PathBuf,
220 current_mmap_buffer: Mmap,
221 current_file: File,
222 current_slab_index: usize,
223 current_reading_position: usize,
224}
225
226struct SlabEntry {
227 file: File,
228 mmap_buffer: ManuallyDrop<MmapMut>,
229 current_global_position: usize,
230 sections_offsets_in_flight: Vec<usize>,
231 flushed_until_offset: usize,
232 page_size: usize,
233}
234
235impl Drop for SlabEntry {
236 fn drop(&mut self) {
237 self.flush_until(self.current_global_position);
238 unsafe { ManuallyDrop::drop(&mut self.mmap_buffer) };
239 self.file
240 .set_len(self.current_global_position as u64)
241 .expect("Failed to trim datalogger file");
242
243 if !self.sections_offsets_in_flight.is_empty() {
244 eprintln!("Error: Slab not full flushed.");
245 }
246 }
247}
248
249pub enum AllocatedSection {
250 NoMoreSpace,
251 Section(SectionHandle),
252}
253
254impl SlabEntry {
255 fn new(file: File, page_size: usize) -> Self {
256 let mmap_buffer =
257 ManuallyDrop::new(unsafe { MmapMut::map_mut(&file).expect("Failed to map file") });
258 Self {
259 file,
260 mmap_buffer,
261 current_global_position: 0,
262 sections_offsets_in_flight: Vec::with_capacity(16),
263 flushed_until_offset: 0,
264 page_size,
265 }
266 }
267
268 fn flush_until(&mut self, until_position: usize) {
270 if (self.flushed_until_offset == until_position) || (until_position == 0) {
272 return;
273 }
274 self.mmap_buffer
275 .flush_async_range(
276 self.flushed_until_offset,
277 until_position - self.flushed_until_offset,
278 )
279 .expect("Failed to flush memory map");
280 self.flushed_until_offset = until_position;
281 }
282
283 fn is_it_my_section(&self, section: &SectionHandle) -> bool {
284 (section.buffer.as_ptr() >= self.mmap_buffer.as_ptr())
285 && (section.buffer.as_ptr() as usize)
286 < (self.mmap_buffer.as_ref().as_ptr() as usize + self.mmap_buffer.as_ref().len())
287 }
288
289 fn flush_section(&mut self, section: &mut SectionHandle) {
292 if section.buffer.as_ptr() < self.mmap_buffer.as_ptr()
293 || section.buffer.as_ptr() as usize
294 > self.mmap_buffer.as_ptr() as usize + self.mmap_buffer.len()
295 {
296 panic!("Invalid section buffer, not in the slab");
297 }
298
299 section.update_header();
301
302 let _sz = encode_into_slice(§ion.section_header, section.buffer, standard())
303 .expect("Failed to encode section header");
304
305 let base = self.mmap_buffer.as_ptr() as usize;
306 let section_buffer_addr = section.buffer.as_ptr() as usize;
307 self.sections_offsets_in_flight
308 .retain(|&x| x != section_buffer_addr - base);
309
310 if self.sections_offsets_in_flight.is_empty() {
311 self.flush_until(self.current_global_position);
312 return;
313 }
314 if self.flushed_until_offset < self.sections_offsets_in_flight[0] {
315 self.flush_until(self.sections_offsets_in_flight[0]);
316 }
317 }
318
319 #[inline]
320 fn align_to_next_page(&self, ptr: usize) -> usize {
321 (ptr + self.page_size - 1) & !(self.page_size - 1)
322 }
323
324 fn add_section(
326 &mut self,
327 entry_type: UnifiedLogType,
328 requested_section_size: usize,
329 ) -> AllocatedSection {
330 self.current_global_position = self.align_to_next_page(self.current_global_position);
332 let section_size = self.align_to_next_page(requested_section_size) as u32;
333
334 if self.current_global_position + section_size as usize > self.mmap_buffer.len() {
336 return AllocatedSection::NoMoreSpace;
337 }
338
339 let section_header = SectionHeader {
340 magic: SECTION_MAGIC,
341 entry_type,
342 section_size,
343 filled_size: 0u32,
344 };
345
346 let nb_bytes = encode_into_slice(
347 §ion_header,
348 &mut self.mmap_buffer[self.current_global_position..],
349 standard(),
350 )
351 .expect("Failed to encode section header");
352 assert!(nb_bytes < self.page_size);
353
354 self.sections_offsets_in_flight
356 .push(self.current_global_position);
357 let end_of_section = self.current_global_position + requested_section_size;
358 let user_buffer = &mut self.mmap_buffer[self.current_global_position..end_of_section];
359
360 let handle_buffer =
362 unsafe { from_raw_parts_mut(user_buffer.as_mut_ptr(), user_buffer.len()) };
363
364 self.current_global_position = end_of_section;
365
366 AllocatedSection::Section(SectionHandle::create(section_header, handle_buffer))
367 }
368
369 #[cfg(test)]
370 fn used(&self) -> usize {
371 self.current_global_position
372 }
373}
374
375#[derive(Default)]
378pub struct SectionHandle {
379 section_header: SectionHeader,
380 buffer: &'static mut [u8], used: u32, }
383
384impl SectionHandle {
387 pub fn create(section_header: SectionHeader, buffer: &'static mut [u8]) -> Self {
389 if buffer[0] != SECTION_MAGIC[0] || buffer[1] != SECTION_MAGIC[1] {
391 panic!("Invalid section buffer, magic number not found");
392 }
393
394 if buffer.len() < MAX_HEADER_SIZE {
395 panic!(
396 "Invalid section buffer, too small: {}, it needs to be > {}",
397 buffer.len(),
398 MAX_HEADER_SIZE
399 );
400 }
401
402 Self {
403 section_header,
404 buffer,
405 used: 0,
406 }
407 }
408 pub fn get_user_buffer(&mut self) -> &mut [u8] {
409 &mut self.buffer[MAX_HEADER_SIZE + self.used as usize..]
410 }
411
412 pub fn update_header(&mut self) {
413 if self.section_header.entry_type == UnifiedLogType::Empty || self.used == 0 {
415 return;
416 }
417 self.section_header.filled_size = self.used;
418
419 }
423}
424
425pub struct UnifiedLoggerWrite {
427 front_slab: SlabEntry,
429 back_slabs: Vec<SlabEntry>,
431 base_file_path: PathBuf,
433 slab_size: usize,
435 front_slab_suffix: usize,
437}
438
439fn build_slab_path(base_file_path: &Path, slab_index: usize) -> PathBuf {
440 let mut file_path = base_file_path.to_path_buf();
441 let file_name = file_path.file_name().unwrap().to_str().unwrap();
442 let mut file_name = file_name.split('.').collect::<Vec<&str>>();
443 let extension = file_name.pop().unwrap();
444 let file_name = file_name.join(".");
445 let file_name = format!("{file_name}_{slab_index}.{extension}");
446 file_path.set_file_name(file_name);
447 file_path
448}
449
450fn make_slab_file(base_file_path: &Path, slab_size: usize, slab_suffix: usize) -> File {
451 let file_path = build_slab_path(base_file_path, slab_suffix);
452 let file = OpenOptions::new()
453 .read(true)
454 .write(true)
455 .create(true)
456 .truncate(true)
457 .open(&file_path)
458 .unwrap_or_else(|_| panic!("Failed to open file: {}", file_path.display()));
459 file.set_len(slab_size as u64)
460 .expect("Failed to set file length");
461 file
462}
463
464impl UnifiedLoggerWrite {
465 fn next_slab(&mut self) -> File {
466 self.front_slab_suffix += 1;
467
468 make_slab_file(&self.base_file_path, self.slab_size, self.front_slab_suffix)
469 }
470
471 fn new(base_file_path: &Path, slab_size: usize, page_size: usize) -> Self {
472 let file = make_slab_file(base_file_path, slab_size, 0);
473 let mut front_slab = SlabEntry::new(file, page_size);
474
475 let main_header = MainHeader {
477 magic: MAIN_MAGIC,
478 first_section_offset: page_size as u16,
479 page_size: page_size as u16,
480 };
481 let nb_bytes = encode_into_slice(&main_header, &mut front_slab.mmap_buffer[..], standard())
482 .expect("Failed to encode main header");
483 assert!(nb_bytes < page_size);
484 front_slab.current_global_position = page_size; Self {
487 front_slab,
488 back_slabs: Vec::new(),
489 base_file_path: base_file_path.to_path_buf(),
490 slab_size,
491 front_slab_suffix: 0,
492 }
493 }
494
495 pub fn flush_section(&mut self, section: &mut SectionHandle) {
496 for slab in self.back_slabs.iter_mut() {
497 if slab.is_it_my_section(section) {
498 slab.flush_section(section);
499 return;
500 }
501 }
502 self.front_slab.flush_section(section);
503 }
504
505 fn garbage_collect_backslabs(&mut self) {
506 self.back_slabs
507 .retain_mut(|slab| !slab.sections_offsets_in_flight.is_empty());
508 }
509
510 fn add_section(
512 &mut self,
513 entry_type: UnifiedLogType,
514 requested_section_size: usize,
515 ) -> SectionHandle {
516 self.garbage_collect_backslabs(); let maybe_section = self
518 .front_slab
519 .add_section(entry_type, requested_section_size);
520
521 match maybe_section {
522 AllocatedSection::NoMoreSpace => {
523 let new_slab = SlabEntry::new(self.next_slab(), self.front_slab.page_size);
525 self.back_slabs
527 .push(mem::replace(&mut self.front_slab, new_slab));
528 match self
529 .front_slab
530 .add_section(entry_type, requested_section_size)
531 {
532 AllocatedSection::NoMoreSpace => {
533 panic!("Failed to allocate a section in a new slab");
534 }
535 AllocatedSection::Section(section) => section,
536 }
537 }
538 AllocatedSection::Section(section) => section,
539 }
540 }
541
542 pub fn stats(&self) -> (usize, Vec<usize>, usize) {
543 (
544 self.front_slab.current_global_position,
545 self.front_slab.sections_offsets_in_flight.clone(),
546 self.back_slabs.len(),
547 )
548 }
549}
550
551impl Drop for UnifiedLoggerWrite {
552 fn drop(&mut self) {
553 let mut section = self.add_section(UnifiedLogType::LastEntry, 80); self.front_slab.flush_section(&mut section);
555 self.garbage_collect_backslabs();
556 }
557}
558
559fn open_slab_index(base_file_path: &Path, slab_index: usize) -> io::Result<(File, Mmap, u16)> {
560 let mut options = OpenOptions::new();
561 let options = options.read(true);
562
563 let file_path = build_slab_path(base_file_path, slab_index);
564 let file = options.open(file_path)?;
565 let mmap = unsafe { Mmap::map(&file) }?;
566 let mut prolog = 0u16;
567 if slab_index == 0 {
568 let main_header: MainHeader;
569 let _read: usize;
570 (main_header, _read) =
571 decode_from_slice(&mmap[..], standard()).expect("Failed to decode main header");
572 if main_header.magic != MAIN_MAGIC {
573 return Err(io::Error::new(
574 io::ErrorKind::InvalidData,
575 "Invalid magic number in main header",
576 ));
577 }
578 prolog = main_header.first_section_offset;
579 }
580 Ok((file, mmap, prolog))
581}
582
583impl UnifiedLoggerRead {
584 pub fn new(base_file_path: &Path) -> io::Result<Self> {
585 let (file, mmap, prolog) = open_slab_index(base_file_path, 0)?;
586
587 Ok(Self {
588 base_file_path: base_file_path.to_path_buf(),
589 current_file: file,
590 current_mmap_buffer: mmap,
591 current_slab_index: 0,
592 current_reading_position: prolog as usize,
593 })
594 }
595
596 fn next_slab(&mut self) -> io::Result<()> {
597 self.current_slab_index += 1;
598 let (file, mmap, prolog) = open_slab_index(&self.base_file_path, self.current_slab_index)?;
599 self.current_file = file;
600 self.current_mmap_buffer = mmap;
601 self.current_reading_position = prolog as usize;
602 Ok(())
603 }
604
605 pub fn read_next_section_type(
606 &mut self,
607 datalogtype: UnifiedLogType,
608 ) -> CuResult<Option<Vec<u8>>> {
609 loop {
611 if self.current_reading_position >= self.current_mmap_buffer.len() {
612 self.next_slab().map_err(|e| {
613 CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
614 })?;
615 }
616
617 let header_result = self.read_section_header();
618 if let Err(error) = header_result {
619 return Err(CuError::new_with_cause(
620 "Could not read a sections header",
621 error,
622 ));
623 };
624 let header = header_result.unwrap();
625
626 if header.entry_type == UnifiedLogType::LastEntry {
628 return Ok(None);
629 }
630
631 if header.entry_type == datalogtype {
633 let result = Some(self.read_section_content(&header)?);
634 self.current_reading_position += header.section_size as usize;
635 return Ok(result);
636 }
637
638 self.current_reading_position += header.section_size as usize;
640 }
641 }
642
643 pub fn read_section(&mut self) -> CuResult<Vec<u8>> {
645 let read_result = self.read_section_header();
646 if let Err(error) = read_result {
647 return Err(CuError::new_with_cause(
648 "Could not read a sections header",
649 error,
650 ));
651 };
652
653 self.read_section_content(&read_result.unwrap())
654 }
655
656 fn read_section_content(&mut self, header: &SectionHeader) -> CuResult<Vec<u8>> {
658 if header.filled_size == 0 {
660 eprintln!("Warning: read an empty section");
661 }
662 let mut section = vec![0; header.filled_size as usize];
663 let start_of_data = self.current_reading_position + MAX_HEADER_SIZE;
664 section.copy_from_slice(
665 &self.current_mmap_buffer[start_of_data..start_of_data + header.filled_size as usize],
666 );
667
668 Ok(section)
669 }
670
671 fn read_section_header(&mut self) -> CuResult<SectionHeader> {
672 let section_header: SectionHeader;
673 (section_header, _) = decode_from_slice(
674 &self.current_mmap_buffer[self.current_reading_position..],
675 standard(),
676 )
677 .expect("Failed to decode section header");
678 if section_header.magic != SECTION_MAGIC {
679 return Err("Invalid magic number in section header".into());
680 }
681 Ok(section_header)
682 }
683}
684
685pub struct UnifiedLoggerIOReader {
687 logger: UnifiedLoggerRead,
688 log_type: UnifiedLogType,
689 buffer: Vec<u8>,
690 buffer_pos: usize,
691}
692
693impl UnifiedLoggerIOReader {
694 pub fn new(logger: UnifiedLoggerRead, log_type: UnifiedLogType) -> Self {
695 Self {
696 logger,
697 log_type,
698 buffer: Vec::new(),
699 buffer_pos: 0,
700 }
701 }
702
703 fn fill_buffer(&mut self) -> io::Result<bool> {
705 match self.logger.read_next_section_type(self.log_type) {
706 Ok(Some(section)) => {
707 self.buffer = section;
708 self.buffer_pos = 0;
709 Ok(true)
710 }
711 Ok(None) => Ok(false), Err(e) => Err(io::Error::other(e.to_string())),
713 }
714 }
715}
716
717impl Read for UnifiedLoggerIOReader {
718 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
719 if self.buffer_pos >= self.buffer.len() && !self.fill_buffer()? {
720 return Ok(0);
722 }
723
724 if self.buffer_pos >= self.buffer.len() {
726 return Ok(0);
727 }
728
729 let len = std::cmp::min(buf.len(), self.buffer.len() - self.buffer_pos);
731 buf[..len].copy_from_slice(&self.buffer[self.buffer_pos..self.buffer_pos + len]);
732 self.buffer_pos += len;
733 Ok(len)
734 }
735}
736
737#[cfg(test)]
738mod tests {
739 use super::*;
740 use bincode::decode_from_reader;
741 use std::io::BufReader;
742 use std::path::PathBuf;
743 use tempfile::TempDir;
744
745 const LARGE_SLAB: usize = 100 * 1024; const SMALL_SLAB: usize = 16 * 2 * 1024; fn make_a_logger(
749 tmp_dir: &TempDir,
750 slab_size: usize,
751 ) -> (Arc<Mutex<UnifiedLoggerWrite>>, PathBuf) {
752 let file_path = tmp_dir.path().join("test.bin");
753 let UnifiedLogger::Write(data_logger) = UnifiedLoggerBuilder::new()
754 .write(true)
755 .create(true)
756 .file_base_name(&file_path)
757 .preallocated_size(slab_size)
758 .build()
759 .expect("Failed to create logger")
760 else {
761 panic!("Failed to create logger")
762 };
763
764 (Arc::new(Mutex::new(data_logger)), file_path)
765 }
766
767 #[test]
768 fn test_truncation_and_sections_creations() {
769 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
770 let file_path = tmp_dir.path().join("test.bin");
771 let _used = {
772 let UnifiedLogger::Write(mut logger) = UnifiedLoggerBuilder::new()
773 .write(true)
774 .create(true)
775 .file_base_name(&file_path)
776 .preallocated_size(100000)
777 .build()
778 .expect("Failed to create logger")
779 else {
780 panic!("Failed to create logger")
781 };
782 logger.add_section(UnifiedLogType::StructuredLogLine, 1024);
783 logger.add_section(UnifiedLogType::CopperList, 2048);
784 let used = logger.front_slab.used();
785 assert!(used < 4 * page_size::get()); used
789 };
790
791 let _file = OpenOptions::new()
792 .read(true)
793 .open(tmp_dir.path().join("test_0.bin"))
794 .expect("Could not reopen the file");
795 }
802
803 #[test]
804 fn test_one_section_self_cleaning() {
805 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
806 let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
807 {
808 let _stream =
809 stream_write::<()>(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
810 assert_eq!(
811 logger
812 .lock()
813 .unwrap()
814 .front_slab
815 .sections_offsets_in_flight
816 .len(),
817 1
818 );
819 }
820 assert_eq!(
821 logger
822 .lock()
823 .unwrap()
824 .front_slab
825 .sections_offsets_in_flight
826 .len(),
827 0
828 );
829 let logger = logger.lock().unwrap();
830 assert_eq!(
831 logger.front_slab.flushed_until_offset,
832 logger.front_slab.current_global_position
833 );
834 }
835
836 #[test]
837 fn test_two_sections_self_cleaning_in_order() {
838 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
839 let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
840 let s1 = stream_write::<()>(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
841 assert_eq!(
842 logger
843 .lock()
844 .unwrap()
845 .front_slab
846 .sections_offsets_in_flight
847 .len(),
848 1
849 );
850 let s2 = stream_write::<()>(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
851 assert_eq!(
852 logger
853 .lock()
854 .unwrap()
855 .front_slab
856 .sections_offsets_in_flight
857 .len(),
858 2
859 );
860 drop(s2);
861 assert_eq!(
862 logger
863 .lock()
864 .unwrap()
865 .front_slab
866 .sections_offsets_in_flight
867 .len(),
868 1
869 );
870 drop(s1);
871 let lg = logger.lock().unwrap();
872 assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
873 assert_eq!(
874 lg.front_slab.flushed_until_offset,
875 lg.front_slab.current_global_position
876 );
877 }
878
879 #[test]
880 fn test_two_sections_self_cleaning_out_of_order() {
881 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
882 let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
883 let s1 = stream_write::<()>(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
884 assert_eq!(
885 logger
886 .lock()
887 .unwrap()
888 .front_slab
889 .sections_offsets_in_flight
890 .len(),
891 1
892 );
893 let s2 = stream_write::<()>(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
894 assert_eq!(
895 logger
896 .lock()
897 .unwrap()
898 .front_slab
899 .sections_offsets_in_flight
900 .len(),
901 2
902 );
903 drop(s1);
904 assert_eq!(
905 logger
906 .lock()
907 .unwrap()
908 .front_slab
909 .sections_offsets_in_flight
910 .len(),
911 1
912 );
913 drop(s2);
914 let lg = logger.lock().unwrap();
915 assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
916 assert_eq!(
917 lg.front_slab.flushed_until_offset,
918 lg.front_slab.current_global_position
919 );
920 }
921
922 #[test]
923 fn test_write_then_read_one_section() {
924 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
925 let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
926 {
927 let mut stream = stream_write(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
928 stream.log(&1u32).unwrap();
929 stream.log(&2u32).unwrap();
930 stream.log(&3u32).unwrap();
931 }
932 drop(logger);
933 let UnifiedLogger::Read(mut dl) = UnifiedLoggerBuilder::new()
934 .file_base_name(&f)
935 .build()
936 .expect("Failed to build logger")
937 else {
938 panic!("Failed to build logger");
939 };
940 let section = dl
941 .read_next_section_type(UnifiedLogType::StructuredLogLine)
942 .expect("Failed to read section");
943 assert!(section.is_some());
944 let section = section.unwrap();
945
946 let mut reader = BufReader::new(§ion[..]);
947 let v1: u32 = decode_from_reader(&mut reader, standard()).unwrap();
948 let v2: u32 = decode_from_reader(&mut reader, standard()).unwrap();
949 let v3: u32 = decode_from_reader(&mut reader, standard()).unwrap();
950 assert_eq!(v1, 1);
951 assert_eq!(v2, 2);
952 assert_eq!(v3, 3);
953 }
954
955 #[derive(Debug, Encode, Decode)]
958 enum CopperListStateMock {
959 Free,
960 ProcessingTasks,
961 BeingSerialized,
962 }
963
964 #[derive(Encode, Decode)]
965 struct CopperList<P: bincode::enc::Encode> {
966 state: CopperListStateMock,
967 payload: P, }
969
970 #[test]
971 fn test_copperlist_list_like_logging() {
972 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
973 let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
974 {
975 let mut stream = stream_write(logger.clone(), UnifiedLogType::CopperList, 1024);
976 let cl0 = CopperList {
977 state: CopperListStateMock::Free,
978 payload: (1u32, 2u32, 3u32),
979 };
980 let cl1 = CopperList {
981 state: CopperListStateMock::ProcessingTasks,
982 payload: (4u32, 5u32, 6u32),
983 };
984 stream.log(&cl0).unwrap();
985 stream.log(&cl1).unwrap();
986 }
987 drop(logger);
988
989 let UnifiedLogger::Read(mut dl) = UnifiedLoggerBuilder::new()
990 .file_base_name(&f)
991 .build()
992 .expect("Failed to build logger")
993 else {
994 panic!("Failed to build logger");
995 };
996 let section = dl
997 .read_next_section_type(UnifiedLogType::CopperList)
998 .expect("Failed to read section");
999 assert!(section.is_some());
1000 let section = section.unwrap();
1001
1002 let mut reader = BufReader::new(§ion[..]);
1003 let cl0: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
1004 let cl1: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
1005 assert_eq!(cl0.payload.1, 2);
1006 assert_eq!(cl1.payload.2, 6);
1007 }
1008
1009 #[test]
1010 fn test_multi_slab_end2end() {
1011 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1012 let (logger, f) = make_a_logger(&tmp_dir, SMALL_SLAB);
1013 {
1014 let mut stream = stream_write(logger.clone(), UnifiedLogType::CopperList, 1024);
1015 let cl0 = CopperList {
1016 state: CopperListStateMock::Free,
1017 payload: (1u32, 2u32, 3u32),
1018 };
1019 for _ in 0..10000 {
1021 stream.log(&cl0).unwrap();
1022 }
1023 }
1024 drop(logger);
1025
1026 let UnifiedLogger::Read(mut dl) = UnifiedLoggerBuilder::new()
1027 .file_base_name(&f)
1028 .build()
1029 .expect("Failed to build logger")
1030 else {
1031 panic!("Failed to build logger");
1032 };
1033 let mut total_readback = 0;
1034 loop {
1035 let section = dl.read_next_section_type(UnifiedLogType::CopperList);
1036 if section.is_err() {
1037 break;
1038 }
1039 let section = section.unwrap();
1040 if section.is_none() {
1041 break;
1042 }
1043 let section = section.unwrap();
1044
1045 let mut reader = BufReader::new(§ion[..]);
1046 loop {
1047 let maybe_cl: Result<CopperList<(u32, u32, u32)>, _> =
1048 decode_from_reader(&mut reader, standard());
1049 if maybe_cl.is_ok() {
1050 total_readback += 1;
1051 } else {
1052 break;
1053 }
1054 }
1055 }
1056 assert_eq!(total_readback, 10000);
1057 }
1058}