1use crate::{
5 AllocatedSection, MAIN_MAGIC, MainHeader, SECTION_MAGIC, SectionHandle, SectionHeader,
6 SectionStorage, UnifiedLogRead, UnifiedLogStatus, UnifiedLogWrite,
7};
8
9use crate::SECTION_HEADER_COMPACT_SIZE;
10
11use AllocatedSection::Section;
12use bincode::config::standard;
13use bincode::enc::EncoderImpl;
14use bincode::enc::write::SliceWriter;
15use bincode::error::EncodeError;
16use bincode::{Encode, decode_from_slice, encode_into_slice};
17use core::slice::from_raw_parts_mut;
18use cu29_traits::{
19 CuError, CuResult, ObservedWriter, UnifiedLogType, abort_observed_encode,
20 begin_observed_encode, finish_observed_encode,
21};
22use memmap2::{Mmap, MmapMut};
23use std::fs::{File, OpenOptions};
24use std::io::Read;
25use std::mem::ManuallyDrop;
26use std::path::{Path, PathBuf};
27use std::{io, mem};
28
29pub struct MmapSectionStorage {
30 buffer: &'static mut [u8],
31 offset: usize,
32 block_size: usize,
33}
34
35impl MmapSectionStorage {
36 pub fn new(buffer: &'static mut [u8], block_size: usize) -> Self {
37 Self {
38 buffer,
39 offset: 0,
40 block_size,
41 }
42 }
43
44 pub fn buffer_ptr(&self) -> *const u8 {
45 &self.buffer[0] as *const u8
46 }
47}
48
49impl SectionStorage for MmapSectionStorage {
50 fn initialize<E: Encode>(&mut self, header: &E) -> Result<usize, EncodeError> {
51 self.post_update_header(header)?;
52 self.offset = self.block_size;
53 Ok(self.offset)
54 }
55
56 fn post_update_header<E: Encode>(&mut self, header: &E) -> Result<usize, EncodeError> {
57 encode_into_slice(header, &mut self.buffer[0..], standard())
58 }
59
60 fn append<E: Encode>(&mut self, entry: &E) -> Result<usize, EncodeError> {
61 begin_observed_encode();
62 let result = (|| {
63 let mut encoder = EncoderImpl::new(
64 ObservedWriter::new(SliceWriter::new(&mut self.buffer[self.offset..])),
65 standard(),
66 );
67 entry.encode(&mut encoder)?;
68 Ok(encoder.into_writer().into_inner().bytes_written())
69 })();
70 let size = match result {
71 Ok(size) => {
72 debug_assert_eq!(size, finish_observed_encode());
73 size
74 }
75 Err(err) => {
76 abort_observed_encode();
77 return Err(err);
78 }
79 };
80 self.offset += size;
81 Ok(size)
82 }
83
84 fn flush(&mut self) -> CuResult<usize> {
85 Ok(self.offset)
87 }
88}
89
90pub enum MmapUnifiedLogger {
93 Read(MmapUnifiedLoggerRead),
94 Write(MmapUnifiedLoggerWrite),
95}
96
97pub struct MmapUnifiedLoggerBuilder {
99 file_base_name: Option<PathBuf>,
100 preallocated_size: Option<usize>,
101 write: bool,
102 create: bool,
103}
104
105impl Default for MmapUnifiedLoggerBuilder {
106 fn default() -> Self {
107 Self::new()
108 }
109}
110
111impl MmapUnifiedLoggerBuilder {
112 pub fn new() -> Self {
113 Self {
114 file_base_name: None,
115 preallocated_size: None,
116 write: false,
117 create: false, }
119 }
120
121 pub fn file_base_name(mut self, file_path: &Path) -> Self {
123 self.file_base_name = Some(file_path.to_path_buf());
124 self
125 }
126
127 pub fn preallocated_size(mut self, preallocated_size: usize) -> Self {
128 self.preallocated_size = Some(preallocated_size);
129 self
130 }
131
132 pub fn write(mut self, write: bool) -> Self {
133 self.write = write;
134 self
135 }
136
137 pub fn create(mut self, create: bool) -> Self {
138 self.create = create;
139 self
140 }
141
142 pub fn build(self) -> io::Result<MmapUnifiedLogger> {
143 let page_size = page_size::get();
144
145 if self.write && self.create {
146 let file_path = self.file_base_name.ok_or_else(|| {
147 io::Error::new(
148 io::ErrorKind::InvalidInput,
149 "File path is required for write mode",
150 )
151 })?;
152 let preallocated_size = self.preallocated_size.ok_or_else(|| {
153 io::Error::new(
154 io::ErrorKind::InvalidInput,
155 "Preallocated size is required for write mode",
156 )
157 })?;
158 let ulw = MmapUnifiedLoggerWrite::new(&file_path, preallocated_size, page_size)?;
159 Ok(MmapUnifiedLogger::Write(ulw))
160 } else {
161 let file_path = self.file_base_name.ok_or_else(|| {
162 io::Error::new(io::ErrorKind::InvalidInput, "File path is required")
163 })?;
164 let ulr = MmapUnifiedLoggerRead::new(&file_path)?;
165 Ok(MmapUnifiedLogger::Read(ulr))
166 }
167 }
168}
169
170struct SlabEntry {
171 file: File,
172 mmap_buffer: ManuallyDrop<MmapMut>,
173 current_global_position: usize,
174 sections_offsets_in_flight: Vec<usize>,
175 flushed_until_offset: usize,
176 page_size: usize,
177 temporary_end_marker: Option<usize>,
178 #[cfg(test)]
179 closed_sections: Vec<(usize, usize)>,
180 #[cfg(test)]
181 flushed_ranges: Vec<(usize, usize)>,
182 #[cfg(all(test, feature = "mmap-fsync"))]
183 sync_call_count: usize,
184}
185
186impl Drop for SlabEntry {
187 fn drop(&mut self) {
188 self.flush_until(self.current_global_position);
189 unsafe { ManuallyDrop::drop(&mut self.mmap_buffer) };
191 if let Err(error) = self.file.set_len(self.current_global_position as u64) {
192 eprintln!("Failed to trim datalogger file: {}", error);
193 }
194 self.sync_file();
195
196 if !self.sections_offsets_in_flight.is_empty() {
197 eprintln!("Error: Slab not full flushed.");
198 }
199 }
200}
201
202impl SlabEntry {
203 fn new(file: File, page_size: usize) -> io::Result<Self> {
204 let mmap_buffer = ManuallyDrop::new(
205 unsafe { MmapMut::map_mut(&file) }
207 .map_err(|e| io::Error::new(e.kind(), format!("Failed to map file: {e}")))?,
208 );
209 Ok(Self {
210 file,
211 mmap_buffer,
212 current_global_position: 0,
213 sections_offsets_in_flight: Vec::with_capacity(16),
214 flushed_until_offset: 0,
215 page_size,
216 temporary_end_marker: None,
217 #[cfg(test)]
218 closed_sections: Vec::new(),
219 #[cfg(test)]
220 flushed_ranges: Vec::new(),
221 #[cfg(all(test, feature = "mmap-fsync"))]
222 sync_call_count: 0,
223 })
224 }
225
226 fn flush_range(&mut self, start: usize, len: usize) {
227 if len == 0 {
228 return;
229 }
230 self.mmap_buffer
231 .flush_async_range(start, len)
232 .expect("Failed to flush memory map");
233 self.sync_file();
234 #[cfg(test)]
235 self.record_flushed_range(start, len);
236 }
237
238 fn sync_file(&mut self) {
239 #[cfg(feature = "mmap-fsync")]
240 {
241 self.file.sync_all().expect("Failed to fsync log file");
242 #[cfg(test)]
243 {
244 self.sync_call_count += 1;
245 }
246 }
247 }
248 fn flush_until(&mut self, until_position: usize) {
250 if (self.flushed_until_offset == until_position) || (until_position == 0) {
252 return;
253 }
254 self.flush_range(
255 self.flushed_until_offset,
256 until_position - self.flushed_until_offset,
257 );
258 self.flushed_until_offset = until_position;
259 }
260
261 fn clear_temporary_end_marker(&mut self) {
262 if let Some(marker_start) = self.temporary_end_marker.take() {
263 self.current_global_position = marker_start;
264 if self.flushed_until_offset > marker_start {
265 self.flushed_until_offset = marker_start;
266 }
267 }
268 }
269
270 fn write_end_marker(&mut self, temporary: bool) -> CuResult<()> {
271 let block_size = SECTION_HEADER_COMPACT_SIZE as usize;
272 let marker_start = self.align_to_next_page(self.current_global_position);
273 let total_marker_size = block_size; let marker_end = marker_start + total_marker_size;
275 if marker_end > self.mmap_buffer.len() {
276 return Err("Not enough space to write end-of-log marker".into());
277 }
278
279 let header = SectionHeader {
280 magic: SECTION_MAGIC,
281 block_size: SECTION_HEADER_COMPACT_SIZE,
282 entry_type: UnifiedLogType::LastEntry,
283 offset_to_next_section: total_marker_size as u32,
284 used: 0,
285 is_open: temporary,
286 };
287
288 encode_into_slice(
289 &header,
290 &mut self.mmap_buffer
291 [marker_start..marker_start + SECTION_HEADER_COMPACT_SIZE as usize],
292 standard(),
293 )
294 .map_err(|e| CuError::new_with_cause("Failed to encode end-of-log header", e))?;
295
296 self.temporary_end_marker = Some(marker_start);
297 self.current_global_position = marker_end;
298 Ok(())
299 }
300
301 fn is_it_my_section(&self, section: &SectionHandle<MmapSectionStorage>) -> bool {
302 let storage = section.get_storage();
303 let ptr = storage.buffer_ptr();
304 (ptr >= self.mmap_buffer.as_ptr())
305 && (ptr as usize)
306 < (self.mmap_buffer.as_ref().as_ptr() as usize + self.mmap_buffer.as_ref().len())
307 }
308
309 fn flush_section(&mut self, section: &mut SectionHandle<MmapSectionStorage>) {
312 section
313 .post_update_header()
314 .expect("Failed to update section header");
315
316 let storage = section.get_storage();
317 let ptr = storage.buffer_ptr();
318
319 if ptr < self.mmap_buffer.as_ptr()
320 || ptr as usize > self.mmap_buffer.as_ptr() as usize + self.mmap_buffer.len()
321 {
322 panic!("Invalid section buffer, not in the slab");
323 }
324
325 let base = self.mmap_buffer.as_ptr() as usize;
326 let section_start = ptr as usize - base;
327 let section_len = section.header.offset_to_next_section as usize;
328 #[cfg(test)]
329 self.record_closed_section(section_start, section_len);
330 self.sections_offsets_in_flight
331 .retain(|&x| x != section_start);
332
333 if self.sections_offsets_in_flight.is_empty() {
334 self.flush_until(self.current_global_position);
335 return;
336 }
337 let next_open_offset = self.sections_offsets_in_flight[0];
338 if self.flushed_until_offset < next_open_offset {
339 self.flush_until(next_open_offset);
340 }
341 if section_start + section_len > self.flushed_until_offset {
342 self.flush_range(section_start, section_len);
345 }
346 }
347
348 #[cfg(test)]
349 fn record_closed_section(&mut self, start: usize, len: usize) {
350 self.closed_sections.push((start, len));
351 }
352
353 #[cfg(test)]
354 fn record_flushed_range(&mut self, start: usize, len: usize) {
355 let mut merged_start = start;
356 let mut merged_end = start + len;
357 let mut merged_ranges = Vec::with_capacity(self.flushed_ranges.len() + 1);
358 let mut inserted = false;
359
360 for (range_start, range_len) in self.flushed_ranges.drain(..) {
361 let range_end = range_start + range_len;
362 if range_end < merged_start {
363 merged_ranges.push((range_start, range_len));
364 continue;
365 }
366 if merged_end < range_start {
367 if !inserted {
368 merged_ranges.push((merged_start, merged_end - merged_start));
369 inserted = true;
370 }
371 merged_ranges.push((range_start, range_len));
372 continue;
373 }
374
375 merged_start = merged_start.min(range_start);
376 merged_end = merged_end.max(range_end);
377 }
378
379 if !inserted {
380 merged_ranges.push((merged_start, merged_end - merged_start));
381 }
382
383 self.flushed_ranges = merged_ranges;
384 }
385
386 #[cfg(test)]
387 fn pending_closed_bytes(&self) -> usize {
388 let mut pending = 0;
389
390 for (section_start, section_len) in &self.closed_sections {
391 let section_end = section_start + section_len;
392 let mut cursor = *section_start;
393
394 for (range_start, range_len) in &self.flushed_ranges {
395 let range_end = range_start + range_len;
396 if range_end <= cursor {
397 continue;
398 }
399 if *range_start >= section_end {
400 break;
401 }
402 if *range_start > cursor {
403 pending += *range_start - cursor;
404 }
405 cursor = cursor.max(range_end);
406 if cursor >= section_end {
407 break;
408 }
409 }
410
411 if cursor < section_end {
412 pending += section_end - cursor;
413 }
414 }
415
416 pending
417 }
418
419 #[inline]
420 fn align_to_next_page(&self, ptr: usize) -> usize {
421 (ptr + self.page_size - 1) & !(self.page_size - 1)
422 }
423
424 fn add_section(
426 &mut self,
427 entry_type: UnifiedLogType,
428 requested_section_size: usize,
429 ) -> AllocatedSection<MmapSectionStorage> {
430 self.current_global_position = self.align_to_next_page(self.current_global_position);
432 let section_size = self.align_to_next_page(requested_section_size) as u32;
433
434 if self.current_global_position + section_size as usize > self.mmap_buffer.len() {
436 return AllocatedSection::NoMoreSpace;
437 }
438
439 #[cfg(feature = "compact")]
440 let block_size = SECTION_HEADER_COMPACT_SIZE;
441
442 #[cfg(not(feature = "compact"))]
443 let block_size = self.page_size as u16;
444
445 let section_header = SectionHeader {
446 magic: SECTION_MAGIC,
447 block_size,
448 entry_type,
449 offset_to_next_section: section_size,
450 used: 0u32,
451 is_open: true,
452 };
453
454 self.sections_offsets_in_flight
456 .push(self.current_global_position);
457 let end_of_section = self.current_global_position + requested_section_size;
458 let user_buffer = &mut self.mmap_buffer[self.current_global_position..end_of_section];
459
460 let handle_buffer =
462 unsafe { from_raw_parts_mut(user_buffer.as_mut_ptr(), user_buffer.len()) };
463 let storage = MmapSectionStorage::new(handle_buffer, block_size as usize);
464
465 self.current_global_position = end_of_section;
466
467 Section(SectionHandle::create(section_header, storage).expect("Failed to create section"))
468 }
469
470 #[cfg(test)]
471 fn used(&self) -> usize {
472 self.current_global_position
473 }
474}
475
476pub struct MmapUnifiedLoggerWrite {
478 front_slab: SlabEntry,
480 back_slabs: Vec<SlabEntry>,
482 base_file_path: PathBuf,
484 slab_size: usize,
486 front_slab_suffix: usize,
488}
489
490fn build_slab_path(base_file_path: &Path, slab_index: usize) -> io::Result<PathBuf> {
491 let mut file_path = base_file_path.to_path_buf();
492 let stem = file_path.file_stem().ok_or_else(|| {
493 io::Error::new(
494 io::ErrorKind::InvalidInput,
495 "Base file path has no file name",
496 )
497 })?;
498 let stem = stem.to_str().ok_or_else(|| {
499 io::Error::new(
500 io::ErrorKind::InvalidInput,
501 "Base file name is not valid UTF-8",
502 )
503 })?;
504 let extension = file_path.extension().ok_or_else(|| {
505 io::Error::new(
506 io::ErrorKind::InvalidInput,
507 "Base file path has no extension",
508 )
509 })?;
510 let extension = extension.to_str().ok_or_else(|| {
511 io::Error::new(
512 io::ErrorKind::InvalidInput,
513 "Base file extension is not valid UTF-8",
514 )
515 })?;
516 if stem.is_empty() {
517 return Err(io::Error::new(
518 io::ErrorKind::InvalidInput,
519 "Base file name is empty",
520 ));
521 }
522 let file_name = format!("{stem}_{slab_index}.{extension}");
523 file_path.set_file_name(file_name);
524 Ok(file_path)
525}
526
527fn make_slab_file(base_file_path: &Path, slab_size: usize, slab_suffix: usize) -> io::Result<File> {
528 let file_path = build_slab_path(base_file_path, slab_suffix)?;
529 let file = OpenOptions::new()
530 .read(true)
531 .write(true)
532 .create(true)
533 .truncate(true)
534 .open(&file_path)
535 .map_err(|e| {
536 io::Error::new(
537 e.kind(),
538 format!("Failed to open file {}: {e}", file_path.display()),
539 )
540 })?;
541 file.set_len(slab_size as u64).map_err(|e| {
542 io::Error::new(
543 e.kind(),
544 format!("Failed to set file length for {}: {e}", file_path.display()),
545 )
546 })?;
547 Ok(file)
548}
549
550fn remove_existing_alias(base_file_path: &Path) -> io::Result<()> {
551 match std::fs::symlink_metadata(base_file_path) {
552 Ok(meta) => {
553 if meta.is_dir() {
554 return Err(io::Error::new(
555 io::ErrorKind::AlreadyExists,
556 format!(
557 "Cannot create base log alias at {} because a directory already exists there",
558 base_file_path.display()
559 ),
560 ));
561 }
562 std::fs::remove_file(base_file_path).map_err(|e| {
563 io::Error::new(
564 e.kind(),
565 format!(
566 "Failed to remove existing base log alias {}: {e}",
567 base_file_path.display()
568 ),
569 )
570 })
571 }
572 Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
573 Err(e) => Err(io::Error::new(
574 e.kind(),
575 format!(
576 "Failed to inspect existing base log alias {}: {e}",
577 base_file_path.display()
578 ),
579 )),
580 }
581}
582
583fn create_base_alias_link(base_file_path: &Path) -> io::Result<()> {
584 let first_slab_path = build_slab_path(base_file_path, 0)?;
585 remove_existing_alias(base_file_path)?;
586
587 #[cfg(unix)]
588 {
589 use std::os::unix::fs::symlink;
590 let relative_target = Path::new(first_slab_path.file_name().ok_or_else(|| {
591 io::Error::new(
592 io::ErrorKind::InvalidInput,
593 "First slab file has no name component",
594 )
595 })?);
596 symlink(relative_target, base_file_path).map_err(|e| {
597 io::Error::new(
598 e.kind(),
599 format!(
600 "Failed to create base log alias {} -> {}: {e}",
601 base_file_path.display(),
602 first_slab_path.display()
603 ),
604 )
605 })
606 }
607
608 #[cfg(windows)]
609 {
610 use std::os::windows::fs::symlink_file;
611 let relative_target = Path::new(first_slab_path.file_name().ok_or_else(|| {
612 io::Error::new(
613 io::ErrorKind::InvalidInput,
614 "First slab file has no name component",
615 )
616 })?);
617 match symlink_file(relative_target, base_file_path) {
618 Ok(()) => Ok(()),
619 Err(symlink_err) => std::fs::hard_link(&first_slab_path, base_file_path).map_err(
620 |hard_link_err| {
621 io::Error::other(format!(
622 "Failed to create base log alias {}. Symlink error: {symlink_err}. Hard-link fallback error: {hard_link_err}",
623 base_file_path.display()
624 ))
625 },
626 ),
627 }?;
628 Ok(())
629 }
630
631 #[cfg(not(any(unix, windows)))]
632 {
633 std::fs::hard_link(&first_slab_path, base_file_path).map_err(|e| {
634 io::Error::new(
635 e.kind(),
636 format!(
637 "Failed to create base log alias {} -> {}: {e}",
638 base_file_path.display(),
639 first_slab_path.display()
640 ),
641 )
642 })
643 }
644}
645
646impl UnifiedLogWrite<MmapSectionStorage> for MmapUnifiedLoggerWrite {
647 fn add_section(
649 &mut self,
650 entry_type: UnifiedLogType,
651 requested_section_size: usize,
652 ) -> CuResult<SectionHandle<MmapSectionStorage>> {
653 self.garbage_collect_backslabs(); self.front_slab.clear_temporary_end_marker();
655 let maybe_section = self
656 .front_slab
657 .add_section(entry_type, requested_section_size);
658
659 match maybe_section {
660 AllocatedSection::NoMoreSpace => {
661 let new_slab = self.create_slab()?;
663 self.back_slabs
665 .push(mem::replace(&mut self.front_slab, new_slab));
666 match self
667 .front_slab
668 .add_section(entry_type, requested_section_size)
669 {
670 AllocatedSection::NoMoreSpace => Err(CuError::from("out of space")),
671 Section(section) => {
672 self.place_end_marker(true)?;
673 Ok(section)
674 }
675 }
676 }
677 Section(section) => {
678 self.place_end_marker(true)?;
679 Ok(section)
680 }
681 }
682 }
683
684 fn flush_section(&mut self, section: &mut SectionHandle<MmapSectionStorage>) {
685 section.mark_closed();
686 for slab in self.back_slabs.iter_mut() {
687 if slab.is_it_my_section(section) {
688 slab.flush_section(section);
689 return;
690 }
691 }
692 self.front_slab.flush_section(section);
693 }
694
695 fn status(&self) -> UnifiedLogStatus {
696 UnifiedLogStatus {
697 total_used_space: self.front_slab.current_global_position,
698 total_allocated_space: self.slab_size * self.front_slab_suffix,
699 }
700 }
701}
702
703impl MmapUnifiedLoggerWrite {
704 fn next_slab(&mut self) -> io::Result<File> {
705 let next_suffix = self.front_slab_suffix + 1;
706 let file = make_slab_file(&self.base_file_path, self.slab_size, next_suffix)?;
707 self.front_slab_suffix = next_suffix;
708 Ok(file)
709 }
710
711 fn new(base_file_path: &Path, slab_size: usize, page_size: usize) -> io::Result<Self> {
712 let file = make_slab_file(base_file_path, slab_size, 0)?;
713 create_base_alias_link(base_file_path)?;
714 let mut front_slab = SlabEntry::new(file, page_size)?;
715
716 let main_header = MainHeader {
718 magic: MAIN_MAGIC,
719 first_section_offset: page_size as u16,
720 page_size: page_size as u16,
721 };
722 let nb_bytes = encode_into_slice(&main_header, &mut front_slab.mmap_buffer[..], standard())
723 .map_err(|e| io::Error::other(format!("Failed to encode main header: {e}")))?;
724 assert!(nb_bytes < page_size);
725 front_slab.current_global_position = page_size; Ok(Self {
728 front_slab,
729 back_slabs: Vec::new(),
730 base_file_path: base_file_path.to_path_buf(),
731 slab_size,
732 front_slab_suffix: 0,
733 })
734 }
735
736 fn garbage_collect_backslabs(&mut self) {
737 self.back_slabs
738 .retain_mut(|slab| !slab.sections_offsets_in_flight.is_empty());
739 }
740
741 fn place_end_marker(&mut self, temporary: bool) -> CuResult<()> {
742 match self.front_slab.write_end_marker(temporary) {
743 Ok(_) => Ok(()),
744 Err(_) => {
745 let new_slab = self.create_slab()?;
747 self.back_slabs
748 .push(mem::replace(&mut self.front_slab, new_slab));
749 self.front_slab.write_end_marker(temporary)
750 }
751 }
752 }
753
754 pub fn stats(&self) -> (usize, Vec<usize>, usize) {
755 (
756 self.front_slab.current_global_position,
757 self.front_slab.sections_offsets_in_flight.clone(),
758 self.back_slabs.len(),
759 )
760 }
761
762 fn create_slab(&mut self) -> CuResult<SlabEntry> {
763 let file = self
764 .next_slab()
765 .map_err(|e| CuError::new_with_cause("Failed to create slab file", e))?;
766 SlabEntry::new(file, self.front_slab.page_size)
767 .map_err(|e| CuError::new_with_cause("Failed to create slab memory map", e))
768 }
769}
770
771impl Drop for MmapUnifiedLoggerWrite {
772 fn drop(&mut self) {
773 #[cfg(debug_assertions)]
774 eprintln!("Flushing the unified Logger ... "); self.front_slab.clear_temporary_end_marker();
777 if let Err(e) = self.place_end_marker(false) {
778 panic!("Failed to flush the unified logger: {}", e);
779 }
780 self.front_slab
781 .flush_until(self.front_slab.current_global_position);
782 self.garbage_collect_backslabs();
783 #[cfg(debug_assertions)]
784 eprintln!("Unified Logger flushed."); }
786}
787
788fn open_slab_index(
789 base_file_path: &Path,
790 slab_index: usize,
791) -> io::Result<(File, Mmap, u16, Option<MainHeader>)> {
792 let mut options = OpenOptions::new();
793 let options = options.read(true);
794
795 let file_path = build_slab_path(base_file_path, slab_index)?;
796 let file = options.open(&file_path).map_err(|e| {
797 io::Error::new(
798 e.kind(),
799 format!("Failed to open slab file {}: {e}", file_path.display()),
800 )
801 })?;
802 let mmap = unsafe { Mmap::map(&file) }
804 .map_err(|e| io::Error::new(e.kind(), format!("Failed to map slab file: {e}")))?;
805 let mut prolog = 0u16;
806 let mut maybe_main_header: Option<MainHeader> = None;
807 if slab_index == 0 {
808 let main_header: MainHeader;
809 let _read: usize;
810 (main_header, _read) = decode_from_slice(&mmap[..], standard()).map_err(|e| {
811 io::Error::new(
812 io::ErrorKind::InvalidData,
813 format!("Failed to decode main header: {e}"),
814 )
815 })?;
816 if main_header.magic != MAIN_MAGIC {
817 return Err(io::Error::new(
818 io::ErrorKind::InvalidData,
819 "Invalid magic number in main header",
820 ));
821 }
822 prolog = main_header.first_section_offset;
823 maybe_main_header = Some(main_header);
824 }
825 Ok((file, mmap, prolog, maybe_main_header))
826}
827
828pub struct MmapUnifiedLoggerRead {
830 base_file_path: PathBuf,
831 main_header: MainHeader,
832 current_mmap_buffer: Mmap,
833 current_file: File,
834 current_slab_index: usize,
835 current_reading_position: usize,
836}
837
838#[derive(Clone, Copy, Debug, PartialEq, Eq)]
840pub struct LogPosition {
841 pub slab_index: usize,
842 pub offset: usize,
843}
844
845impl UnifiedLogRead for MmapUnifiedLoggerRead {
846 fn read_next_section_type(&mut self, datalogtype: UnifiedLogType) -> CuResult<Option<Vec<u8>>> {
847 loop {
849 if self.current_reading_position >= self.current_mmap_buffer.len() {
850 self.next_slab().map_err(|e| {
851 CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
852 })?;
853 }
854
855 let header_result = self.read_section_header();
856 let header = header_result.map_err(|error| {
857 CuError::new_with_cause(
858 &format!(
859 "Could not read a sections header: {}/{}:{}",
860 self.base_file_path.as_os_str().to_string_lossy(),
861 self.current_slab_index,
862 self.current_reading_position,
863 ),
864 error,
865 )
866 })?;
867
868 if header.entry_type == UnifiedLogType::LastEntry {
870 return Ok(None);
871 }
872
873 if header.entry_type == datalogtype {
875 let result = Some(self.read_section_content(&header)?);
876 self.current_reading_position += header.offset_to_next_section as usize;
877 return Ok(result);
878 }
879
880 self.current_reading_position += header.offset_to_next_section as usize;
882 }
883 }
884
885 fn raw_read_section(&mut self) -> CuResult<(SectionHeader, Vec<u8>)> {
887 if self.current_reading_position >= self.current_mmap_buffer.len() {
888 self.next_slab().map_err(|e| {
889 CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
890 })?;
891 }
892
893 let read_result = self.read_section_header();
894
895 match read_result {
896 Err(error) => Err(CuError::new_with_cause(
897 &format!(
898 "Could not read a sections header: {}/{}:{}",
899 self.base_file_path.as_os_str().to_string_lossy(),
900 self.current_slab_index,
901 self.current_reading_position,
902 ),
903 error,
904 )),
905 Ok(header) => {
906 let data = self.read_section_content(&header)?;
907 self.current_reading_position += header.offset_to_next_section as usize;
908 Ok((header, data))
909 }
910 }
911 }
912}
913
914impl MmapUnifiedLoggerRead {
915 pub fn new(base_file_path: &Path) -> io::Result<Self> {
916 let (file, mmap, prolog, header) = open_slab_index(base_file_path, 0)?;
917 let main_header = header.ok_or_else(|| {
918 io::Error::new(io::ErrorKind::InvalidData, "Missing main header in slab 0")
919 })?;
920
921 Ok(Self {
922 base_file_path: base_file_path.to_path_buf(),
923 main_header,
924 current_file: file,
925 current_mmap_buffer: mmap,
926 current_slab_index: 0,
927 current_reading_position: prolog as usize,
928 })
929 }
930
931 pub fn position(&self) -> LogPosition {
933 LogPosition {
934 slab_index: self.current_slab_index,
935 offset: self.current_reading_position,
936 }
937 }
938
939 pub fn seek(&mut self, pos: LogPosition) -> CuResult<()> {
941 if pos.slab_index != self.current_slab_index {
942 let (file, mmap, _prolog, _header) =
943 open_slab_index(&self.base_file_path, pos.slab_index).map_err(|e| {
944 CuError::new_with_cause(
945 &format!("Failed to open slab {} for seek", pos.slab_index),
946 e,
947 )
948 })?;
949 self.current_file = file;
950 self.current_mmap_buffer = mmap;
951 self.current_slab_index = pos.slab_index;
952 }
953 self.current_reading_position = pos.offset;
954 Ok(())
955 }
956
957 fn next_slab(&mut self) -> io::Result<()> {
958 self.current_slab_index += 1;
959 let (file, mmap, prolog, _) =
960 open_slab_index(&self.base_file_path, self.current_slab_index)?;
961 self.current_file = file;
962 self.current_mmap_buffer = mmap;
963 self.current_reading_position = prolog as usize;
964 Ok(())
965 }
966
967 pub fn raw_main_header(&self) -> &MainHeader {
968 &self.main_header
969 }
970
971 pub fn scan_section_bytes(&mut self, datalogtype: UnifiedLogType) -> CuResult<u64> {
972 let mut total = 0u64;
973
974 loop {
975 if self.current_reading_position >= self.current_mmap_buffer.len() {
976 self.next_slab().map_err(|e| {
977 CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
978 })?;
979 }
980
981 let header = self.read_section_header()?;
982
983 if header.entry_type == UnifiedLogType::LastEntry {
984 return Ok(total);
985 }
986
987 if header.entry_type == datalogtype {
988 total = total.saturating_add(header.used as u64);
989 }
990
991 self.current_reading_position += header.offset_to_next_section as usize;
992 }
993 }
994
995 fn read_section_content(&mut self, header: &SectionHeader) -> CuResult<Vec<u8>> {
997 let mut section_data = vec![0; header.used as usize];
999 let start_of_data = self.current_reading_position + header.block_size as usize;
1000 section_data.copy_from_slice(
1001 &self.current_mmap_buffer[start_of_data..start_of_data + header.used as usize],
1002 );
1003
1004 Ok(section_data)
1005 }
1006
1007 fn read_section_header(&mut self) -> CuResult<SectionHeader> {
1008 let section_header: SectionHeader;
1009 (section_header, _) = decode_from_slice(
1010 &self.current_mmap_buffer[self.current_reading_position..],
1011 standard(),
1012 )
1013 .map_err(|e| {
1014 CuError::new_with_cause(
1015 &format!(
1016 "Could not read a sections header: {}/{}:{}",
1017 self.base_file_path.as_os_str().to_string_lossy(),
1018 self.current_slab_index,
1019 self.current_reading_position,
1020 ),
1021 e,
1022 )
1023 })?;
1024 if section_header.magic != SECTION_MAGIC {
1025 return Err("Invalid magic number in section header".into());
1026 }
1027
1028 Ok(section_header)
1029 }
1030}
1031
1032pub struct UnifiedLoggerIOReader {
1034 logger: MmapUnifiedLoggerRead,
1035 log_type: UnifiedLogType,
1036 buffer: Vec<u8>,
1037 buffer_pos: usize,
1038}
1039
1040impl UnifiedLoggerIOReader {
1041 pub fn new(logger: MmapUnifiedLoggerRead, log_type: UnifiedLogType) -> Self {
1042 Self {
1043 logger,
1044 log_type,
1045 buffer: Vec::new(),
1046 buffer_pos: 0,
1047 }
1048 }
1049
1050 fn fill_buffer(&mut self) -> io::Result<bool> {
1052 match self.logger.read_next_section_type(self.log_type) {
1053 Ok(Some(section)) => {
1054 self.buffer = section;
1055 self.buffer_pos = 0;
1056 Ok(true)
1057 }
1058 Ok(None) => Ok(false), Err(e) => Err(io::Error::other(e.to_string())),
1060 }
1061 }
1062}
1063
1064impl Read for UnifiedLoggerIOReader {
1065 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1066 if self.buffer_pos >= self.buffer.len() && !self.fill_buffer()? {
1067 return Ok(0);
1069 }
1070
1071 if self.buffer_pos >= self.buffer.len() {
1073 return Ok(0);
1074 }
1075
1076 let len = std::cmp::min(buf.len(), self.buffer.len() - self.buffer_pos);
1078 buf[..len].copy_from_slice(&self.buffer[self.buffer_pos..self.buffer_pos + len]);
1079 self.buffer_pos += len;
1080 Ok(len)
1081 }
1082}
1083
1084#[cfg(feature = "std")]
1085#[cfg(test)]
1086mod tests {
1087 use super::*;
1088 use crate::stream_write;
1089 use bincode::de::read::SliceReader;
1090 use bincode::{Decode, Encode, decode_from_reader, decode_from_slice};
1091 use cu29_traits::WriteStream;
1092 use std::path::PathBuf;
1093 use std::sync::{Arc, Mutex};
1094 use tempfile::TempDir;
1095
1096 const LARGE_SLAB: usize = 100 * 1024; const SMALL_SLAB: usize = 16 * 2 * 1024; fn make_a_logger(
1100 tmp_dir: &TempDir,
1101 slab_size: usize,
1102 ) -> (Arc<Mutex<MmapUnifiedLoggerWrite>>, PathBuf) {
1103 let file_path = tmp_dir.path().join("test.bin");
1104 let MmapUnifiedLogger::Write(data_logger) = MmapUnifiedLoggerBuilder::new()
1105 .write(true)
1106 .create(true)
1107 .file_base_name(&file_path)
1108 .preallocated_size(slab_size)
1109 .build()
1110 .expect("Failed to create logger")
1111 else {
1112 panic!("Failed to create logger")
1113 };
1114
1115 (Arc::new(Mutex::new(data_logger)), file_path)
1116 }
1117
1118 #[test]
1119 fn test_truncation_and_sections_creations() {
1120 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1121 let file_path = tmp_dir.path().join("test.bin");
1122 let _used = {
1123 let MmapUnifiedLogger::Write(mut logger) = MmapUnifiedLoggerBuilder::new()
1124 .write(true)
1125 .create(true)
1126 .file_base_name(&file_path)
1127 .preallocated_size(100000)
1128 .build()
1129 .expect("Failed to create logger")
1130 else {
1131 panic!("Failed to create logger")
1132 };
1133 logger
1134 .add_section(UnifiedLogType::StructuredLogLine, 1024)
1135 .unwrap();
1136 logger
1137 .add_section(UnifiedLogType::CopperList, 2048)
1138 .unwrap();
1139 let used = logger.front_slab.used();
1140 assert!(used < 4 * page_size::get()); used
1144 };
1145
1146 let _file = OpenOptions::new()
1147 .read(true)
1148 .open(tmp_dir.path().join("test_0.bin"))
1149 .expect("Could not reopen the file");
1150 }
1157
1158 #[test]
1159 fn test_base_alias_exists_and_matches_first_slab() {
1160 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1161 let file_path = tmp_dir.path().join("test.bin");
1162 let _logger = MmapUnifiedLoggerBuilder::new()
1163 .write(true)
1164 .create(true)
1165 .file_base_name(&file_path)
1166 .preallocated_size(LARGE_SLAB)
1167 .build()
1168 .expect("Failed to create logger");
1169
1170 let first_slab = build_slab_path(&file_path, 0).expect("Failed to build first slab path");
1171 assert!(file_path.exists(), "base alias does not exist");
1172 assert!(first_slab.exists(), "first slab does not exist");
1173
1174 let alias_bytes = std::fs::read(&file_path).expect("Failed to read base alias");
1175 let slab_bytes = std::fs::read(&first_slab).expect("Failed to read first slab");
1176 assert_eq!(alias_bytes, slab_bytes);
1177 }
1178
1179 #[test]
1180 fn test_one_section_self_cleaning() {
1181 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1182 let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
1183 {
1184 let _stream = stream_write::<(), MmapSectionStorage>(
1185 logger.clone(),
1186 UnifiedLogType::StructuredLogLine,
1187 1024,
1188 );
1189 assert_eq!(
1190 logger
1191 .lock()
1192 .unwrap()
1193 .front_slab
1194 .sections_offsets_in_flight
1195 .len(),
1196 1
1197 );
1198 }
1199 assert_eq!(
1200 logger
1201 .lock()
1202 .unwrap()
1203 .front_slab
1204 .sections_offsets_in_flight
1205 .len(),
1206 0
1207 );
1208 let logger = logger.lock().unwrap();
1209 assert_eq!(
1210 logger.front_slab.flushed_until_offset,
1211 logger.front_slab.current_global_position
1212 );
1213 }
1214
1215 #[test]
1216 fn test_temporary_end_marker_is_created() {
1217 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1218 let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
1219 {
1220 let mut stream = stream_write::<u32, MmapSectionStorage>(
1221 logger.clone(),
1222 UnifiedLogType::StructuredLogLine,
1223 1024,
1224 )
1225 .unwrap();
1226 stream.log(&42u32).unwrap();
1227 }
1228
1229 let logger_guard = logger.lock().unwrap();
1230 let slab = &logger_guard.front_slab;
1231 let marker_start = slab
1232 .temporary_end_marker
1233 .expect("temporary end-of-log marker missing");
1234 let (eof_header, _) =
1235 decode_from_slice::<SectionHeader, _>(&slab.mmap_buffer[marker_start..], standard())
1236 .expect("Could not decode end-of-log marker header");
1237 assert_eq!(eof_header.entry_type, UnifiedLogType::LastEntry);
1238 assert!(eof_header.is_open);
1239 assert_eq!(eof_header.used, 0);
1240 }
1241
1242 #[test]
1243 fn test_final_end_marker_is_not_temporary() {
1244 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1245 let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
1246 {
1247 let mut stream = stream_write::<u32, MmapSectionStorage>(
1248 logger.clone(),
1249 UnifiedLogType::CopperList,
1250 1024,
1251 )
1252 .unwrap();
1253 stream.log(&1u32).unwrap();
1254 }
1255 drop(logger);
1256
1257 let MmapUnifiedLogger::Read(mut reader) = MmapUnifiedLoggerBuilder::new()
1258 .file_base_name(&f)
1259 .build()
1260 .expect("Failed to build reader")
1261 else {
1262 panic!("Failed to create reader");
1263 };
1264
1265 loop {
1266 let (header, _data) = reader
1267 .raw_read_section()
1268 .expect("Failed to read section while searching for EOF");
1269 if header.entry_type == UnifiedLogType::LastEntry {
1270 assert!(!header.is_open);
1271 break;
1272 }
1273 }
1274 }
1275
1276 #[test]
1277 fn test_two_sections_self_cleaning_in_order() {
1278 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1279 let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
1280 let s1 = stream_write::<(), MmapSectionStorage>(
1281 logger.clone(),
1282 UnifiedLogType::StructuredLogLine,
1283 1024,
1284 );
1285 assert_eq!(
1286 logger
1287 .lock()
1288 .unwrap()
1289 .front_slab
1290 .sections_offsets_in_flight
1291 .len(),
1292 1
1293 );
1294 let s2 = stream_write::<(), MmapSectionStorage>(
1295 logger.clone(),
1296 UnifiedLogType::StructuredLogLine,
1297 1024,
1298 );
1299 assert_eq!(
1300 logger
1301 .lock()
1302 .unwrap()
1303 .front_slab
1304 .sections_offsets_in_flight
1305 .len(),
1306 2
1307 );
1308 drop(s2);
1309 assert_eq!(
1310 logger
1311 .lock()
1312 .unwrap()
1313 .front_slab
1314 .sections_offsets_in_flight
1315 .len(),
1316 1
1317 );
1318 drop(s1);
1319 let lg = logger.lock().unwrap();
1320 assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
1321 assert_eq!(
1322 lg.front_slab.flushed_until_offset,
1323 lg.front_slab.current_global_position
1324 );
1325 }
1326
1327 #[test]
1328 fn test_two_sections_self_cleaning_out_of_order() {
1329 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1330 let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
1331 let s1 = stream_write::<(), MmapSectionStorage>(
1332 logger.clone(),
1333 UnifiedLogType::StructuredLogLine,
1334 1024,
1335 );
1336 assert_eq!(
1337 logger
1338 .lock()
1339 .unwrap()
1340 .front_slab
1341 .sections_offsets_in_flight
1342 .len(),
1343 1
1344 );
1345 let s2 = stream_write::<(), MmapSectionStorage>(
1346 logger.clone(),
1347 UnifiedLogType::StructuredLogLine,
1348 1024,
1349 );
1350 assert_eq!(
1351 logger
1352 .lock()
1353 .unwrap()
1354 .front_slab
1355 .sections_offsets_in_flight
1356 .len(),
1357 2
1358 );
1359 drop(s1);
1360 assert_eq!(
1361 logger
1362 .lock()
1363 .unwrap()
1364 .front_slab
1365 .sections_offsets_in_flight
1366 .len(),
1367 1
1368 );
1369 drop(s2);
1370 let lg = logger.lock().unwrap();
1371 assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
1372 assert_eq!(
1373 lg.front_slab.flushed_until_offset,
1374 lg.front_slab.current_global_position
1375 );
1376 }
1377
1378 #[test]
1379 fn test_closed_section_flushes_behind_open_earlier_section() {
1380 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1381 let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
1382 let s1 = stream_write::<(), MmapSectionStorage>(
1383 logger.clone(),
1384 UnifiedLogType::StructuredLogLine,
1385 1024,
1386 )
1387 .unwrap();
1388 {
1389 let mut s2 = stream_write::<u32, MmapSectionStorage>(
1390 logger.clone(),
1391 UnifiedLogType::CopperList,
1392 1024,
1393 )
1394 .unwrap();
1395 s2.log(&42u32).unwrap();
1396 }
1397
1398 let logger_guard = logger.lock().unwrap();
1399 assert_eq!(logger_guard.front_slab.sections_offsets_in_flight.len(), 1);
1400 assert!(
1401 logger_guard.front_slab.flushed_until_offset
1402 < logger_guard.front_slab.current_global_position
1403 );
1404 assert_eq!(logger_guard.front_slab.pending_closed_bytes(), 0);
1405 drop(logger_guard);
1406 drop(s1);
1407 }
1408
1409 #[test]
1410 fn test_write_then_read_one_section() {
1411 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1412 let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
1413 {
1414 let mut stream =
1415 stream_write(logger.clone(), UnifiedLogType::StructuredLogLine, 1024).unwrap();
1416 stream.log(&1u32).unwrap();
1417 stream.log(&2u32).unwrap();
1418 stream.log(&3u32).unwrap();
1419 }
1420 drop(logger);
1421 let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
1422 .file_base_name(&f)
1423 .build()
1424 .expect("Failed to build logger")
1425 else {
1426 panic!("Failed to build logger");
1427 };
1428 let section = dl
1429 .read_next_section_type(UnifiedLogType::StructuredLogLine)
1430 .expect("Failed to read section");
1431 assert!(section.is_some());
1432 let section = section.unwrap();
1433 let mut reader = SliceReader::new(§ion[..]);
1434 let v1: u32 = decode_from_reader(&mut reader, standard()).unwrap();
1435 let v2: u32 = decode_from_reader(&mut reader, standard()).unwrap();
1436 let v3: u32 = decode_from_reader(&mut reader, standard()).unwrap();
1437 assert_eq!(v1, 1);
1438 assert_eq!(v2, 2);
1439 assert_eq!(v3, 3);
1440 }
1441
1442 #[cfg(feature = "mmap-fsync")]
1443 #[test]
1444 fn test_fsync_feature_syncs_on_section_flush() {
1445 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1446 let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
1447 {
1448 let mut stream =
1449 stream_write(logger.clone(), UnifiedLogType::StructuredLogLine, 1024).unwrap();
1450 stream.log(&1u32).unwrap();
1451 }
1452
1453 let logger = logger.lock().unwrap();
1454 assert!(
1455 logger.front_slab.sync_call_count > 0,
1456 "expected mmap-fsync to issue at least one sync_all call"
1457 );
1458 }
1459
1460 #[derive(Debug, Encode, Decode)]
1463 enum CopperListStateMock {
1464 Free,
1465 ProcessingTasks,
1466 BeingSerialized,
1467 }
1468
1469 #[derive(Encode, Decode)]
1470 struct CopperList<P: bincode::enc::Encode> {
1471 state: CopperListStateMock,
1472 payload: P, }
1474
1475 #[test]
1476 fn test_copperlist_list_like_logging() {
1477 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1478 let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
1479 {
1480 let mut stream =
1481 stream_write(logger.clone(), UnifiedLogType::CopperList, 1024).unwrap();
1482 let cl0 = CopperList {
1483 state: CopperListStateMock::Free,
1484 payload: (1u32, 2u32, 3u32),
1485 };
1486 let cl1 = CopperList {
1487 state: CopperListStateMock::ProcessingTasks,
1488 payload: (4u32, 5u32, 6u32),
1489 };
1490 stream.log(&cl0).unwrap();
1491 stream.log(&cl1).unwrap();
1492 }
1493 drop(logger);
1494
1495 let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
1496 .file_base_name(&f)
1497 .build()
1498 .expect("Failed to build logger")
1499 else {
1500 panic!("Failed to build logger");
1501 };
1502 let section = dl
1503 .read_next_section_type(UnifiedLogType::CopperList)
1504 .expect("Failed to read section");
1505 assert!(section.is_some());
1506 let section = section.unwrap();
1507
1508 let mut reader = SliceReader::new(§ion[..]);
1509 let cl0: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
1510 let cl1: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
1511 assert_eq!(cl0.payload.1, 2);
1512 assert_eq!(cl1.payload.2, 6);
1513 }
1514
1515 #[test]
1516 fn test_multi_slab_end2end() {
1517 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1518 let (logger, f) = make_a_logger(&tmp_dir, SMALL_SLAB);
1519 {
1520 let mut stream =
1521 stream_write(logger.clone(), UnifiedLogType::CopperList, 1024).unwrap();
1522 let cl0 = CopperList {
1523 state: CopperListStateMock::Free,
1524 payload: (1u32, 2u32, 3u32),
1525 };
1526 for _ in 0..10000 {
1528 stream.log(&cl0).unwrap();
1529 }
1530 }
1531 drop(logger);
1532
1533 let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
1534 .file_base_name(&f)
1535 .build()
1536 .expect("Failed to build logger")
1537 else {
1538 panic!("Failed to build logger");
1539 };
1540 let mut total_readback = 0;
1541 loop {
1542 let section = dl.read_next_section_type(UnifiedLogType::CopperList);
1543 if section.is_err() {
1544 break;
1545 }
1546 let section = section.unwrap();
1547 if section.is_none() {
1548 break;
1549 }
1550 let section = section.unwrap();
1551
1552 let mut reader = SliceReader::new(§ion[..]);
1553 loop {
1554 let maybe_cl: Result<CopperList<(u32, u32, u32)>, _> =
1555 decode_from_reader(&mut reader, standard());
1556 if maybe_cl.is_ok() {
1557 total_readback += 1;
1558 } else {
1559 break;
1560 }
1561 }
1562 }
1563 assert_eq!(total_readback, 10000);
1564 }
1565}