cu29_unifiedlog/
lib.rs

1#![cfg_attr(not(feature = "std"), no_std)]
2
3extern crate alloc;
4extern crate core;
5
6#[cfg(feature = "std")]
7pub mod memmap;
8
9#[cfg(feature = "std")]
10mod compat {
11    // backward compatibility for the std implementation
12    pub use crate::memmap::MmapUnifiedLogger as UnifiedLogger;
13    pub use crate::memmap::MmapUnifiedLoggerBuilder as UnifiedLoggerBuilder;
14    pub use crate::memmap::MmapUnifiedLoggerRead as UnifiedLoggerRead;
15    pub use crate::memmap::MmapUnifiedLoggerWrite as UnifiedLoggerWrite;
16    pub use crate::memmap::UnifiedLoggerIOReader;
17}
18
19#[cfg(feature = "std")]
20pub use compat::*;
21
22use alloc::string::ToString;
23use alloc::sync::Arc;
24use alloc::vec::Vec;
25use core::fmt::{Debug, Display, Formatter, Result as FmtResult};
26#[cfg(not(feature = "std"))]
27use spin::Mutex;
28#[cfg(feature = "std")]
29use std::sync::Mutex;
30
31use bincode::error::EncodeError;
32use bincode::{Decode, Encode};
33use cu29_traits::{CuError, CuResult, UnifiedLogType, WriteStream};
34
35/// ID to spot the beginning of a Copper Log
36#[allow(dead_code)]
37pub const MAIN_MAGIC: [u8; 4] = [0xB4, 0xA5, 0x50, 0xFF]; // BRASS OFF
38
39/// ID to spot a section of Copper Log
40pub const SECTION_MAGIC: [u8; 2] = [0xFA, 0x57]; // FAST
41
42pub const SECTION_HEADER_COMPACT_SIZE: u16 = 512; // Usual minimum size for a disk sector.
43
44/// The main file header of the datalogger.
45#[derive(Encode, Decode, Debug)]
46pub struct MainHeader {
47    pub magic: [u8; 4],            // Magic number to identify the file.
48    pub first_section_offset: u16, // This is to align with a page at write time.
49    pub page_size: u16,
50}
51
52impl Display for MainHeader {
53    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
54        writeln!(
55            f,
56            "  Magic -> {:2x}{:2x}{:2x}{:2x}",
57            self.magic[0], self.magic[1], self.magic[2], self.magic[3]
58        )?;
59        writeln!(f, "  first_section_offset -> {}", self.first_section_offset)?;
60        writeln!(f, "  page_size -> {}", self.page_size)
61    }
62}
63
64/// Each concurrent sublogger is tracked through a section header.
65/// They form a linked list of sections.
66/// The entry type is used to identify the type of data in the section.
67#[derive(Encode, Decode, Debug)]
68pub struct SectionHeader {
69    pub magic: [u8; 2],  // Magic number to identify the section.
70    pub block_size: u16, // IMPORTANT: we assume this header fits in this block size.
71    pub entry_type: UnifiedLogType,
72    pub offset_to_next_section: u32, // offset from the first byte of this header to the first byte of the next header (MAGIC to MAGIC).
73    pub used: u32,                   // how much of the section is filled.
74    pub is_open: bool,               // true while being written, false once closed.
75}
76
77impl Display for SectionHeader {
78    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
79        writeln!(f, "    Magic -> {:2x}{:2x}", self.magic[0], self.magic[1])?;
80        writeln!(f, "    type -> {:?}", self.entry_type)?;
81        write!(
82            f,
83            "    use  -> {} / {} (open: {})",
84            self.used, self.offset_to_next_section, self.is_open
85        )
86    }
87}
88
89impl Default for SectionHeader {
90    fn default() -> Self {
91        Self {
92            magic: SECTION_MAGIC,
93            block_size: 512,
94            entry_type: UnifiedLogType::Empty,
95            offset_to_next_section: 0,
96            used: 0,
97            is_open: true,
98        }
99    }
100}
101
102pub enum AllocatedSection<S: SectionStorage> {
103    NoMoreSpace,
104    Section(SectionHandle<S>),
105}
106
107/// A Storage is an append-only structure that can update a header section.
108pub trait SectionStorage: Send + Sync {
109    /// This rewinds the storage, serialize the header and jumps to the beginning of the user data storage.
110    fn initialize<E: Encode>(&mut self, header: &E) -> Result<usize, EncodeError>;
111    /// This updates the header leaving the position to the end of the user data storage.
112    fn post_update_header<E: Encode>(&mut self, header: &E) -> Result<usize, EncodeError>;
113    /// Appends the entry to the user data storage.
114    fn append<E: Encode>(&mut self, entry: &E) -> Result<usize, EncodeError>;
115    /// Flushes the section to the underlying storage
116    fn flush(&mut self) -> CuResult<usize>;
117}
118
119/// A SectionHandle is a handle to a section in the datalogger.
120/// It allows tracking the lifecycle of the section.
121#[derive(Default)]
122pub struct SectionHandle<S: SectionStorage> {
123    header: SectionHeader, // keep a copy of the header as metadata
124    storage: S,
125}
126
127impl<S: SectionStorage> SectionHandle<S> {
128    pub fn create(header: SectionHeader, mut storage: S) -> CuResult<Self> {
129        // Write the first version of the header.
130        let _ = storage.initialize(&header).map_err(|e| e.to_string())?;
131        Ok(Self { header, storage })
132    }
133
134    pub fn mark_closed(&mut self) {
135        self.header.is_open = false;
136    }
137    pub fn append<E: Encode>(&mut self, entry: E) -> Result<usize, EncodeError> {
138        self.storage.append(&entry)
139    }
140
141    pub fn get_storage(&self) -> &S {
142        &self.storage
143    }
144
145    pub fn get_storage_mut(&mut self) -> &mut S {
146        &mut self.storage
147    }
148
149    pub fn post_update_header(&mut self) -> Result<usize, EncodeError> {
150        self.storage.post_update_header(&self.header)
151    }
152}
153
154/// Basic statistics for the unified logger.
155/// Note: the total_allocated_space might grow for the std implementation
156pub struct UnifiedLogStatus {
157    pub total_used_space: usize,
158    pub total_allocated_space: usize,
159}
160
161/// Payload stored in the end-of-log section to signal whether the log was cleanly closed.
162#[derive(Encode, Decode, Debug, Clone)]
163pub struct EndOfLogMarker {
164    pub temporary: bool,
165}
166
167/// The writing interface to the unified logger.
168/// Writing is "almost" linear as various streams can allocate sections and track them until
169/// they drop them.
170pub trait UnifiedLogWrite<S: SectionStorage>: Send + Sync {
171    /// A section is a contiguous chunk of memory that can be used to write data.
172    /// It can store various types of data as specified by the entry_type.
173    /// The requested_section_size is the size of the section to allocate.
174    /// It returns a handle to the section that can be used to write data until
175    /// it is flushed with flush_section, it is then considered unmutable.
176    fn add_section(
177        &mut self,
178        entry_type: UnifiedLogType,
179        requested_section_size: usize,
180    ) -> CuResult<SectionHandle<S>>;
181
182    /// Flush the given section to the underlying storage.
183    fn flush_section(&mut self, section: &mut SectionHandle<S>);
184
185    /// Returns the current status of the unified logger.
186    fn status(&self) -> UnifiedLogStatus;
187}
188
189/// Read back a unified log linearly.
190pub trait UnifiedLogRead {
191    /// Read through the unified logger until it reaches the UnifiedLogType given in datalogtype.
192    /// It will return the byte array of the section if found.
193    fn read_next_section_type(&mut self, datalogtype: UnifiedLogType) -> CuResult<Option<Vec<u8>>>;
194
195    /// Read through the next section entry regardless of its type.
196    /// It will return the header and the byte array of the section.
197    /// Note the last Entry should be of UnifiedLogType::LastEntry if the log is not corrupted.
198    fn raw_read_section(&mut self) -> CuResult<(SectionHeader, Vec<u8>)>;
199}
200
201/// Create a new stream to write to the unifiedlogger.
202pub fn stream_write<E: Encode, S: SectionStorage>(
203    logger: Arc<Mutex<impl UnifiedLogWrite<S>>>,
204    entry_type: UnifiedLogType,
205    minimum_allocation_amount: usize,
206) -> CuResult<impl WriteStream<E>> {
207    LogStream::new(entry_type, logger, minimum_allocation_amount)
208}
209
210/// A wrapper around the unifiedlogger that implements the Write trait.
211struct LogStream<S: SectionStorage, L: UnifiedLogWrite<S>> {
212    entry_type: UnifiedLogType,
213    parent_logger: Arc<Mutex<L>>,
214    current_section: SectionHandle<S>,
215    current_position: usize,
216    minimum_allocation_amount: usize,
217}
218
219impl<S: SectionStorage, L: UnifiedLogWrite<S>> LogStream<S, L> {
220    fn new(
221        entry_type: UnifiedLogType,
222        parent_logger: Arc<Mutex<L>>,
223        minimum_allocation_amount: usize,
224    ) -> CuResult<Self> {
225        #[cfg(feature = "std")]
226        let section = parent_logger
227            .lock()
228            .expect("Could not lock a section at MmapStream creation")
229            .add_section(entry_type, minimum_allocation_amount)?;
230
231        #[cfg(not(feature = "std"))]
232        let section = parent_logger
233            .lock()
234            .add_section(entry_type, minimum_allocation_amount)?;
235
236        Ok(Self {
237            entry_type,
238            parent_logger,
239            current_section: section,
240            current_position: 0,
241            minimum_allocation_amount,
242        })
243    }
244}
245
246impl<S: SectionStorage, L: UnifiedLogWrite<S>> Debug for LogStream<S, L> {
247    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
248        write!(
249            f,
250            "MmapStream {{ entry_type: {:?}, current_position: {}, minimum_allocation_amount: {} }}",
251            self.entry_type, self.current_position, self.minimum_allocation_amount
252        )
253    }
254}
255
256impl<E: Encode, S: SectionStorage, L: UnifiedLogWrite<S>> WriteStream<E> for LogStream<S, L> {
257    fn log(&mut self, obj: &E) -> CuResult<()> {
258        //let dst = self.current_section.get_user_buffer();
259        // let result = encode_into_slice(obj, dst, standard());
260        let result = self.current_section.append(obj);
261        match result {
262            Ok(nb_bytes) => {
263                self.current_position += nb_bytes;
264                self.current_section.header.used += nb_bytes as u32;
265                Ok(())
266            }
267            Err(e) => match e {
268                EncodeError::UnexpectedEnd => {
269                    #[cfg(feature = "std")]
270                    let logger_guard = self.parent_logger.lock();
271
272                    #[cfg(not(feature = "std"))]
273                    let mut logger_guard = self.parent_logger.lock();
274
275                    #[cfg(feature = "std")]
276                    let mut logger_guard =
277                        match logger_guard {
278                            Ok(g) => g,
279                            Err(_) => return Err(
280                                "Logger mutex poisoned while reporting EncodeError::UnexpectedEnd"
281                                    .into(),
282                            ), // It will retry but at least not completely crash.
283                        };
284
285                    logger_guard.flush_section(&mut self.current_section);
286                    self.current_section = logger_guard
287                        .add_section(self.entry_type, self.minimum_allocation_amount)?;
288
289                    let result = self.current_section.append(obj).expect(
290                        "Failed to encode object in a newly minted section. Unrecoverable failure.",
291                    ); // If we fail just after creating a section, there is not much we can do, we need to bail.
292
293                    self.current_position += result;
294                    self.current_section.header.used += result as u32;
295                    Ok(())
296                }
297                _ => {
298                    let err =
299                        <&str as Into<CuError>>::into("Unexpected error while encoding object.")
300                            .add_cause(e.to_string().as_str());
301                    Err(err)
302                }
303            },
304        }
305    }
306}
307
308impl<S: SectionStorage, L: UnifiedLogWrite<S>> Drop for LogStream<S, L> {
309    fn drop(&mut self) {
310        #[cfg(feature = "std")]
311        let logger_guard = self.parent_logger.lock();
312
313        #[cfg(not(feature = "std"))]
314        let mut logger_guard = self.parent_logger.lock();
315
316        #[cfg(feature = "std")]
317        let mut logger_guard = match logger_guard {
318            Ok(g) => g,
319            Err(_) => return,
320        };
321        logger_guard.flush_section(&mut self.current_section);
322        #[cfg(feature = "std")]
323        if !std::thread::panicking() {
324            eprintln!("⚠️ MmapStream::drop: logger mutex poisoned");
325        }
326    }
327}