Streaming
When ordering has been defined, consumers can use FnGraph::iter
to iterate through the graph and invoke each function sequentially. When the "async"
feature is enabled, which is on by default, FnGraph::stream
will produce each function once all of its predecessors have been streamed and returned.
To know when a function is available to be streamed, fn_graph
uses the following algorithm:
- Track the number of predecessors of each node.
- When streaming, each time a function is streamed and the closure returns, subtract 1 from the predecessor counts of each of that function's successors.
- If the predecessor count of a function is 0, it is now available to be streamed.
Visualized
-
In the example scenario, each function has a number of predecessors:
-
When a function completes, subtract 1 from each of its successors' predecessor counts:
-
This applies to both logical and data dependencies:
-
Performance is gained when multiple functions can be executed concurrently:
This can be seen by timing the executions:
// [dependencies]
// fn_graph = { version = "0.5.4", features = ["fn_meta", "resman"] }
// futures = "0.3.21"
// resman = { version = "0.15.0", features = ["fn_meta", "fn_res"] }
// tokio = { version = "1.20.0", features = ["rt", "macros", "time"] }
use std::{
fmt,
ops::{AddAssign, Deref, DerefMut},
};
use fn_graph::FnGraphBuilder;
use futures::stream::StreamExt;
use resman::{IntoFnRes, Resources};
// Define newtypes for each parameter.
#[derive(Debug)]
struct A(u32);
#[derive(Debug)]
struct B(u32);
#[derive(Debug)]
struct C(u32);
#[derive(Debug)]
struct D(u32);
fn main() {
// Initialize data.
let mut resources = Resources::new();
resources.insert(A(0));
resources.insert(B(0));
resources.insert(C(0));
resources.insert(D(0));
let resources = &resources; // Now the map is compile time immutable.
// Define logic and insert them into graph structure.
type Fn1 = fn(&mut A, &mut B, &mut C, &mut D);
type Fn2 = fn(&A, &mut C);
type Fn3 = fn(&B, &mut C);
type Fn4 = fn(&C, &mut D);
type Fn5 = fn(&A, &B, &C);
let fn_graph = {
let fn1: Fn1 = |a, b, c, d| { a.0 = 1; b.0 = 2; c.0 = 0; d.0 = 0; };
let fn2: Fn2 = |a, c | *c += a.0;
let fn3: Fn3 = | b, c | *c += b.0;
let fn4: Fn4 = | c, d| *d += c.0;
let fn5: Fn5 = |a, b, c | println!("{a} + {b} = {c}");
let mut fn_graph_builder = FnGraphBuilder::new();
// Store functions in graph.
let [fn_id1, fn_id2, fn_id3, fn_id4, fn_id5] = fn_graph_builder.add_fns([
fn1.into_fn_res(),
fn2.into_fn_res(),
fn3.into_fn_res(),
fn4.into_fn_res(),
fn5.into_fn_res(),
]);
// Define dependencies to control ordering.
fn_graph_builder
.add_logic_edges([
(fn_id1, fn_id2),
(fn_id1, fn_id3),
(fn_id2, fn_id4),
(fn_id2, fn_id5),
(fn_id3, fn_id4),
(fn_id3, fn_id5),
])
.unwrap();
fn_graph_builder.build()
};
// Invoke logic over data.
let sequential_start = tokio::time::Instant::now();
fn_graph.iter().for_each(|fun| {
fun.call(resources);
std::thread::sleep(std::time::Duration::from_millis(10));
});
let sequential_elapsed = sequential_start.elapsed();
println!("sequential_elapsed: {sequential_elapsed:?}");
// prints:
// 1 + 2 = 3
// sequential_elapsed: 50.683709ms
let rt = tokio::runtime::Builder::new_current_thread()
.enable_time()
.build()
.unwrap();
rt.block_on(async move {
let concurrent_start = tokio::time::Instant::now();
fn_graph
.stream()
.for_each_concurrent(None, |fun| async move {
fun.call(resources);
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
})
.await;
let concurrent_elapsed = concurrent_start.elapsed();
println!("concurrent_elapsed: {concurrent_elapsed:?}");
});
// prints:
// 1 + 2 = 3
// concurrent_elapsed: 44.740638ms
}
macro_rules! u32_newtype {
($name:ident) => {
impl fmt::Display for $name {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.0.fmt(f)
}
}
impl Deref for $name {
type Target = u32;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for $name {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl AddAssign<u32> for $name {
fn add_assign(&mut self, other: u32) {
*self = Self(self.0 + other);
}
}
};
}
u32_newtype!(A);
u32_newtype!(B);
u32_newtype!(C);
u32_newtype!(D);
Notably there is some overhead with the asynchronous execution, but as the number of functions grow, so should the concurrency, and the performance gains should increase proportionally.