class ProcessExecuter::MonitoredPipe

Acts as a pipe that writes the data written to it to one or more destinations

{ProcessExecuter::MonitoredPipe} was created to expand the output redirection options for [Process.spawn](docs.ruby-lang.org/en/3.4/Process.html#method-c-spawn) and methods derived from it within the ‘ProcessExecuter` module.

This class’s initializer accepts any redirection destination supported by [Process.spawn](docs.ruby-lang.org/en/3.4/Process.html#method-c-spawn) (this is the ‘value` part of the file redirection option described in [the File Redirection section of `Process.spawn`](docs.ruby-lang.org/en/3.4/Process.html#module-Process-label-File+Redirection+-28File+Descriptor-29).

In addition to the standard redirection destinations, {ProcessExecuter::MonitoredPipe} also supports these additional types of destinations:

When a new MonitoredPipe is created, a pipe is created (via IO.pipe) and a thread is created to read data written to the pipe. As data is read from the pipe, it is written to the destination provided in the MonitoredPipe initializer.

If the destination raises an exception, the monitoring thread will exit, the pipe will be closed, and the exception will be saved in ‘#exception`.

> **⚠️ WARNING** > > ‘#close` must be called to ensure that (1) the pipe is closed, (2) all data is

read from the pipe and written to the destination, and (3) the monitoring thread is
killed.

@example Collect pipe data into a StringIO object

pipe_data = StringIO.new
begin
  pipe = ProcessExecuter::MonitoredPipe.new(pipe_data)
  pipe.write("Hello World")
ensure
  pipe.close
end
pipe_data.string #=> "Hello World"

@example Collect pipe data into a string AND a file

pipe_data_string = StringIO.new
pipe_data_file = File.open("pipe_data.txt", "w")
begin
  pipe = ProcessExecuter::MonitoredPipe.new([:tee, pipe_data_string, pipe_data_file])
  pipe.write("Hello World")
ensure
  pipe.close
end
pipe_data_string.string #=> "Hello World"
# It is your responsibility to close the file you opened
pipe_data_file.close
File.read("pipe_data.txt") #=> "Hello World"

@example Using a MonitoredPipe with Process.spawn

stdout_buffer = StringIO.new
begin
  stdout_pipe = ProcessExecuter::MonitoredPipe.new(stdout_buffer)
  pid = Process.spawn('echo Hello World', out: stdout_pipe)
  _waited_pid, status = Process.wait2(pid)
ensure
  stdout_pipe.close
end
stdout_buffer.string #=> "Hello World\n"

@api public

Attributes

chunk_size[R]

@!attribute [r]

The size of the chunks to read from the pipe

@example

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.chunk_size #=> 100_000

@return [Integer] the size of the chunks to read from the pipe

condition_variable[R]

@!attribute [r]

The condition variable used to synchronize access to the state

In particular, it is used while waiting for the state to change to :closed

@return [ConditionVariable]

@api private

destination[R]

@!attribute [r]

The redirection destination to write data that is read from the pipe

@example

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.destination #=> #<ProcessExecuter::Destinations::Writer>

@return [ProcessExecuter::Destinations::DestinationBase]

exception[R]

@!attribute [r]

The exception raised by a destination

If an exception is raised by a destination, it is stored here. Otherwise, it is ‘nil`.

@example

pipe.exception #=> nil

@return [Exception, nil] the exception raised by a destination or ‘nil` if no exception was raised

mutex[R]

@!attribute [r]

The mutex used to synchronize access to the state variable

@return [Mutex]

@api private

pipe_reader[R]

@!attribute [r]

The read end of the pipe

@example

pipe = ProcessExecuter::MonitoredPipe.new($stdout)
pipe.pipe_reader #=> #<IO:fd 11>

@return [IO]

@api private

pipe_writer[R]

@!attribute [r]

The write end of the pipe

@example

pipe = ProcessExecuter::MonitoredPipe.new($stdout)
pipe.pipe_writer #=> #<IO:fd 12>

@return [IO] the write end of the pipe

@api private

state[R]

@!attribute [r]

The state of the pipe

Must be either ‘:open`, `:closing`, or `:closed`

  • ‘:open` - the pipe is open and data can be written to it

  • ‘:closing` - the pipe is being closed and data can no longer be written to it

  • ‘:closed` - the pipe is closed and data can no longer be written to it

@example

pipe = ProcessExecuter::MonitoredPipe.new($stdout)
pipe.state #=> :open
pipe.close
pipe.state #=> :closed

@return [Symbol] the state of the pipe

thread[R]

@!attribute [r]

The thread that monitors the pipe

@example

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.thread #=> #<Thread:0x00007f8b1a0b0e00>

@return [Thread]

@api private

Public Class Methods

new(redirection_destination, chunk_size: 100_000) click to toggle source

Create a new monitored pipe

Creates an IO.pipe and starts a monitoring thread to read data written to the pipe.

@example

redirection_destination = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(redirection_destination)

@param redirection_destination [Object] as data is read from the pipe,

it is written to this destination

Accepts any redirection destination supported by
[`Process.spawn`](https://docs.ruby-lang.org/en/3.4/Process.html#method-c-spawn).
This is the `value` part of the file redirection option described in [the
File Redirection section of
`Process.spawn`](https://docs.ruby-lang.org/en/3.4/Process.html#module-Process-label-File+Redirection+-28File+Descriptor-29).

In addition to the standard redirection destinations, `MonitoredPipe` also
accepts (1) another monitored pipe, (2) any object that implements a `#write` method and
(3) an array in the form `[:tee, destination1, destination2, ...]` where each
`destination` can be any value that `MonitoredPipe` itself supports (including
another tee or MonitoredPipe).

@param chunk_size [Integer] the size of the chunks to read from the pipe

# File lib/process_executer/monitored_pipe.rb, line 122
def initialize(redirection_destination, chunk_size: 100_000)
  @destination = Destinations.factory(redirection_destination)

  assert_destination_is_compatible_with_monitored_pipe

  @mutex = Mutex.new
  @condition_variable = ConditionVariable.new
  @chunk_size = chunk_size
  @pipe_reader, @pipe_writer = IO.pipe

  # Set the encoding of the pipe reader to ASCII_8BIT. This is not strictly
  # necessary since read_nonblock always returns a String where encoding is
  # Encoding::ASCII_8BIT, but it is a good practice to explicitly set the
  # encoding.
  pipe_reader.set_encoding(Encoding::ASCII_8BIT)

  @state = :open
  @thread = start_monitoring_thread

  self.class.add_open_instance(self)
end

Public Instance Methods

close() click to toggle source

Set the state to ‘:closing` and wait for the state to be set to `:closed`

The monitoring thread will see that the state has changed and will close the pipe.

@example

data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.state #=> :open
pipe.write('Hello World')
pipe.close
pipe.state #=> :closed
data_collector.string #=> "Hello World"

@return [void]

# File lib/process_executer/monitored_pipe.rb, line 159
def close
  mutex.synchronize do
    if state == :open
      @state = :closing
      condition_variable.wait(mutex) while @state != :closed
    end
  end

  thread.join
  destination.close
  self.class.remove_open_instance(self)
end
fileno() click to toggle source

@!attribute [r] fileno

The file descriptor for the write end of the pipe

@example

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.fileno == pipe.to_io.fileno #=> true

@return [Integer] the file descriptor for the write end of the pipe

# File lib/process_executer/monitored_pipe.rb, line 205
def fileno
  pipe_writer.fileno
end
to_io() click to toggle source

Return the write end of the pipe so that data can be written to it

Data written to this end of the pipe will be read by the monitor thread and written to the destination.

This is so we can provide a MonitoredPipe to Process.spawn as a FD

@example

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.to_io.write('Hello World')
pipe.close
data_collector.string #=> "Hello World"

@return [IO] the write end of the pipe

# File lib/process_executer/monitored_pipe.rb, line 189
def to_io
  pipe_writer
end
write(data) click to toggle source

Writes data to the pipe so that it can be read by the monitor thread

Primarily used for testing.

@example

require 'stringio'
data_collector = StringIO.new
pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
pipe.write('Hello World')
pipe.close
data_collector.string #=> "Hello World"

@param data [String] the data to write to the pipe

@return [Integer] the number of bytes written to the pipe

@raise [IOError] if the pipe is not open

# File lib/process_executer/monitored_pipe.rb, line 227
def write(data)
  mutex.synchronize do
    raise IOError, 'closed stream' unless state == :open

    pipe_writer.write(data)
  end
end

Private Instance Methods

assert_destination_is_compatible_with_monitored_pipe() click to toggle source

Raise an error if the destination is not compatible with MonitoredPipe @return [void] @raise [ArgumentError] if the destination is not compatible with MonitoredPipe @api private

# File lib/process_executer/monitored_pipe.rb, line 368
def assert_destination_is_compatible_with_monitored_pipe
  return if destination.compatible_with_monitored_pipe?

  raise ArgumentError, "Destination #{destination.destination} is not compatible with MonitoredPipe"
end
close_pipe() click to toggle source

Read any remaining data from the pipe and close it

@return [void] @api private

# File lib/process_executer/monitored_pipe.rb, line 438
def close_pipe
  # Close the write end of the pipe so no more data can be written to it
  pipe_writer.close

  # Read remaining data from pipe_reader (if any)
  # If an exception was already raised by the last call to #write, then don't try to read remaining data
  monitor_pipe while exception.nil? && !pipe_reader.eof?

  # Close the read end of the pipe
  pipe_reader.close
end
monitor() click to toggle source

Read data from the pipe until ‘#state` is changed to `:closing`

The state is changed to ‘:closed` by calling `#close`.

Before this method returns, state is set to ‘:closed`

@return [void] @api private

# File lib/process_executer/monitored_pipe.rb, line 393
def monitor
  monitor_pipe until state == :closing
ensure
  close_pipe
  mutex.synchronize do
    @state = :closed
    condition_variable.signal
  end
end
monitor_pipe() click to toggle source

Read data from the pipe until ‘#state` is changed to `:closing`

Data read from the pipe is written to the destination.

@return [void] @api private

# File lib/process_executer/monitored_pipe.rb, line 409
def monitor_pipe
  # read_nonblock always returns a String where encoding is Encoding::ASCII_8BIT
  new_data = pipe_reader.read_nonblock(chunk_size)
  write_data(new_data)
rescue IO::WaitReadable
  pipe_reader.wait_readable(0.001)
end
start_monitoring_thread() click to toggle source

Start the thread to monitor the pipe and write data to the destination @return [void] @api private

# File lib/process_executer/monitored_pipe.rb, line 377
def start_monitoring_thread
  Thread.new do
    Thread.current.report_on_exception = false
    Thread.current.abort_on_exception = false
    monitor
  end
end
write_data(data) click to toggle source

Write the data read from the pipe to the destination

If an exception is raised by a writer, set the state to ‘:closing` so that the pipe can be closed.

@param data [String] the data read from the pipe @return [void] @api private

# File lib/process_executer/monitored_pipe.rb, line 425
def write_data(data)
  destination.write(data)
rescue StandardError => e
  mutex.synchronize do
    @exception = e
    @state = :closing
  end
end