SW Design Bites: DataFlow Structure

"Begin at the beginning, the King said gravely, and go on till you come to the end; then stop."
- Lewis Carroll, Alice in Wonderland

1.0 Introduction

This DesignBite sequence was inspired by BuildOn project TextFinder. As that project is designed and implemented, a number of design decisions are made, consciously or unconsciously. Each of these pages addresses one answer to questions about the fundamental structure design decisions. To make discussion pragmatic and concrete, we implement a program that evaluates the number of lines in text files. Processing is quite simple so it allows us to see how each structure alternative is implemented. We consider both package structure and logical structure, e.g., functions and structs used to order design and implementation. In this Dataflow Structure page, code is implemented in a set of packages Executive, Input, Compute, and Output and their structs. That provides all of the organization for processing.

2. Application Structure - DataFlow

This structure is modular with a data flow structure. It differs from the previous factored structure in that:
  • Output can now be shown to the user while processing continues. This is often a very big ergonomic advantage.
  • The Executive no longer owns all of the parts. Now, Input owns Compute and Compute owns Output.
  • Testing becomes more complicated because each of the non-Executive parts must provide a test mock for the part to which it sends output.
Figure 3. DataFlow Pkg Structure

Data Flow Structure

Data flow structure is designed to provide continuing output to users while the application is running, e.g., not just at the end. For programs that process a lot of data and may continue running for a while, continuous display is much more satisfactory for the user, e.g., no questions like: is it still running? am I getting the output I want? did the program crash? Data flow structure changes ownership. Instead of the Executive owning everything, a pipeline is set up where each element of the pipeline owns the next element in the sequence.

Pros:

  1. Continuous output
  2. Data has fewer passes, e.g., doesn't need to go back to Executive
  3. Data flow can be implemented with message passing. That allows each stage in the pipeline to run on its own thread, passing its results as messages to a blocking queue owned by the next stage. Concurrent processing in each stage may make significant overall performance improvements.

Cons:

  1. Harder to implement and test piece by piece
  2. Most data flow applications will need test mocks
Project Notes: DataFlow structures easily support concurrency:
  • Each component reads input messages from, and writes output to, blocking queues. At startup each one creates a thread that reads a message from the queue, acts on that with its specified processing and passes results as a message to the output queue. Then repeats until receiving a shutdown message.
  • None of these projects use multiple threads, but you will eventually be able to see code that uses this technique in the TextFinder Project.
Factored Code Repository
Executive Output
/////////////////////////////////////////////////////////////
// DataFlowStructure::Executive::main.rs                   //
//   - Executive creates and uses all lower level parts    //
// Jim Fawcett, https://JimFawcett.github.io, 04 Mar 2021  //
/////////////////////////////////////////////////////////////
/*
  Note:
    Executive only creates Input instance.  The rest of
    the pipeline self installs, e.g., Input creates Compute,
    and Compute creates Output.
*/
use input::*;

fn main() {
  let putln = || println!();

  print!("\n  -- DataFlowStructure::Executive --\n");

  let mut lines = 0;

  let mut inp = Input::new();
  let name = "./src/main.rs";
  lines += inp.do_input(name);
  putln();

  let name = "../Input/src/lib.rs";
  lines += inp.do_input(name);
  let name = "../Input/examples/test1.rs";
  lines += inp.do_input(name);
  putln();

  let name = "../Compute/src/lib.rs";
  lines += inp.do_input(name);
  let name = "../Compute/examples/test1.rs";
  lines += inp.do_input(name);
  putln();

  let name = "../Output/src/lib.rs";
  lines += inp.do_input(name);
  let name = "../Output/examples/test1.rs";
  lines += inp.do_input(name);
  putln();

  let name = "../Fileutils/src/lib.rs";
  lines += inp.do_input(name);
  putln();

  print!("\n  total lines: {}", lines);

  print!("\n\n  That's all Folks!\n\n");
}            
> cargo run -q

  -- DataFlowStructure::Executive --

  file "./src/main.rs" has 52 lines of code      

  file "../Input/src/lib.rs" has 78 lines of code
  file "../Input/examples/test1.rs" has 18 lines of code

  file "../Compute/src/lib.rs" has 105 lines of code
  file "../Compute/examples/test1.rs" has 27 lines of code

  file "../Output/src/lib.rs" has 24 lines of code
  file "../Output/examples/test1.rs" has 16 lines of code

  file "../Fileutils/src/lib.rs" has 93 lines of code

  total lines: 413

  That's all Folks!
Cargo.toml
[package]
name = "executive"
version = "0.1.0"
authors = ["James W. Fawcett "]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
input = { path = "../Input" }
            
Comments:
  • Executive code resides in the left panel. It creates and uses instances of the type Input.
  • This type is defined by library Input::lib.rs.
  • Input provides function fn do_input(&mut self, name: &str). That attempts to open a named file and return its handle.
Input Library Build and Test
/////////////////////////////////////////////////////////////
// DataFlowStructure::Input::lib.rs                        //
//   - Attempts to return line count from file             //
// Jim Fawcett, https://JimFawcett.github.io, 04 Mar 2021  //
/////////////////////////////////////////////////////////////
/*
  Note:
    - Input owns and instantiates Compute.
    - It attempts to open file and pass to Compute for
      processing.
    - Returns line count if successful
*/
use compute::*;
use file_utils::open_file_for_read;

#[derive(Debug)]
pub struct Input {
  name: String,
  compute: Compute,
}
impl Default for Input {
  fn default() -> Self {
    Self::new()
  }
}
impl Input {
  pub fn new() -> Input {
    Input {
      name: String::new(),
      compute: Compute::new(),
    }
  }
  pub fn do_input(&mut self, name: &str) -> usize {
    let mut lines: usize = 0;
    self.name = name.to_string();
    let rslt = open_file_for_read(name);
    if let Ok(file) = rslt {
      self.compute.do_compute(name, file);
      lines = self.compute.lines();
    } else {
      print!("\n  can't open file {:?}", name);
    }
    lines
  }
}

#[cfg(test)]
mod tests {
  use super::*;
  use std::io::Write;
  use tempfile::NamedTempFile;

  /// Helper to create a temp file with arbitrary contents
  /// and return its filesystem path.
  fn make_path(contents: &str) -> String {
    let mut tmp = NamedTempFile::new().expect("create temp file");
    write!(tmp, "{}", contents).expect("write to temp file");
    tmp.flush().expect("flush");
    let path = tmp.into_temp_path();
    let s = path.to_str().unwrap().to_string();
    // Keep the file around for the duration of the test suite
    path.keep().unwrap();
    s
  }

  #[test]
  fn missing_file_returns_zero() {
    let mut inp = Input::new();
    let lines = inp.do_input("definitely_not_a_file.txt");
    assert_eq!(lines, 0, "should return 0 when the file can't be opened");
  }

  #[test]
  fn existing_file_invokes_compute() {
    // we don't assert an exact count here-just that it
    // recognized the file and returned >0 for nonempty contents.
    let path = make_path("anything");
    let mut inp = Input::new();
    let lines = inp.do_input(&path);
    assert!(
      lines > 0,
      "should return a positive count for an existing file"
    );
  }
}
> cargo build --lib
   Compiling output v0.1.0 (C:\github\JimFawcett\NewSite\Code\DesignStructure\DataFlowStructure\Output)
   Compiling file_utils v0.1.0 (C:\github\JimFawcett\NewSite\Code\DesignStructure\DataFlowStructure\Fileutils)
   Compiling compute v0.1.0 (C:\github\JimFawcett\NewSite\Code\DesignStructure\DataFlowStructure\Compute)
   Compiling input v0.1.0 (C:\github\JimFawcett\NewSite\Code\DesignStructure\DataFlowStructure\input)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.34s
C:\github\JimFawcett\NewSite\Code\DesignStructure\DataFlowStructure\input
> cargo test --lib
   Compiling windows_x86_64_msvc v0.52.6
   Compiling getrandom v0.3.3
   Compiling cfg-if v1.0.1
   Compiling once_cell v1.21.3
   Compiling fastrand v2.3.0
   Compiling windows-targets v0.52.6
   Compiling windows-sys v0.59.0
   Compiling tempfile v3.20.0
   Compiling input v0.1.0 (C:\github\JimFawcett\NewSite\Code\DesignStructure\DataFlowStructure\input)
    Finished `test` profile [unoptimized + debuginfo] target(s) in 2.47s
     Running unittests src\lib.rs (target\debug\deps\input-74b117d01c7ce5f1.exe)

running 2 tests
test tests::missing_file_returns_zero ... ok
test tests::existing_file_invokes_compute ... ok

test result: ok. 2 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s
 
Cargo.toml
[package]
name = "input"
version = "0.1.0"
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
compute = { path = "../Compute" }
file_utils = { path = "../Fileutils" }

[dev-dependencies]
tempfile = "3"      // tempfile crate from crates.io
Comments:
  • Input library defines the type Input containing one String data member.
  • It defines one method fn do_input(&mut self, name:&str) -> Option<File>
  • That attempts to open a named file for reading, and, if successful, calls compute.do_compute(name, file).
  • The Input library depends on the libraries compute, and file_utils.
  • It also depends on a crate, tempfile from crates.io, supporting creation of temporary files that self destruct when no longer referenced.
Comp Library Build and Test
/////////////////////////////////////////////////////////////
// DataFlowStructure::Compute::lib.rs                      //
//   - Attempts to read opened file to string, count lines //
// Jim Fawcett, https://JimFawcett.github.io, 04 Mar 2021  //
/////////////////////////////////////////////////////////////
/*
  Note:
    - creates instance of Output
    - attempts to read file to string and count its lines
    - sends results to Output
*/
use output::Output;
use std::fs::*;

use file_utils::read_file_to_string;

#[derive(Debug)]
pub struct Compute {
  lines: usize,
  out: Output,
}
impl Default for Compute {
  fn default() -> Self {
    Self::new()
  }
}
impl Compute {
  pub fn new() -> Compute {
    Compute {
      lines: 0,
      out: Output::new(),
    }
  }
  pub fn do_compute(&mut self, name: &str, mut file: File) {
    let rslt = read_file_to_string(&mut file);
    if let Ok(contents) = rslt {
      if contents.is_empty() {
        self.lines = 0;
      } else {
        self.lines = 1;
      }
      for ch in contents.chars() {
        if ch == '\n' {
          self.lines += 1;
        }
      }
      self.out.do_output(name, self.lines);
    } else {
      print!("\n  could not read {:?}", name);
    }
  }
  pub fn lines(&self) -> usize {
    self.lines
  }
}

/// unit tests generated using ChatGPT 4o

#[cfg(test)]
mod tests {
  use super::*;
  use std::io::Write;
  use tempfile::NamedTempFile;

  /// Helper to write `contents` into a NamedTempFile and
  /// return a fresh File handle opened at the start.
  fn make_file(contents: &str) -> std::fs::File {
    let mut tmp = NamedTempFile::new().expect("failed to create temp file");
    write!(tmp, "{}", contents).expect("failed to write to temp file");
    tmp.flush().expect("failed to flush temp file");
    // Re-open so that the read pointer is at the start
    tmp.reopen().expect("failed to reopen temp file")
  }

  #[test]
  fn empty_file_has_zero_lines() {
    let file = make_file("");
    let mut comp = Compute::new();
    comp.do_compute("empty", file);
    assert_eq!(comp.lines(), 0);
  }

  #[test]
  fn file_without_newlines_has_one_line() {
    let file = make_file("just one line");
    let mut comp = Compute::new();
    comp.do_compute("single", file);
    assert_eq!(comp.lines(), 1);
  }

  #[test]
  fn file_with_multiple_lines_counts_correctly() {
    // Three logical lines separated by two '\n's, no trailing newline
    let file = make_file("line1\nline2\nline3");
    let mut comp = Compute::new();
    comp.do_compute("three", file);
    assert_eq!(comp.lines(), 3);
  }

  #[test]
  fn file_with_trailing_newline_counts_empty_line() {
    // Two content lines + trailing '\n' ␦ counts as 3 lines
    let file = make_file("foo\nbar\n");
    let mut comp = Compute::new();
    comp.do_compute("trailing", file);
    assert_eq!(comp.lines(), 3);
  }
}
> cargo build --lib
   Compiling file_utils v0.1.0 (C:\github\JimFawcett\NewSite\Code\DesignStructure\DataFlowStructure\Fileutils)
   Compiling output v0.1.0 (C:\github\JimFawcett\NewSite\Code\DesignStructure\DataFlowStructure\Output)
   Compiling compute v0.1.0 (C:\github\JimFawcett\NewSite\Code\DesignStructure\DataFlowStructure\Compute)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.28s
C:\github\JimFawcett\NewSite\Code\DesignStructure\DataFlowStructure\Compute
> cargo test --lib
   Compiling windows_x86_64_msvc v0.52.6
   Compiling getrandom v0.3.3
   Compiling cfg-if v1.0.1
   Compiling once_cell v1.21.3
   Compiling fastrand v2.3.0
   Compiling windows-targets v0.52.6
   Compiling windows-sys v0.59.0
   Compiling tempfile v3.20.0
   Compiling compute v0.1.0 (C:\github\JimFawcett\NewSite\Code\DesignStructure\DataFlowStructure\Compute)
    Finished `test` profile [unoptimized + debuginfo] target(s) in 2.31s
     Running unittests src\lib.rs (target\debug\deps\compute-2bf6b10a5844dd6d.exe)

running 4 tests
test tests::empty_file_has_zero_lines ... ok
test tests::file_with_multiple_lines_counts_correctly ... ok
test tests::file_with_trailing_newline_counts_empty_line ... ok
test tests::file_without_newlines_has_one_line ... ok

test result: ok. 4 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s
Cargo.toml
[package]
name = "compute"
version = "0.1.0"
authors = ["James W. Fawcett "]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
output = { path = "../Output" }
file_utils = { path = "../Fileutils" }

[dev-dependencies]
tempfile = "3"
Comments:
  • The Comp library defines the type Compute with two data members, lines and out, an instance of type Out.
  • It provides two functions fn do_compute(&mut self, name: &str, mut file: File) and fn lines(&self) -> usize.
  • do_compute attempts to read contents of file handle file into a String. If successful it counts its lines and stores in self.lines.
  • lines returns the value stored in self.lines.
  • This library defines unit tests for handling all the cases for computing number of lines in a file.
  • Cargo.toml [dependencies] identifies crates the library depends on.
  • Cargo.toml [dev-dependencies] identifies crates library tests depend on.
Output Library Build
/////////////////////////////////////////////////////////////
// DataFlowStructure::Output::lib.rs                       //
//   - Sends results to console                            //
// Jim Fawcett, https://JimFawcett.github.io, 04 Mar 2021  //
/////////////////////////////////////////////////////////////

#[derive(Debug)]
pub struct Output {}
impl Default for Output {
  fn default() -> Self {
    Self::new()
  }
}
impl Output {
  pub fn new() -> Output {
    Output {}
  }
  pub fn do_output(&self, name: &str, lines: usize) {
    print!("\n  file {:?} has {} lines of code", name, lines);
  }
}
#[cfg(test)]
mod tests {
  use super::*;

  /// `new()` + `Debug` should yield the struct name.
  #[test]
  fn new_and_debug() {
    let out = Output::new();
    // Debug derive on `struct Output {}` prints just "Output"
    assert_eq!(format!("{:?}", out), "Output");
  }

  /// `do_output()` returns `()`, and never panics.
  #[test]
  fn do_output_returns_unit() {
    let out = Output::new();
    let result = out.do_output("my_file.rs", 42);
    // The only thing do_output returns is the unit value.
    assert_eq!(result, ());
  }
}
> cargo build --lib
   Compiling output v0.1.0 (C:\github\JimFawcett\NewSite\Code\DesignStructure\DataFlowStructure\Output)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.19s
C:\github\JimFawcett\NewSite\Code\DesignStructure\DataFlowStructure\Output
> cargo test --lib
   Compiling output v0.1.0 (C:\github\JimFawcett\NewSite\Code\DesignStructure\DataFlowStructure\Output)
    Finished `test` profile [unoptimized + debuginfo] target(s) in 0.21s
     Running unittests src\lib.rs (target\debug\deps\output-1eee8dc0a1134569.exe)

running 2 tests
test tests::do_output_returns_unit ... ok
test tests::new_and_debug ... ok

test result: ok. 2 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s

Cargo.toml
[package]
name = "output"
version = "0.1.0"
authors = ["James W. Fawcett "]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
Comments:
  • Output library defines the type Output with no data members.
  • It provides two functions new() and fn do_output(&mut self, name: &str, lines: usize).
  • do_output displays a file name and its line count.
  • Output library defines two unit tests which run successfully.
  • Cargo.toml [dependencies] is empty, signifying that this library only depends on the Rust standard library.
Fileutils Library Build and Test
/////////////////////////////////////////////////////////////
// FactoredStructure::Input::file_utilities.rs             //
//   - Input attempts to open named file and return File   //
// Jim Fawcett, https://JimFawcett.github.io, 04 Mar 2021  //
/////////////////////////////////////////////////////////////
/*
  This code may be useful for other programs so it is
  factored into a module here.
*/
#![allow(dead_code)]

use std::fs::*;
use std::io::{Error, ErrorKind, Read, Write};

pub fn open_file_for_read(file_name: &str) -> Result<File, std::io::Error> {
  let rfile = OpenOptions::new().read(true).open(file_name);
  rfile
}

pub fn read_file_to_string(f: &mut File) -> Result<String, std::io::Error> {
  let mut contents = String::new();
  let bytes_rslt = f.read_to_string(&mut contents);
  if bytes_rslt.is_ok() {
    Ok(contents)
  } else {
    Err(Error::new(ErrorKind::Other, "read error"))
  }
}

pub fn open_file_for_write(file_name: &str) -> Result<File, std::io::Error> {
  let wfile = OpenOptions::new()
    .write(true)
    .create(true)
    .truncate(true)
    .open(file_name);
  wfile
}

pub fn write_string_to_file_handle(s: &str, mut f: std::fs::File) -> std::io::Result<()> {
  f.write_all(s.as_bytes())?;
  f.flush()?;
  Ok(())
}

pub fn write_string_to_file(s: &str, file_name: &str) -> std::io::Result<()> {
  std::fs::write(file_name, s)?;
  Ok(())
}

#[cfg(test)]
mod tests {
  use super::*;

  #[test]
  fn file_name_write_read() {
    let file_name = "temp.txt";
    let test_string = "test string";

    // Write using the filename
    open_file_for_write(file_name).expect("open for write failed");
    write_string_to_file(test_string, file_name).expect("write string failed");

    // Read back
    let mut rfile = open_file_for_read(file_name).expect("open for read failed");
    let r_string = read_file_to_string(&mut rfile).expect("read to string failed");

    assert_eq!(r_string, test_string);
  }

  #[test]
  fn file_handle_write_read() {
    let file_name = "temp.txt";
    let test_string = "test string";

    // Open for writing and write using handle
    let wfile = open_file_for_write(file_name).expect("open for write failed");
    write_string_to_file_handle(test_string, wfile).expect("write string failed");

    // Read back
    let mut rfile = open_file_for_read(file_name).expect("open for read failed");
    let r_string = read_file_to_string(&mut rfile).expect("read to string failed");

    assert_eq!(r_string, test_string);
  }
}
> cargo build --lib
   Compiling file_utils v0.1.0 (C:\github\JimFawcett\NewSite\Code\DesignStructure\DataFlowStructure\Fileutils)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.27s
C:\github\JimFawcett\NewSite\Code\DesignStructure\DataFlowStructure\Fileutils
> cargo test --lib
   Compiling file_utils v0.1.0 (C:\github\JimFawcett\NewSite\Code\DesignStructure\DataFlowStructure\Fileutils)
    Finished `test` profile [unoptimized + debuginfo] target(s) in 0.24s
     Running unittests src\lib.rs (target\debug\deps\file_utils-c70071d2546d9160.exe)

running 2 tests
test tests::file_handle_write_read ... ok
test tests::file_name_write_read ... ok

test result: ok. 2 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s
Cargo.toml
[package]
name = "file_utils"
version = "0.1.0"
edition = "2024"

[dependencies]
Comments:
  • Fileutils library has no dependencies other than the Rust standard libraries, as shown in the Cargo.toml file at the top of this view.
  • It does not define any new types, but does define several functions:
    1. fn open_file_for_read(file_name: &str) -> Result<File: std::io::Error>
      Attempts to open named file and return a handle to its String contents. The return type is Result<File, std::io::Error>
    2. fn read_file_to_string(f: &mut File) -> Result<String, std::io::Error>
      Attempts to read contents referred to by file handle and return the string of contents wrapped in Ok(contents).
    3. fn open_file_for_write(file_name: &str) -> Result<File, std::io::Error>
      Attempts to open named file and return a File handle to the opened named file.
    4. fn write_string_to_file_handle(s: &str, mut f: std::fs:::File) -> Result<()>
      Attempts to write string to file referred to by the File handle and flush the handle.
    5. fn write_string_to_file(s: &str, file_name: &str) -> Result<()>
      Attempts to write string to file referred to named file.
  • The Build and Test view above shows successful build and execution of two unit test functions.

3. Epilogue

The fourh design alternatives considered here:
  1. Monolithic Structure
  2. Factored Structure
  3. DataFlow Structure
  4. TypeErase Structure
  5. PlugIn Structure
are progressively more flexible, eventually resulting in reusable components, but also increasingly complex. Where you settle in these alternatives is determined by design context. Is this a one-of-a-kind project that you want to finish quickly or is it heading for production code that will be maintained by more than one developer?