1use crate::{
5 AllocatedSection, MAIN_MAGIC, MainHeader, SECTION_MAGIC, SectionHandle, SectionHeader,
6 SectionStorage, UnifiedLogRead, UnifiedLogStatus, UnifiedLogWrite,
7};
8
9use crate::SECTION_HEADER_COMPACT_SIZE;
10
11use AllocatedSection::Section;
12use bincode::config::standard;
13use bincode::error::EncodeError;
14use bincode::{Encode, decode_from_slice, encode_into_slice};
15use core::slice::from_raw_parts_mut;
16use cu29_traits::{CuError, CuResult, UnifiedLogType};
17use memmap2::{Mmap, MmapMut};
18use std::fs::{File, OpenOptions};
19use std::io::Read;
20use std::mem::ManuallyDrop;
21use std::path::{Path, PathBuf};
22use std::{io, mem};
23
24pub struct MmapSectionStorage {
25 buffer: &'static mut [u8],
26 offset: usize,
27 block_size: usize,
28}
29
30impl MmapSectionStorage {
31 pub fn new(buffer: &'static mut [u8], block_size: usize) -> Self {
32 Self {
33 buffer,
34 offset: 0,
35 block_size,
36 }
37 }
38
39 pub fn buffer_ptr(&self) -> *const u8 {
40 &self.buffer[0] as *const u8
41 }
42}
43
44impl SectionStorage for MmapSectionStorage {
45 fn initialize<E: Encode>(&mut self, header: &E) -> Result<usize, EncodeError> {
46 self.post_update_header(header)?;
47 self.offset = self.block_size;
48 Ok(self.offset)
49 }
50
51 fn post_update_header<E: Encode>(&mut self, header: &E) -> Result<usize, EncodeError> {
52 encode_into_slice(header, &mut self.buffer[0..], standard())
53 }
54
55 fn append<E: Encode>(&mut self, entry: &E) -> Result<usize, EncodeError> {
56 let size = encode_into_slice(entry, &mut self.buffer[self.offset..], standard())?;
57 self.offset += size;
58 Ok(size)
59 }
60
61 fn flush(&mut self) -> CuResult<usize> {
62 Ok(self.offset)
64 }
65}
66
67pub enum MmapUnifiedLogger {
70 Read(MmapUnifiedLoggerRead),
71 Write(MmapUnifiedLoggerWrite),
72}
73
74pub struct MmapUnifiedLoggerBuilder {
76 file_base_name: Option<PathBuf>,
77 preallocated_size: Option<usize>,
78 write: bool,
79 create: bool,
80}
81
82impl Default for MmapUnifiedLoggerBuilder {
83 fn default() -> Self {
84 Self::new()
85 }
86}
87
88impl MmapUnifiedLoggerBuilder {
89 pub fn new() -> Self {
90 Self {
91 file_base_name: None,
92 preallocated_size: None,
93 write: false,
94 create: false, }
96 }
97
98 pub fn file_base_name(mut self, file_path: &Path) -> Self {
100 self.file_base_name = Some(file_path.to_path_buf());
101 self
102 }
103
104 pub fn preallocated_size(mut self, preallocated_size: usize) -> Self {
105 self.preallocated_size = Some(preallocated_size);
106 self
107 }
108
109 pub fn write(mut self, write: bool) -> Self {
110 self.write = write;
111 self
112 }
113
114 pub fn create(mut self, create: bool) -> Self {
115 self.create = create;
116 self
117 }
118
119 pub fn build(self) -> io::Result<MmapUnifiedLogger> {
120 let page_size = page_size::get();
121
122 if self.write && self.create {
123 let file_path = self.file_base_name.ok_or_else(|| {
124 io::Error::new(
125 io::ErrorKind::InvalidInput,
126 "File path is required for write mode",
127 )
128 })?;
129 let preallocated_size = self.preallocated_size.ok_or_else(|| {
130 io::Error::new(
131 io::ErrorKind::InvalidInput,
132 "Preallocated size is required for write mode",
133 )
134 })?;
135 let ulw = MmapUnifiedLoggerWrite::new(&file_path, preallocated_size, page_size)?;
136 Ok(MmapUnifiedLogger::Write(ulw))
137 } else {
138 let file_path = self.file_base_name.ok_or_else(|| {
139 io::Error::new(io::ErrorKind::InvalidInput, "File path is required")
140 })?;
141 let ulr = MmapUnifiedLoggerRead::new(&file_path)?;
142 Ok(MmapUnifiedLogger::Read(ulr))
143 }
144 }
145}
146
147struct SlabEntry {
148 file: File,
149 mmap_buffer: ManuallyDrop<MmapMut>,
150 current_global_position: usize,
151 sections_offsets_in_flight: Vec<usize>,
152 flushed_until_offset: usize,
153 page_size: usize,
154 temporary_end_marker: Option<usize>,
155}
156
157impl Drop for SlabEntry {
158 fn drop(&mut self) {
159 self.flush_until(self.current_global_position);
160 unsafe { ManuallyDrop::drop(&mut self.mmap_buffer) };
162 if let Err(error) = self.file.set_len(self.current_global_position as u64) {
163 eprintln!("Failed to trim datalogger file: {}", error);
164 }
165
166 if !self.sections_offsets_in_flight.is_empty() {
167 eprintln!("Error: Slab not full flushed.");
168 }
169 }
170}
171
172impl SlabEntry {
173 fn new(file: File, page_size: usize) -> io::Result<Self> {
174 let mmap_buffer = ManuallyDrop::new(
175 unsafe { MmapMut::map_mut(&file) }
177 .map_err(|e| io::Error::new(e.kind(), format!("Failed to map file: {e}")))?,
178 );
179 Ok(Self {
180 file,
181 mmap_buffer,
182 current_global_position: 0,
183 sections_offsets_in_flight: Vec::with_capacity(16),
184 flushed_until_offset: 0,
185 page_size,
186 temporary_end_marker: None,
187 })
188 }
189
190 fn flush_until(&mut self, until_position: usize) {
192 if (self.flushed_until_offset == until_position) || (until_position == 0) {
194 return;
195 }
196 self.mmap_buffer
197 .flush_async_range(
198 self.flushed_until_offset,
199 until_position - self.flushed_until_offset,
200 )
201 .expect("Failed to flush memory map");
202 self.flushed_until_offset = until_position;
203 }
204
205 fn clear_temporary_end_marker(&mut self) {
206 if let Some(marker_start) = self.temporary_end_marker.take() {
207 self.current_global_position = marker_start;
208 if self.flushed_until_offset > marker_start {
209 self.flushed_until_offset = marker_start;
210 }
211 }
212 }
213
214 fn write_end_marker(&mut self, temporary: bool) -> CuResult<()> {
215 let block_size = SECTION_HEADER_COMPACT_SIZE as usize;
216 let marker_start = self.align_to_next_page(self.current_global_position);
217 let total_marker_size = block_size; let marker_end = marker_start + total_marker_size;
219 if marker_end > self.mmap_buffer.len() {
220 return Err("Not enough space to write end-of-log marker".into());
221 }
222
223 let header = SectionHeader {
224 magic: SECTION_MAGIC,
225 block_size: SECTION_HEADER_COMPACT_SIZE,
226 entry_type: UnifiedLogType::LastEntry,
227 offset_to_next_section: total_marker_size as u32,
228 used: 0,
229 is_open: temporary,
230 };
231
232 encode_into_slice(
233 &header,
234 &mut self.mmap_buffer
235 [marker_start..marker_start + SECTION_HEADER_COMPACT_SIZE as usize],
236 standard(),
237 )
238 .map_err(|e| CuError::new_with_cause("Failed to encode end-of-log header", e))?;
239
240 self.temporary_end_marker = Some(marker_start);
241 self.current_global_position = marker_end;
242 Ok(())
243 }
244
245 fn is_it_my_section(&self, section: &SectionHandle<MmapSectionStorage>) -> bool {
246 let storage = section.get_storage();
247 let ptr = storage.buffer_ptr();
248 (ptr >= self.mmap_buffer.as_ptr())
249 && (ptr as usize)
250 < (self.mmap_buffer.as_ref().as_ptr() as usize + self.mmap_buffer.as_ref().len())
251 }
252
253 fn flush_section(&mut self, section: &mut SectionHandle<MmapSectionStorage>) {
256 section
257 .post_update_header()
258 .expect("Failed to update section header");
259
260 let storage = section.get_storage();
261 let ptr = storage.buffer_ptr();
262
263 if ptr < self.mmap_buffer.as_ptr()
264 || ptr as usize > self.mmap_buffer.as_ptr() as usize + self.mmap_buffer.len()
265 {
266 panic!("Invalid section buffer, not in the slab");
267 }
268
269 let base = self.mmap_buffer.as_ptr() as usize;
270 self.sections_offsets_in_flight
271 .retain(|&x| x != ptr as usize - base);
272
273 if self.sections_offsets_in_flight.is_empty() {
274 self.flush_until(self.current_global_position);
275 return;
276 }
277 if self.flushed_until_offset < self.sections_offsets_in_flight[0] {
278 self.flush_until(self.sections_offsets_in_flight[0]);
279 }
280 }
281
282 #[inline]
283 fn align_to_next_page(&self, ptr: usize) -> usize {
284 (ptr + self.page_size - 1) & !(self.page_size - 1)
285 }
286
287 fn add_section(
289 &mut self,
290 entry_type: UnifiedLogType,
291 requested_section_size: usize,
292 ) -> AllocatedSection<MmapSectionStorage> {
293 self.current_global_position = self.align_to_next_page(self.current_global_position);
295 let section_size = self.align_to_next_page(requested_section_size) as u32;
296
297 if self.current_global_position + section_size as usize > self.mmap_buffer.len() {
299 return AllocatedSection::NoMoreSpace;
300 }
301
302 #[cfg(feature = "compact")]
303 let block_size = SECTION_HEADER_COMPACT_SIZE;
304
305 #[cfg(not(feature = "compact"))]
306 let block_size = self.page_size as u16;
307
308 let section_header = SectionHeader {
309 magic: SECTION_MAGIC,
310 block_size,
311 entry_type,
312 offset_to_next_section: section_size,
313 used: 0u32,
314 is_open: true,
315 };
316
317 self.sections_offsets_in_flight
319 .push(self.current_global_position);
320 let end_of_section = self.current_global_position + requested_section_size;
321 let user_buffer = &mut self.mmap_buffer[self.current_global_position..end_of_section];
322
323 let handle_buffer =
325 unsafe { from_raw_parts_mut(user_buffer.as_mut_ptr(), user_buffer.len()) };
326 let storage = MmapSectionStorage::new(handle_buffer, block_size as usize);
327
328 self.current_global_position = end_of_section;
329
330 Section(SectionHandle::create(section_header, storage).expect("Failed to create section"))
331 }
332
333 #[cfg(test)]
334 fn used(&self) -> usize {
335 self.current_global_position
336 }
337}
338
339pub struct MmapUnifiedLoggerWrite {
341 front_slab: SlabEntry,
343 back_slabs: Vec<SlabEntry>,
345 base_file_path: PathBuf,
347 slab_size: usize,
349 front_slab_suffix: usize,
351}
352
353fn build_slab_path(base_file_path: &Path, slab_index: usize) -> io::Result<PathBuf> {
354 let mut file_path = base_file_path.to_path_buf();
355 let stem = file_path.file_stem().ok_or_else(|| {
356 io::Error::new(
357 io::ErrorKind::InvalidInput,
358 "Base file path has no file name",
359 )
360 })?;
361 let stem = stem.to_str().ok_or_else(|| {
362 io::Error::new(
363 io::ErrorKind::InvalidInput,
364 "Base file name is not valid UTF-8",
365 )
366 })?;
367 let extension = file_path.extension().ok_or_else(|| {
368 io::Error::new(
369 io::ErrorKind::InvalidInput,
370 "Base file path has no extension",
371 )
372 })?;
373 let extension = extension.to_str().ok_or_else(|| {
374 io::Error::new(
375 io::ErrorKind::InvalidInput,
376 "Base file extension is not valid UTF-8",
377 )
378 })?;
379 if stem.is_empty() {
380 return Err(io::Error::new(
381 io::ErrorKind::InvalidInput,
382 "Base file name is empty",
383 ));
384 }
385 let file_name = format!("{stem}_{slab_index}.{extension}");
386 file_path.set_file_name(file_name);
387 Ok(file_path)
388}
389
390fn make_slab_file(base_file_path: &Path, slab_size: usize, slab_suffix: usize) -> io::Result<File> {
391 let file_path = build_slab_path(base_file_path, slab_suffix)?;
392 let file = OpenOptions::new()
393 .read(true)
394 .write(true)
395 .create(true)
396 .truncate(true)
397 .open(&file_path)
398 .map_err(|e| {
399 io::Error::new(
400 e.kind(),
401 format!("Failed to open file {}: {e}", file_path.display()),
402 )
403 })?;
404 file.set_len(slab_size as u64).map_err(|e| {
405 io::Error::new(
406 e.kind(),
407 format!("Failed to set file length for {}: {e}", file_path.display()),
408 )
409 })?;
410 Ok(file)
411}
412
413impl UnifiedLogWrite<MmapSectionStorage> for MmapUnifiedLoggerWrite {
414 fn add_section(
416 &mut self,
417 entry_type: UnifiedLogType,
418 requested_section_size: usize,
419 ) -> CuResult<SectionHandle<MmapSectionStorage>> {
420 self.garbage_collect_backslabs(); self.front_slab.clear_temporary_end_marker();
422 let maybe_section = self
423 .front_slab
424 .add_section(entry_type, requested_section_size);
425
426 match maybe_section {
427 AllocatedSection::NoMoreSpace => {
428 let new_slab = self.create_slab()?;
430 self.back_slabs
432 .push(mem::replace(&mut self.front_slab, new_slab));
433 match self
434 .front_slab
435 .add_section(entry_type, requested_section_size)
436 {
437 AllocatedSection::NoMoreSpace => Err(CuError::from("out of space")),
438 Section(section) => {
439 self.place_end_marker(true)?;
440 Ok(section)
441 }
442 }
443 }
444 Section(section) => {
445 self.place_end_marker(true)?;
446 Ok(section)
447 }
448 }
449 }
450
451 fn flush_section(&mut self, section: &mut SectionHandle<MmapSectionStorage>) {
452 section.mark_closed();
453 for slab in self.back_slabs.iter_mut() {
454 if slab.is_it_my_section(section) {
455 slab.flush_section(section);
456 return;
457 }
458 }
459 self.front_slab.flush_section(section);
460 }
461
462 fn status(&self) -> UnifiedLogStatus {
463 UnifiedLogStatus {
464 total_used_space: self.front_slab.current_global_position,
465 total_allocated_space: self.slab_size * self.front_slab_suffix,
466 }
467 }
468}
469
470impl MmapUnifiedLoggerWrite {
471 fn next_slab(&mut self) -> io::Result<File> {
472 let next_suffix = self.front_slab_suffix + 1;
473 let file = make_slab_file(&self.base_file_path, self.slab_size, next_suffix)?;
474 self.front_slab_suffix = next_suffix;
475 Ok(file)
476 }
477
478 fn new(base_file_path: &Path, slab_size: usize, page_size: usize) -> io::Result<Self> {
479 let file = make_slab_file(base_file_path, slab_size, 0)?;
480 let mut front_slab = SlabEntry::new(file, page_size)?;
481
482 let main_header = MainHeader {
484 magic: MAIN_MAGIC,
485 first_section_offset: page_size as u16,
486 page_size: page_size as u16,
487 };
488 let nb_bytes = encode_into_slice(&main_header, &mut front_slab.mmap_buffer[..], standard())
489 .map_err(|e| io::Error::other(format!("Failed to encode main header: {e}")))?;
490 assert!(nb_bytes < page_size);
491 front_slab.current_global_position = page_size; Ok(Self {
494 front_slab,
495 back_slabs: Vec::new(),
496 base_file_path: base_file_path.to_path_buf(),
497 slab_size,
498 front_slab_suffix: 0,
499 })
500 }
501
502 fn garbage_collect_backslabs(&mut self) {
503 self.back_slabs
504 .retain_mut(|slab| !slab.sections_offsets_in_flight.is_empty());
505 }
506
507 fn place_end_marker(&mut self, temporary: bool) -> CuResult<()> {
508 match self.front_slab.write_end_marker(temporary) {
509 Ok(_) => Ok(()),
510 Err(_) => {
511 let new_slab = self.create_slab()?;
513 self.back_slabs
514 .push(mem::replace(&mut self.front_slab, new_slab));
515 self.front_slab.write_end_marker(temporary)
516 }
517 }
518 }
519
520 pub fn stats(&self) -> (usize, Vec<usize>, usize) {
521 (
522 self.front_slab.current_global_position,
523 self.front_slab.sections_offsets_in_flight.clone(),
524 self.back_slabs.len(),
525 )
526 }
527
528 fn create_slab(&mut self) -> CuResult<SlabEntry> {
529 let file = self
530 .next_slab()
531 .map_err(|e| CuError::new_with_cause("Failed to create slab file", e))?;
532 SlabEntry::new(file, self.front_slab.page_size)
533 .map_err(|e| CuError::new_with_cause("Failed to create slab memory map", e))
534 }
535}
536
537impl Drop for MmapUnifiedLoggerWrite {
538 fn drop(&mut self) {
539 #[cfg(debug_assertions)]
540 eprintln!("Flushing the unified Logger ... "); self.front_slab.clear_temporary_end_marker();
543 if let Err(e) = self.place_end_marker(false) {
544 panic!("Failed to flush the unified logger: {}", e);
545 }
546 self.front_slab
547 .flush_until(self.front_slab.current_global_position);
548 self.garbage_collect_backslabs();
549 #[cfg(debug_assertions)]
550 eprintln!("Unified Logger flushed."); }
552}
553
554fn open_slab_index(
555 base_file_path: &Path,
556 slab_index: usize,
557) -> io::Result<(File, Mmap, u16, Option<MainHeader>)> {
558 let mut options = OpenOptions::new();
559 let options = options.read(true);
560
561 let file_path = build_slab_path(base_file_path, slab_index)?;
562 let file = options.open(&file_path).map_err(|e| {
563 io::Error::new(
564 e.kind(),
565 format!("Failed to open slab file {}: {e}", file_path.display()),
566 )
567 })?;
568 let mmap = unsafe { Mmap::map(&file) }
570 .map_err(|e| io::Error::new(e.kind(), format!("Failed to map slab file: {e}")))?;
571 let mut prolog = 0u16;
572 let mut maybe_main_header: Option<MainHeader> = None;
573 if slab_index == 0 {
574 let main_header: MainHeader;
575 let _read: usize;
576 (main_header, _read) = decode_from_slice(&mmap[..], standard()).map_err(|e| {
577 io::Error::new(
578 io::ErrorKind::InvalidData,
579 format!("Failed to decode main header: {e}"),
580 )
581 })?;
582 if main_header.magic != MAIN_MAGIC {
583 return Err(io::Error::new(
584 io::ErrorKind::InvalidData,
585 "Invalid magic number in main header",
586 ));
587 }
588 prolog = main_header.first_section_offset;
589 maybe_main_header = Some(main_header);
590 }
591 Ok((file, mmap, prolog, maybe_main_header))
592}
593
594pub struct MmapUnifiedLoggerRead {
596 base_file_path: PathBuf,
597 main_header: MainHeader,
598 current_mmap_buffer: Mmap,
599 current_file: File,
600 current_slab_index: usize,
601 current_reading_position: usize,
602}
603
604#[derive(Clone, Copy, Debug, PartialEq, Eq)]
606pub struct LogPosition {
607 pub slab_index: usize,
608 pub offset: usize,
609}
610
611impl UnifiedLogRead for MmapUnifiedLoggerRead {
612 fn read_next_section_type(&mut self, datalogtype: UnifiedLogType) -> CuResult<Option<Vec<u8>>> {
613 loop {
615 if self.current_reading_position >= self.current_mmap_buffer.len() {
616 self.next_slab().map_err(|e| {
617 CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
618 })?;
619 }
620
621 let header_result = self.read_section_header();
622 let header = header_result.map_err(|error| {
623 CuError::new_with_cause(
624 &format!(
625 "Could not read a sections header: {}/{}:{}",
626 self.base_file_path.as_os_str().to_string_lossy(),
627 self.current_slab_index,
628 self.current_reading_position,
629 ),
630 error,
631 )
632 })?;
633
634 if header.entry_type == UnifiedLogType::LastEntry {
636 return Ok(None);
637 }
638
639 if header.entry_type == datalogtype {
641 let result = Some(self.read_section_content(&header)?);
642 self.current_reading_position += header.offset_to_next_section as usize;
643 return Ok(result);
644 }
645
646 self.current_reading_position += header.offset_to_next_section as usize;
648 }
649 }
650
651 fn raw_read_section(&mut self) -> CuResult<(SectionHeader, Vec<u8>)> {
653 if self.current_reading_position >= self.current_mmap_buffer.len() {
654 self.next_slab().map_err(|e| {
655 CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
656 })?;
657 }
658
659 let read_result = self.read_section_header();
660
661 match read_result {
662 Err(error) => Err(CuError::new_with_cause(
663 &format!(
664 "Could not read a sections header: {}/{}:{}",
665 self.base_file_path.as_os_str().to_string_lossy(),
666 self.current_slab_index,
667 self.current_reading_position,
668 ),
669 error,
670 )),
671 Ok(header) => {
672 let data = self.read_section_content(&header)?;
673 self.current_reading_position += header.offset_to_next_section as usize;
674 Ok((header, data))
675 }
676 }
677 }
678}
679
680impl MmapUnifiedLoggerRead {
681 pub fn new(base_file_path: &Path) -> io::Result<Self> {
682 let (file, mmap, prolog, header) = open_slab_index(base_file_path, 0)?;
683 let main_header = header.ok_or_else(|| {
684 io::Error::new(io::ErrorKind::InvalidData, "Missing main header in slab 0")
685 })?;
686
687 Ok(Self {
688 base_file_path: base_file_path.to_path_buf(),
689 main_header,
690 current_file: file,
691 current_mmap_buffer: mmap,
692 current_slab_index: 0,
693 current_reading_position: prolog as usize,
694 })
695 }
696
697 pub fn position(&self) -> LogPosition {
699 LogPosition {
700 slab_index: self.current_slab_index,
701 offset: self.current_reading_position,
702 }
703 }
704
705 pub fn seek(&mut self, pos: LogPosition) -> CuResult<()> {
707 if pos.slab_index != self.current_slab_index {
708 let (file, mmap, _prolog, _header) =
709 open_slab_index(&self.base_file_path, pos.slab_index).map_err(|e| {
710 CuError::new_with_cause(
711 &format!("Failed to open slab {} for seek", pos.slab_index),
712 e,
713 )
714 })?;
715 self.current_file = file;
716 self.current_mmap_buffer = mmap;
717 self.current_slab_index = pos.slab_index;
718 }
719 self.current_reading_position = pos.offset;
720 Ok(())
721 }
722
723 fn next_slab(&mut self) -> io::Result<()> {
724 self.current_slab_index += 1;
725 let (file, mmap, prolog, _) =
726 open_slab_index(&self.base_file_path, self.current_slab_index)?;
727 self.current_file = file;
728 self.current_mmap_buffer = mmap;
729 self.current_reading_position = prolog as usize;
730 Ok(())
731 }
732
733 pub fn raw_main_header(&self) -> &MainHeader {
734 &self.main_header
735 }
736
737 pub fn scan_section_bytes(&mut self, datalogtype: UnifiedLogType) -> CuResult<u64> {
738 let mut total = 0u64;
739
740 loop {
741 if self.current_reading_position >= self.current_mmap_buffer.len() {
742 self.next_slab().map_err(|e| {
743 CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
744 })?;
745 }
746
747 let header = self.read_section_header()?;
748
749 if header.entry_type == UnifiedLogType::LastEntry {
750 return Ok(total);
751 }
752
753 if header.entry_type == datalogtype {
754 total = total.saturating_add(header.used as u64);
755 }
756
757 self.current_reading_position += header.offset_to_next_section as usize;
758 }
759 }
760
761 fn read_section_content(&mut self, header: &SectionHeader) -> CuResult<Vec<u8>> {
763 let mut section_data = vec![0; header.used as usize];
765 let start_of_data = self.current_reading_position + header.block_size as usize;
766 section_data.copy_from_slice(
767 &self.current_mmap_buffer[start_of_data..start_of_data + header.used as usize],
768 );
769
770 Ok(section_data)
771 }
772
773 fn read_section_header(&mut self) -> CuResult<SectionHeader> {
774 let section_header: SectionHeader;
775 (section_header, _) = decode_from_slice(
776 &self.current_mmap_buffer[self.current_reading_position..],
777 standard(),
778 )
779 .map_err(|e| {
780 CuError::new_with_cause(
781 &format!(
782 "Could not read a sections header: {}/{}:{}",
783 self.base_file_path.as_os_str().to_string_lossy(),
784 self.current_slab_index,
785 self.current_reading_position,
786 ),
787 e,
788 )
789 })?;
790 if section_header.magic != SECTION_MAGIC {
791 return Err("Invalid magic number in section header".into());
792 }
793
794 Ok(section_header)
795 }
796}
797
798pub struct UnifiedLoggerIOReader {
800 logger: MmapUnifiedLoggerRead,
801 log_type: UnifiedLogType,
802 buffer: Vec<u8>,
803 buffer_pos: usize,
804}
805
806impl UnifiedLoggerIOReader {
807 pub fn new(logger: MmapUnifiedLoggerRead, log_type: UnifiedLogType) -> Self {
808 Self {
809 logger,
810 log_type,
811 buffer: Vec::new(),
812 buffer_pos: 0,
813 }
814 }
815
816 fn fill_buffer(&mut self) -> io::Result<bool> {
818 match self.logger.read_next_section_type(self.log_type) {
819 Ok(Some(section)) => {
820 self.buffer = section;
821 self.buffer_pos = 0;
822 Ok(true)
823 }
824 Ok(None) => Ok(false), Err(e) => Err(io::Error::other(e.to_string())),
826 }
827 }
828}
829
830impl Read for UnifiedLoggerIOReader {
831 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
832 if self.buffer_pos >= self.buffer.len() && !self.fill_buffer()? {
833 return Ok(0);
835 }
836
837 if self.buffer_pos >= self.buffer.len() {
839 return Ok(0);
840 }
841
842 let len = std::cmp::min(buf.len(), self.buffer.len() - self.buffer_pos);
844 buf[..len].copy_from_slice(&self.buffer[self.buffer_pos..self.buffer_pos + len]);
845 self.buffer_pos += len;
846 Ok(len)
847 }
848}
849
850#[cfg(feature = "std")]
851#[cfg(test)]
852mod tests {
853 use super::*;
854 use crate::stream_write;
855 use bincode::de::read::SliceReader;
856 use bincode::{Decode, Encode, decode_from_reader, decode_from_slice};
857 use cu29_traits::WriteStream;
858 use std::path::PathBuf;
859 use std::sync::{Arc, Mutex};
860 use tempfile::TempDir;
861
862 const LARGE_SLAB: usize = 100 * 1024; const SMALL_SLAB: usize = 16 * 2 * 1024; fn make_a_logger(
866 tmp_dir: &TempDir,
867 slab_size: usize,
868 ) -> (Arc<Mutex<MmapUnifiedLoggerWrite>>, PathBuf) {
869 let file_path = tmp_dir.path().join("test.bin");
870 let MmapUnifiedLogger::Write(data_logger) = MmapUnifiedLoggerBuilder::new()
871 .write(true)
872 .create(true)
873 .file_base_name(&file_path)
874 .preallocated_size(slab_size)
875 .build()
876 .expect("Failed to create logger")
877 else {
878 panic!("Failed to create logger")
879 };
880
881 (Arc::new(Mutex::new(data_logger)), file_path)
882 }
883
884 #[test]
885 fn test_truncation_and_sections_creations() {
886 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
887 let file_path = tmp_dir.path().join("test.bin");
888 let _used = {
889 let MmapUnifiedLogger::Write(mut logger) = MmapUnifiedLoggerBuilder::new()
890 .write(true)
891 .create(true)
892 .file_base_name(&file_path)
893 .preallocated_size(100000)
894 .build()
895 .expect("Failed to create logger")
896 else {
897 panic!("Failed to create logger")
898 };
899 logger
900 .add_section(UnifiedLogType::StructuredLogLine, 1024)
901 .unwrap();
902 logger
903 .add_section(UnifiedLogType::CopperList, 2048)
904 .unwrap();
905 let used = logger.front_slab.used();
906 assert!(used < 4 * page_size::get()); used
910 };
911
912 let _file = OpenOptions::new()
913 .read(true)
914 .open(tmp_dir.path().join("test_0.bin"))
915 .expect("Could not reopen the file");
916 }
923
924 #[test]
925 fn test_one_section_self_cleaning() {
926 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
927 let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
928 {
929 let _stream = stream_write::<(), MmapSectionStorage>(
930 logger.clone(),
931 UnifiedLogType::StructuredLogLine,
932 1024,
933 );
934 assert_eq!(
935 logger
936 .lock()
937 .unwrap()
938 .front_slab
939 .sections_offsets_in_flight
940 .len(),
941 1
942 );
943 }
944 assert_eq!(
945 logger
946 .lock()
947 .unwrap()
948 .front_slab
949 .sections_offsets_in_flight
950 .len(),
951 0
952 );
953 let logger = logger.lock().unwrap();
954 assert_eq!(
955 logger.front_slab.flushed_until_offset,
956 logger.front_slab.current_global_position
957 );
958 }
959
960 #[test]
961 fn test_temporary_end_marker_is_created() {
962 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
963 let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
964 {
965 let mut stream = stream_write::<u32, MmapSectionStorage>(
966 logger.clone(),
967 UnifiedLogType::StructuredLogLine,
968 1024,
969 )
970 .unwrap();
971 stream.log(&42u32).unwrap();
972 }
973
974 let logger_guard = logger.lock().unwrap();
975 let slab = &logger_guard.front_slab;
976 let marker_start = slab
977 .temporary_end_marker
978 .expect("temporary end-of-log marker missing");
979 let (eof_header, _) =
980 decode_from_slice::<SectionHeader, _>(&slab.mmap_buffer[marker_start..], standard())
981 .expect("Could not decode end-of-log marker header");
982 assert_eq!(eof_header.entry_type, UnifiedLogType::LastEntry);
983 assert!(eof_header.is_open);
984 assert_eq!(eof_header.used, 0);
985 }
986
987 #[test]
988 fn test_final_end_marker_is_not_temporary() {
989 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
990 let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
991 {
992 let mut stream = stream_write::<u32, MmapSectionStorage>(
993 logger.clone(),
994 UnifiedLogType::CopperList,
995 1024,
996 )
997 .unwrap();
998 stream.log(&1u32).unwrap();
999 }
1000 drop(logger);
1001
1002 let MmapUnifiedLogger::Read(mut reader) = MmapUnifiedLoggerBuilder::new()
1003 .file_base_name(&f)
1004 .build()
1005 .expect("Failed to build reader")
1006 else {
1007 panic!("Failed to create reader");
1008 };
1009
1010 loop {
1011 let (header, _data) = reader
1012 .raw_read_section()
1013 .expect("Failed to read section while searching for EOF");
1014 if header.entry_type == UnifiedLogType::LastEntry {
1015 assert!(!header.is_open);
1016 break;
1017 }
1018 }
1019 }
1020
1021 #[test]
1022 fn test_two_sections_self_cleaning_in_order() {
1023 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1024 let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
1025 let s1 = stream_write::<(), MmapSectionStorage>(
1026 logger.clone(),
1027 UnifiedLogType::StructuredLogLine,
1028 1024,
1029 );
1030 assert_eq!(
1031 logger
1032 .lock()
1033 .unwrap()
1034 .front_slab
1035 .sections_offsets_in_flight
1036 .len(),
1037 1
1038 );
1039 let s2 = stream_write::<(), MmapSectionStorage>(
1040 logger.clone(),
1041 UnifiedLogType::StructuredLogLine,
1042 1024,
1043 );
1044 assert_eq!(
1045 logger
1046 .lock()
1047 .unwrap()
1048 .front_slab
1049 .sections_offsets_in_flight
1050 .len(),
1051 2
1052 );
1053 drop(s2);
1054 assert_eq!(
1055 logger
1056 .lock()
1057 .unwrap()
1058 .front_slab
1059 .sections_offsets_in_flight
1060 .len(),
1061 1
1062 );
1063 drop(s1);
1064 let lg = logger.lock().unwrap();
1065 assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
1066 assert_eq!(
1067 lg.front_slab.flushed_until_offset,
1068 lg.front_slab.current_global_position
1069 );
1070 }
1071
1072 #[test]
1073 fn test_two_sections_self_cleaning_out_of_order() {
1074 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1075 let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
1076 let s1 = stream_write::<(), MmapSectionStorage>(
1077 logger.clone(),
1078 UnifiedLogType::StructuredLogLine,
1079 1024,
1080 );
1081 assert_eq!(
1082 logger
1083 .lock()
1084 .unwrap()
1085 .front_slab
1086 .sections_offsets_in_flight
1087 .len(),
1088 1
1089 );
1090 let s2 = stream_write::<(), MmapSectionStorage>(
1091 logger.clone(),
1092 UnifiedLogType::StructuredLogLine,
1093 1024,
1094 );
1095 assert_eq!(
1096 logger
1097 .lock()
1098 .unwrap()
1099 .front_slab
1100 .sections_offsets_in_flight
1101 .len(),
1102 2
1103 );
1104 drop(s1);
1105 assert_eq!(
1106 logger
1107 .lock()
1108 .unwrap()
1109 .front_slab
1110 .sections_offsets_in_flight
1111 .len(),
1112 1
1113 );
1114 drop(s2);
1115 let lg = logger.lock().unwrap();
1116 assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
1117 assert_eq!(
1118 lg.front_slab.flushed_until_offset,
1119 lg.front_slab.current_global_position
1120 );
1121 }
1122
1123 #[test]
1124 fn test_write_then_read_one_section() {
1125 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1126 let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
1127 {
1128 let mut stream =
1129 stream_write(logger.clone(), UnifiedLogType::StructuredLogLine, 1024).unwrap();
1130 stream.log(&1u32).unwrap();
1131 stream.log(&2u32).unwrap();
1132 stream.log(&3u32).unwrap();
1133 }
1134 drop(logger);
1135 let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
1136 .file_base_name(&f)
1137 .build()
1138 .expect("Failed to build logger")
1139 else {
1140 panic!("Failed to build logger");
1141 };
1142 let section = dl
1143 .read_next_section_type(UnifiedLogType::StructuredLogLine)
1144 .expect("Failed to read section");
1145 assert!(section.is_some());
1146 let section = section.unwrap();
1147 let mut reader = SliceReader::new(§ion[..]);
1148 let v1: u32 = decode_from_reader(&mut reader, standard()).unwrap();
1149 let v2: u32 = decode_from_reader(&mut reader, standard()).unwrap();
1150 let v3: u32 = decode_from_reader(&mut reader, standard()).unwrap();
1151 assert_eq!(v1, 1);
1152 assert_eq!(v2, 2);
1153 assert_eq!(v3, 3);
1154 }
1155
1156 #[derive(Debug, Encode, Decode)]
1159 enum CopperListStateMock {
1160 Free,
1161 ProcessingTasks,
1162 BeingSerialized,
1163 }
1164
1165 #[derive(Encode, Decode)]
1166 struct CopperList<P: bincode::enc::Encode> {
1167 state: CopperListStateMock,
1168 payload: P, }
1170
1171 #[test]
1172 fn test_copperlist_list_like_logging() {
1173 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1174 let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
1175 {
1176 let mut stream =
1177 stream_write(logger.clone(), UnifiedLogType::CopperList, 1024).unwrap();
1178 let cl0 = CopperList {
1179 state: CopperListStateMock::Free,
1180 payload: (1u32, 2u32, 3u32),
1181 };
1182 let cl1 = CopperList {
1183 state: CopperListStateMock::ProcessingTasks,
1184 payload: (4u32, 5u32, 6u32),
1185 };
1186 stream.log(&cl0).unwrap();
1187 stream.log(&cl1).unwrap();
1188 }
1189 drop(logger);
1190
1191 let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
1192 .file_base_name(&f)
1193 .build()
1194 .expect("Failed to build logger")
1195 else {
1196 panic!("Failed to build logger");
1197 };
1198 let section = dl
1199 .read_next_section_type(UnifiedLogType::CopperList)
1200 .expect("Failed to read section");
1201 assert!(section.is_some());
1202 let section = section.unwrap();
1203
1204 let mut reader = SliceReader::new(§ion[..]);
1205 let cl0: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
1206 let cl1: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
1207 assert_eq!(cl0.payload.1, 2);
1208 assert_eq!(cl1.payload.2, 6);
1209 }
1210
1211 #[test]
1212 fn test_multi_slab_end2end() {
1213 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1214 let (logger, f) = make_a_logger(&tmp_dir, SMALL_SLAB);
1215 {
1216 let mut stream =
1217 stream_write(logger.clone(), UnifiedLogType::CopperList, 1024).unwrap();
1218 let cl0 = CopperList {
1219 state: CopperListStateMock::Free,
1220 payload: (1u32, 2u32, 3u32),
1221 };
1222 for _ in 0..10000 {
1224 stream.log(&cl0).unwrap();
1225 }
1226 }
1227 drop(logger);
1228
1229 let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
1230 .file_base_name(&f)
1231 .build()
1232 .expect("Failed to build logger")
1233 else {
1234 panic!("Failed to build logger");
1235 };
1236 let mut total_readback = 0;
1237 loop {
1238 let section = dl.read_next_section_type(UnifiedLogType::CopperList);
1239 if section.is_err() {
1240 break;
1241 }
1242 let section = section.unwrap();
1243 if section.is_none() {
1244 break;
1245 }
1246 let section = section.unwrap();
1247
1248 let mut reader = SliceReader::new(§ion[..]);
1249 loop {
1250 let maybe_cl: Result<CopperList<(u32, u32, u32)>, _> =
1251 decode_from_reader(&mut reader, standard());
1252 if maybe_cl.is_ok() {
1253 total_readback += 1;
1254 } else {
1255 break;
1256 }
1257 }
1258 }
1259 assert_eq!(total_readback, 10000);
1260 }
1261}