use crate::config::{Cnx, CuConfig, NodeId};
use crate::config::{ComponentConfig, Node};
use crate::copperlist::{CopperList, CopperListState, CuListsManager};
use crate::monitoring::CuMonitor;
use cu29_clock::{ClockProvider, RobotClock};
use cu29_log_runtime::LoggerRuntime;
use cu29_traits::CopperListTuple;
use cu29_traits::CuResult;
use cu29_traits::WriteStream;
use cu29_unifiedlog::UnifiedLoggerWrite;
use std::sync::{Arc, Mutex};
use petgraph::prelude::*;
use std::fmt::Debug;
pub struct CopperContext {
pub unified_logger: Arc<Mutex<UnifiedLoggerWrite>>,
pub logger_runtime: LoggerRuntime,
pub clock: RobotClock,
}
pub struct CuRuntime<CT, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
pub tasks: CT,
pub monitor: M,
pub copper_lists_manager: CuListsManager<P, NBCL>,
pub clock: RobotClock, logger: Box<dyn WriteStream<CopperList<P>>>,
}
impl<CT, P: CopperListTuple, M: CuMonitor, const NBCL: usize> ClockProvider
for CuRuntime<CT, P, M, NBCL>
{
fn get_clock(&self) -> RobotClock {
self.clock.clone()
}
}
impl<CT, P: CopperListTuple + 'static, M: CuMonitor, const NBCL: usize> CuRuntime<CT, P, M, NBCL> {
pub fn new(
clock: RobotClock,
config: &CuConfig,
tasks_instanciator: impl Fn(Vec<Option<&ComponentConfig>>) -> CuResult<CT>,
monitor_instanciator: impl Fn(&CuConfig) -> M,
logger: impl WriteStream<CopperList<P>> + 'static,
) -> CuResult<Self> {
let all_instances_configs: Vec<Option<&ComponentConfig>> = config
.get_all_nodes()
.iter()
.map(|(_, node)| node.get_instance_config())
.collect();
let tasks = tasks_instanciator(all_instances_configs)?;
let monitor = monitor_instanciator(config);
let runtime = Self {
tasks,
monitor,
copper_lists_manager: CuListsManager::new(), clock,
logger: Box::new(logger),
};
Ok(runtime)
}
pub fn available_copper_lists(&self) -> usize {
NBCL - self.copper_lists_manager.len()
}
pub fn end_of_processing(&mut self, culistid: u32) {
let mut is_top = true;
let mut nb_done = 0;
self.copper_lists_manager.iter_mut().for_each(|cl| {
if cl.id == culistid && cl.get_state() == CopperListState::Processing {
cl.change_state(CopperListState::DoneProcessing);
}
if is_top && cl.get_state() == CopperListState::DoneProcessing {
cl.change_state(CopperListState::BeingSerialized);
self.logger.log(cl).unwrap();
cl.change_state(CopperListState::Free);
nb_done += 1;
} else {
is_top = false;
}
});
for _ in 0..nb_done {
let _ = self.copper_lists_manager.pop();
}
}
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum CuTaskType {
Source,
Regular,
Sink,
}
pub struct CuExecutionStep {
pub node_id: NodeId,
pub node: Node,
pub task_type: CuTaskType,
pub input_msg_indices_types: Vec<(u32, String)>,
pub output_msg_index_type: Option<(u32, String)>,
}
impl Debug for CuExecutionStep {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(format!(" CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
f.write_str(format!(" task_type: {:?}\n", self.node.get_type()).as_str())?;
f.write_str(format!(" task: {:?}\n", self.task_type).as_str())?;
f.write_str(
format!(
" input_msg_types: {:?}\n",
self.input_msg_indices_types
)
.as_str(),
)?;
f.write_str(
format!(" output_msg_type: {:?}\n", self.output_msg_index_type).as_str(),
)?;
Ok(())
}
}
pub struct CuExecutionLoop {
pub steps: Vec<CuExecutionUnit>,
pub loop_count: Option<u32>,
}
impl Debug for CuExecutionLoop {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("CuExecutionLoop:\n")?;
for step in &self.steps {
match step {
CuExecutionUnit::Step(step) => {
step.fmt(f)?;
}
CuExecutionUnit::Loop(l) => {
l.fmt(f)?;
}
}
}
f.write_str(format!(" count: {:?}", self.loop_count).as_str())?;
Ok(())
}
}
#[derive(Debug)]
pub enum CuExecutionUnit {
Step(CuExecutionStep),
Loop(CuExecutionLoop),
}
fn find_output_index_type_from_nodeid(
node_id: NodeId,
steps: &Vec<CuExecutionUnit>,
) -> Option<(u32, String)> {
for step in steps {
match step {
CuExecutionUnit::Loop(loop_unit) => {
if let Some(index) = find_output_index_type_from_nodeid(node_id, &loop_unit.steps) {
return Some(index);
}
}
CuExecutionUnit::Step(step) => {
if step.node_id == node_id {
return step.output_msg_index_type.clone();
}
}
}
}
None
}
pub fn find_task_type_for_id(
graph: &StableDiGraph<Node, Cnx, NodeId>,
node_id: NodeId,
) -> CuTaskType {
if graph.neighbors_directed(node_id.into(), Incoming).count() == 0 {
CuTaskType::Source
} else if graph.neighbors_directed(node_id.into(), Outgoing).count() == 0 {
CuTaskType::Sink
} else {
CuTaskType::Regular
}
}
fn plan_tasks_tree_branch(
config: &CuConfig,
mut next_culist_output_index: u32,
starting_point: NodeId,
plan: &mut Vec<CuExecutionUnit>,
) -> u32 {
let mut visitor = Bfs::new(&config.graph, starting_point.into());
while let Some(node) = visitor.next(&config.graph) {
let id = node.index() as NodeId;
let node = config.get_node(id).unwrap();
let mut input_msg_indices_types: Vec<(u32, String)> = Vec::new();
let output_msg_index_type: Option<(u32, String)>;
let task_type = find_task_type_for_id(&config.graph, id);
match task_type {
CuTaskType::Source => {
output_msg_index_type = Some((
next_culist_output_index,
config
.graph
.edge_weight(EdgeIndex::new(config.get_src_edges(id)[0]))
.unwrap()
.msg
.clone(),
));
next_culist_output_index += 1;
}
CuTaskType::Sink => {
let parents: Vec<NodeIndex> = config
.graph
.neighbors_directed(id.into(), Incoming)
.collect();
for parent in parents {
let index_type =
find_output_index_type_from_nodeid(parent.index() as NodeId, plan);
if let Some(index_type) = index_type {
input_msg_indices_types.push(index_type);
} else {
return next_culist_output_index;
}
}
output_msg_index_type = Some((
next_culist_output_index,
"()".to_string(), ));
next_culist_output_index += 1;
}
CuTaskType::Regular => {
let parents: Vec<NodeIndex> = config
.graph
.neighbors_directed(id.into(), Incoming)
.collect();
for parent in parents {
let index_type =
find_output_index_type_from_nodeid(parent.index() as NodeId, plan);
if let Some(index_type) = index_type {
input_msg_indices_types.push(index_type);
} else {
return next_culist_output_index;
}
}
output_msg_index_type = Some((
next_culist_output_index,
config
.graph
.edge_weight(EdgeIndex::new(config.get_src_edges(id)[0]))
.unwrap()
.msg
.clone(),
));
next_culist_output_index += 1;
}
}
input_msg_indices_types.sort_by(|a, b| a.0.cmp(&b.0));
if let Some(pos) = plan.iter().position(|step| {
if let CuExecutionUnit::Step(ref s) = step {
s.node_id == id
} else {
false
}
}) {
let mut step = plan.remove(pos);
if let CuExecutionUnit::Step(ref mut s) = step {
s.input_msg_indices_types = input_msg_indices_types;
}
plan.push(step);
} else {
let step = CuExecutionStep {
node_id: id,
node: node.clone(),
task_type,
input_msg_indices_types,
output_msg_index_type,
};
plan.push(CuExecutionUnit::Step(step));
}
}
next_culist_output_index
}
pub fn compute_runtime_plan(config: &CuConfig) -> CuResult<CuExecutionLoop> {
let nodes_to_visit = config
.graph
.node_indices()
.filter(|node_id| {
let id = node_id.index() as NodeId;
let task_type = find_task_type_for_id(&config.graph, id);
task_type == CuTaskType::Source
})
.collect::<Vec<NodeIndex>>();
let mut next_culist_output_index = 0u32;
let mut plan: Vec<CuExecutionUnit> = Vec::new();
for node_index in &nodes_to_visit {
next_culist_output_index = plan_tasks_tree_branch(
config,
next_culist_output_index,
node_index.index() as NodeId,
&mut plan,
);
}
Ok(CuExecutionLoop {
steps: plan,
loop_count: None, })
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::Node;
use crate::cutask::CuSinkTask;
use crate::cutask::{CuSrcTask, Freezable};
use crate::monitoring::NoMonitor;
use bincode::Encode;
pub struct TestSource {}
impl Freezable for TestSource {}
impl CuSrcTask<'_> for TestSource {
type Output = ();
fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
where
Self: Sized,
{
Ok(Self {})
}
fn process(&mut self, _clock: &RobotClock, _empty_msg: Self::Output) -> CuResult<()> {
Ok(())
}
}
pub struct TestSink {}
impl Freezable for TestSink {}
impl CuSinkTask<'_> for TestSink {
type Input = ();
fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
where
Self: Sized,
{
Ok(Self {})
}
fn process(&mut self, _clock: &RobotClock, _input: Self::Input) -> CuResult<()> {
Ok(())
}
}
type Tasks = (TestSource, TestSink);
type Msgs = ((),);
fn tasks_instanciator(all_instances_configs: Vec<Option<&ComponentConfig>>) -> CuResult<Tasks> {
Ok((
TestSource::new(all_instances_configs[0])?,
TestSink::new(all_instances_configs[1])?,
))
}
fn monitor_instanciator(_config: &CuConfig) -> NoMonitor {
NoMonitor {}
}
#[derive(Debug)]
struct FakeWriter {}
impl<E: Encode> WriteStream<E> for FakeWriter {
fn log(&mut self, _obj: &E) -> CuResult<()> {
Ok(())
}
}
#[test]
fn test_runtime_instanciation() {
let mut config = CuConfig::default();
config.add_node(Node::new("a", "TestSource"));
config.add_node(Node::new("b", "TestSink"));
config.connect(0, 1, "()");
let runtime = CuRuntime::<Tasks, Msgs, NoMonitor, 2>::new(
RobotClock::default(),
&config,
tasks_instanciator,
monitor_instanciator,
FakeWriter {},
);
assert!(runtime.is_ok());
}
#[test]
fn test_copperlists_manager_lifecycle() {
let mut config = CuConfig::default();
config.add_node(Node::new("a", "TestSource"));
config.add_node(Node::new("b", "TestSink"));
config.connect(0, 1, "()");
let mut runtime = CuRuntime::<Tasks, Msgs, NoMonitor, 2>::new(
RobotClock::default(),
&config,
tasks_instanciator,
monitor_instanciator,
FakeWriter {},
)
.unwrap();
{
let copperlists = &mut runtime.copper_lists_manager;
let culist0 = copperlists
.create()
.expect("Ran out of space for copper lists");
let id = culist0.id;
assert_eq!(id, 0);
culist0.change_state(CopperListState::Processing);
assert_eq!(runtime.available_copper_lists(), 1);
}
{
let copperlists = &mut runtime.copper_lists_manager;
let culist1 = copperlists
.create()
.expect("Ran out of space for copper lists"); let id = culist1.id;
assert_eq!(id, 1);
culist1.change_state(CopperListState::Processing);
assert_eq!(runtime.available_copper_lists(), 0);
}
{
let copperlists = &mut runtime.copper_lists_manager;
let culist2 = copperlists.create();
assert!(culist2.is_none());
assert_eq!(runtime.available_copper_lists(), 0);
}
runtime.end_of_processing(1);
assert_eq!(runtime.available_copper_lists(), 1);
{
let copperlists = &mut runtime.copper_lists_manager;
let culist2 = copperlists
.create()
.expect("Ran out of space for copper lists"); let id = culist2.id;
assert_eq!(id, 2);
culist2.change_state(CopperListState::Processing);
assert_eq!(runtime.available_copper_lists(), 0);
}
runtime.end_of_processing(0);
assert_eq!(runtime.available_copper_lists(), 0);
runtime.end_of_processing(2);
assert_eq!(runtime.available_copper_lists(), 2);
}
}