1use crate::{
5 AllocatedSection, MAIN_MAGIC, MainHeader, SECTION_MAGIC, SectionHandle, SectionHeader,
6 SectionStorage, UNIFIED_LOG_FORMAT_VERSION, UnifiedLogRead, UnifiedLogStatus, UnifiedLogWrite,
7};
8
9use crate::SECTION_HEADER_COMPACT_SIZE;
10
11use AllocatedSection::Section;
12use bincode::config::standard;
13use bincode::enc::EncoderImpl;
14use bincode::enc::write::SliceWriter;
15use bincode::error::EncodeError;
16use bincode::{Encode, decode_from_slice, encode_into_slice};
17use core::slice::from_raw_parts_mut;
18use cu29_traits::{
19 CuError, CuResult, ObservedWriter, UnifiedLogType, abort_observed_encode,
20 begin_observed_encode, finish_observed_encode,
21};
22use memmap2::{Mmap, MmapMut};
23use std::fs::{File, OpenOptions};
24use std::io::Read;
25use std::mem::ManuallyDrop;
26use std::path::{Path, PathBuf};
27use std::{io, mem};
28
29pub struct MmapSectionStorage {
30 buffer: &'static mut [u8],
31 offset: usize,
32 block_size: usize,
33}
34
35impl MmapSectionStorage {
36 pub fn new(buffer: &'static mut [u8], block_size: usize) -> Self {
37 Self {
38 buffer,
39 offset: 0,
40 block_size,
41 }
42 }
43
44 pub fn buffer_ptr(&self) -> *const u8 {
45 &self.buffer[0] as *const u8
46 }
47}
48
49impl SectionStorage for MmapSectionStorage {
50 fn initialize<E: Encode>(&mut self, header: &E) -> Result<usize, EncodeError> {
51 self.post_update_header(header)?;
52 self.offset = self.block_size;
53 Ok(self.offset)
54 }
55
56 fn post_update_header<E: Encode>(&mut self, header: &E) -> Result<usize, EncodeError> {
57 encode_into_slice(header, &mut self.buffer[0..], standard())
58 }
59
60 fn append<E: Encode>(&mut self, entry: &E) -> Result<usize, EncodeError> {
61 begin_observed_encode();
62 let result = (|| {
63 let mut encoder = EncoderImpl::new(
64 ObservedWriter::new(SliceWriter::new(&mut self.buffer[self.offset..])),
65 standard(),
66 );
67 entry.encode(&mut encoder)?;
68 Ok(encoder.into_writer().into_inner().bytes_written())
69 })();
70 let size = match result {
71 Ok(size) => {
72 debug_assert_eq!(size, finish_observed_encode());
73 size
74 }
75 Err(err) => {
76 abort_observed_encode();
77 return Err(err);
78 }
79 };
80 self.offset += size;
81 Ok(size)
82 }
83
84 fn flush(&mut self) -> CuResult<usize> {
85 Ok(self.offset)
87 }
88}
89
90pub enum MmapUnifiedLogger {
93 Read(MmapUnifiedLoggerRead),
94 Write(MmapUnifiedLoggerWrite),
95}
96
97pub struct MmapUnifiedLoggerBuilder {
99 file_base_name: Option<PathBuf>,
100 preallocated_size: Option<usize>,
101 write: bool,
102 create: bool,
103}
104
105impl Default for MmapUnifiedLoggerBuilder {
106 fn default() -> Self {
107 Self::new()
108 }
109}
110
111impl MmapUnifiedLoggerBuilder {
112 pub fn new() -> Self {
113 Self {
114 file_base_name: None,
115 preallocated_size: None,
116 write: false,
117 create: false, }
119 }
120
121 pub fn file_base_name(mut self, file_path: &Path) -> Self {
123 self.file_base_name = Some(file_path.to_path_buf());
124 self
125 }
126
127 pub fn preallocated_size(mut self, preallocated_size: usize) -> Self {
128 self.preallocated_size = Some(preallocated_size);
129 self
130 }
131
132 pub fn write(mut self, write: bool) -> Self {
133 self.write = write;
134 self
135 }
136
137 pub fn create(mut self, create: bool) -> Self {
138 self.create = create;
139 self
140 }
141
142 pub fn build(self) -> io::Result<MmapUnifiedLogger> {
143 let page_size = page_size::get();
144
145 if self.write && self.create {
146 let file_path = self.file_base_name.ok_or_else(|| {
147 io::Error::new(
148 io::ErrorKind::InvalidInput,
149 "File path is required for write mode",
150 )
151 })?;
152 let preallocated_size = self.preallocated_size.ok_or_else(|| {
153 io::Error::new(
154 io::ErrorKind::InvalidInput,
155 "Preallocated size is required for write mode",
156 )
157 })?;
158 let ulw = MmapUnifiedLoggerWrite::new(&file_path, preallocated_size, page_size)?;
159 Ok(MmapUnifiedLogger::Write(ulw))
160 } else {
161 let file_path = self.file_base_name.ok_or_else(|| {
162 io::Error::new(io::ErrorKind::InvalidInput, "File path is required")
163 })?;
164 let ulr = MmapUnifiedLoggerRead::new(&file_path)?;
165 Ok(MmapUnifiedLogger::Read(ulr))
166 }
167 }
168}
169
170struct SlabEntry {
171 file: File,
172 mmap_buffer: ManuallyDrop<MmapMut>,
173 current_global_position: usize,
174 sections_offsets_in_flight: Vec<usize>,
175 flushed_until_offset: usize,
176 page_size: usize,
177 temporary_end_marker: Option<usize>,
178 #[cfg(test)]
179 closed_sections: Vec<(usize, usize)>,
180 #[cfg(test)]
181 flushed_ranges: Vec<(usize, usize)>,
182 #[cfg(all(test, feature = "mmap-fsync"))]
183 sync_call_count: usize,
184}
185
186impl Drop for SlabEntry {
187 fn drop(&mut self) {
188 self.flush_until(self.current_global_position);
189 unsafe { ManuallyDrop::drop(&mut self.mmap_buffer) };
191 if let Err(error) = self.file.set_len(self.current_global_position as u64) {
192 eprintln!("Failed to trim datalogger file: {}", error);
193 }
194 self.sync_file();
195
196 if !self.sections_offsets_in_flight.is_empty() {
197 eprintln!("Error: Slab not full flushed.");
198 }
199 }
200}
201
202impl SlabEntry {
203 fn new(file: File, page_size: usize) -> io::Result<Self> {
204 let mmap_buffer = ManuallyDrop::new(
205 unsafe { MmapMut::map_mut(&file) }
207 .map_err(|e| io::Error::new(e.kind(), format!("Failed to map file: {e}")))?,
208 );
209 Ok(Self {
210 file,
211 mmap_buffer,
212 current_global_position: 0,
213 sections_offsets_in_flight: Vec::with_capacity(16),
214 flushed_until_offset: 0,
215 page_size,
216 temporary_end_marker: None,
217 #[cfg(test)]
218 closed_sections: Vec::new(),
219 #[cfg(test)]
220 flushed_ranges: Vec::new(),
221 #[cfg(all(test, feature = "mmap-fsync"))]
222 sync_call_count: 0,
223 })
224 }
225
226 fn flush_range(&mut self, start: usize, len: usize) {
227 if len == 0 {
228 return;
229 }
230 self.mmap_buffer
231 .flush_async_range(start, len)
232 .expect("Failed to flush memory map");
233 self.sync_file();
234 #[cfg(test)]
235 self.record_flushed_range(start, len);
236 }
237
238 fn sync_file(&mut self) {
239 #[cfg(feature = "mmap-fsync")]
240 {
241 self.file.sync_all().expect("Failed to fsync log file");
242 #[cfg(test)]
243 {
244 self.sync_call_count += 1;
245 }
246 }
247 }
248 fn flush_until(&mut self, until_position: usize) {
250 if (self.flushed_until_offset == until_position) || (until_position == 0) {
252 return;
253 }
254 self.flush_range(
255 self.flushed_until_offset,
256 until_position - self.flushed_until_offset,
257 );
258 self.flushed_until_offset = until_position;
259 }
260
261 fn clear_temporary_end_marker(&mut self) {
262 if let Some(marker_start) = self.temporary_end_marker.take() {
263 self.current_global_position = marker_start;
264 if self.flushed_until_offset > marker_start {
265 self.flushed_until_offset = marker_start;
266 }
267 }
268 }
269
270 fn write_end_marker(&mut self, temporary: bool) -> CuResult<()> {
271 let block_size = SECTION_HEADER_COMPACT_SIZE as usize;
272 let marker_start = self.align_to_next_page(self.current_global_position);
273 let total_marker_size = block_size; let marker_end = marker_start + total_marker_size;
275 if marker_end > self.mmap_buffer.len() {
276 return Err("Not enough space to write end-of-log marker".into());
277 }
278
279 let header = SectionHeader {
280 magic: SECTION_MAGIC,
281 block_size: SECTION_HEADER_COMPACT_SIZE,
282 entry_type: UnifiedLogType::LastEntry,
283 offset_to_next_section: total_marker_size as u32,
284 used: 0,
285 is_open: temporary,
286 };
287
288 encode_into_slice(
289 &header,
290 &mut self.mmap_buffer
291 [marker_start..marker_start + SECTION_HEADER_COMPACT_SIZE as usize],
292 standard(),
293 )
294 .map_err(|e| CuError::new_with_cause("Failed to encode end-of-log header", e))?;
295
296 self.temporary_end_marker = Some(marker_start);
297 self.current_global_position = marker_end;
298 Ok(())
299 }
300
301 fn is_it_my_section(&self, section: &SectionHandle<MmapSectionStorage>) -> bool {
302 let storage = section.get_storage();
303 let ptr = storage.buffer_ptr();
304 (ptr >= self.mmap_buffer.as_ptr())
305 && (ptr as usize)
306 < (self.mmap_buffer.as_ref().as_ptr() as usize + self.mmap_buffer.as_ref().len())
307 }
308
309 fn flush_section(&mut self, section: &mut SectionHandle<MmapSectionStorage>) {
312 section
313 .post_update_header()
314 .expect("Failed to update section header");
315
316 let storage = section.get_storage();
317 let ptr = storage.buffer_ptr();
318
319 if ptr < self.mmap_buffer.as_ptr()
320 || ptr as usize > self.mmap_buffer.as_ptr() as usize + self.mmap_buffer.len()
321 {
322 panic!("Invalid section buffer, not in the slab");
323 }
324
325 let base = self.mmap_buffer.as_ptr() as usize;
326 let section_start = ptr as usize - base;
327 let section_len = section.header.offset_to_next_section as usize;
328 #[cfg(test)]
329 self.record_closed_section(section_start, section_len);
330 self.sections_offsets_in_flight
331 .retain(|&x| x != section_start);
332
333 if self.sections_offsets_in_flight.is_empty() {
334 self.flush_until(self.current_global_position);
335 return;
336 }
337 let next_open_offset = self.sections_offsets_in_flight[0];
338 if self.flushed_until_offset < next_open_offset {
339 self.flush_until(next_open_offset);
340 }
341 if section_start + section_len > self.flushed_until_offset {
342 self.flush_range(section_start, section_len);
345 }
346 }
347
348 #[cfg(test)]
349 fn record_closed_section(&mut self, start: usize, len: usize) {
350 self.closed_sections.push((start, len));
351 }
352
353 #[cfg(test)]
354 fn record_flushed_range(&mut self, start: usize, len: usize) {
355 let mut merged_start = start;
356 let mut merged_end = start + len;
357 let mut merged_ranges = Vec::with_capacity(self.flushed_ranges.len() + 1);
358 let mut inserted = false;
359
360 for (range_start, range_len) in self.flushed_ranges.drain(..) {
361 let range_end = range_start + range_len;
362 if range_end < merged_start {
363 merged_ranges.push((range_start, range_len));
364 continue;
365 }
366 if merged_end < range_start {
367 if !inserted {
368 merged_ranges.push((merged_start, merged_end - merged_start));
369 inserted = true;
370 }
371 merged_ranges.push((range_start, range_len));
372 continue;
373 }
374
375 merged_start = merged_start.min(range_start);
376 merged_end = merged_end.max(range_end);
377 }
378
379 if !inserted {
380 merged_ranges.push((merged_start, merged_end - merged_start));
381 }
382
383 self.flushed_ranges = merged_ranges;
384 }
385
386 #[cfg(test)]
387 fn pending_closed_bytes(&self) -> usize {
388 let mut pending = 0;
389
390 for (section_start, section_len) in &self.closed_sections {
391 let section_end = section_start + section_len;
392 let mut cursor = *section_start;
393
394 for (range_start, range_len) in &self.flushed_ranges {
395 let range_end = range_start + range_len;
396 if range_end <= cursor {
397 continue;
398 }
399 if *range_start >= section_end {
400 break;
401 }
402 if *range_start > cursor {
403 pending += *range_start - cursor;
404 }
405 cursor = cursor.max(range_end);
406 if cursor >= section_end {
407 break;
408 }
409 }
410
411 if cursor < section_end {
412 pending += section_end - cursor;
413 }
414 }
415
416 pending
417 }
418
419 #[inline]
420 fn align_to_next_page(&self, ptr: usize) -> usize {
421 (ptr + self.page_size - 1) & !(self.page_size - 1)
422 }
423
424 fn add_section(
426 &mut self,
427 entry_type: UnifiedLogType,
428 requested_section_size: usize,
429 ) -> AllocatedSection<MmapSectionStorage> {
430 self.current_global_position = self.align_to_next_page(self.current_global_position);
432 let section_size = self.align_to_next_page(requested_section_size) as u32;
433
434 if self.current_global_position + section_size as usize > self.mmap_buffer.len() {
436 return AllocatedSection::NoMoreSpace;
437 }
438
439 #[cfg(feature = "compact")]
440 let block_size = SECTION_HEADER_COMPACT_SIZE;
441
442 #[cfg(not(feature = "compact"))]
443 let block_size = self.page_size as u16;
444
445 let section_header = SectionHeader {
446 magic: SECTION_MAGIC,
447 block_size,
448 entry_type,
449 offset_to_next_section: section_size,
450 used: 0u32,
451 is_open: true,
452 };
453
454 self.sections_offsets_in_flight
456 .push(self.current_global_position);
457 let end_of_section = self.current_global_position + requested_section_size;
458 let user_buffer = &mut self.mmap_buffer[self.current_global_position..end_of_section];
459
460 let handle_buffer =
462 unsafe { from_raw_parts_mut(user_buffer.as_mut_ptr(), user_buffer.len()) };
463 let storage = MmapSectionStorage::new(handle_buffer, block_size as usize);
464
465 self.current_global_position = end_of_section;
466
467 Section(SectionHandle::create(section_header, storage).expect("Failed to create section"))
468 }
469
470 #[cfg(test)]
471 fn used(&self) -> usize {
472 self.current_global_position
473 }
474}
475
476pub struct MmapUnifiedLoggerWrite {
478 front_slab: SlabEntry,
480 back_slabs: Vec<SlabEntry>,
482 base_file_path: PathBuf,
484 slab_size: usize,
486 front_slab_suffix: usize,
488}
489
490fn build_slab_path(base_file_path: &Path, slab_index: usize) -> io::Result<PathBuf> {
491 let mut file_path = base_file_path.to_path_buf();
492 let stem = file_path.file_stem().ok_or_else(|| {
493 io::Error::new(
494 io::ErrorKind::InvalidInput,
495 "Base file path has no file name",
496 )
497 })?;
498 let stem = stem.to_str().ok_or_else(|| {
499 io::Error::new(
500 io::ErrorKind::InvalidInput,
501 "Base file name is not valid UTF-8",
502 )
503 })?;
504 let extension = file_path.extension().ok_or_else(|| {
505 io::Error::new(
506 io::ErrorKind::InvalidInput,
507 "Base file path has no extension",
508 )
509 })?;
510 let extension = extension.to_str().ok_or_else(|| {
511 io::Error::new(
512 io::ErrorKind::InvalidInput,
513 "Base file extension is not valid UTF-8",
514 )
515 })?;
516 if stem.is_empty() {
517 return Err(io::Error::new(
518 io::ErrorKind::InvalidInput,
519 "Base file name is empty",
520 ));
521 }
522 let file_name = format!("{stem}_{slab_index}.{extension}");
523 file_path.set_file_name(file_name);
524 Ok(file_path)
525}
526
527fn make_slab_file(base_file_path: &Path, slab_size: usize, slab_suffix: usize) -> io::Result<File> {
528 let file_path = build_slab_path(base_file_path, slab_suffix)?;
529 let file = OpenOptions::new()
530 .read(true)
531 .write(true)
532 .create(true)
533 .truncate(true)
534 .open(&file_path)
535 .map_err(|e| {
536 io::Error::new(
537 e.kind(),
538 format!("Failed to open file {}: {e}", file_path.display()),
539 )
540 })?;
541 file.set_len(slab_size as u64).map_err(|e| {
542 io::Error::new(
543 e.kind(),
544 format!("Failed to set file length for {}: {e}", file_path.display()),
545 )
546 })?;
547 Ok(file)
548}
549
550fn remove_existing_alias(base_file_path: &Path) -> io::Result<()> {
551 match std::fs::symlink_metadata(base_file_path) {
552 Ok(meta) => {
553 if meta.is_dir() {
554 return Err(io::Error::new(
555 io::ErrorKind::AlreadyExists,
556 format!(
557 "Cannot create base log alias at {} because a directory already exists there",
558 base_file_path.display()
559 ),
560 ));
561 }
562 std::fs::remove_file(base_file_path).map_err(|e| {
563 io::Error::new(
564 e.kind(),
565 format!(
566 "Failed to remove existing base log alias {}: {e}",
567 base_file_path.display()
568 ),
569 )
570 })
571 }
572 Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
573 Err(e) => Err(io::Error::new(
574 e.kind(),
575 format!(
576 "Failed to inspect existing base log alias {}: {e}",
577 base_file_path.display()
578 ),
579 )),
580 }
581}
582
583fn create_base_alias_link(base_file_path: &Path) -> io::Result<()> {
584 let first_slab_path = build_slab_path(base_file_path, 0)?;
585 remove_existing_alias(base_file_path)?;
586
587 #[cfg(unix)]
588 {
589 use std::os::unix::fs::symlink;
590 let relative_target = Path::new(first_slab_path.file_name().ok_or_else(|| {
591 io::Error::new(
592 io::ErrorKind::InvalidInput,
593 "First slab file has no name component",
594 )
595 })?);
596 symlink(relative_target, base_file_path).map_err(|e| {
597 io::Error::new(
598 e.kind(),
599 format!(
600 "Failed to create base log alias {} -> {}: {e}",
601 base_file_path.display(),
602 first_slab_path.display()
603 ),
604 )
605 })
606 }
607
608 #[cfg(windows)]
609 {
610 use std::os::windows::fs::symlink_file;
611 let relative_target = Path::new(first_slab_path.file_name().ok_or_else(|| {
612 io::Error::new(
613 io::ErrorKind::InvalidInput,
614 "First slab file has no name component",
615 )
616 })?);
617 match symlink_file(relative_target, base_file_path) {
618 Ok(()) => Ok(()),
619 Err(symlink_err) => std::fs::hard_link(&first_slab_path, base_file_path).map_err(
620 |hard_link_err| {
621 io::Error::other(format!(
622 "Failed to create base log alias {}. Symlink error: {symlink_err}. Hard-link fallback error: {hard_link_err}",
623 base_file_path.display()
624 ))
625 },
626 ),
627 }?;
628 Ok(())
629 }
630
631 #[cfg(not(any(unix, windows)))]
632 {
633 std::fs::hard_link(&first_slab_path, base_file_path).map_err(|e| {
634 io::Error::new(
635 e.kind(),
636 format!(
637 "Failed to create base log alias {} -> {}: {e}",
638 base_file_path.display(),
639 first_slab_path.display()
640 ),
641 )
642 })
643 }
644}
645
646impl UnifiedLogWrite<MmapSectionStorage> for MmapUnifiedLoggerWrite {
647 fn add_section(
649 &mut self,
650 entry_type: UnifiedLogType,
651 requested_section_size: usize,
652 ) -> CuResult<SectionHandle<MmapSectionStorage>> {
653 self.garbage_collect_backslabs(); self.front_slab.clear_temporary_end_marker();
655 let maybe_section = self
656 .front_slab
657 .add_section(entry_type, requested_section_size);
658
659 match maybe_section {
660 AllocatedSection::NoMoreSpace => {
661 let new_slab = self.create_slab()?;
663 self.back_slabs
665 .push(mem::replace(&mut self.front_slab, new_slab));
666 match self
667 .front_slab
668 .add_section(entry_type, requested_section_size)
669 {
670 AllocatedSection::NoMoreSpace => Err(CuError::from("out of space")),
671 Section(section) => {
672 self.place_end_marker(true)?;
673 Ok(section)
674 }
675 }
676 }
677 Section(section) => {
678 self.place_end_marker(true)?;
679 Ok(section)
680 }
681 }
682 }
683
684 fn flush_section(&mut self, section: &mut SectionHandle<MmapSectionStorage>) {
685 section.mark_closed();
686 for slab in self.back_slabs.iter_mut() {
687 if slab.is_it_my_section(section) {
688 slab.flush_section(section);
689 return;
690 }
691 }
692 self.front_slab.flush_section(section);
693 }
694
695 fn status(&self) -> UnifiedLogStatus {
696 UnifiedLogStatus {
697 total_used_space: self.front_slab.current_global_position,
698 total_allocated_space: self.slab_size * self.front_slab_suffix,
699 }
700 }
701}
702
703impl MmapUnifiedLoggerWrite {
704 fn next_slab(&mut self) -> io::Result<File> {
705 let next_suffix = self.front_slab_suffix + 1;
706 let file = make_slab_file(&self.base_file_path, self.slab_size, next_suffix)?;
707 self.front_slab_suffix = next_suffix;
708 Ok(file)
709 }
710
711 fn new(base_file_path: &Path, slab_size: usize, page_size: usize) -> io::Result<Self> {
712 let file = make_slab_file(base_file_path, slab_size, 0)?;
713 create_base_alias_link(base_file_path)?;
714 let mut front_slab = SlabEntry::new(file, page_size)?;
715
716 let main_header = MainHeader {
718 magic: MAIN_MAGIC,
719 format_version: UNIFIED_LOG_FORMAT_VERSION,
720 first_section_offset: page_size as u16,
721 page_size: page_size as u16,
722 };
723 let nb_bytes = encode_into_slice(&main_header, &mut front_slab.mmap_buffer[..], standard())
724 .map_err(|e| io::Error::other(format!("Failed to encode main header: {e}")))?;
725 assert!(nb_bytes < page_size);
726 front_slab.current_global_position = page_size; Ok(Self {
729 front_slab,
730 back_slabs: Vec::new(),
731 base_file_path: base_file_path.to_path_buf(),
732 slab_size,
733 front_slab_suffix: 0,
734 })
735 }
736
737 fn garbage_collect_backslabs(&mut self) {
738 self.back_slabs
739 .retain_mut(|slab| !slab.sections_offsets_in_flight.is_empty());
740 }
741
742 fn place_end_marker(&mut self, temporary: bool) -> CuResult<()> {
743 match self.front_slab.write_end_marker(temporary) {
744 Ok(_) => Ok(()),
745 Err(_) => {
746 let new_slab = self.create_slab()?;
748 self.back_slabs
749 .push(mem::replace(&mut self.front_slab, new_slab));
750 self.front_slab.write_end_marker(temporary)
751 }
752 }
753 }
754
755 pub fn stats(&self) -> (usize, Vec<usize>, usize) {
756 (
757 self.front_slab.current_global_position,
758 self.front_slab.sections_offsets_in_flight.clone(),
759 self.back_slabs.len(),
760 )
761 }
762
763 fn create_slab(&mut self) -> CuResult<SlabEntry> {
764 let file = self
765 .next_slab()
766 .map_err(|e| CuError::new_with_cause("Failed to create slab file", e))?;
767 SlabEntry::new(file, self.front_slab.page_size)
768 .map_err(|e| CuError::new_with_cause("Failed to create slab memory map", e))
769 }
770}
771
772impl Drop for MmapUnifiedLoggerWrite {
773 fn drop(&mut self) {
774 #[cfg(debug_assertions)]
775 eprintln!("Flushing the unified Logger ... "); self.front_slab.clear_temporary_end_marker();
778 if let Err(e) = self.place_end_marker(false) {
779 panic!("Failed to flush the unified logger: {}", e);
780 }
781 self.front_slab
782 .flush_until(self.front_slab.current_global_position);
783 self.garbage_collect_backslabs();
784 #[cfg(debug_assertions)]
785 eprintln!("Unified Logger flushed."); }
787}
788
789fn open_slab_index(
790 base_file_path: &Path,
791 slab_index: usize,
792) -> io::Result<(File, Mmap, u16, Option<MainHeader>)> {
793 let mut options = OpenOptions::new();
794 let options = options.read(true);
795
796 let file_path = build_slab_path(base_file_path, slab_index)?;
797 let file = options.open(&file_path).map_err(|e| {
798 io::Error::new(
799 e.kind(),
800 format!("Failed to open slab file {}: {e}", file_path.display()),
801 )
802 })?;
803 let mmap = unsafe { Mmap::map(&file) }
805 .map_err(|e| io::Error::new(e.kind(), format!("Failed to map slab file: {e}")))?;
806 let mut prolog = 0u16;
807 let mut maybe_main_header: Option<MainHeader> = None;
808 if slab_index == 0 {
809 let main_header: MainHeader;
810 let _read: usize;
811 (main_header, _read) = decode_from_slice(&mmap[..], standard()).map_err(|e| {
812 io::Error::new(
813 io::ErrorKind::InvalidData,
814 format!("Failed to decode main header: {e}"),
815 )
816 })?;
817 if main_header.magic != MAIN_MAGIC {
818 return Err(io::Error::new(
819 io::ErrorKind::InvalidData,
820 "Invalid magic number in main header",
821 ));
822 }
823 if main_header.format_version != UNIFIED_LOG_FORMAT_VERSION {
824 return Err(io::Error::new(
825 io::ErrorKind::InvalidData,
826 format!(
827 "Unsupported unified log format version {} in main header; this reader supports version {}",
828 main_header.format_version, UNIFIED_LOG_FORMAT_VERSION
829 ),
830 ));
831 }
832 prolog = main_header.first_section_offset;
833 maybe_main_header = Some(main_header);
834 }
835 Ok((file, mmap, prolog, maybe_main_header))
836}
837
838pub struct MmapUnifiedLoggerRead {
840 base_file_path: PathBuf,
841 main_header: MainHeader,
842 current_mmap_buffer: Mmap,
843 current_file: File,
844 current_slab_index: usize,
845 current_reading_position: usize,
846}
847
848#[derive(Clone, Copy, Debug, PartialEq, Eq)]
850pub struct LogPosition {
851 pub slab_index: usize,
852 pub offset: usize,
853}
854
855impl UnifiedLogRead for MmapUnifiedLoggerRead {
856 fn read_next_section_type(&mut self, datalogtype: UnifiedLogType) -> CuResult<Option<Vec<u8>>> {
857 loop {
859 if self.current_reading_position >= self.current_mmap_buffer.len() {
860 self.next_slab().map_err(|e| {
861 CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
862 })?;
863 }
864
865 let header_result = self.read_section_header();
866 let header = header_result.map_err(|error| {
867 CuError::new_with_cause(
868 &format!(
869 "Could not read a sections header: {}/{}:{}",
870 self.base_file_path.as_os_str().to_string_lossy(),
871 self.current_slab_index,
872 self.current_reading_position,
873 ),
874 error,
875 )
876 })?;
877
878 if header.entry_type == UnifiedLogType::LastEntry {
880 return Ok(None);
881 }
882
883 if header.entry_type == datalogtype {
885 let result = Some(self.read_section_content(&header)?);
886 self.current_reading_position += header.offset_to_next_section as usize;
887 return Ok(result);
888 }
889
890 self.current_reading_position += header.offset_to_next_section as usize;
892 }
893 }
894
895 fn raw_read_section(&mut self) -> CuResult<(SectionHeader, Vec<u8>)> {
897 if self.current_reading_position >= self.current_mmap_buffer.len() {
898 self.next_slab().map_err(|e| {
899 CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
900 })?;
901 }
902
903 let read_result = self.read_section_header();
904
905 match read_result {
906 Err(error) => Err(CuError::new_with_cause(
907 &format!(
908 "Could not read a sections header: {}/{}:{}",
909 self.base_file_path.as_os_str().to_string_lossy(),
910 self.current_slab_index,
911 self.current_reading_position,
912 ),
913 error,
914 )),
915 Ok(header) => {
916 let data = self.read_section_content(&header)?;
917 self.current_reading_position += header.offset_to_next_section as usize;
918 Ok((header, data))
919 }
920 }
921 }
922}
923
924impl MmapUnifiedLoggerRead {
925 pub fn new(base_file_path: &Path) -> io::Result<Self> {
926 let (file, mmap, prolog, header) = open_slab_index(base_file_path, 0)?;
927 let main_header = header.ok_or_else(|| {
928 io::Error::new(io::ErrorKind::InvalidData, "Missing main header in slab 0")
929 })?;
930
931 Ok(Self {
932 base_file_path: base_file_path.to_path_buf(),
933 main_header,
934 current_file: file,
935 current_mmap_buffer: mmap,
936 current_slab_index: 0,
937 current_reading_position: prolog as usize,
938 })
939 }
940
941 pub fn position(&self) -> LogPosition {
943 LogPosition {
944 slab_index: self.current_slab_index,
945 offset: self.current_reading_position,
946 }
947 }
948
949 pub fn seek(&mut self, pos: LogPosition) -> CuResult<()> {
951 if pos.slab_index != self.current_slab_index {
952 let (file, mmap, _prolog, _header) =
953 open_slab_index(&self.base_file_path, pos.slab_index).map_err(|e| {
954 CuError::new_with_cause(
955 &format!("Failed to open slab {} for seek", pos.slab_index),
956 e,
957 )
958 })?;
959 self.current_file = file;
960 self.current_mmap_buffer = mmap;
961 self.current_slab_index = pos.slab_index;
962 }
963 self.current_reading_position = pos.offset;
964 Ok(())
965 }
966
967 fn next_slab(&mut self) -> io::Result<()> {
968 self.current_slab_index += 1;
969 let (file, mmap, prolog, _) =
970 open_slab_index(&self.base_file_path, self.current_slab_index)?;
971 self.current_file = file;
972 self.current_mmap_buffer = mmap;
973 self.current_reading_position = prolog as usize;
974 Ok(())
975 }
976
977 pub fn raw_main_header(&self) -> &MainHeader {
978 &self.main_header
979 }
980
981 pub fn scan_section_bytes(&mut self, datalogtype: UnifiedLogType) -> CuResult<u64> {
982 let mut total = 0u64;
983
984 loop {
985 if self.current_reading_position >= self.current_mmap_buffer.len() {
986 self.next_slab().map_err(|e| {
987 CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
988 })?;
989 }
990
991 let header = self.read_section_header()?;
992
993 if header.entry_type == UnifiedLogType::LastEntry {
994 return Ok(total);
995 }
996
997 if header.entry_type == datalogtype {
998 total = total.saturating_add(header.used as u64);
999 }
1000
1001 self.current_reading_position += header.offset_to_next_section as usize;
1002 }
1003 }
1004
1005 fn read_section_content(&mut self, header: &SectionHeader) -> CuResult<Vec<u8>> {
1007 let mut section_data = vec![0; header.used as usize];
1009 let start_of_data = self.current_reading_position + header.block_size as usize;
1010 section_data.copy_from_slice(
1011 &self.current_mmap_buffer[start_of_data..start_of_data + header.used as usize],
1012 );
1013
1014 Ok(section_data)
1015 }
1016
1017 fn read_section_header(&mut self) -> CuResult<SectionHeader> {
1018 let section_header: SectionHeader;
1019 (section_header, _) = decode_from_slice(
1020 &self.current_mmap_buffer[self.current_reading_position..],
1021 standard(),
1022 )
1023 .map_err(|e| {
1024 CuError::new_with_cause(
1025 &format!(
1026 "Could not read a sections header: {}/{}:{}",
1027 self.base_file_path.as_os_str().to_string_lossy(),
1028 self.current_slab_index,
1029 self.current_reading_position,
1030 ),
1031 e,
1032 )
1033 })?;
1034 if section_header.magic != SECTION_MAGIC {
1035 return Err("Invalid magic number in section header".into());
1036 }
1037
1038 Ok(section_header)
1039 }
1040}
1041
1042pub struct UnifiedLoggerIOReader {
1044 logger: MmapUnifiedLoggerRead,
1045 log_type: UnifiedLogType,
1046 buffer: Vec<u8>,
1047 buffer_pos: usize,
1048}
1049
1050impl UnifiedLoggerIOReader {
1051 pub fn new(logger: MmapUnifiedLoggerRead, log_type: UnifiedLogType) -> Self {
1052 Self {
1053 logger,
1054 log_type,
1055 buffer: Vec::new(),
1056 buffer_pos: 0,
1057 }
1058 }
1059
1060 fn fill_buffer(&mut self) -> io::Result<bool> {
1062 match self.logger.read_next_section_type(self.log_type) {
1063 Ok(Some(section)) => {
1064 self.buffer = section;
1065 self.buffer_pos = 0;
1066 Ok(true)
1067 }
1068 Ok(None) => Ok(false), Err(e) => Err(io::Error::other(e.to_string())),
1070 }
1071 }
1072}
1073
1074impl Read for UnifiedLoggerIOReader {
1075 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1076 if self.buffer_pos >= self.buffer.len() && !self.fill_buffer()? {
1077 return Ok(0);
1079 }
1080
1081 if self.buffer_pos >= self.buffer.len() {
1083 return Ok(0);
1084 }
1085
1086 let len = std::cmp::min(buf.len(), self.buffer.len() - self.buffer_pos);
1088 buf[..len].copy_from_slice(&self.buffer[self.buffer_pos..self.buffer_pos + len]);
1089 self.buffer_pos += len;
1090 Ok(len)
1091 }
1092}
1093
1094#[cfg(feature = "std")]
1095#[cfg(test)]
1096mod tests {
1097 use super::*;
1098 use crate::stream_write;
1099 use bincode::de::read::SliceReader;
1100 use bincode::{Decode, Encode, decode_from_reader, decode_from_slice};
1101 use cu29_traits::WriteStream;
1102 use std::io::{Seek, SeekFrom, Write};
1103 use std::path::PathBuf;
1104 use std::sync::{Arc, Mutex};
1105 use tempfile::TempDir;
1106
1107 const LARGE_SLAB: usize = 100 * 1024; const SMALL_SLAB: usize = 16 * 2 * 1024; fn make_a_logger(
1111 tmp_dir: &TempDir,
1112 slab_size: usize,
1113 ) -> (Arc<Mutex<MmapUnifiedLoggerWrite>>, PathBuf) {
1114 let file_path = tmp_dir.path().join("test.bin");
1115 let MmapUnifiedLogger::Write(data_logger) = MmapUnifiedLoggerBuilder::new()
1116 .write(true)
1117 .create(true)
1118 .file_base_name(&file_path)
1119 .preallocated_size(slab_size)
1120 .build()
1121 .expect("Failed to create logger")
1122 else {
1123 panic!("Failed to create logger")
1124 };
1125
1126 (Arc::new(Mutex::new(data_logger)), file_path)
1127 }
1128
1129 #[test]
1130 fn test_truncation_and_sections_creations() {
1131 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1132 let file_path = tmp_dir.path().join("test.bin");
1133 let _used = {
1134 let MmapUnifiedLogger::Write(mut logger) = MmapUnifiedLoggerBuilder::new()
1135 .write(true)
1136 .create(true)
1137 .file_base_name(&file_path)
1138 .preallocated_size(100000)
1139 .build()
1140 .expect("Failed to create logger")
1141 else {
1142 panic!("Failed to create logger")
1143 };
1144 logger
1145 .add_section(UnifiedLogType::StructuredLogLine, 1024)
1146 .unwrap();
1147 logger
1148 .add_section(UnifiedLogType::CopperList, 2048)
1149 .unwrap();
1150 let used = logger.front_slab.used();
1151 assert!(used < 4 * page_size::get()); used
1155 };
1156
1157 let _file = OpenOptions::new()
1158 .read(true)
1159 .open(tmp_dir.path().join("test_0.bin"))
1160 .expect("Could not reopen the file");
1161 }
1168
1169 #[test]
1170 fn test_unsupported_main_header_format_version_is_rejected() {
1171 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1172 let file_path = tmp_dir.path().join("test.bin");
1173 {
1174 let MmapUnifiedLogger::Write(_logger) = MmapUnifiedLoggerBuilder::new()
1175 .write(true)
1176 .create(true)
1177 .file_base_name(&file_path)
1178 .preallocated_size(100000)
1179 .build()
1180 .expect("Failed to create logger")
1181 else {
1182 panic!("Failed to create logger")
1183 };
1184 }
1185
1186 let mut file = OpenOptions::new()
1187 .read(true)
1188 .write(true)
1189 .open(tmp_dir.path().join("test_0.bin"))
1190 .expect("Could not reopen the slab");
1191 let unsupported_version = UNIFIED_LOG_FORMAT_VERSION + 1;
1192 file.seek(SeekFrom::Start(MAIN_MAGIC.len() as u64))
1193 .expect("Could not seek to format version");
1194 file.write_all(&[unsupported_version])
1195 .expect("Could not write unsupported format version");
1196 drop(file);
1197
1198 let err = match MmapUnifiedLoggerBuilder::new()
1199 .file_base_name(&file_path)
1200 .build()
1201 {
1202 Ok(_) => panic!("Reader accepted unsupported unified log format version"),
1203 Err(err) => err,
1204 };
1205
1206 assert_eq!(err.kind(), io::ErrorKind::InvalidData);
1207 assert_eq!(
1208 err.to_string(),
1209 format!(
1210 "Unsupported unified log format version {unsupported_version} in main header; this reader supports version {UNIFIED_LOG_FORMAT_VERSION}"
1211 )
1212 );
1213 }
1214
1215 #[test]
1216 fn test_base_alias_exists_and_matches_first_slab() {
1217 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1218 let file_path = tmp_dir.path().join("test.bin");
1219 let _logger = MmapUnifiedLoggerBuilder::new()
1220 .write(true)
1221 .create(true)
1222 .file_base_name(&file_path)
1223 .preallocated_size(LARGE_SLAB)
1224 .build()
1225 .expect("Failed to create logger");
1226
1227 let first_slab = build_slab_path(&file_path, 0).expect("Failed to build first slab path");
1228 assert!(file_path.exists(), "base alias does not exist");
1229 assert!(first_slab.exists(), "first slab does not exist");
1230
1231 let alias_bytes = std::fs::read(&file_path).expect("Failed to read base alias");
1232 let slab_bytes = std::fs::read(&first_slab).expect("Failed to read first slab");
1233 assert_eq!(alias_bytes, slab_bytes);
1234 }
1235
1236 #[test]
1237 fn test_one_section_self_cleaning() {
1238 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1239 let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
1240 {
1241 let _stream = stream_write::<(), MmapSectionStorage>(
1242 logger.clone(),
1243 UnifiedLogType::StructuredLogLine,
1244 1024,
1245 );
1246 assert_eq!(
1247 logger
1248 .lock()
1249 .unwrap()
1250 .front_slab
1251 .sections_offsets_in_flight
1252 .len(),
1253 1
1254 );
1255 }
1256 assert_eq!(
1257 logger
1258 .lock()
1259 .unwrap()
1260 .front_slab
1261 .sections_offsets_in_flight
1262 .len(),
1263 0
1264 );
1265 let logger = logger.lock().unwrap();
1266 assert_eq!(
1267 logger.front_slab.flushed_until_offset,
1268 logger.front_slab.current_global_position
1269 );
1270 }
1271
1272 #[test]
1273 fn test_temporary_end_marker_is_created() {
1274 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1275 let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
1276 {
1277 let mut stream = stream_write::<u32, MmapSectionStorage>(
1278 logger.clone(),
1279 UnifiedLogType::StructuredLogLine,
1280 1024,
1281 )
1282 .unwrap();
1283 stream.log(&42u32).unwrap();
1284 }
1285
1286 let logger_guard = logger.lock().unwrap();
1287 let slab = &logger_guard.front_slab;
1288 let marker_start = slab
1289 .temporary_end_marker
1290 .expect("temporary end-of-log marker missing");
1291 let (eof_header, _) =
1292 decode_from_slice::<SectionHeader, _>(&slab.mmap_buffer[marker_start..], standard())
1293 .expect("Could not decode end-of-log marker header");
1294 assert_eq!(eof_header.entry_type, UnifiedLogType::LastEntry);
1295 assert!(eof_header.is_open);
1296 assert_eq!(eof_header.used, 0);
1297 }
1298
1299 #[test]
1300 fn test_final_end_marker_is_not_temporary() {
1301 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1302 let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
1303 {
1304 let mut stream = stream_write::<u32, MmapSectionStorage>(
1305 logger.clone(),
1306 UnifiedLogType::CopperList,
1307 1024,
1308 )
1309 .unwrap();
1310 stream.log(&1u32).unwrap();
1311 }
1312 drop(logger);
1313
1314 let MmapUnifiedLogger::Read(mut reader) = MmapUnifiedLoggerBuilder::new()
1315 .file_base_name(&f)
1316 .build()
1317 .expect("Failed to build reader")
1318 else {
1319 panic!("Failed to create reader");
1320 };
1321
1322 loop {
1323 let (header, _data) = reader
1324 .raw_read_section()
1325 .expect("Failed to read section while searching for EOF");
1326 if header.entry_type == UnifiedLogType::LastEntry {
1327 assert!(!header.is_open);
1328 break;
1329 }
1330 }
1331 }
1332
1333 #[test]
1334 fn test_two_sections_self_cleaning_in_order() {
1335 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1336 let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
1337 let s1 = stream_write::<(), MmapSectionStorage>(
1338 logger.clone(),
1339 UnifiedLogType::StructuredLogLine,
1340 1024,
1341 );
1342 assert_eq!(
1343 logger
1344 .lock()
1345 .unwrap()
1346 .front_slab
1347 .sections_offsets_in_flight
1348 .len(),
1349 1
1350 );
1351 let s2 = stream_write::<(), MmapSectionStorage>(
1352 logger.clone(),
1353 UnifiedLogType::StructuredLogLine,
1354 1024,
1355 );
1356 assert_eq!(
1357 logger
1358 .lock()
1359 .unwrap()
1360 .front_slab
1361 .sections_offsets_in_flight
1362 .len(),
1363 2
1364 );
1365 drop(s2);
1366 assert_eq!(
1367 logger
1368 .lock()
1369 .unwrap()
1370 .front_slab
1371 .sections_offsets_in_flight
1372 .len(),
1373 1
1374 );
1375 drop(s1);
1376 let lg = logger.lock().unwrap();
1377 assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
1378 assert_eq!(
1379 lg.front_slab.flushed_until_offset,
1380 lg.front_slab.current_global_position
1381 );
1382 }
1383
1384 #[test]
1385 fn test_two_sections_self_cleaning_out_of_order() {
1386 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1387 let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
1388 let s1 = stream_write::<(), MmapSectionStorage>(
1389 logger.clone(),
1390 UnifiedLogType::StructuredLogLine,
1391 1024,
1392 );
1393 assert_eq!(
1394 logger
1395 .lock()
1396 .unwrap()
1397 .front_slab
1398 .sections_offsets_in_flight
1399 .len(),
1400 1
1401 );
1402 let s2 = stream_write::<(), MmapSectionStorage>(
1403 logger.clone(),
1404 UnifiedLogType::StructuredLogLine,
1405 1024,
1406 );
1407 assert_eq!(
1408 logger
1409 .lock()
1410 .unwrap()
1411 .front_slab
1412 .sections_offsets_in_flight
1413 .len(),
1414 2
1415 );
1416 drop(s1);
1417 assert_eq!(
1418 logger
1419 .lock()
1420 .unwrap()
1421 .front_slab
1422 .sections_offsets_in_flight
1423 .len(),
1424 1
1425 );
1426 drop(s2);
1427 let lg = logger.lock().unwrap();
1428 assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
1429 assert_eq!(
1430 lg.front_slab.flushed_until_offset,
1431 lg.front_slab.current_global_position
1432 );
1433 }
1434
1435 #[test]
1436 fn test_closed_section_flushes_behind_open_earlier_section() {
1437 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1438 let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
1439 let s1 = stream_write::<(), MmapSectionStorage>(
1440 logger.clone(),
1441 UnifiedLogType::StructuredLogLine,
1442 1024,
1443 )
1444 .unwrap();
1445 {
1446 let mut s2 = stream_write::<u32, MmapSectionStorage>(
1447 logger.clone(),
1448 UnifiedLogType::CopperList,
1449 1024,
1450 )
1451 .unwrap();
1452 s2.log(&42u32).unwrap();
1453 }
1454
1455 let logger_guard = logger.lock().unwrap();
1456 assert_eq!(logger_guard.front_slab.sections_offsets_in_flight.len(), 1);
1457 assert!(
1458 logger_guard.front_slab.flushed_until_offset
1459 < logger_guard.front_slab.current_global_position
1460 );
1461 assert_eq!(logger_guard.front_slab.pending_closed_bytes(), 0);
1462 drop(logger_guard);
1463 drop(s1);
1464 }
1465
1466 #[test]
1467 fn test_write_then_read_one_section() {
1468 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1469 let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
1470 {
1471 let mut stream =
1472 stream_write(logger.clone(), UnifiedLogType::StructuredLogLine, 1024).unwrap();
1473 stream.log(&1u32).unwrap();
1474 stream.log(&2u32).unwrap();
1475 stream.log(&3u32).unwrap();
1476 }
1477 drop(logger);
1478 let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
1479 .file_base_name(&f)
1480 .build()
1481 .expect("Failed to build logger")
1482 else {
1483 panic!("Failed to build logger");
1484 };
1485 let section = dl
1486 .read_next_section_type(UnifiedLogType::StructuredLogLine)
1487 .expect("Failed to read section");
1488 assert!(section.is_some());
1489 let section = section.unwrap();
1490 let mut reader = SliceReader::new(§ion[..]);
1491 let v1: u32 = decode_from_reader(&mut reader, standard()).unwrap();
1492 let v2: u32 = decode_from_reader(&mut reader, standard()).unwrap();
1493 let v3: u32 = decode_from_reader(&mut reader, standard()).unwrap();
1494 assert_eq!(v1, 1);
1495 assert_eq!(v2, 2);
1496 assert_eq!(v3, 3);
1497 }
1498
1499 #[cfg(feature = "mmap-fsync")]
1500 #[test]
1501 fn test_fsync_feature_syncs_on_section_flush() {
1502 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1503 let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
1504 {
1505 let mut stream =
1506 stream_write(logger.clone(), UnifiedLogType::StructuredLogLine, 1024).unwrap();
1507 stream.log(&1u32).unwrap();
1508 }
1509
1510 let logger = logger.lock().unwrap();
1511 assert!(
1512 logger.front_slab.sync_call_count > 0,
1513 "expected mmap-fsync to issue at least one sync_all call"
1514 );
1515 }
1516
1517 #[derive(Debug, Encode, Decode)]
1520 enum CopperListStateMock {
1521 Free,
1522 ProcessingTasks,
1523 BeingSerialized,
1524 }
1525
1526 #[derive(Encode, Decode)]
1527 struct CopperList<P: bincode::enc::Encode> {
1528 state: CopperListStateMock,
1529 payload: P, }
1531
1532 #[test]
1533 fn test_copperlist_list_like_logging() {
1534 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1535 let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
1536 {
1537 let mut stream =
1538 stream_write(logger.clone(), UnifiedLogType::CopperList, 1024).unwrap();
1539 let cl0 = CopperList {
1540 state: CopperListStateMock::Free,
1541 payload: (1u32, 2u32, 3u32),
1542 };
1543 let cl1 = CopperList {
1544 state: CopperListStateMock::ProcessingTasks,
1545 payload: (4u32, 5u32, 6u32),
1546 };
1547 stream.log(&cl0).unwrap();
1548 stream.log(&cl1).unwrap();
1549 }
1550 drop(logger);
1551
1552 let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
1553 .file_base_name(&f)
1554 .build()
1555 .expect("Failed to build logger")
1556 else {
1557 panic!("Failed to build logger");
1558 };
1559 let section = dl
1560 .read_next_section_type(UnifiedLogType::CopperList)
1561 .expect("Failed to read section");
1562 assert!(section.is_some());
1563 let section = section.unwrap();
1564
1565 let mut reader = SliceReader::new(§ion[..]);
1566 let cl0: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
1567 let cl1: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
1568 assert_eq!(cl0.payload.1, 2);
1569 assert_eq!(cl1.payload.2, 6);
1570 }
1571
1572 #[test]
1573 fn test_multi_slab_end2end() {
1574 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1575 let (logger, f) = make_a_logger(&tmp_dir, SMALL_SLAB);
1576 {
1577 let mut stream =
1578 stream_write(logger.clone(), UnifiedLogType::CopperList, 1024).unwrap();
1579 let cl0 = CopperList {
1580 state: CopperListStateMock::Free,
1581 payload: (1u32, 2u32, 3u32),
1582 };
1583 for _ in 0..10000 {
1585 stream.log(&cl0).unwrap();
1586 }
1587 }
1588 drop(logger);
1589
1590 let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
1591 .file_base_name(&f)
1592 .build()
1593 .expect("Failed to build logger")
1594 else {
1595 panic!("Failed to build logger");
1596 };
1597 let mut total_readback = 0;
1598 loop {
1599 let section = dl.read_next_section_type(UnifiedLogType::CopperList);
1600 if section.is_err() {
1601 break;
1602 }
1603 let section = section.unwrap();
1604 if section.is_none() {
1605 break;
1606 }
1607 let section = section.unwrap();
1608
1609 let mut reader = SliceReader::new(§ion[..]);
1610 loop {
1611 let maybe_cl: Result<CopperList<(u32, u32, u32)>, _> =
1612 decode_from_reader(&mut reader, standard());
1613 if maybe_cl.is_ok() {
1614 total_readback += 1;
1615 } else {
1616 break;
1617 }
1618 }
1619 }
1620 assert_eq!(total_readback, 10000);
1621 }
1622}