Create a Stateful Worker

Description

In this example we implement a naive polish calculator, and it stores it's calculator state every time an actor fails or terminates.

The following example:

  • defines a set of operations that can be performed to worker via the CalculatorOperation record
  • reads a backup from a filepath everytime it runs
  • writes backup to a filepath everytime it is terminated (using workerOnTerminationL)
  • writes backup to a filepath everytime it fails (using workerOnFailureL)
  • creates a worker spec with behavior defined above
  • creates a capataz system and a worker (using worker specification)
  • executes a few operations to showcase calculator usage
  • waits a few milliseconds to give the calculator worker some leeway
  • tears down the capataz system and show results of teardown operation

Code

{-# LANGUAGE DeriveGeneric     #-}
{-# LANGUAGE OverloadedStrings #-}
module Control.Concurrent.Capataz.HowTo.StaticWorker where

import Control.Concurrent         (threadDelay)
import Control.Concurrent.Capataz
    ( ProcessSpec
    , WorkerRestartStrategy (..)
    , forkCapataz
    , set
    , supervisorProcessSpecListL
    , teardown
    , workerOnFailureL
    , workerOnTerminationL
    , workerRestartStrategyL
    , workerSpec
    )
import Control.Concurrent.STM     (atomically, newTQueueIO, readTQueue, writeTQueue)
import Control.Monad              (forever)
import Data.IORef                 (IORef, atomicModifyIORef, newIORef, readIORef)
import Data.Maybe                 (fromMaybe)
import GHC.Generics               (Generic)
import Safe                       (readMay)
import System.Directory           (doesFileExist)

data CalculatorOperation
  = Push Int
  | Add
  | Multiply
  | Top (Maybe Int -> IO ())
  | Read ([Int] -> IO ())
  deriving (Generic)

buildCalculatorWorker :: FilePath -> IO CalculatorOperation -> IO ProcessSpec
buildCalculatorWorker prevStBackup getOperation = do
  st    <- restoreBackup
  stRef <- newIORef st
  return $ workerSpec
    "calculator-worker"
    (workerLoop stRef)
    ( set workerRestartStrategyL Permanent
    . set workerOnFailureL       (const $ storeBackup stRef)
    . set workerOnTerminationL   (storeBackup stRef)
    )
 where
  restoreBackup :: IO [Int]
  restoreBackup = do
    filePresent <- doesFileExist prevStBackup
    if filePresent
      then (fromMaybe [] . readMay) <$> readFile prevStBackup
      else return []

  storeBackup :: IORef [Int] -> IO ()
  storeBackup stRef = do
    st <- readIORef stRef
    writeFile prevStBackup (show st)

  workerLoop :: IORef [Int] -> IO ()
  workerLoop stRef = forever $ do
    operation <- getOperation
    case operation of
      Push n -> atomicModifyIORef stRef (\ns -> (n : ns, ()))

      Add    -> atomicModifyIORef
        stRef
        ( \ns -> case ns of
          (a:b:rest) -> (a + b : rest, ())
          _          -> (ns, ())
        )

      Multiply -> atomicModifyIORef
        stRef
        ( \ns -> case ns of
          (a:b:rest) -> (a * b : rest, ())
          _          -> (ns, ())
        )

      Top notifyFn -> do
        result <- readIORef stRef
        case result of
          []    -> notifyFn Nothing
          (a:_) -> notifyFn (Just a)

      Read notifyFn -> do
        st <- readIORef stRef
        notifyFn st


main :: IO ()
main = do
  calculatorOpsQueue <- newTQueueIO
  let (readOperation, sendOperation) =
        ( atomically $ readTQueue calculatorOpsQueue
        , atomically . writeTQueue calculatorOpsQueue
        )

  myWorker <- buildCalculatorWorker "/tmp/calculator-state.bin" readOperation

  capataz  <- forkCapataz "stateful-worker-example"
                          (set supervisorProcessSpecListL [myWorker])

  sendOperation (Read print)
  sendOperation (Push 2)
  sendOperation (Push 2)
  sendOperation Add
  sendOperation (Top print)
  sendOperation (Push 8)
  sendOperation Multiply
  sendOperation (Top print)

  threadDelay 500

  result <- teardown capataz
  print result

results matching ""

    No results matching ""