Our greatest glory is not in never failing, but in rising every time we fail.
Confucius.
What are Supervision Trees?
A Supervision Tree is a design pattern where developers build applications as a composable group of nodes that form a tree.
flowchart TD Root --> workera["Worker A"] Root --> subsystemb["System B"] subsystemb --> workerb1["Worker B.1"] subsystemb --> workerb2["Worker B.2"] Root --> subsystemc["System C"] subsystemc --> workerc1["Worker C.1"] subsystemc --> workerc2["Worker C.2"]
By designing applications this way, we accomplish a few desirable properties:
-
Every sub-tree node is responsible of supervising it's children failures, restarting them when needed.
-
Every sub-tree node encapsulates a specific scope of the application.
-
Every leaf node encapsulates some business logic that may use resources shared across sibling nodes.
-
When we nest one sub-tree into another, we compose multiple sub-applications together in a way that facilitates splitting them apart when necessary (when doing microservices, for example.)
Why Restarts fix (most of) my problems?
You may find that one of the top 3 culprits of failing systems is invalid states at program runtime. It is pretty much inevitable, a system may be able to have infinite combinations of values in their runtime state, and some of these combinations are not valid or expected, making our systems crash with broken assumptions.
A way to deal with this issue is to make invalid states impossible, but that is a costly endeavour, more so in dynamic typed languages or languages that fill up empty values with "sane defaults" for the sake of simplicity.
When we develop software, we start from a state that is known to work, and our system would later evolve into an invalid one, crashing again. By restarting our components, we can guarantee that when your application encounters an error state, it will reset itself to a healthy state as soon as possible.
Later, we can assess through error reports what went wrong and fix our code so that it doesn't crash unnecessarily. The main idea is to let our applications crash fast to be back in business again.
The folks at Ericson had great success with this approach by designing Erlang to have this feature front-and-center via Supervision Trees.
Long Term Goal
Capataz wants to offer a supervision tree specification that can be implemented across multiple programming languages and provide cross-language test-suites.
go-Capataz
go-Capataz
is an implementation of
the Capataz API in Golang. To learn more about how to use this library, check
go-Capataz
's
documentation
or follow the examples in this guide.
NOTE: All examples bellow are for go-capataz v0.2.x
Drop-in Replacement Example
A few notes:
This example is not managing state properly for the sake of simplicity/learning.
Using a Supervision Tree on small applications could be overkill; ensure that the inherited complexity of Supervision Trees is not higher than the complexity of your program.
Using goroutines
We could use Capataz as a drop-in replacement for regular go
statements.
Imagine we have a simple application that spawns two goroutines that form a
producer
⟷ consumer
relationship. We are going to add an arbitrary input
error to showcase the restart capabilities of Capataz; every time the user
writes "broccoli" the system is going to crash.
package main
import (
"bufio"
"fmt"
"os"
"sync"
)
func main() {
buff := make(chan interface{})
var wg sync.WaitGroup
// Routine that gets a value from somewhere
wg.Add(1)
go func() {
defer wg.Done()
reader := bufio.NewReader(os.Stdin)
for {
text, err := reader.ReadString('\n')
if text == "broccoli\n" {
panic("I do not like broccoli")
}
if err != nil {
panic(err)
}
buff <- text
}
}()
// Routine that consumes values for something else
wg.Add(1)
go func() {
defer wg.Done()
for msg := range buff {
fmt.Printf("received msg: %s\n", msg)
}
}()
wg.Wait()
}
When running this software, the program crashes as soon as you say broccoli:
$ ./example_1
hello
received msg: hello
world
received msg: world
broccoli
panic: I do not like broccoli
goroutine 6 [running]:
main.main.func1(0xc000026220, 0xc000024360)
/home/capataz/example/main.go:22 +0x26d
created by main.main
/home/capataz/example/main.go:16 +0x9c
Using Capataz
We can use a supervision tree value to get the same behavior, but on a supervised way:
package main
import (
"bufio"
"context"
"fmt"
"os"
"github.com/capatazlib/go-capataz/cap"
)
func main() {
buff := make(chan interface{})
// Replace a go statement with cap.NewWorker call. Note cap.NewWorker can
// receive multiple configuration options, so make sure to check it's
// godoc documentation
producer := cap.NewWorker("producer", func(ctx context.Context) error {
// NOTE goroutines in capataz must always deal with a context
// to account for termination signals, and can return an error to note
// that they failed signaling a message to it's supervisor
reader := bufio.NewReader(os.Stdin)
for {
select {
case <-ctx.Done():
return nil
default:
text, err := reader.ReadString('\n')
if text == "broccoli\n" {
panic("I do not like broccoli")
}
if err != nil {
panic(err)
}
select {
case <-ctx.Done():
return nil
case buff <- text:
}
}
}
})
// Replace go statement with cap.NewWorker call
consumer := cap.NewWorker("consumer", func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case msg := <-buff:
fmt.Printf("received msg: %s\n", msg)
}
}
})
// Replace `sync.WaitGroup` with `cap.NewSupervisorSpec`, like
// `cap.NewWorker`, this function may receive multiple configuration
// options, check its godoc documentation for more details.
spec := cap.NewSupervisorSpec("root", cap.WithNodes(producer, consumer))
// The code above has not spawned any goroutines yet, but it wired up
// all the nodes the application needs to function in a static way
// The Start method triggers the spawning of all the nodes in the
// supervision tree in pre-order, first producer, then consumer, and
// finally root (our supervisor).
appCtx := context.Background()
supervisor, startErr := spec.Start(appCtx)
if startErr != nil {
panic(startErr)
}
// Join the current goroutine with the supervisor goroutine, in the
// situation there is a termination error, it will be notified here.
terminationErr := supervisor.Wait()
if terminationErr != nil {
fmt.Printf("terminated with errors: %v", terminationErr)
}
}
In this application, we have an improved management of the lifecycle of our
goroutines via the enforcement of a context.Context
value, we also survive the
dreadful broccoli:
$ ./example_1
hello
received msg: hello
world
received msg: world
broccoli // <--- no crash
hello
received msg: hello
Errors shouldn't go to the void
If you are like us, you probably hate errors getting completely ignored. We are
able to capture errors that happen in our supervision tree using the
EventNotifier API. Let us change the code above slightly by adding an extra
argument to the NewSupervisorSpec
call.
func main() {
// ...
logEvent := func(ev cap.Event) {
fmt.Fprintf(os.Stderr, "%v\n", ev)
}
spec := cap.NewSupervisorSpec(
"root",
cap.WithNodes(producer, consumer),
cap.WithNotifier(logEvent),
)
// ...
}
If we run our application again, we get a better idea of what is going on under the covers:
$ ./example_1
Event{created: 2021-10-15 17:14:29.116512659 -0700 PDT m=+0.000185322, tag: ProcessStarted, nodeTag: Worker, processRuntime: root/producer}
Event{created: 2021-10-15 17:14:29.116846811 -0700 PDT m=+0.000519467, tag: ProcessStarted, nodeTag: Worker, processRuntime: root/consumer}
Event{created: 2021-10-15 17:14:29.116860487 -0700 PDT m=+0.000533142, tag: ProcessStarted, nodeTag: Supervisor, processRuntime: root}
hello
received msg: hello
world
received msg: world
broccoli // <-- crash
Event{created: 2021-10-15 17:14:42.265600375 -0700 PDT m=+13.149273244, tag: ProcessFailed, nodeTag: Worker, processRuntime: root/producer, err: panic error: I do not like broccoli}
Event{created: 2021-10-15 17:14:42.265932578 -0700 PDT m=+13.149605300, tag: ProcessStarted, nodeTag: Worker, processRuntime: root/producer}
hello again // <-- back to business
received msg: hello again
What did we learn
-
We may use a
NewSupervisorSpec
+NewWorker
in favor ofgo
routines if we want to restart them on failure. -
Use the
cap.WithNotifier
to get full visibility on what is going on inside the supervision tree. -
Use the
context.Context
API to manage the lifecycle of worker nodes.
Next, we are going to initialize and manage our state inside the root Supervisor runtime.
State Management Example
Note: This example is a very contrived one. The goal is to show the capabilities of the library with easy to understand code, not one that would make sense.
In our previous example, we implemented a producer
⟷
consumer
program that crashed when we mentioned broccoli.
The previous implementation is good enough given the channel used for communication between our goroutines (shared resource) is never closed. But what would happen if because of some complicated business logic, our channel gets closed (invalid state)?
Let us close the channel when we receive the word "cucumber"
package main
import (
"bufio"
"context"
"fmt"
"os"
"github.com/capatazlib/go-capataz/cap"
)
func main() {
buff := make(chan interface{})
producer := cap.NewWorker("producer", func(ctx context.Context) error {
reader := bufio.NewReader(os.Stdin)
for {
select {
case <-ctx.Done():
return nil
default:
text, err := reader.ReadString('\n')
if text == "broccoli\n" {
panic("I do not like broccoli")
}
// Execute "feature" that closes the communication channel
if text == "cucumber\n" {
close(buff)
}
if err != nil {
panic(err)
}
select {
case <-ctx.Done():
return nil
case buff <- text:
}
}
}
})
consumer := cap.NewWorker("consumer", func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case msg := <-buff:
fmt.Printf("received msg: %s\n", msg)
}
}
})
logEvent := func(ev cap.Event) {
fmt.Fprintf(os.Stderr, "%v\n", ev)
}
spec := cap.NewSupervisorSpec(
"root",
cap.WithNodes(producer, consumer),
cap.WithNotifier(logEvent),
)
appCtx := context.Background()
supervisor, startErr := spec.Start(appCtx)
if startErr != nil {
panic(startErr)
}
terminationErr := supervisor.Wait()
if terminationErr != nil {
fmt.Printf("terminated with errors: %v", terminationErr)
}
}
What do you think is going to happen? Try to guess before looking at the output bellow.
When we run this program, we get a very interesting output:
$ ./example_2
Event{created: 2021-10-18 14:01:58.377600448 -0700 PDT m=+0.000352428, tag: ProcessStarted, nodeTag: Worker, processRuntime: root/producer}
Event{created: 2021-10-18 14:01:58.378461922 -0700 PDT m=+0.001213907, tag: ProcessStarted, nodeTag: Worker, processRuntime: root/consumer}
Event{created: 2021-10-18 14:01:58.378506302 -0700 PDT m=+0.001258282, tag: ProcessStarted, nodeTag: Supervisor, processRuntime: root}
hello
received msg: hello
world
received msg: world
broccoli
Event{created: 2021-10-18 14:02:07.077565077 -0700 PDT m=+8.700317164, tag: ProcessFailed, nodeTag: Worker, processRuntime: root/producer, err: panic error: I do not like broccoli}
Event{created: 2021-10-18 14:02:07.078041552 -0700 PDT m=+8.700793536, tag: ProcessStarted, nodeTag: Worker, processRuntime: root/producer}
cucumber
received msg: %!s(<nil>)
received msg: %!s(<nil>)
... Message repeated infinitely
<Ctrl-C>
Oh no, we implemented an infinite loop that reads a closed channel. Maybe if we check that the channel is closed and terminate the worker, things are going to work:
package main
import (
"errors"
// ...
)
func main() {
// ...
consumer := cap.NewWorker("consumer", func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case msg, ok := <-buff:
if !ok {
return errors.New("consumer chan is closed")
}
fmt.Printf("received msg: %s\n", msg)
}
}
})
// ...
}
If we run this program again, is the application going to recover? Let's see:
$ ./example_2
Event{created: 2021-10-18 14:05:34.29107699 -0700 PDT m=+0.000542504, tag: ProcessStarted, nodeTag: Worker, processRuntime: root/producer}
Event{created: 2021-10-18 14:05:34.292069745 -0700 PDT m=+0.001535241, tag: ProcessStarted, nodeTag: Worker, processRuntime: root/consumer}
Event{created: 2021-10-18 14:05:34.292111319 -0700 PDT m=+0.001576815, tag: ProcessStarted, nodeTag: Supervisor, processRuntime: root}
hello
received msg: hello
world
received msg: world
cucumber
Event{created: 2021-10-18 14:05:38.80534924 -0700 PDT m=+4.514814712, tag: ProcessFailed, nodeTag: Worker, processRuntime: root/consumer, err: Consumer is not there}
Event{created: 2021-10-18 14:05:38.805445809 -0700 PDT m=+4.514911246, tag: ProcessStarted, nodeTag: Worker, processRuntime: root/consumer}
Event{created: 2021-10-18 14:05:38.805459842 -0700 PDT m=+4.514925271, tag: ProcessFailed, nodeTag: Worker, processRuntime: root/producer, err: send on closed channel}
Event{created: 2021-10-18 14:05:38.805490588 -0700 PDT m=+4.514956018, tag: ProcessFailed, nodeTag: Worker, processRuntime: root/consumer, err: Consumer is not there}
Event{created: 2021-10-18 14:05:38.805519651 -0700 PDT m=+4.514985081, tag: ProcessFailed, nodeTag: Supervisor, processRuntime: root, err: supervisor crashed due to restart tolerance surpassed}
terminated with errors: supervisor crashed due to restart tolerance surpassed
Nope, still bad, although better because now we are failing fast. What's happening here? The application gets in a state that it cannot recover from because the channel was created outside the supervision tree. Whenever the supervisor restarts, it is keeping the old state around, which defeats the purpose of Supervision Trees.
For this use cases where we have a shared state among many workers of a
Supervisor, we use the cap.BuildNodesFn
function to build all the shared
resources and the supervised workers that are going to use them.
package main
import (
"bufio"
"context"
"errors"
"fmt"
"os"
"github.com/capatazlib/go-capataz/cap"
)
func main() {
logEvent := func(ev cap.Event) {
fmt.Fprintf(os.Stderr, "%v\n", ev)
}
spec := cap.NewSupervisorSpec(
"root",
// Provide a custom cap.BuildNodesFn that:
//
// * Setups all the resources and workers in a (re)start callback.
//
// * When allocating resources, return a `cap.CleanupFn` that
// closes them when the supervisor is terminated.
//
// * If there is an error allocating a resource, return the error
// and _fail fast_.
//
func() (workers []cap.Node, cleanupFn func() error, resourceAcquErr error) {
// Create buff, producer and consumer inside the cap.BuildNodesFn
buff := make(chan interface{})
producer := cap.NewWorker("producer", func(ctx context.Context) error {
// previous implementation
})
consumer := cap.NewWorker("consumer", func(ctx context.Context) error {
// previous implementation
})
// There is no resource allocation, so no need to
// perform a cleanup
cleanupFn = func() (resourceCleanupErr error) {
return nil
}
return []cap.Node{producer, consumer}, cleanupFn, nil
},
cap.WithNotifier(logEvent),
)
appCtx := context.Background()
supervisor, startErr := spec.Start(appCtx)
if startErr != nil {
panic(startErr)
}
terminationErr := supervisor.Wait()
if terminationErr != nil {
fmt.Printf("terminated with errors: %v", terminationErr)
}
}
What do you think, does it work now?
$ ./example_2
Event{created: 2021-10-18 14:29:42.772181434 -0700 PDT m=+0.000405612, tag: ProcessStarted, nodeTag: Worker, processRuntime: root/producer}
Event{created: 2021-10-18 14:29:42.772920751 -0700 PDT m=+0.001144916, tag: ProcessStarted, nodeTag: Worker, processRuntime: root/consumer}
Event{created: 2021-10-18 14:29:42.772961871 -0700 PDT m=+0.001186054, tag: ProcessStarted, nodeTag: Supervisor, processRuntime: root}
hello
received msg: hello
cucumber
Event{created: 2021-10-18 14:29:45.167407521 -0700 PDT m=+2.395631796, tag: ProcessFailed, nodeTag: Worker, processRuntime: root/consumer, err: consumer chan is closed}
Event{created: 2021-10-18 14:29:45.167749604 -0700 PDT m=+2.395973777, tag: ProcessStarted, nodeTag: Worker, processRuntime: root/consumer}
Event{created: 2021-10-18 14:29:45.167792201 -0700 PDT m=+2.396016366, tag: ProcessFailed, nodeTag: Worker, processRuntime: root/producer, err: send on closed channel}
Event{created: 2021-10-18 14:29:45.167871787 -0700 PDT m=+2.396095961, tag: ProcessFailed, nodeTag: Worker, processRuntime: root/consumer, err: consumer chan is closed}
Event{created: 2021-10-18 14:29:45.167935431 -0700 PDT m=+2.396159596, tag: ProcessFailed, nodeTag: Supervisor, processRuntime: root, err: supervisor crashed due to restart tolerance surpassed}
terminated with errors: supervisor crashed due to restart tolerance surpassed
Not quite.
How come our cap.BuildNodesFn
doesn't get called?
When a Supervisor detects to many restarts from its children, it terminates all the running children, cleans its allocated resources and terminates with an error. If no other supervisor is there to catch it, the application terminates with that error.
So, who supervises the supervisor? Another supervisor at the top of course.
package main
import (
"bufio"
"context"
"errors"
"fmt"
"os"
"github.com/capatazlib/go-capataz/cap"
)
func main() {
logEvent := func(ev cap.Event) {
fmt.Fprintf(os.Stderr, "%v\n", ev)
}
// Make this a subSystem variable that gets used as a child node of a bigger
// supervision tree.
subSystem := cap.NewSupervisorSpec(
"producer-consumer",
func() (workers []cap.Node, cleanupFn func() error, resourceAcquErr error) {
buff := make(chan interface{})
producer := cap.NewWorker("producer", func(ctx context.Context) error {
// ...
})
consumer := cap.NewWorker("consumer", func(ctx context.Context) error {
// ...
})
cleanupFn = func() (resourceCleanupErr error) {
return nil
}
return []cap.Node{producer, consumer}, cleanupFn, nil
},
)
spec := cap.NewSupervisorSpec(
"root",
cap.WithNodes(
// Use `cap.Subtree` to insert this subSystem in our
// application
cap.Subtree(subSystem),
),
// Keep the notifier at the top of the supervision tree
cap.WithNotifier(logEvent),
)
appCtx := context.Background()
supervisor, startErr := spec.Start(appCtx)
if startErr != nil {
panic(startErr)
}
terminationErr := supervisor.Wait()
if terminationErr != nil {
fmt.Printf("terminated with errors: %v", terminationErr)
}
}
Now, let's see if we finally could make this very contrived application reliable:
$ ./example_2
Event{created: 2021-10-18 14:40:02.952526855 -0700 PDT m=+0.000340334, tag: ProcessStarted, nodeTag: Worker, processRuntime: root/producer-consumer/producer}
Event{created: 2021-10-18 14:40:02.952935936 -0700 PDT m=+0.000749413, tag: ProcessStarted, nodeTag: Worker, processRuntime: root/producer-consumer/consumer}
Event{created: 2021-10-18 14:40:02.952954676 -0700 PDT m=+0.000768141, tag: ProcessStarted, nodeTag: Supervisor, processRuntime: root/producer-consumer}
Event{created: 2021-10-18 14:40:02.952967745 -0700 PDT m=+0.000781216, tag: ProcessStarted, nodeTag: Supervisor, processRuntime: root}
hello
received msg: hello
world
received msg: world
cucumber // Crash
Event{created: 2021-10-18 14:40:10.169566747 -0700 PDT m=+7.217380249, tag: ProcessFailed, nodeTag: Worker, processRuntime: root/producer-consumer/consumer, err: consumer chan is closed}
Event{created: 2021-10-18 14:40:10.169711812 -0700 PDT m=+7.217525283, tag: ProcessStarted, nodeTag: Worker, processRuntime: root/producer-consumer/consumer}
Event{created: 2021-10-18 14:40:10.169732943 -0700 PDT m=+7.217546404, tag: ProcessFailed, nodeTag: Worker, processRuntime: root/producer-consumer/producer, err: send on closed channel}
Event{created: 2021-10-18 14:40:10.169781719 -0700 PDT m=+7.217595185, tag: ProcessFailed, nodeTag: Worker, processRuntime: root/producer-consumer/consumer, err: consumer chan is closed}
Event{created: 2021-10-18 14:40:10.169816451 -0700 PDT m=+7.217629933, tag: ProcessFailed, nodeTag: Supervisor, processRuntime: root/producer-consumer, err: supervisor crashed due to restart tolerance surpassed}
Event{created: 2021-10-18 14:40:10.169952496 -0700 PDT m=+7.217765976, tag: ProcessStarted, nodeTag: Worker, processRuntime: root/producer-consumer/producer}
Event{created: 2021-10-18 14:40:10.170026745 -0700 PDT m=+7.217840202, tag: ProcessStarted, nodeTag: Worker, processRuntime: root/producer-consumer/consumer}
Event{created: 2021-10-18 14:40:10.170038109 -0700 PDT m=+7.217851568, tag: ProcessStarted, nodeTag: Supervisor, processRuntime: root/producer-consumer}
hello // Back in business
received msg: hello
Fresh like a cucumber.
What did we learn
-
Having a supervision tree without resetting state is not a very useful supervision tree.
-
Use the
cap.BuildNodesFn
function to create tree nodes that use a shared resource. -
How the
cap.BuildNodesFn
allows us to return errors on start or termination. -
If we want to ensure a supervisor initialization gets called again, we need to supervise our supervisor from another supervisor :yawdawg:.
FAQ
Is Capataz production ready?
We are currently using go-Capataz in various production systems at DigitalOcean with great success.
There are implementations in various languages but these are not production ready and currently on (very slow paced) development.
Is Capataz an actor system like Akka?
No. Capataz doesn't enforce or prescribe the Actor model as a way to accomplish Supervision Trees. It encourages the usage of monitored threads/routines/processes as a way to encapsulate business logic; however, it doesn't impose any communication mechanisms between these threads.
I use systemd/k8s, is Capataz still useful for me?
It depends. You may want to avoid restarting your program when a single logical
component in your application is failing. For example. Let's assume that your
application contains three logical components A
, B
, C
.
flowchart TD Root --> nodea["A"] Root --> nodeb["B"] Root --> nodec["C"] style nodeb stroke:#f66,stroke-width:2px
In this context, component A
takes a considerate amount of time to build some
state (cache). You probably want to avoid crashing your application (resetting
that expensive state) when an error in component B
occurs.
With Capataz you can tune the granularity of your restart strategy, adding
bulkheads
around components A
, B
, and C
via supervision trees and restarting only
the component that is failing (B
in this example).
Should I use supervision trees everywhere?
It may be tempting, but no, we only recommend using Capataz in applications where multiple components run concurrently and could fail in unexpected ways.
For example, if your application runs a single server (gRPC, HTTP), there is a good change the server library already offers a reliable error handling strategy, and adding Capataz may be overkill.
Is it good to restart forever?
In most situations, no. You want to ensure Supervision trees have a restart tolerance that works for your application, so that it propagates failures up until the whole program gives up.
You want to do this for a couple of reasons:
-
It triggers error notifications on your system when recovery mechanisms (like restarting an invalid state) are not working (fail fast).
-
If logical components at the bottom of your tree are failing because some resource (allocated at the root of the supervisor) is in an invalid state, you want to make sure that restarts get propagated to the very top of the application.
By default, Capataz allows one (1) worker failure every five (5) seconds, if your business domain requires the tolerance of more errors, you need to tweak your supervision tree settings.
Remember, a supervision tree and a properly configured supervision tree are two different things.
Is Capataz a library or a framework?
It depends. If you use Capataz to spawn workers without relying on supervisors to manage the state of your application, we argue Capataz may be a drop-in library that replaces to your language's thread spawning mechanism.
It is unlikely that you will have a group of workers that do not communicate between each other in a useful application, so you would rely on Capataz to manage your resource lifecycles. This use-case moves the needle towards Capataz being a framework.
All of this said, we developed Capataz with the framework use-case in mind.
Are there Capataz extensions?
Unfortunately at the current time there are no open-source libraries that would integrate Capataz with other observability tools like logging, Prometheus or any visualization tool.