Pipeline Nodes

shtk.PipelineNodeFactory subclasses are templates used to define the properties of shtk.PipelineNode instances. shtk.PipelineNode instances are nodes within a directed acyclic graph (DAG) that represent a process pipeline.

The leaf nodes of this DAG are always PipelineProcess instances representing an individual process that is part of the pipeline.

PipelineNode instances can be used to communicate, start, and stop individual processes within a process pipeline.

PipelineNode classes are typically instantiated by other elements of SHTK, rather than being manually instantiated by the end user.

shtk.PipelineNode

class shtk.PipelineNode(event_loop)

Bases: ABC

Abstract base class for subprocess management nodes

children

children of this node

Type

list of PipelineNode

stdin_stream

Stream to use for stdin

Type

None or Stream

stdout_stream

Stream to use for stdout

Type

None or Stream

stderr_stream

Stream to use for stderr

Type

None or Stream

async classmethod create(*args, **kwargs)

Instantiates and runs the node

Parameters
  • *args – passed to the constructor

  • **kwargs – passed to the constructor

Returns

The instantiated and run node.

Return type

PipelineNode

async run()

Runs the process

flatten_children()

Flattens the PipelineNode DAG into a list of PipelineProcess objects using a depth-first search.

Returns

All child PipelineProcess nodes

Return type

list of PipelineProcess

send_signal(signum)

Sends a signal to all child ProcessNode processes.

Parameters

signum (int) – the signal to send.

terminate()

Sends a signal.SIGTERM to all child ProcessNode processes.

kill()

Sends a signal.SIGKILL to all child ProcessNode processes.

async poll_async(ret)

Gets the return codes of all child ProcessNodes

Parameters

ret (list of [int, None]) – a list that will be modified to contain a collection of return codes from flattened child ProcessNodes. Child processes that have exited will be represented by their return code. Child processes that have not exited will be represented by None.

poll(timeout=1e-06)

Synchronous wrapper for poll_async(). Gets the return codes of all child ProcessNodes.

Returns

A list containing return codes from

flattened child ProcessNodes. Child processes that have exited will be represented by their integer return code. Child processes that have not exited will be represented by None.

Return type

list of (int or None)

async wait_async()

Waits for and retrieves the return codes of all child ProcessNodes.

Returns

A list of return codes from a flattened collection of child processes.

Return type

list of int

wait()

Synchronous wrapper for wait_async().

Returns

A list of return codes from a flattened collection of child processes.

Return type

list of int

shtk.PipelineChannel

class shtk.PipelineChannel(event_loop, left, right)

Bases: PipelineNode

Represents a pipeline of commands

Parameters
  • left (PipelineNode) – A PipelineNode whose stdout is (usually) fed to right

  • right (PipelineNode) – A PipelineNode whose stdin is (usually) read from left

left

The left PipelineNode

Type

PipelineNode

right

The right PipelineNode

Type

PipelineNode

async classmethod create(*args, **kwargs)

Instantiates and runs the node

Parameters
  • *args – passed to the constructor

  • **kwargs – passed to the constructor

Returns

The instantiated and run node.

Return type

PipelineNode

flatten_children()

Flattens the PipelineNode DAG into a list of PipelineProcess objects using a depth-first search.

Returns

All child PipelineProcess nodes

Return type

list of PipelineProcess

kill()

Sends a signal.SIGKILL to all child ProcessNode processes.

poll(timeout=1e-06)

Synchronous wrapper for poll_async(). Gets the return codes of all child ProcessNodes.

Returns

A list containing return codes from

flattened child ProcessNodes. Child processes that have exited will be represented by their integer return code. Child processes that have not exited will be represented by None.

Return type

list of (int or None)

async poll_async(ret)

Gets the return codes of all child ProcessNodes

Parameters

ret (list of [int, None]) – a list that will be modified to contain a collection of return codes from flattened child ProcessNodes. Child processes that have exited will be represented by their return code. Child processes that have not exited will be represented by None.

async run()

Runs the process

send_signal(signum)

Sends a signal to all child ProcessNode processes.

Parameters

signum (int) – the signal to send.

terminate()

Sends a signal.SIGTERM to all child ProcessNode processes.

wait()

Synchronous wrapper for wait_async().

Returns

A list of return codes from a flattened collection of child processes.

Return type

list of int

async wait_async()

Waits for and retrieves the return codes of all child ProcessNodes.

Returns

A list of return codes from a flattened collection of child processes.

Return type

list of int

shtk.PipelineProcess

class shtk.PipelineProcess(event_loop, cwd, args, env, stdin_stream, stdout_stream, stderr_stream, user=None, group=None, close_fds=True)

Bases: PipelineNode

An interface representing subprocesses.

Parameters
  • cwd (str or pathlib.Path) – The current working directory

  • args (list of str or pathlib.Path) – The arguments for the process (including the base command).

  • env (dict of str) – The environment variables for the process

  • stdin_stream (Stream) – The Stream whose .reader() is used as stdin

  • stdout_stream (Stream) – The Stream whose .writer() is used as stdout

  • stderr_stream (Stream) – The Stream whose .writer() is used as stderr

  • user (None, int, or str) – The user to pass to asyncio.create_subprocess_exec(). Requires Python >= 3.9.

  • group (None, int, or str) – The group to pass to asyncio.create_subprocess_exec(). Requires Python >= 3.9.

  • close_fds (bool) – If true, close_fds will be passed to the equivalent of subprocess.Popen().

Raises

AssertionError – When len(args) <= 0

async run()

Runs the process using asyncio.create_subprocess_exec()

async classmethod create(*args, **kwargs)

Instantiates and runs the node

Parameters
  • *args – passed to the constructor

  • **kwargs – passed to the constructor

Returns

The instantiated and run node.

Return type

PipelineNode

flatten_children()

Flattens the PipelineNode DAG into a list of PipelineProcess objects using a depth-first search.

Returns

All child PipelineProcess nodes

Return type

list of PipelineProcess

kill()

Sends a signal.SIGKILL to all child ProcessNode processes.

poll(timeout=1e-06)

Synchronous wrapper for poll_async(). Gets the return codes of all child ProcessNodes.

Returns

A list containing return codes from

flattened child ProcessNodes. Child processes that have exited will be represented by their integer return code. Child processes that have not exited will be represented by None.

Return type

list of (int or None)

async poll_async(ret)

Gets the return codes of all child ProcessNodes

Parameters

ret (list of [int, None]) – a list that will be modified to contain a collection of return codes from flattened child ProcessNodes. Child processes that have exited will be represented by their return code. Child processes that have not exited will be represented by None.

send_signal(signum)

Sends a signal to all child ProcessNode processes.

Parameters

signum (int) – the signal to send.

terminate()

Sends a signal.SIGTERM to all child ProcessNode processes.

wait()

Synchronous wrapper for wait_async().

Returns

A list of return codes from a flattened collection of child processes.

Return type

list of int

async wait_async()

Waits for and retrieves the return codes of all child ProcessNodes.

Returns

A list of return codes from a flattened collection of child processes.

Return type

list of int