Pipeline Nodes
shtk.PipelineNodeFactory subclasses are templates used to define the properties of shtk.PipelineNode instances. shtk.PipelineNode instances are nodes within a directed acyclic graph (DAG) that represent a process pipeline.
The leaf nodes of this DAG are always PipelineProcess instances representing an individual process that is part of the pipeline.
PipelineNode instances can be used to communicate, start, and stop individual processes within a process pipeline.
PipelineNode classes are typically instantiated by other elements of SHTK, rather than being manually instantiated by the end user.
shtk.PipelineNode
- class shtk.PipelineNode(event_loop)
Bases:
ABC
Abstract base class for subprocess management nodes
- children
children of this node
- Type
list of PipelineNode
- async classmethod create(*args, **kwargs)
Instantiates and runs the node
- Parameters
*args – passed to the constructor
**kwargs – passed to the constructor
- Returns
The instantiated and run node.
- Return type
- async run()
Runs the process
- flatten_children()
Flattens the PipelineNode DAG into a list of PipelineProcess objects using a depth-first search.
- Returns
All child PipelineProcess nodes
- Return type
list of PipelineProcess
- send_signal(signum)
Sends a signal to all child ProcessNode processes.
- Parameters
signum (int) – the signal to send.
- terminate()
Sends a signal.SIGTERM to all child ProcessNode processes.
- kill()
Sends a signal.SIGKILL to all child ProcessNode processes.
- async poll_async(ret)
Gets the return codes of all child ProcessNodes
- Parameters
ret (list of [int, None]) – a list that will be modified to contain a collection of return codes from flattened child ProcessNodes. Child processes that have exited will be represented by their return code. Child processes that have not exited will be represented by None.
- poll(timeout=1e-06)
Synchronous wrapper for poll_async(). Gets the return codes of all child ProcessNodes.
- Returns
- A list containing return codes from
flattened child ProcessNodes. Child processes that have exited will be represented by their integer return code. Child processes that have not exited will be represented by None.
- Return type
list of (int or None)
- async wait_async()
Waits for and retrieves the return codes of all child ProcessNodes.
- Returns
A list of return codes from a flattened collection of child processes.
- Return type
list of int
- wait()
Synchronous wrapper for wait_async().
- Returns
A list of return codes from a flattened collection of child processes.
- Return type
list of int
shtk.PipelineChannel
- class shtk.PipelineChannel(event_loop, left, right)
Bases:
PipelineNode
Represents a pipeline of commands
- Parameters
left (PipelineNode) – A PipelineNode whose stdout is (usually) fed to right
right (PipelineNode) – A PipelineNode whose stdin is (usually) read from left
- left
The left PipelineNode
- Type
- right
The right PipelineNode
- Type
- async classmethod create(*args, **kwargs)
Instantiates and runs the node
- Parameters
*args – passed to the constructor
**kwargs – passed to the constructor
- Returns
The instantiated and run node.
- Return type
- flatten_children()
Flattens the PipelineNode DAG into a list of PipelineProcess objects using a depth-first search.
- Returns
All child PipelineProcess nodes
- Return type
list of PipelineProcess
- kill()
Sends a signal.SIGKILL to all child ProcessNode processes.
- poll(timeout=1e-06)
Synchronous wrapper for poll_async(). Gets the return codes of all child ProcessNodes.
- Returns
- A list containing return codes from
flattened child ProcessNodes. Child processes that have exited will be represented by their integer return code. Child processes that have not exited will be represented by None.
- Return type
list of (int or None)
- async poll_async(ret)
Gets the return codes of all child ProcessNodes
- Parameters
ret (list of [int, None]) – a list that will be modified to contain a collection of return codes from flattened child ProcessNodes. Child processes that have exited will be represented by their return code. Child processes that have not exited will be represented by None.
- async run()
Runs the process
- send_signal(signum)
Sends a signal to all child ProcessNode processes.
- Parameters
signum (int) – the signal to send.
- terminate()
Sends a signal.SIGTERM to all child ProcessNode processes.
- wait()
Synchronous wrapper for wait_async().
- Returns
A list of return codes from a flattened collection of child processes.
- Return type
list of int
- async wait_async()
Waits for and retrieves the return codes of all child ProcessNodes.
- Returns
A list of return codes from a flattened collection of child processes.
- Return type
list of int
shtk.PipelineProcess
- class shtk.PipelineProcess(event_loop, cwd, args, env, stdin_stream, stdout_stream, stderr_stream, user=None, group=None, close_fds=True)
Bases:
PipelineNode
An interface representing subprocesses.
- Parameters
cwd (str or pathlib.Path) – The current working directory
args (list of str or pathlib.Path) – The arguments for the process (including the base command).
env (dict of str) – The environment variables for the process
stdin_stream (Stream) – The Stream whose .reader() is used as stdin
stdout_stream (Stream) – The Stream whose .writer() is used as stdout
stderr_stream (Stream) – The Stream whose .writer() is used as stderr
user (None, int, or str) – The user to pass to asyncio.create_subprocess_exec(). Requires Python >= 3.9.
group (None, int, or str) – The group to pass to asyncio.create_subprocess_exec(). Requires Python >= 3.9.
close_fds (bool) – If true, close_fds will be passed to the equivalent of subprocess.Popen().
- Raises
AssertionError – When len(args) <= 0
- async run()
Runs the process using asyncio.create_subprocess_exec()
- async classmethod create(*args, **kwargs)
Instantiates and runs the node
- Parameters
*args – passed to the constructor
**kwargs – passed to the constructor
- Returns
The instantiated and run node.
- Return type
- flatten_children()
Flattens the PipelineNode DAG into a list of PipelineProcess objects using a depth-first search.
- Returns
All child PipelineProcess nodes
- Return type
list of PipelineProcess
- kill()
Sends a signal.SIGKILL to all child ProcessNode processes.
- poll(timeout=1e-06)
Synchronous wrapper for poll_async(). Gets the return codes of all child ProcessNodes.
- Returns
- A list containing return codes from
flattened child ProcessNodes. Child processes that have exited will be represented by their integer return code. Child processes that have not exited will be represented by None.
- Return type
list of (int or None)
- async poll_async(ret)
Gets the return codes of all child ProcessNodes
- Parameters
ret (list of [int, None]) – a list that will be modified to contain a collection of return codes from flattened child ProcessNodes. Child processes that have exited will be represented by their return code. Child processes that have not exited will be represented by None.
- send_signal(signum)
Sends a signal to all child ProcessNode processes.
- Parameters
signum (int) – the signal to send.
- terminate()
Sends a signal.SIGTERM to all child ProcessNode processes.
- wait()
Synchronous wrapper for wait_async().
- Returns
A list of return codes from a flattened collection of child processes.
- Return type
list of int
- async wait_async()
Waits for and retrieves the return codes of all child ProcessNodes.
- Returns
A list of return codes from a flattened collection of child processes.
- Return type
list of int