Part 3. Running a pipeline (25 points)

In the previous parts, you parsed input into tokens and then the tokens into a representation of a pipeline. In this part, you’re going to implement a run() method which will create and run the processes in the pipeline.

To create a new process, you’re going to use Rust’s standard std::process::Command. It’s worth reading the documentation now.

Example

#![allow(unused)]
fn main() {
use std::process::{Command, Stdio};

let arguments = vec!["-l", "/usr"];
let child = Command::new("ls")
    .args(arguments)
    .stdin(Stdio::null())
    .stdout(Stdio::inherit())
    .stderr(Stdio::inherit())
    .spawn();

match child {
    Ok(mut child) => {
        // Wait for the child to exit.
        child.wait().unwrap();
    }
    Err(err) => {
        eprintln!("Error: {err}");
    }
}
}

(Click Run to see the output.)

There are several things to notice in this example. First, the style of code with the Command struct is called the “builder pattern” because Command is only used for configuring the command to run. Each of the methods you call on a Command return a &mut Self, meaning you can chain together methods as shown in the example. The final method, .spawn() consumes the Command and returns a Child (wrapped in a Result, of course).

Second, you configure the arguments by calling .args() and passing it a vector of arguments. (Cleverly, it can handle arrays, vectors, and slices of &str or String.)

Third, you configure the behavior of stdin, stdout, and stderr by passing the corresponding methods an instance of Stdio. You’ll see next lab how to redirect those to/from files. For now, you’ll want to use Stdio::inherit() to mean use the stdin/stdout/stderr of the current process, Stdio::piped() to mean create a pipe (see the next example), and Stdio::null() to mean connect it to /dev/null.

Fourth, The .spawn() method returns a Child. To wait for the child to exit, use the .wait() method. The .wait() method returns a Result, but the error condition shouldn’t happen since .spawn() just spawned a new process so the example just unwraps the result.

The previous example shows how to spawn one process. To spawn two processes and have the second process’s stdin be the first process’s stdout, you’ll need to (1) spawn the first process; (2) extract the ChildStdout from the Child returned by .spawn(); (3) convert that to a Stdio; (4) spawn the second process using the newly converted Stdio as stdin; and (5) wait for both children.

Example

#![allow(unused)]
fn main() {
use std::process::{Command, Stdio};

let mut children = Vec::new();

// 1. Spawn the first child.
let arguments = vec!["-l", "/usr"];
let first_child = Command::new("ls")
    .args(arguments)
    .stdin(Stdio::inherit())
    .stdout(Stdio::piped())
    .stderr(Stdio::inherit())
    .spawn();

let first_child_stdout: Stdio = match first_child {
    Ok(mut child) => {
        let stdout = child.stdout
            .take() // 2. Extract the ChildStdout.
            .map_or(Stdio::null(), Stdio::from); // 3. Convert it to a Stdio.
        children.push(child);
        stdout
    }
    Err(err) => {
        eprintln!("Error: {err}");
        Stdio::null()
    }
};

// 4. Spawn the second child.
let second_child = Command::new("wc")
    .stdin(first_child_stdout)
    .stdout(Stdio::inherit())
    .stderr(Stdio::inherit())
    .spawn();

match second_child {
    Ok(mut child) => {
        children.push(child)
    }
    Err(err) => {
        eprintln!("Error: {err}");
    }
}

// 5. Wait for all of the children that were created successfully.
for mut child in children {
    child.wait().unwrap();
}
}

One tricky portion of this is

        let stdout = child.stdout
            .take() // 2. Extract the ChildStdout.
            .map_or(Stdio::null(), Stdio::from); // 3. Convert it to a Stdio.

child.stdout is an Option<ChildStdout>. The .take() method for an Option takes the value from self if it’s a Some, replacing it with None and returns a new Option. Here’s some example code showing this behavior. (Click Run.)

#![allow(unused)]
fn main() {
let mut x: Option<i32> = Some(10);
let taken_from_x = x.take();

let mut y: Option<i32> = None;
let taken_from_y = y.take();
println!("{x:?}  {taken_from_x:?}");
println!("{y:?}  {taken_from_y:?}");
}

The .map_or(default_value, function) method for an Option will return default_value if the Option is None, otherwise, it unwraps the Option and calls function on the unwrapped value. In this case, if child.stdout is None, stdout will be set to Stdio::null() and if child.stdout is Some(child_stdout), then stdout will be set to Stdio::from(child_stdout).

Your task

Implement the run() method.

#![allow(unused)]
fn main() {
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
struct Pipeline;
impl Pipeline {
    pub fn run(self) -> Result<()> { todo!() }
}
}

Your code should spawn processes in a loop, connecting the stdout of each process to stdin of the next process. I suggest starting with something like

let mut last_stdout = Stdio::inherit();

and using it as the argument to .stdin() for each process. After spawning a process, extract the ChildStdout and convert it to a Stdio as shown in the example and assign it to last_stdout.

Each child that is successfully spawned should be added to a vector of Child. Any errors spawning a child should be printed to stderr, but the rest of the pipeline should be spawned. After all children have attempted to be spawned, you should wait for all of them to complete. See the example for details.

Modify your main() function to run the Pipeline you’ve created.