Git repository synchronizer
We will build a CLI application that automatically synchronizes files in a git repository with its remote server whenever they are modified. To do this, we will keep track of file modifications using the iNotify UNIX API.
For this tutorial, we assume the reader is familiar with:
- Stack projects
- GHC Extensions
- Shelly library
- Haskell concurrency (threads and STM)
If you are not familiar with the topics above, we recommend reading the tutorial while looking information from the given links.
What you'll learn:
- How to create supervision trees using Capataz
- How to use the
ComponentM
monad from the teardown library - How to use the shelly library to run commands
- How to use RIO for logging
Summary of what our program will do
Our CLI program will receive various strings as CLI arguments; each argument represents a file path that points to a git repository which we want to synchronize with a remote repository automatically; by just saving a file a commit should be created; also, our repository will sync with its remote host in a regular basis.
You can find the code for this tutorial in the examples
directory of the project's Github repository.
Setting up the stage - Modeling our domain problem as a supervisor tree
We first define a diagram of how our supervision tree is going to look like:
capataz-system/
├── logger-worker -- ^ performs logging to a console
└── repository-supervisor(*)
├── git-worker -- ^ performs all git operations on a repository
├── repo-file-watcher -- ^ monitors changes on files
└── sync-interval-worker -- ^ notifies every interval of time an event
Our Capataz' root supervisor tree is composed of a worker green thread responsible for logging, and at least one (1) repository-supervisor
that monitors two (2) worker threads, one responsible for sync interval (to schedule git pull/push commands) and one to execute git commands.
Why this organization? We want to make sure that each repository is self-contained, if one of them is faulty, we wouldn't want to restart the whole application, only the workers related to that repository. We can go as far as to add an AllForOne
strategy at the repository-supervisor
level.
How do our workers communicate?
Given Capataz does not enforce a message passing communication scheme, we can use whatever we want to communicate our workers, from third-party services like SQS to external processes like RabbitMQ or Redis, to memory STM Queues and MVars.
Given we don't care about messages getting lost in case of catastrophic failure, in this example we are going to use STM
channels (TChan
) to communicate threads between them.
Implementing our supervision tree
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE TypeSynonymInstances #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE NoImplicitPrelude #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
module Lib where
import qualified Prelude
import RIO -- (1)
import qualified RIO.Logger as Logger
import Control.Monad.Component (ComponentM)
import Control.Concurrent (threadDelay)
import Control.Concurrent.Capataz (Capataz, ProcessSpec)
import Control.Concurrent.STM (orElse)
import System.Environment (getArgs)
import qualified RIO.Text as Text
import qualified Control.Monad.Component as Component
import qualified Control.Concurrent.STM.TChan as TChan
import qualified Control.Concurrent.Capataz as Capataz
import qualified Shelly -- (1)
import qualified System.INotify as INotify -- (1)
(1)
For this example we use RIO
as our custom Prelude, we use shelly
to execute CLI commands from Haskell, and we use hinotify
to keep track of files changes.
--------------------------------------------------------------------------------
-- | An instance of HasLogFunc for a LogFunc, I create this instance given
-- there is no obvious way to use a LogFunc in a reader. I'm confident there
-- must be a better way to do this. For the sake of the tutorial, this is a
-- non-essential implementation detail.
instance Logger.HasLogFunc Logger.LogFunc where
logFuncL = id
-- | Utility function that runs a component builder and uses it's result
-- on a callback on a safe way.
withComponent :: Text -> ComponentM a -> (a -> IO ()) -> IO ()
withComponent !appDesc !buildComponent !f = mask $ \restore -> do
component <- Component.runComponentM appDesc buildComponent
(restore . f $ Component.fromComponent component)
`finally`
(Component.teardown component >>= Prelude.print)
--------------------------------------------------------------------------------
-- | A combination of both file modification events and git repository
-- synchronization.
data WatcherMsg -- (2)
-- | Message triggered when monitored file changes
= FileChanged !FilePath
-- | Message triggered when a sync request is made
| SyncRequested
(2)
We use a record that helps us notify threads when an interval period is met (SyncRequest
) or when a monitored file has changed (triggered by hinotify
).
--------------------------------------------------------------------------------
-- | Returns an INotify descriptor, necessary to build watches for a directory
buildINotify :: ComponentM INotify.INotify
buildINotify = Component.buildComponentWithCleanup $ do
inotify <- INotify.initINotify
return (inotify, ("inotify descriptor", INotify.killINotify inotify))
-- | Returns an STM sub-routine that returns a filepath that has been modified,
-- this sub-routine retries until such change happens in the filesystem.
buildFileWatcher
:: INotify.INotify
-> (FilePath -> IO ())
-> FilePath -- ^ Directory where changes are tracked
-> IO (IO ())
buildFileWatcher inotify notifyFileChange !dir = do
-- (3)
fileWatch <- INotify.addWatch inotify [INotify.CloseWrite, INotify.Modify] dir $ \ev -> do
case ev of
INotify.Modified {INotify.isDirectory, INotify.maybeFilePath}
-- we ignore all changes that happen to a Directory
| isDirectory -> return ()
| otherwise ->
maybe (return ()) notifyFileChange maybeFilePath
_ -> return ()
return (INotify.removeWatch fileWatch)
(3)
Using the INotify.addWatch
function, we can register what files we are interested in receiving events on modification; we also need to provide a callback that writes the INotify
events to a TChan
that we created in step (4)
-- | Returns both an STM sub-routine that blocks until a given period
-- has passed, and a "ProcessSpec" for supervision of this interval thread.
buildIntervalWorker
:: Text -- ^ Name of the worker process
-> Int -- ^ Number of seconds between notifications
-> ComponentM (STM (), ProcessSpec)
buildIntervalWorker !workerName !delaySeconds = Component.buildComponent $ do
-- (4)
intervalChan <- TChan.newTChanIO
let
triggerEvent :: IO ()
triggerEvent = forever $ do
threadDelay (delaySeconds * 1000100)
atomically $ TChan.writeTChan intervalChan ()
-- (5)
intervalSpec :: Capataz.ProcessSpec
intervalSpec =
Capataz.workerSpec workerName triggerEvent
(set Capataz.workerRestartStrategyL Capataz.Permanent)
return (TChan.readTChan intervalChan, intervalSpec)
(4)
We use a TChan
to communicate to other interesting threads that a git synchronization should happen
(5)
We build a ProcessSpec
using the workerSpec
function; this worker will emit a signal after sleeping for a few seconds. Note, we use the workerRestartStrategyL
lens to override the default options of a new worker.
-- | Builds a "ProcessSpec" that monitors a green thread that receives messages
-- from the given "WatcherMsg" notifier, it receives a path where the git
-- repository is.
buildGitWorker
:: FilePath -- ^ Location of git repository where changes are kept
-> IO WatcherMsg -- ^ An IO sub-routine that gets triggered everytime a
-- WatcherMsg happens
-> ProcessSpec
buildGitWorker !repoPath !getWatcherMsg =
let
executeCmd :: IO ()
executeCmd = forever $ do
msg <- getWatcherMsg
case msg of
FileChanged {} ->
Shelly.shelly
$ Shelly.chdir (Shelly.fromText $ Text.pack repoPath)
$ do Shelly.run_ "git" ["add", "."]
Shelly.run_ "git" ["commit", "-m", "file changes"]
SyncRequested -> do
Shelly.shelly
$ Shelly.chdir (Shelly.fromText $ Text.pack repoPath)
$ do Shelly.run_ "git" ["pull", "-r", "origin", "master"]
Shelly.run_ "git" ["push", "origin", "master"]
in
-- (6)
Capataz.workerSpec "git-worker" executeCmd
(set Capataz.workerRestartStrategyL Capataz.Permanent)
(6)
We build a ProcessSpec
using the workerSpec
function; this worker receives notifications and performs bash operations using the Shelly
API.
-- | Returns both an utility function for logging and a "ProcessSpec" to
-- supervise a thread that receives log messages and displays them to stdout.
buildEventLogger :: ComponentM (DisplayBuilder -> IO (), ProcessSpec)
buildEventLogger = Component.buildComponent $ do
logChan <- TChan.newTChanIO
let
logOptions :: Logger.LogOptions
logOptions =
Logger.LogOptions
{
Logger.logMinLevel = Logger.LevelDebug
, Logger.logVerboseFormat = True
, Logger.logTerminal = True
, Logger.logUseTime = True
, Logger.logUseColor = True
, Logger.logUseUnicode = True
}
logLoop :: IO ()
logLoop = Logger.withStickyLogger logOptions $ \logger -> do
flip runReaderT logger $ forever $ do
bs <- liftIO $ atomically $ TChan.readTChan logChan
Logger.logDebug bs
return (
atomically . TChan.writeTChan logChan
-- (7)
, Capataz.workerSpec "logger" logLoop
(set Capataz.workerRestartStrategyL Capataz.Permanent)
)
(7)
As with the previous examples, we create a ProcessSpec
with the workerSpec
function; this worker listens to a channel for messages to print to stdout
.
-- | Creates a RepoWatcher supervisor, which is composed by:
--
-- * A file watcher
-- * An interval worker
-- * A git worker
--
-- NOTE: when we restart our repo file watcher, we need to make sure that our
-- watch gets restarted as well.
buildRepoFileWatcher :: INotify.INotify -> FilePath -> ComponentM ProcessSpec
buildRepoFileWatcher !inotify !repoDir = do
-- We create functions that workers will use to communicate between each
-- other
changesChan <- liftIO $ TChan.newTChanIO
let notifyFileChange = atomically . TChan.writeTChan changesChan
onFileChange = TChan.readTChan changesChan
fileWatchCleanupRef <- liftIO $ do
fileWatchCleanup <- buildFileWatcher inotify notifyFileChange repoDir
newIORef fileWatchCleanup
(onSync, syncIntervalSpec) <- buildIntervalWorker "git-sync-interval" (60 * 2)
let
-- We compose both Sync interval requests and file changes notifications
onMsg :: IO WatcherMsg
onMsg =
atomically
$ (FileChanged <$> onFileChange)
`orElse` (onSync $> SyncRequested)
cleanupWatch :: IO ()
cleanupWatch =
-- Invokes the `IO ()` operation contained inside our `IORef`
join (readIORef fileWatchCleanupRef)
-- We restart the inotify watch when supervisor fails; We mask to make sure
-- that our ref is not corrupted with async exceptions
onRepoWatcherFailure :: IO ()
onRepoWatcherFailure = mask $ \unmask -> do
unmask cleanupWatch
fileWatchCleanup <- buildFileWatcher inotify notifyFileChange repoDir
writeIORef fileWatchCleanupRef fileWatchCleanup
gitWorkerSpec :: ProcessSpec
gitWorkerSpec =
buildGitWorker repoDir onMsg
-- (8)
Component.buildComponentWithCleanup
$ return
$ (
-- (9)
Capataz.supervisorSpec ("repo-file-watcher:" <> Text.pack repoDir)
( set Capataz.supervisorRestartStrategyL Capataz.AllForOne -- (10)
. set Capataz.supervisorOnFailureL (const $ onRepoWatcherFailure) -- (11)
. set Capataz.supervisorProcessSpecListL [gitWorkerSpec, syncIntervalSpec]
)
, ("repo-file-watcher:" <> Text.pack repoDir, cleanupWatch)
)
(8)
The Component.buildComponentWithCleanup
allows us to allocate resources, returning a value from this allocation (say, a supervisor spec) and a named IO ()
sub-routine that gets composed with other cleanup tasks by the ComponentM
monad.
(9)
With the supervisorSpec
function, we create a supervisor ProcessSpec
. Note we use the supervisorProcessSpecListL
lens to create a static supervision tree; we use the ProcessSpec
created on steps (7)
and (8)
. This supervisor will start a git-worker
and a sync-interval-worker
process on startup, and it will monitor and restart workers when they fail.
(10)
We use an AllForOne
strategy, meaning, if either gitWorkerSpec
or syncIntervalSpec
fail, the other is going to be restarted.
(11)
We use the supervisorOnFailureL
lens to override the repo supervisor failure callback, in here, we make sure we restart the INotify
watch of the repository.
-- | Creates a Capataz supervision tree which contains a RepoWatcher
-- supervisor per repository path
createRepoWatcherSystem :: [FilePath] -> ComponentM Capataz
createRepoWatcherSystem repoPathList = do
(logFn, loggerProcessSpec) <- buildEventLogger
inotify <- buildINotify
repoProcessSpecList <- mapM (buildRepoFileWatcher inotify) repoPathList
let
procList =
loggerProcessSpec:repoProcessSpecList
Component.buildComponentWithTeardown $ mask $ \_ -> do
-- (12)
capataz <- Capataz.forkCapataz "repo-watcher-capataz"
( set Capataz.onSystemEventL (logFn . displayShow)
. set Capataz.supervisorProcessSpecListL procList
)
-- (13)
return (capataz, Capataz.getCapatazTeardown capataz)
(12)
We connect all our components using the forkCapataz
function. The logger notification function is used as our onSystemEventL
callback so that we have proper logging around what our Capataz system is doing.
(13)
We return our Capataz record and a teardown
sub-routine.
main :: IO ()
main = do
input <- getArgs
case input of
[] ->
error "Expecting repository paths as inputs; got nothing."
repoPaths ->
-- (14)
withComponent ("repo-watcher-system")
(createRepoWatcherSystem repoPaths)
Capataz.joinCapatazThread -- (15)
(14)
We execute our ComponentM
sub-routine and provide a callback that receives the result from that sub-routine, also guaranteeing that all resources are cleaned up (even in the case of failure).
(15)
We use the joinCapatazThread
function, which allows us to connect both our current thread and the root supervisor thread and lock it until the Capataz root supervisor finishes its execution.
What have we accomplished
By now we have a pretty reliable program that will stay running smoothly even in errors scenarios like:
- We monitor a directory that is not a git repository
- The repository is not configured to have a remote host
- The repository gets deleted
Try it out!
1) Clone the capataz repository
2) Run make build
3) Run ./out/bin/repo-watcher <some-git-repo-directory>
4) Try modifying a file in the monitored repo and see how it perform a commit automatically.