Streaming

When ordering has been defined, consumers can use FnGraph::iter to iterate through the graph and invoke each function sequentially. When the "async" feature is enabled, which is on by default, FnGraph::stream will produce each function once all of its predecessors have been streamed and returned.

To know when a function is available to be streamed, fn_graph uses the following algorithm:

  • Track the number of predecessors of each node.
  • When streaming, each time a function is streamed and the closure returns, subtract 1 from the predecessor counts of each of that function's successors.
  • If the predecessor count of a function is 0, it is now available to be streamed.

Visualized

  1. In the example scenario, each function has a number of predecessors:

    %3fn1fn1:0fn3fn3:2fn1->fn3fn2fn2:1fn1->fn2fn5fn5:2fn3->fn5fn4fn4:2fn3->fn4fn2->fn3fn2->fn5fn2->fn4
  2. When a function completes, subtract 1 from each of its successors' predecessor counts:

    %3fn1fn1:0fn3fn3:1fn1->fn3fn2fn2:0fn1->fn2fn5fn5:2fn3->fn5fn4fn4:2fn3->fn4fn2->fn3fn2->fn5fn2->fn4
  3. This applies to both logical and data dependencies:

    %3fn1fn1:0fn3fn3:0fn1->fn3fn2fn2:0fn1->fn2fn5fn5:1fn3->fn5fn4fn4:1fn3->fn4fn2->fn3fn2->fn5fn2->fn4
  4. Performance is gained when multiple functions can be executed concurrently:

    %3fn1fn1:0fn3fn3:0fn1->fn3fn2fn2:0fn1->fn2fn5fn5:0fn3->fn5fn4fn4:0fn3->fn4fn2->fn3fn2->fn5fn2->fn4

This can be seen by timing the executions:

// [dependencies]
// fn_graph = { version = "0.5.4", features = ["fn_meta", "resman"] }
// futures = "0.3.21"
// resman = { version = "0.15.0", features = ["fn_meta", "fn_res"] }
// tokio = { version = "1.20.0", features = ["rt", "macros", "time"] }
use std::{
    fmt,
    ops::{AddAssign, Deref, DerefMut},
};

use fn_graph::FnGraphBuilder;
use futures::stream::StreamExt;
use resman::{IntoFnRes, Resources};

// Define newtypes for each parameter.
#[derive(Debug)]
struct A(u32);
#[derive(Debug)]
struct B(u32);
#[derive(Debug)]
struct C(u32);
#[derive(Debug)]
struct D(u32);

fn main() {
    // Initialize data.
    let mut resources = Resources::new();
    resources.insert(A(0));
    resources.insert(B(0));
    resources.insert(C(0));
    resources.insert(D(0));
    let resources = &resources; // Now the map is compile time immutable.

    // Define logic and insert them into graph structure.
    type Fn1 = fn(&mut A, &mut B, &mut C, &mut D);
    type Fn2 = fn(&A, &mut C);
    type Fn3 = fn(&B, &mut C);
    type Fn4 = fn(&C, &mut D);
    type Fn5 = fn(&A, &B, &C);

    let fn_graph = {
        let fn1: Fn1 = |a, b, c, d| { a.0 = 1; b.0 = 2; c.0 = 0; d.0 = 0; };
        let fn2: Fn2 = |a,    c   | *c += a.0;
        let fn3: Fn3 = |   b, c   | *c += b.0;
        let fn4: Fn4 = |      c, d| *d += c.0;
        let fn5: Fn5 = |a, b, c   | println!("{a} + {b} = {c}");

        let mut fn_graph_builder = FnGraphBuilder::new();

        // Store functions in graph.
        let [fn_id1, fn_id2, fn_id3, fn_id4, fn_id5] = fn_graph_builder.add_fns([
            fn1.into_fn_res(),
            fn2.into_fn_res(),
            fn3.into_fn_res(),
            fn4.into_fn_res(),
            fn5.into_fn_res(),
        ]);

        // Define dependencies to control ordering.
        fn_graph_builder
            .add_edges([
                (fn_id1, fn_id2),
                (fn_id1, fn_id3),
                (fn_id2, fn_id4),
                (fn_id2, fn_id5),
                (fn_id3, fn_id4),
                (fn_id3, fn_id5),
            ])
            .unwrap();

        fn_graph_builder.build()
    };

// Invoke logic over data.
let sequential_start = tokio::time::Instant::now();
fn_graph.iter().for_each(|fun| {
    fun.call(resources);
    std::thread::sleep(std::time::Duration::from_millis(10));
});
let sequential_elapsed = sequential_start.elapsed();
println!("sequential_elapsed: {sequential_elapsed:?}");

// prints:
// 1 + 2 = 3
// sequential_elapsed: 50.683709ms

let rt = tokio::runtime::Builder::new_current_thread()
    .enable_time()
    .build()
    .unwrap();
rt.block_on(async move {
    let concurrent_start = tokio::time::Instant::now();
    fn_graph
        .stream()
        .for_each_concurrent(None, |fun| async move {
            fun.call(resources);
            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
        })
        .await;
    let concurrent_elapsed = concurrent_start.elapsed();
    println!("concurrent_elapsed: {concurrent_elapsed:?}");
});

// prints:
// 1 + 2 = 3
// concurrent_elapsed: 44.740638ms
}

macro_rules! u32_newtype {
    ($name:ident) => {
        impl fmt::Display for $name {
            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
                self.0.fmt(f)
            }
        }
        impl Deref for $name {
            type Target = u32;

            fn deref(&self) -> &Self::Target {
                &self.0
            }
        }
        impl DerefMut for $name {
            fn deref_mut(&mut self) -> &mut Self::Target {
                &mut self.0
            }
        }
        impl AddAssign<u32> for $name {
            fn add_assign(&mut self, other: u32) {
                *self = Self(self.0 + other);
            }
        }
    };
}
u32_newtype!(A);
u32_newtype!(B);
u32_newtype!(C);
u32_newtype!(D);

Notably there is some overhead with the asynchronous execution, but as the number of functions grow, so should the concurrency, and the performance gains should increase proportionally.