Skip to main content

cu29_unifiedlog/
lib.rs

1#![cfg_attr(not(feature = "std"), no_std)]
2
3extern crate alloc;
4extern crate core;
5
6#[cfg(feature = "std")]
7pub mod memmap;
8
9#[cfg(feature = "std")]
10mod compat {
11    // backward compatibility for the std implementation
12    pub use crate::memmap::LogPosition;
13    pub use crate::memmap::MmapUnifiedLogger as UnifiedLogger;
14    pub use crate::memmap::MmapUnifiedLoggerBuilder as UnifiedLoggerBuilder;
15    pub use crate::memmap::MmapUnifiedLoggerRead as UnifiedLoggerRead;
16    pub use crate::memmap::MmapUnifiedLoggerWrite as UnifiedLoggerWrite;
17    pub use crate::memmap::UnifiedLoggerIOReader;
18}
19
20#[cfg(feature = "std")]
21pub use compat::*;
22
23use alloc::string::ToString;
24use alloc::sync::Arc;
25use alloc::vec::Vec;
26use core::fmt::{Debug, Display, Formatter, Result as FmtResult};
27#[cfg(not(feature = "std"))]
28use spin::Mutex;
29#[cfg(feature = "std")]
30use std::sync::Mutex;
31
32use bincode::error::EncodeError;
33use bincode::{Decode, Encode};
34use cu29_traits::{CuError, CuResult, UnifiedLogType, WriteStream};
35
36/// ID to spot the beginning of a Copper Log
37#[allow(dead_code)]
38pub const MAIN_MAGIC: [u8; 4] = [0xB4, 0xA5, 0x50, 0xFF]; // BRASS OFF
39
40/// ID to spot a section of Copper Log
41pub const SECTION_MAGIC: [u8; 2] = [0xFA, 0x57]; // FAST
42
43pub const SECTION_HEADER_COMPACT_SIZE: u16 = 512; // Usual minimum size for a disk sector.
44
45/// The main file header of the datalogger.
46#[derive(Encode, Decode, Debug)]
47pub struct MainHeader {
48    pub magic: [u8; 4],            // Magic number to identify the file.
49    pub first_section_offset: u16, // This is to align with a page at write time.
50    pub page_size: u16,
51}
52
53impl Display for MainHeader {
54    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
55        writeln!(
56            f,
57            "  Magic -> {:2x}{:2x}{:2x}{:2x}",
58            self.magic[0], self.magic[1], self.magic[2], self.magic[3]
59        )?;
60        writeln!(f, "  first_section_offset -> {}", self.first_section_offset)?;
61        writeln!(f, "  page_size -> {}", self.page_size)
62    }
63}
64
65/// Each concurrent sublogger is tracked through a section header.
66/// They form a linked list of sections.
67/// The entry type is used to identify the type of data in the section.
68#[derive(Encode, Decode, Debug)]
69pub struct SectionHeader {
70    pub magic: [u8; 2],  // Magic number to identify the section.
71    pub block_size: u16, // IMPORTANT: we assume this header fits in this block size.
72    pub entry_type: UnifiedLogType,
73    pub offset_to_next_section: u32, // offset from the first byte of this header to the first byte of the next header (MAGIC to MAGIC).
74    pub used: u32,                   // how much of the section is filled.
75    pub is_open: bool,               // true while being written, false once closed.
76}
77
78impl Display for SectionHeader {
79    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
80        writeln!(f, "    Magic -> {:2x}{:2x}", self.magic[0], self.magic[1])?;
81        writeln!(f, "    type -> {:?}", self.entry_type)?;
82        write!(
83            f,
84            "    use  -> {} / {} (open: {})",
85            self.used, self.offset_to_next_section, self.is_open
86        )
87    }
88}
89
90impl Default for SectionHeader {
91    fn default() -> Self {
92        Self {
93            magic: SECTION_MAGIC,
94            block_size: 512,
95            entry_type: UnifiedLogType::Empty,
96            offset_to_next_section: 0,
97            used: 0,
98            is_open: true,
99        }
100    }
101}
102
103pub enum AllocatedSection<S: SectionStorage> {
104    NoMoreSpace,
105    Section(SectionHandle<S>),
106}
107
108/// A Storage is an append-only structure that can update a header section.
109pub trait SectionStorage: Send + Sync {
110    /// This rewinds the storage, serialize the header and jumps to the beginning of the user data storage.
111    fn initialize<E: Encode>(&mut self, header: &E) -> Result<usize, EncodeError>;
112    /// This updates the header leaving the position to the end of the user data storage.
113    fn post_update_header<E: Encode>(&mut self, header: &E) -> Result<usize, EncodeError>;
114    /// Appends the entry to the user data storage.
115    fn append<E: Encode>(&mut self, entry: &E) -> Result<usize, EncodeError>;
116    /// Flushes the section to the underlying storage
117    fn flush(&mut self) -> CuResult<usize>;
118}
119
120/// A SectionHandle is a handle to a section in the datalogger.
121/// It allows tracking the lifecycle of the section.
122#[derive(Default)]
123pub struct SectionHandle<S: SectionStorage> {
124    header: SectionHeader, // keep a copy of the header as metadata
125    storage: S,
126}
127
128impl<S: SectionStorage> SectionHandle<S> {
129    pub fn create(header: SectionHeader, mut storage: S) -> CuResult<Self> {
130        // Write the first version of the header.
131        let _ = storage.initialize(&header).map_err(|e| e.to_string())?;
132        Ok(Self { header, storage })
133    }
134
135    pub fn mark_closed(&mut self) {
136        self.header.is_open = false;
137    }
138    pub fn append<E: Encode>(&mut self, entry: E) -> Result<usize, EncodeError> {
139        self.storage.append(&entry)
140    }
141
142    pub fn get_storage(&self) -> &S {
143        &self.storage
144    }
145
146    pub fn get_storage_mut(&mut self) -> &mut S {
147        &mut self.storage
148    }
149
150    pub fn post_update_header(&mut self) -> Result<usize, EncodeError> {
151        self.storage.post_update_header(&self.header)
152    }
153}
154
155/// Basic statistics for the unified logger.
156/// Note: the total_allocated_space might grow for the std implementation
157pub struct UnifiedLogStatus {
158    pub total_used_space: usize,
159    pub total_allocated_space: usize,
160}
161
162/// Payload stored in the end-of-log section to signal whether the log was cleanly closed.
163#[derive(Encode, Decode, Debug, Clone)]
164pub struct EndOfLogMarker {
165    pub temporary: bool,
166}
167
168/// The writing interface to the unified logger.
169/// Writing is "almost" linear as various streams can allocate sections and track them until
170/// they drop them.
171pub trait UnifiedLogWrite<S: SectionStorage>: Send + Sync {
172    /// A section is a contiguous chunk of memory that can be used to write data.
173    /// It can store various types of data as specified by the entry_type.
174    /// The requested_section_size is the size of the section to allocate.
175    /// It returns a handle to the section that can be used to write data until
176    /// it is flushed with flush_section, it is then considered unmutable.
177    fn add_section(
178        &mut self,
179        entry_type: UnifiedLogType,
180        requested_section_size: usize,
181    ) -> CuResult<SectionHandle<S>>;
182
183    /// Flush the given section to the underlying storage.
184    fn flush_section(&mut self, section: &mut SectionHandle<S>);
185
186    /// Returns the current status of the unified logger.
187    fn status(&self) -> UnifiedLogStatus;
188}
189
190/// Read back a unified log linearly.
191pub trait UnifiedLogRead {
192    /// Read through the unified logger until it reaches the UnifiedLogType given in datalogtype.
193    /// It will return the byte array of the section if found.
194    fn read_next_section_type(&mut self, datalogtype: UnifiedLogType) -> CuResult<Option<Vec<u8>>>;
195
196    /// Read through the next section entry regardless of its type.
197    /// It will return the header and the byte array of the section.
198    /// Note the last Entry should be of UnifiedLogType::LastEntry if the log is not corrupted.
199    fn raw_read_section(&mut self) -> CuResult<(SectionHeader, Vec<u8>)>;
200}
201
202/// Create a new stream to write to the unifiedlogger.
203pub fn stream_write<E: Encode, S: SectionStorage>(
204    logger: Arc<Mutex<impl UnifiedLogWrite<S>>>,
205    entry_type: UnifiedLogType,
206    minimum_allocation_amount: usize,
207) -> CuResult<impl WriteStream<E>> {
208    LogStream::new(entry_type, logger, minimum_allocation_amount)
209}
210
211/// A wrapper around the unifiedlogger that implements the Write trait.
212pub struct LogStream<S: SectionStorage, L: UnifiedLogWrite<S>> {
213    entry_type: UnifiedLogType,
214    parent_logger: Arc<Mutex<L>>,
215    current_section: SectionHandle<S>,
216    current_position: usize,
217    minimum_allocation_amount: usize,
218    last_log_bytes: usize,
219}
220
221impl<S: SectionStorage, L: UnifiedLogWrite<S>> LogStream<S, L> {
222    fn new(
223        entry_type: UnifiedLogType,
224        parent_logger: Arc<Mutex<L>>,
225        minimum_allocation_amount: usize,
226    ) -> CuResult<Self> {
227        #[cfg(feature = "std")]
228        let section = parent_logger
229            .lock()
230            .map_err(|e| {
231                CuError::from("Could not lock a section at LogStream creation")
232                    .add_cause(e.to_string().as_str())
233            })?
234            .add_section(entry_type, minimum_allocation_amount)?;
235
236        #[cfg(not(feature = "std"))]
237        let section = parent_logger
238            .lock()
239            .add_section(entry_type, minimum_allocation_amount)?;
240
241        Ok(Self {
242            entry_type,
243            parent_logger,
244            current_section: section,
245            current_position: 0,
246            minimum_allocation_amount,
247            last_log_bytes: 0,
248        })
249    }
250}
251
252impl<S: SectionStorage, L: UnifiedLogWrite<S>> Debug for LogStream<S, L> {
253    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
254        write!(
255            f,
256            "MmapStream {{ entry_type: {:?}, current_position: {}, minimum_allocation_amount: {} }}",
257            self.entry_type, self.current_position, self.minimum_allocation_amount
258        )
259    }
260}
261
262impl<E: Encode, S: SectionStorage, L: UnifiedLogWrite<S>> WriteStream<E> for LogStream<S, L> {
263    fn log(&mut self, obj: &E) -> CuResult<()> {
264        //let dst = self.current_section.get_user_buffer();
265        // let result = encode_into_slice(obj, dst, standard());
266        let result = self.current_section.append(obj);
267        match result {
268            Ok(nb_bytes) => {
269                self.current_position += nb_bytes;
270                self.current_section.header.used += nb_bytes as u32;
271                self.last_log_bytes = nb_bytes;
272                // Track encoded bytes so monitoring can compute actual bytes written.
273                Ok(())
274            }
275            Err(e) => match e {
276                EncodeError::UnexpectedEnd => {
277                    #[cfg(feature = "std")]
278                    let logger_guard = self.parent_logger.lock();
279
280                    #[cfg(not(feature = "std"))]
281                    let mut logger_guard = self.parent_logger.lock();
282
283                    #[cfg(feature = "std")]
284                    let mut logger_guard =
285                        match logger_guard {
286                            Ok(g) => g,
287                            Err(_) => return Err(
288                                "Logger mutex poisoned while reporting EncodeError::UnexpectedEnd"
289                                    .into(),
290                            ), // It will retry but at least not completely crash.
291                        };
292
293                    logger_guard.flush_section(&mut self.current_section);
294                    self.current_section = logger_guard
295                        .add_section(self.entry_type, self.minimum_allocation_amount)?;
296
297                    let result = self
298                        .current_section
299                        .append(obj)
300                        .map_err(|e| {
301                            CuError::from(
302                                "Failed to encode object in a newly minted section. Unrecoverable failure.",
303                            )
304                            .add_cause(e.to_string().as_str())
305                        })?; // If we fail just after creating a section, there is not much we can do.
306
307                    self.current_position += result;
308                    self.current_section.header.used += result as u32;
309                    self.last_log_bytes = result;
310                    Ok(())
311                }
312                _ => {
313                    let err =
314                        <&str as Into<CuError>>::into("Unexpected error while encoding object.")
315                            .add_cause(e.to_string().as_str());
316                    Err(err)
317                }
318            },
319        }
320    }
321
322    fn last_log_bytes(&self) -> Option<usize> {
323        Some(self.last_log_bytes)
324    }
325}
326
327impl<S: SectionStorage, L: UnifiedLogWrite<S>> Drop for LogStream<S, L> {
328    fn drop(&mut self) {
329        #[cfg(feature = "std")]
330        match self.parent_logger.lock() {
331            Ok(mut logger_guard) => {
332                logger_guard.flush_section(&mut self.current_section);
333            }
334            Err(_) => {
335                // Only surface the warning when a real poisoning occurred.
336                if !std::thread::panicking() {
337                    eprintln!("⚠️ MmapStream::drop: logger mutex poisoned");
338                }
339            }
340        }
341
342        #[cfg(not(feature = "std"))]
343        {
344            let mut logger_guard = self.parent_logger.lock();
345            logger_guard.flush_section(&mut self.current_section);
346        }
347    }
348}