cu29_unifiedlog/
lib.rs

1#![cfg_attr(not(feature = "std"), no_std)]
2
3#[cfg(not(feature = "std"))]
4extern crate alloc;
5extern crate core;
6
7#[cfg(feature = "std")]
8pub mod memmap;
9
10#[cfg(feature = "std")]
11mod compat {
12    // backward compatibility for the std implementation
13    pub use crate::memmap::MmapUnifiedLogger as UnifiedLogger;
14    pub use crate::memmap::MmapUnifiedLoggerBuilder as UnifiedLoggerBuilder;
15    pub use crate::memmap::MmapUnifiedLoggerRead as UnifiedLoggerRead;
16    pub use crate::memmap::MmapUnifiedLoggerWrite as UnifiedLoggerWrite;
17    pub use crate::memmap::UnifiedLoggerIOReader;
18}
19
20#[cfg(feature = "std")]
21pub use compat::*;
22
23#[cfg(not(feature = "std"))]
24mod imp {
25    pub use alloc::string::ToString;
26    pub use alloc::sync::Arc;
27    pub use alloc::vec::Vec;
28    pub use core::fmt::Debug;
29    pub use core::fmt::Display;
30    pub use core::fmt::Formatter;
31    pub use core::fmt::Result as FmtResult;
32    pub use spin::Mutex;
33}
34
35#[cfg(feature = "std")]
36mod imp {
37    pub use std::fmt::Debug;
38    pub use std::fmt::Display;
39    pub use std::fmt::Formatter;
40    pub use std::fmt::Result as FmtResult;
41    pub use std::sync::Arc;
42    pub use std::sync::Mutex;
43}
44
45use imp::*;
46
47use bincode::error::EncodeError;
48use bincode::{Decode, Encode};
49use cu29_traits::{CuError, CuResult, UnifiedLogType, WriteStream};
50
51/// ID to spot the beginning of a Copper Log
52#[allow(dead_code)]
53pub const MAIN_MAGIC: [u8; 4] = [0xB4, 0xA5, 0x50, 0xFF]; // BRASS OFF
54
55/// ID to spot a section of Copper Log
56pub const SECTION_MAGIC: [u8; 2] = [0xFA, 0x57]; // FAST
57
58pub const SECTION_HEADER_COMPACT_SIZE: u16 = 512; // Usual minimum size for a disk sector.
59
60/// The main file header of the datalogger.
61#[derive(Encode, Decode, Debug)]
62pub struct MainHeader {
63    pub magic: [u8; 4],            // Magic number to identify the file.
64    pub first_section_offset: u16, // This is to align with a page at write time.
65    pub page_size: u16,
66}
67
68impl Display for MainHeader {
69    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
70        writeln!(
71            f,
72            "  Magic -> {:2x}{:2x}{:2x}{:2x}",
73            self.magic[0], self.magic[1], self.magic[2], self.magic[3]
74        )?;
75        writeln!(f, "  first_section_offset -> {}", self.first_section_offset)?;
76        writeln!(f, "  page_size -> {}", self.page_size)
77    }
78}
79
80/// Each concurrent sublogger is tracked through a section header.
81/// They form a linked list of sections.
82/// The entry type is used to identify the type of data in the section.
83#[derive(Encode, Decode, Debug)]
84pub struct SectionHeader {
85    pub magic: [u8; 2],  // Magic number to identify the section.
86    pub block_size: u16, // IMPORTANT: we assume this header fits in this block size.
87    pub entry_type: UnifiedLogType,
88    pub offset_to_next_section: u32, // offset from the first byte of this header to the first byte of the next header (MAGIC to MAGIC).
89    pub used: u32,                   // how much of the section is filled.
90    pub is_open: bool,               // true while being written, false once closed.
91}
92
93impl Display for SectionHeader {
94    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
95        writeln!(f, "    Magic -> {:2x}{:2x}", self.magic[0], self.magic[1])?;
96        writeln!(f, "    type -> {:?}", self.entry_type)?;
97        write!(
98            f,
99            "    use  -> {} / {} (open: {})",
100            self.used, self.offset_to_next_section, self.is_open
101        )
102    }
103}
104
105impl Default for SectionHeader {
106    fn default() -> Self {
107        Self {
108            magic: SECTION_MAGIC,
109            block_size: 512,
110            entry_type: UnifiedLogType::Empty,
111            offset_to_next_section: 0,
112            used: 0,
113            is_open: true,
114        }
115    }
116}
117
118pub enum AllocatedSection<S: SectionStorage> {
119    NoMoreSpace,
120    Section(SectionHandle<S>),
121}
122
123/// A Storage is an append-only structure that can update a header section.
124pub trait SectionStorage: Send + Sync {
125    /// This rewinds the storage, serialize the header and jumps to the beginning of the user data storage.
126    fn initialize<E: Encode>(&mut self, header: &E) -> Result<usize, EncodeError>;
127    /// This updates the header leaving the position to the end of the user data storage.
128    fn post_update_header<E: Encode>(&mut self, header: &E) -> Result<usize, EncodeError>;
129    /// Appends the entry to the user data storage.
130    fn append<E: Encode>(&mut self, entry: &E) -> Result<usize, EncodeError>;
131    /// Flushes the section to the underlying storage
132    fn flush(&mut self) -> CuResult<usize>;
133}
134
135/// A SectionHandle is a handle to a section in the datalogger.
136/// It allows tracking the lifecycle of the section.
137#[derive(Default)]
138pub struct SectionHandle<S: SectionStorage> {
139    header: SectionHeader, // keep a copy of the header as metadata
140    storage: S,
141}
142
143impl<S: SectionStorage> SectionHandle<S> {
144    pub fn create(header: SectionHeader, mut storage: S) -> CuResult<Self> {
145        // Write the first version of the header.
146        let _ = storage.initialize(&header).map_err(|e| e.to_string())?;
147        Ok(Self { header, storage })
148    }
149
150    pub fn mark_closed(&mut self) {
151        self.header.is_open = false;
152    }
153    pub fn append<E: Encode>(&mut self, entry: E) -> Result<usize, EncodeError> {
154        self.storage.append(&entry)
155    }
156
157    pub fn get_storage(&self) -> &S {
158        &self.storage
159    }
160
161    pub fn get_storage_mut(&mut self) -> &mut S {
162        &mut self.storage
163    }
164
165    pub fn post_update_header(&mut self) -> Result<usize, EncodeError> {
166        self.storage.post_update_header(&self.header)
167    }
168}
169
170/// Basic statistics for the unified logger.
171/// Note: the total_allocated_space might grow for the std implementation
172pub struct UnifiedLogStatus {
173    pub total_used_space: usize,
174    pub total_allocated_space: usize,
175}
176
177/// Payload stored in the end-of-log section to signal whether the log was cleanly closed.
178#[derive(Encode, Decode, Debug, Clone)]
179pub struct EndOfLogMarker {
180    pub temporary: bool,
181}
182
183/// The writing interface to the unified logger.
184/// Writing is "almost" linear as various streams can allocate sections and track them until
185/// they drop them.
186pub trait UnifiedLogWrite<S: SectionStorage>: Send + Sync {
187    /// A section is a contiguous chunk of memory that can be used to write data.
188    /// It can store various types of data as specified by the entry_type.
189    /// The requested_section_size is the size of the section to allocate.
190    /// It returns a handle to the section that can be used to write data until
191    /// it is flushed with flush_section, it is then considered unmutable.
192    fn add_section(
193        &mut self,
194        entry_type: UnifiedLogType,
195        requested_section_size: usize,
196    ) -> CuResult<SectionHandle<S>>;
197
198    /// Flush the given section to the underlying storage.
199    fn flush_section(&mut self, section: &mut SectionHandle<S>);
200
201    /// Returns the current status of the unified logger.
202    fn status(&self) -> UnifiedLogStatus;
203}
204
205/// Read back a unified log linearly.
206pub trait UnifiedLogRead {
207    /// Read through the unified logger until it reaches the UnifiedLogType given in datalogtype.
208    /// It will return the byte array of the section if found.
209    fn read_next_section_type(&mut self, datalogtype: UnifiedLogType) -> CuResult<Option<Vec<u8>>>;
210
211    /// Read through the next section entry regardless of its type.
212    /// It will return the header and the byte array of the section.
213    /// Note the last Entry should be of UnifiedLogType::LastEntry if the log is not corrupted.
214    fn raw_read_section(&mut self) -> CuResult<(SectionHeader, Vec<u8>)>;
215}
216
217/// Create a new stream to write to the unifiedlogger.
218pub fn stream_write<E: Encode, S: SectionStorage>(
219    logger: Arc<Mutex<impl UnifiedLogWrite<S>>>,
220    entry_type: UnifiedLogType,
221    minimum_allocation_amount: usize,
222) -> CuResult<impl WriteStream<E>> {
223    LogStream::new(entry_type, logger, minimum_allocation_amount)
224}
225
226/// A wrapper around the unifiedlogger that implements the Write trait.
227struct LogStream<S: SectionStorage, L: UnifiedLogWrite<S>> {
228    entry_type: UnifiedLogType,
229    parent_logger: Arc<Mutex<L>>,
230    current_section: SectionHandle<S>,
231    current_position: usize,
232    minimum_allocation_amount: usize,
233}
234
235impl<S: SectionStorage, L: UnifiedLogWrite<S>> LogStream<S, L> {
236    fn new(
237        entry_type: UnifiedLogType,
238        parent_logger: Arc<Mutex<L>>,
239        minimum_allocation_amount: usize,
240    ) -> CuResult<Self> {
241        #[cfg(feature = "std")]
242        let section = parent_logger
243            .lock()
244            .expect("Could not lock a section at MmapStream creation")
245            .add_section(entry_type, minimum_allocation_amount)?;
246
247        #[cfg(not(feature = "std"))]
248        let section = parent_logger
249            .lock()
250            .add_section(entry_type, minimum_allocation_amount)?;
251
252        Ok(Self {
253            entry_type,
254            parent_logger,
255            current_section: section,
256            current_position: 0,
257            minimum_allocation_amount,
258        })
259    }
260}
261
262impl<S: SectionStorage, L: UnifiedLogWrite<S>> Debug for LogStream<S, L> {
263    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
264        write!(f, "MmapStream {{ entry_type: {:?}, current_position: {}, minimum_allocation_amount: {} }}", self.entry_type, self.current_position, self.minimum_allocation_amount)
265    }
266}
267
268impl<E: Encode, S: SectionStorage, L: UnifiedLogWrite<S>> WriteStream<E> for LogStream<S, L> {
269    fn log(&mut self, obj: &E) -> CuResult<()> {
270        //let dst = self.current_section.get_user_buffer();
271        // let result = encode_into_slice(obj, dst, standard());
272        let result = self.current_section.append(obj);
273        match result {
274            Ok(nb_bytes) => {
275                self.current_position += nb_bytes;
276                self.current_section.header.used += nb_bytes as u32;
277                Ok(())
278            }
279            Err(e) => match e {
280                EncodeError::UnexpectedEnd => {
281                    #[cfg(feature = "std")]
282                    let logger_guard = self.parent_logger.lock();
283
284                    #[cfg(not(feature = "std"))]
285                    let mut logger_guard = self.parent_logger.lock();
286
287                    #[cfg(feature = "std")]
288                    let mut logger_guard =
289                        match logger_guard {
290                            Ok(g) => g,
291                            Err(_) => return Err(
292                                "Logger mutex poisoned while reporting EncodeError::UnexpectedEnd"
293                                    .into(),
294                            ), // It will retry but at least not completely crash.
295                        };
296
297                    logger_guard.flush_section(&mut self.current_section);
298                    self.current_section = logger_guard
299                        .add_section(self.entry_type, self.minimum_allocation_amount)?;
300
301                    let result = self.current_section.append(obj).expect(
302                        "Failed to encode object in a newly minted section. Unrecoverable failure.",
303                    ); // If we fail just after creating a section, there is not much we can do, we need to bail.
304
305                    self.current_position += result;
306                    self.current_section.header.used += result as u32;
307                    Ok(())
308                }
309                _ => {
310                    let err =
311                        <&str as Into<CuError>>::into("Unexpected error while encoding object.")
312                            .add_cause(e.to_string().as_str());
313                    Err(err)
314                }
315            },
316        }
317    }
318}
319
320impl<S: SectionStorage, L: UnifiedLogWrite<S>> Drop for LogStream<S, L> {
321    fn drop(&mut self) {
322        #[cfg(feature = "std")]
323        let logger_guard = self.parent_logger.lock();
324
325        #[cfg(not(feature = "std"))]
326        let mut logger_guard = self.parent_logger.lock();
327
328        #[cfg(feature = "std")]
329        let mut logger_guard = match logger_guard {
330            Ok(g) => g,
331            Err(_) => return,
332        };
333        logger_guard.flush_section(&mut self.current_section);
334        #[cfg(feature = "std")]
335        if !std::thread::panicking() {
336            eprintln!("⚠️ MmapStream::drop: logger mutex poisoned");
337        }
338    }
339}