Basic Tutorial using Unix Processes

NOTE: This tutorial is for version v0.0.0.2 of the capataz library, follow this link to read this tutorial with the newest version

In this tutorial, we will build a small CLI application that spawns processes through Haskell's Unix Process API. We will keep all our threads running smoothly despite having one of the threads killing the spawned Unix processes through Unix SIGTERM signals.

We will implement two (2) different versions of our CLI program, one using standard Haskell threading utilities, and another using the Capataz library.

For this tutorial, we assume the reader is familiar with:

If you are not familiar with the topics above, we recommend following along and finding pertinent info from the links provided.

Summary of what our program will do

Our CLI program will receive an input parameter procNumber that will be used to spawn some green threads, each of them generating a Unix process. Each Unix process executes a simple bash script that echoes a number and increments it in an infinite while loop. Our Haskell program will also run a Haskell thread that will kill one of the many bash script executions.

You can find the code for this tutorial in the examples directory of the project's Github repository.

Setting up the stage - A trivial library for Processes

Let's start by explaining the Lib module; it contains utility functions to spawn and kill Unix processes, first the header:

{-# LANGUAGE DataKinds         #-}
{-# LANGUAGE DeriveGeneric     #-}
{-# LANGUAGE NamedFieldPuns    #-}
{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeOperators     #-}
module Lib where

import qualified Data.ByteString.Char8 as C
import           Data.List             ((!!))
import qualified Data.Text             as T
import           Options.Generic       (ParseRecord)
import           Protolude
import           System.IO             (hGetLine, hIsEOF)
import qualified System.Process        as Process
import qualified System.Random         as Random
import qualified Turtle

-- (0)
newtype Cli =
  Cli { procNumber :: Int }
  deriving (Generic, Show)

instance ParseRecord Cli

-- (1)
data SimpleProcess =
  SimpleProcess { readStdOut       :: !(IO (Either ExitCode ByteString))
                , terminateProcess :: !(IO ())
                , waitProcess      :: !(IO ExitCode)

-- (1)
spawnSimpleProcess :: Text -> [Text] -> IO SimpleProcess
spawnSimpleProcess program args = do
  let processSpec = (Process.proc (T.unpack program) (fmap T.unpack args))
        { Process.std_out = Process.CreatePipe

  (_, Just hout, _, procHandle) <- Process.createProcess processSpec

  let readStdOut :: IO (Either ExitCode ByteString)
      readStdOut = do
        isEof <- hIsEOF hout
        if not isEof
          then (Right . C.pack) <$> hGetLine hout
          else Left <$> Process.waitForProcess procHandle

      terminateProcess :: IO ()
      terminateProcess = Process.terminateProcess procHandle

      waitProcess :: IO ExitCode
      waitProcess = Process.waitForProcess procHandle

  return SimpleProcess {readStdOut , terminateProcess , waitProcess }

(0) We have a Cli record that we use to gather values for our CLI program. Using the optparse-generic library, this becomes a trivial affair. We make this work by adding an instance for Generic and ParseRecord.

(1) We create a SimpleProcess record. This record contains the logic to read the stdout of the spawned process and provides sub-routines for terminating or waiting for termination of the Unix process. This utility record limits the scope of the Haskell Unix process API to our small use case.

Next, we implement the function that will spawn a Unix process that performs an echo of numbers from 1 to infinity in bash:

  :: (Int -> IO ())  -- ^ sub-routine that writes number to other resource
  -> IO ()
spawnNumbersProcess writeNumber = do
  -- We are going to execute a while loop that echoes numbers to stdout
  proc' <-
        , "COUNTER=1; while [ $COUNTER -gt 0 ]; do "
          <>  "echo $COUNTER; sleep 1; let COUNTER=COUNTER+1; "
          <> "done"

  let loop = do
        -- read a string from stdout and transform it into a number, this sub-routine
        -- returns an Either where the Right value is a an stdout line, and the
        -- Left value is an exit code, in case the exit code is not a success, finish
        -- with an exception
        case eInput of
          Left exitCode
            | exitCode == ExitSuccess -> return ()
            | otherwise -> throwIO exitCode
          Right Nothing -> do
            putText "didn't get a number?"
          Right (Just number) -> do
            writeNumber number

  -- Make sure we terminate the process if we stop the loop using
  -- an async exception
  loop `finally` terminateProcess proc'

Now, let's have another IO sub-routine that lists Unix processes PIDs and picks at random one of them to send a SIGTERM signal. We use the Turtle library to run bash commands, we use the function (procStrict) which returns the stdout and exitCode of a process.

  :: Text  -- ^ Search processes with given name
  -> IO ()
processKiller processName = do
  (_exitCode, pgrepOutput) <-
      Turtle.procStrict "pgrep" ["-f", processName] Turtle.empty
      -- pgrep lists all pids from processes that have a particular name

  -- Split output in lines, and get pid per line
  let procNumbers = T.lines pgrepOutput
  case procNumbers of
    [] -> return ()
    _ -> do
      -- get a random element from the list of proccess identifiers
      theOneToKill <- Random.randomRIO (0, pred $ length procNumbers)

      putText $ "Process running: " <> show procNumbers
      putText $ "Killing: " <> (procNumbers !! theOneToKill)

      void $ Turtle.procStrict "kill" [procNumbers !! theOneToKill] Turtle.empty

Example 1 - Running program without supervision

Once we have the API that spawns Unix processes, we implement a concurrent application that generates Haskell threads and calls this API; we build each thread using the standard async package:

{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE NoImplicitPrelude #-}
module Main where

import Protolude -- (0)
import Options.Generic (getRecord)
import Control.Concurrent.Async (async)
import Lib (Cli(..), SimpleProcess(..), spawnNumbersProcess, killNumberProcess)

main :: IO ()
main = do
  n <- getRecord "Counter spawner" -- (1)

  let numberWriter i a = print (i, a)
      delayMicros = 5000100

  asyncList <- forM [1..procNumber n] $ \i ->
    async $ spawnNumbersProcess (numberWriter i) -- (2)

  killerAsync <-
    async $ forever $ threadDelay delayMicros >> killNumberProcess

  wait killerAsync `finally` mapM_ cancel asyncList

(0) We start by removing the default Prelude and use the batteries included protolude library, this provides most of the used functions from Haskell and some extra useful libraries.

(1) We use the optparse-generic library to get a quick CLI optparser that provides us with the number of processes to run.

(2) We spawn an async (thread) where each of them is going to execute the spawnProcessNumber sub-routine.

(3) We spawn another thread that kills Unix processes.

When we run the previous program, it will fail slowly, removing the output of each of the threads that stop working after receiving an asynchronous exception. The Operative System throws this exception, but it originates from the execution of the killerNumberProcess sub-routine which sends a SIGTERM signal to the spawned process on each of the running threads.

The next example will show a project that uses the same functions, but relies on our API, which restarts threads in case of failure from external factors (in this case a SIGTERM Unix signal).

Example 2 - Running program with supervision

Now, instead of using async, let's build the Haskell threads using a capataz instance that monitors both a group of threads that execute the spawnProcessNumber sub-routine and, a thread that terminates particular Unix process in a random fashion.

{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE NoImplicitPrelude #-}
module Main where

import Protolude
import Options.Generic (getRecord)
import Control.Concurrent.Capataz -- (0)
  ( WorkerOptions(..)
  , CapatazOptions(..)
  , WorkerRestartStrategy(..)
  , SupervisorRestartStrategy(..)
  , forkCapataz
  , forkWorker
  , defWorkerOptions
  , defCapatazOptions
  , capatazToAsync
  , teardown
import Lib (Cli(..), spawnNumbersProcess, killNumberProcess)
import Text.Show.Pretty (pPrint)

main :: IO ()
main = do
  n <- getRecord "Counter spawner"
  capataz <-
    --                (1)
    --                 |
    forkCapataz defCapatazOptions { capatazName = "Example Capataz"
                                  , capatazRestartStrategy = OneForOne -- (2)
                                  , notifyEvent = pPrint               -- (3)

  let numberWriter i a = print (i, a)
      delayMicros = 5000100

  forM_ [1..procNumber n] $ \i ->
    --              (4)
    --               |
    void $ forkWorker defWorkerOptions { workerName = "Worker (" <> show i <> ")"
                                       , workerRestartStrategy = Permanent  -- (5)
                     (spawnNumbersProcess (numberWriter i)) -- (6)

  let delayMicros = 5000100
  void $ forkWorker defWorkerOptions { workerName = "Worker Killer" }
                   (forever $ threadDelay delayMicros >> processKiller "while")

  wait (capatazToAsync capataz)  -- (7)
    (teardown capataz >>= print) -- (8)

We start the main sub-routine building a capataz instance, using the forkCapataz function.

(0) We import many symbols from our Capataz library; we need to provide some settings that will determine the restart mechanisms for the threads we want to keep running (despite any errors that could happen).

(1) its first argument is a set of options for the capataz instance, in this example, we are overwriting some default values.

(2) One of the options we overwrite is the capatazRestartStrategy. Some of the values may be:

  • OneForOne -- if a monitored sub-routine fails, the capataz will only restart the failing one

  • AllForOne -- if a monitored sub-routine fails, the capataz will restart all sub-routines that are currently monitoring

(3) We set the notifyEvent function, which emits CapatazEvent records. They indicate the current status of the capataz instance; in this simple example, we are using the pPrint (pretty print) function to debug the capataz instance execution.

We continue the example by spawning a few workers (a.k.a. the supervised IO () sub-routines). For this, we use the forkWorker function.

(4) As well as the forkCapataz function, forkWorker receives an options record. In this example, we are setting an explicit name for the workers.

(5) We are also specifying the workerRestartStrategy which may be:

  • Permanent -- The sub-routine will always get restarted, even it finishes without any errors. This strategy is ideal to monitor long-running servers.

  • Transient -- The sub-routine gets restarted, if and only if it fails with an error, if it completes without any errors, the capataz instance will drop the sub-routine from its supervision. This strategy is ideal to monitor one-time execution sub-routines.

  • Temporary -- The sub-routine will not be restarted, even in the cause of failure, used for non-important sub-routines.

(6) We pass our spawnProcessNumber sub-routine to execute it in a supervised thread. This approach is no different from using a simple forkIO. Note the capataz instance created in step (1) is the last parameter to the forkWorker function.

(7) We can convert the capataz sub-routine into an async to join the supervisor thread to the main thread.

(8) We make sure that we cleaned up the capataz instance and supervised sub-routine threads using the teardown API.

Try it out!

1) Clone the capataz repository

1) Run make run-example1

2) Run make run-example2

results matching ""

    No results matching ""