Skip to content
Snippets Groups Projects
Commit c1887e0d authored by Lars Seipel's avatar Lars Seipel
Browse files

initial import

parents
No related branches found
No related tags found
No related merge requests found
# behealthy
Check health of some backend and execute user-defined action on rise or fall.
We use this to control BGP announcement of the anycast addresses for log ingestion.
## Configuration
Checks are specified in a configuration file whose location is determined by
the `BEH_CONFIG` environment variable (`/etc/behealth.yaml` if unset).
```yaml
---
checks:
- addr: fbi.h-da.de:443
scheme: https
method: HEAD
path: /
# inter is the duration between consecutive checks
inter: 3s
# rise specifies the number of successful consecutive
# checks before running up
rise: 3
# fall specifies the number of failed consecutive checks
# before running down
fall: 3
# if we don't have a response before timeout elapses,
# fail the check
timeout: 1s
# up is run when transitioning to up state ($rise
# successful checks)
up: ['/bin/echo', 'UP']
# down is run when transitioning to down state ($fall
# failed checks)
down: ['/bin/echo', 'DOWN']
# We try to run exit on program shutdown. There's no
# guarantee it will run to completion (or run at all).
exit: ['/bin/echo', 'EXIT']
```
go.mod 0 → 100644
module code.fbi.h-da.de/hdacloud/behealthy
go 1.21.1
require (
go.uber.org/zap v1.26.0
gopkg.in/yaml.v3 v3.0.1
)
require go.uber.org/multierr v1.11.0 // indirect
health.go 0 → 100644
package main
import (
"context"
"fmt"
"io"
"net/http"
"net/url"
"time"
"go.uber.org/zap"
)
type healthChecker struct {
http *http.Client
log *zap.Logger
}
func (c *healthChecker) run(ctx context.Context, conf checkConfig) error {
var nok, nfail int
tick := time.NewTicker(conf.Inter)
defer tick.Stop()
log := c.log.With(
zap.Int("check", conf.id),
)
for {
select {
case <-tick.C:
err := c.checkHealth(ctx, conf)
if err != nil {
nok = 0
nfail++
} else {
nfail = 0
nok++
}
log.Info("run check",
zap.Error(err),
zap.Int("nok", nok),
zap.Int("rise", conf.Rise),
zap.Int("nfail", nfail),
zap.Int("fall", conf.Fall),
)
var what string
err = nil
if nok == conf.Rise {
what = "up"
err = runComm(ctx, conf.OnUp...)
}
if nfail == conf.Fall {
what = "down"
err = runComm(ctx, conf.OnDown...)
}
if err != nil {
return fmt.Errorf("%s action failed: %w", what, err)
}
case <-ctx.Done():
return ctx.Err()
}
}
}
func (c *healthChecker) checkHealth(ctx context.Context, conf checkConfig) error {
u := &url.URL{
Scheme: conf.Scheme,
Host: conf.Addr,
Path: conf.Path,
}
urlStr := u.String()
log := c.log.With(
zap.Int("check", conf.id),
zap.String("url", urlStr),
zap.String("method", conf.Method),
zap.Duration("timeout", conf.Timeout),
)
ctx, cancel := context.WithTimeout(ctx, conf.Timeout)
defer cancel()
log.Info("checking health")
req, err := http.NewRequestWithContext(ctx, conf.Method, urlStr, nil)
if err != nil {
log.Error("NewRequest", zap.Error(err))
return err
}
req.Header = conf.Headers.Clone()
resp, err := c.http.Do(req)
if err != nil {
log.Error("failure", zap.Error(err))
return err
}
defer drainAndClose(resp.Body)
if !statusOK(resp) {
log.Warn("failure", zap.Int("status", resp.StatusCode))
return fmt.Errorf("%s %s: got status %d",
conf.Method, urlStr, resp.StatusCode)
}
log.Info("success", zap.Int("status", resp.StatusCode))
return nil
}
func statusOK(r *http.Response) bool {
return 200 <= r.StatusCode && r.StatusCode < 300
}
func drainAndClose(r io.ReadCloser) error {
const maxReadBody = 1 << 20
_, _ = io.Copy(io.Discard, io.LimitReader(r, maxReadBody))
return r.Close()
}
main.go 0 → 100644
package main
import (
"context"
"crypto/tls"
"flag"
"fmt"
"net/http"
"os"
"os/exec"
"os/signal"
"sync"
"syscall"
"time"
"go.uber.org/zap"
"gopkg.in/yaml.v3"
)
var configFile = flag.String("config",
getEnvDefault("BEH_CONFIG", "/etc/behealthy.yaml"),
"configuration `file`")
type config struct {
ClientTimeout time.Duration `yaml:"client_timeout"`
Checks []checkConfig `yaml:"checks"`
}
type checkConfig struct {
id int
Addr string `yaml:"addr"`
Scheme string `yaml:"scheme"`
Method string `yaml:"method"`
Path string `yaml:"path"`
Headers http.Header `yaml:"headers"`
Timeout time.Duration `yaml:"timeout"`
Inter time.Duration `yaml:"inter"`
Rise int `yaml:"rise"`
Fall int `yaml:"fall"`
OnUp []string `yaml:"up"`
OnDown []string `yaml:"down"`
OnExit []string `yaml:"exit"`
}
var defaultConfig = config{
ClientTimeout: 5 * time.Second,
}
func main() {
flag.Parse()
logger, err := zap.NewProduction()
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
zap.RedirectStdLog(logger)
log := logger.Sugar()
configBuf, err := os.ReadFile(*configFile)
if err != nil {
log.Fatalf("read config: %v", err)
}
// Initialize from defaultConfig and let yaml.Unmarshal overwrite keys
// set in the YAML.
conf := defaultConfig
if err := yaml.Unmarshal(configBuf, &conf); err != nil {
log.Fatalf("unmarshal config: %v", err)
}
for i := range conf.Checks {
conf.Checks[i].id = i
}
tlsConf := &tls.Config{
// BUG(ls): we might want to allow for specifying TLS settings
// in configuration. Not required for our initial use-case, so
// leave it out for now.
InsecureSkipVerify: true,
}
httpClient := &http.Client{
Timeout: conf.ClientTimeout,
Transport: &http.Transport{
TLSClientConfig: tlsConf,
},
}
c := &healthChecker{
http: httpClient,
log: logger,
}
var shutdownWG sync.WaitGroup
shutdown := make(chan os.Signal, 1)
signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)
type checkFailure struct {
i int
err error
}
errc := make(chan checkFailure, len(conf.Checks))
ctx, cancel := context.WithCancel(context.Background())
for i, chk := range conf.Checks {
i, chk := i, chk
shutdownWG.Add(1)
go func() {
defer shutdownWG.Done()
if err := c.run(ctx, chk); err != nil {
errc <- checkFailure{i: i, err: err}
}
}()
}
select {
case e := <-errc:
log.Errorf("check %d died: %v", e.i, e.err)
cancel() // ask other checks to exit, too
case sig := <-shutdown:
log.Info(sig)
cancel()
}
// wait until all checks have returned
shutdownWG.Wait()
// allow 5s for running actions during shutdown
shCtx, shCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shCancel()
// now execute exit actions
for i, chk := range conf.Checks {
if len(chk.OnExit) == 0 {
continue
}
if err := runComm(shCtx, chk.OnExit...); err != nil {
log.Errorf("check %d exit action: %v", i, err)
}
}
}
func getEnvDefault(key, def string) string {
if v := os.Getenv(key); v != "" {
return v
}
return def
}
func runComm(ctx context.Context, argv ...string) error {
cmd := exec.CommandContext(ctx, argv[0], argv[1:]...)
return cmd.Run()
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment