From aaaad4d217058e273f3a5cfad04d4f8f9128628c Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Wed, 29 May 2019 18:31:06 -0700 Subject: [PATCH] Add health output plugin (#5882) --- internal/http.go | 45 ++++ internal/internal.go | 3 +- plugins/outputs/all/all.go | 1 + plugins/outputs/health/README.md | 61 ++++++ plugins/outputs/health/compares.go | 77 +++++++ plugins/outputs/health/compares_test.go | 268 ++++++++++++++++++++++++ plugins/outputs/health/contains.go | 19 ++ plugins/outputs/health/contains_test.go | 68 ++++++ plugins/outputs/health/health.go | 252 ++++++++++++++++++++++ plugins/outputs/health/health_test.go | 124 +++++++++++ 10 files changed, 917 insertions(+), 1 deletion(-) create mode 100644 internal/http.go create mode 100644 plugins/outputs/health/README.md create mode 100644 plugins/outputs/health/compares.go create mode 100644 plugins/outputs/health/compares_test.go create mode 100644 plugins/outputs/health/contains.go create mode 100644 plugins/outputs/health/contains_test.go create mode 100644 plugins/outputs/health/health.go create mode 100644 plugins/outputs/health/health_test.go diff --git a/internal/http.go b/internal/http.go new file mode 100644 index 000000000..230fdf2b7 --- /dev/null +++ b/internal/http.go @@ -0,0 +1,45 @@ +package internal + +import ( + "crypto/subtle" + "net/http" +) + +// ErrorFunc is a callback for writing an error response. +type ErrorFunc func(rw http.ResponseWriter, code int) + +// AuthHandler returns a http handler that requires HTTP basic auth +// credentials to match the given username and password. +func AuthHandler(username, password string, onError ErrorFunc) func(h http.Handler) http.Handler { + return func(h http.Handler) http.Handler { + return &basicAuthHandler{ + username: username, + password: password, + onError: onError, + next: h, + } + } + +} + +type basicAuthHandler struct { + username string + password string + onError ErrorFunc + next http.Handler +} + +func (h *basicAuthHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) { + if h.username != "" || h.password != "" { + reqUsername, reqPassword, ok := req.BasicAuth() + if !ok || + subtle.ConstantTimeCompare([]byte(reqUsername), []byte(h.username)) != 1 || + subtle.ConstantTimeCompare([]byte(reqPassword), []byte(h.password)) != 1 { + + h.onError(rw, http.StatusUnauthorized) + return + } + } + + h.next.ServeHTTP(rw, req) +} diff --git a/internal/internal.go b/internal/internal.go index ebb69db8a..c191eac94 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -64,7 +64,8 @@ func Version() string { // ProductToken returns a tag for Telegraf that can be used in user agents. func ProductToken() string { - return fmt.Sprintf("Telegraf/%s Go/%s", Version(), runtime.Version()) + return fmt.Sprintf("Telegraf/%s Go/%s", + Version(), strings.TrimPrefix(runtime.Version(), "go")) } // UnmarshalTOML parses the duration from the TOML config file diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index c29d05efb..f9dd73c44 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -14,6 +14,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/file" _ "github.com/influxdata/telegraf/plugins/outputs/graphite" _ "github.com/influxdata/telegraf/plugins/outputs/graylog" + _ "github.com/influxdata/telegraf/plugins/outputs/health" _ "github.com/influxdata/telegraf/plugins/outputs/http" _ "github.com/influxdata/telegraf/plugins/outputs/influxdb" _ "github.com/influxdata/telegraf/plugins/outputs/influxdb_v2" diff --git a/plugins/outputs/health/README.md b/plugins/outputs/health/README.md new file mode 100644 index 000000000..5ef30fd57 --- /dev/null +++ b/plugins/outputs/health/README.md @@ -0,0 +1,61 @@ +# Health Output Plugin + +The health plugin provides a HTTP health check resource that can be configured +to return a failure status code based on the value of a metric. + +When the plugin is healthy it will return a 200 response; when unhealthy it +will return a 503 response. The default state is healthy, one or more checks +must fail in order for the resource to enter the failed state. + +### Configuration +```toml +[[outputs.health]] + ## Address and port to listen on. + ## ex: service_address = "tcp://localhost:8080" + ## service_address = "unix:///var/run/telegraf-health.sock" + # service_address = "tcp://:8080" + + ## The maximum duration for reading the entire request. + # read_timeout = "5s" + ## The maximum duration for writing the entire response. + # write_timeout = "5s" + + ## Username and password to accept for HTTP basic authentication. + # basic_username = "user1" + # basic_password = "secret" + + ## Allowed CA certificates for client certificates. + # tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"] + + ## TLS server certificate and private key. + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + + ## One or more check sub-tables should be defined, it is also recommended to + ## use metric filtering to limit the metrics that flow into this output. + ## + ## When using the default buffer sizes, this example will fail when the + ## metric buffer is half full. + ## + ## namepass = ["internal_write"] + ## tagpass = { output = ["influxdb"] } + ## + ## [[outputs.health.compares]] + ## field = "buffer_size" + ## lt = 5000.0 + ## + ## [[outputs.health.contains]] + ## field = "buffer_size" +``` + +#### compares + +The `compares` check is used to assert basic mathematical relationships. Use +it by choosing a field key and one or more comparisons. All comparisons must +be true on all metrics for the check to pass. If the field is not found on a +metric no comparison will be made. + +#### contains + +The `contains` check can be used to require a field key to exist on at least +one metric. diff --git a/plugins/outputs/health/compares.go b/plugins/outputs/health/compares.go new file mode 100644 index 000000000..9228bd2df --- /dev/null +++ b/plugins/outputs/health/compares.go @@ -0,0 +1,77 @@ +package health + +import ( + "github.com/influxdata/telegraf" +) + +type Compares struct { + Field string `toml:"field"` + GT *float64 `toml:"gt"` + GE *float64 `toml:"ge"` + LT *float64 `toml:"lt"` + LE *float64 `toml:"le"` + EQ *float64 `toml:"eq"` + NE *float64 `toml:"ne"` +} + +func (c *Compares) runChecks(fv float64) bool { + if c.GT != nil && !(fv > *c.GT) { + return false + } + if c.GE != nil && !(fv >= *c.GE) { + return false + } + if c.LT != nil && !(fv < *c.LT) { + return false + } + if c.LE != nil && !(fv <= *c.LE) { + return false + } + if c.EQ != nil && !(fv == *c.EQ) { + return false + } + if c.NE != nil && !(fv != *c.NE) { + return false + } + return true +} + +func (c *Compares) Check(metrics []telegraf.Metric) bool { + success := true + for _, m := range metrics { + fv, ok := m.GetField(c.Field) + if !ok { + continue + } + + f, ok := asFloat(fv) + if !ok { + return false + } + + result := c.runChecks(f) + if !result { + success = false + } + } + return success +} + +func asFloat(fv interface{}) (float64, bool) { + switch v := fv.(type) { + case int64: + return float64(v), true + case float64: + return v, true + case uint64: + return float64(v), true + case bool: + if v { + return 1.0, true + } else { + return 0.0, true + } + default: + return 0.0, false + } +} diff --git a/plugins/outputs/health/compares_test.go b/plugins/outputs/health/compares_test.go new file mode 100644 index 000000000..26f0dc1e1 --- /dev/null +++ b/plugins/outputs/health/compares_test.go @@ -0,0 +1,268 @@ +package health_test + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/outputs/health" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func addr(v float64) *float64 { + return &v +} + +func TestFieldNotFoundIsSuccess(t *testing.T) { + metrics := []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{}, + time.Now()), + } + + compares := &health.Compares{ + Field: "time_idle", + GT: addr(42.0), + } + result := compares.Check(metrics) + require.True(t, result) +} + +func TestStringFieldIsFailure(t *testing.T) { + metrics := []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": "foo", + }, + time.Now()), + } + + compares := &health.Compares{ + Field: "time_idle", + GT: addr(42.0), + } + result := compares.Check(metrics) + require.False(t, result) +} + +func TestFloatConvert(t *testing.T) { + tests := []struct { + name string + metrics []telegraf.Metric + expected bool + }{ + { + name: "int64 field", + metrics: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": int64(42.0), + }, + time.Now()), + }, + expected: true, + }, + { + name: "uint64 field", + metrics: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": uint64(42.0), + }, + time.Now()), + }, + expected: true, + }, + { + name: "float64 field", + metrics: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": float64(42.0), + }, + time.Now()), + }, + expected: true, + }, + { + name: "bool field true", + metrics: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": true, + }, + time.Now()), + }, + expected: true, + }, + { + name: "bool field false", + metrics: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": false, + }, + time.Now()), + }, + expected: false, + }, + { + name: "string field", + metrics: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": "42.0", + }, + time.Now()), + }, + expected: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + compares := &health.Compares{ + Field: "time_idle", + GT: addr(0.0), + } + actual := compares.Check(tt.metrics) + require.Equal(t, tt.expected, actual) + }) + } +} + +func TestOperators(t *testing.T) { + tests := []struct { + name string + compares *health.Compares + expected bool + }{ + { + name: "gt", + compares: &health.Compares{ + Field: "time_idle", + GT: addr(41.0), + }, + expected: true, + }, + { + name: "not gt", + compares: &health.Compares{ + Field: "time_idle", + GT: addr(42.0), + }, + expected: false, + }, + { + name: "ge", + compares: &health.Compares{ + Field: "time_idle", + GE: addr(42.0), + }, + expected: true, + }, + { + name: "not ge", + compares: &health.Compares{ + Field: "time_idle", + GE: addr(43.0), + }, + expected: false, + }, + { + name: "lt", + compares: &health.Compares{ + Field: "time_idle", + LT: addr(43.0), + }, + expected: true, + }, + { + name: "not lt", + compares: &health.Compares{ + Field: "time_idle", + LT: addr(42.0), + }, + expected: false, + }, + { + name: "le", + compares: &health.Compares{ + Field: "time_idle", + LE: addr(42.0), + }, + expected: true, + }, + { + name: "not le", + compares: &health.Compares{ + Field: "time_idle", + LE: addr(41.0), + }, + expected: false, + }, + { + name: "eq", + compares: &health.Compares{ + Field: "time_idle", + EQ: addr(42.0), + }, + expected: true, + }, + { + name: "not eq", + compares: &health.Compares{ + Field: "time_idle", + EQ: addr(41.0), + }, + expected: false, + }, + { + name: "ne", + compares: &health.Compares{ + Field: "time_idle", + NE: addr(41.0), + }, + expected: true, + }, + { + name: "not ne", + compares: &health.Compares{ + Field: "time_idle", + NE: addr(42.0), + }, + expected: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + metrics := []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Now()), + } + actual := tt.compares.Check(metrics) + require.Equal(t, tt.expected, actual) + }) + } +} diff --git a/plugins/outputs/health/contains.go b/plugins/outputs/health/contains.go new file mode 100644 index 000000000..ff03667e0 --- /dev/null +++ b/plugins/outputs/health/contains.go @@ -0,0 +1,19 @@ +package health + +import "github.com/influxdata/telegraf" + +type Contains struct { + Field string `toml:"field"` +} + +func (c *Contains) Check(metrics []telegraf.Metric) bool { + success := false + for _, m := range metrics { + ok := m.HasField(c.Field) + if ok { + success = true + } + } + + return success +} diff --git a/plugins/outputs/health/contains_test.go b/plugins/outputs/health/contains_test.go new file mode 100644 index 000000000..2337dd867 --- /dev/null +++ b/plugins/outputs/health/contains_test.go @@ -0,0 +1,68 @@ +package health_test + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/outputs/health" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestFieldFound(t *testing.T) { + metrics := []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Now()), + } + + contains := &health.Contains{ + Field: "time_idle", + } + result := contains.Check(metrics) + require.True(t, result) +} + +func TestFieldNotFound(t *testing.T) { + metrics := []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{}, + time.Now()), + } + + contains := &health.Contains{ + Field: "time_idle", + } + result := contains.Check(metrics) + require.False(t, result) +} + +func TestOneMetricWithFieldIsSuccess(t *testing.T) { + metrics := []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{}, + time.Now()), + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Now()), + } + + contains := &health.Contains{ + Field: "time_idle", + } + result := contains.Check(metrics) + require.True(t, result) +} diff --git a/plugins/outputs/health/health.go b/plugins/outputs/health/health.go new file mode 100644 index 000000000..c7c2cc547 --- /dev/null +++ b/plugins/outputs/health/health.go @@ -0,0 +1,252 @@ +package health + +import ( + "context" + "crypto/tls" + "log" + "net" + "net/http" + "net/url" + "sync" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + tlsint "github.com/influxdata/telegraf/internal/tls" + "github.com/influxdata/telegraf/plugins/outputs" +) + +const ( + defaultServiceAddress = "tcp://:8080" + defaultReadTimeout = 5 * time.Second + defaultWriteTimeout = 5 * time.Second +) + +var sampleConfig = ` + ## Address and port to listen on. + ## ex: service_address = "tcp://localhost:8080" + ## service_address = "unix:///var/run/telegraf-health.sock" + # service_address = "tcp://:8080" + + ## The maximum duration for reading the entire request. + # read_timeout = "5s" + ## The maximum duration for writing the entire response. + # write_timeout = "5s" + + ## Username and password to accept for HTTP basic authentication. + # basic_username = "user1" + # basic_password = "secret" + + ## Allowed CA certificates for client certificates. + # tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"] + + ## TLS server certificate and private key. + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + + ## One or more check sub-tables should be defined, it is also recommended to + ## use metric filtering to limit the metrics that flow into this output. + ## + ## When using the default buffer sizes, this example will fail when the + ## metric buffer is half full. + ## + ## namepass = ["internal_write"] + ## tagpass = { output = ["influxdb"] } + ## + ## [[outputs.health.compares]] + ## field = "buffer_size" + ## lt = 5000.0 + ## + ## [[outputs.health.contains]] + ## field = "buffer_size" +` + +type Checker interface { + // Check returns true if the metrics meet its criteria. + Check(metrics []telegraf.Metric) bool +} + +type Health struct { + ServiceAddress string `toml:"service_address"` + ReadTimeout internal.Duration `toml:"read_timeout"` + WriteTimeout internal.Duration `toml:"write_timeout"` + BasicUsername string `toml:"basic_username"` + BasicPassword string `toml:"basic_password"` + tlsint.ServerConfig + + Compares []*Compares `toml:"compares"` + Contains []*Contains `toml:"contains"` + checkers []Checker + + wg sync.WaitGroup + server *http.Server + origin string + + mu sync.Mutex + healthy bool +} + +func (h *Health) SampleConfig() string { + return sampleConfig +} + +func (h *Health) Description() string { + return "Configurable HTTP health check resource based on metrics" +} + +// Connect starts the HTTP server. +func (h *Health) Connect() error { + h.checkers = make([]Checker, 0) + for i := range h.Compares { + h.checkers = append(h.checkers, h.Compares[i]) + } + for i := range h.Contains { + h.checkers = append(h.checkers, h.Contains[i]) + } + + tlsConf, err := h.ServerConfig.TLSConfig() + if err != nil { + return err + } + + authHandler := internal.AuthHandler(h.BasicUsername, h.BasicPassword, onAuthError) + + h.server = &http.Server{ + Addr: h.ServiceAddress, + Handler: authHandler(h), + ReadTimeout: h.ReadTimeout.Duration, + WriteTimeout: h.WriteTimeout.Duration, + TLSConfig: tlsConf, + } + + listener, err := h.listen(tlsConf) + if err != nil { + return err + } + + h.origin = h.getOrigin(listener, tlsConf) + + log.Printf("I! [outputs.health] Listening on %s", h.origin) + + h.wg.Add(1) + go func() { + defer h.wg.Done() + err := h.server.Serve(listener) + if err != http.ErrServerClosed { + log.Printf("E! [outputs.health] Serve error on %s: %v", h.origin, err) + } + h.origin = "" + }() + + return nil +} + +func onAuthError(rw http.ResponseWriter, code int) { + http.Error(rw, http.StatusText(code), code) +} + +func (h *Health) listen(tlsConf *tls.Config) (net.Listener, error) { + u, err := url.Parse(h.ServiceAddress) + if err != nil { + return nil, err + } + + network := "tcp" + address := u.Host + if u.Host == "" { + network = "unix" + address = u.Path + } + + if tlsConf != nil { + return tls.Listen(network, address, tlsConf) + } else { + return net.Listen(network, address) + } + +} + +func (h *Health) ServeHTTP(rw http.ResponseWriter, req *http.Request) { + var code = http.StatusOK + if !h.isHealthy() { + code = http.StatusServiceUnavailable + } + + rw.Header().Set("Server", internal.ProductToken()) + http.Error(rw, http.StatusText(code), code) +} + +// Write runs all checks over the metric batch and adjust health state. +func (h *Health) Write(metrics []telegraf.Metric) error { + healthy := true + for _, checker := range h.checkers { + success := checker.Check(metrics) + if !success { + healthy = false + } + } + + h.setHealthy(healthy) + return nil +} + +// Close shuts down the HTTP server. +func (h *Health) Close() error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + h.server.Shutdown(ctx) + h.wg.Wait() + return nil +} + +// Origin returns the URL of the HTTP server. +func (h *Health) Origin() string { + return h.origin +} + +func (h *Health) getOrigin(listener net.Listener, tlsConf *tls.Config) string { + switch listener.Addr().Network() { + case "tcp": + scheme := "http" + if tlsConf != nil { + scheme = "https" + } + origin := &url.URL{ + Scheme: scheme, + Host: listener.Addr().String(), + } + return origin.String() + case "unix": + return listener.Addr().String() + default: + return "" + } +} + +func (h *Health) setHealthy(healthy bool) { + h.mu.Lock() + defer h.mu.Unlock() + h.healthy = healthy +} + +func (h *Health) isHealthy() bool { + h.mu.Lock() + defer h.mu.Unlock() + return h.healthy +} + +func NewHealth() *Health { + return &Health{ + ServiceAddress: defaultServiceAddress, + ReadTimeout: internal.Duration{Duration: defaultReadTimeout}, + WriteTimeout: internal.Duration{Duration: defaultWriteTimeout}, + healthy: true, + } +} + +func init() { + outputs.Add("health", func() telegraf.Output { + return NewHealth() + }) +} diff --git a/plugins/outputs/health/health_test.go b/plugins/outputs/health/health_test.go new file mode 100644 index 000000000..234b0251c --- /dev/null +++ b/plugins/outputs/health/health_test.go @@ -0,0 +1,124 @@ +package health_test + +import ( + "io/ioutil" + "net/http" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/outputs/health" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestHealth(t *testing.T) { + type Options struct { + Compares []*health.Compares `toml:"compares"` + Contains []*health.Contains `toml:"contains"` + } + + now := time.Now() + tests := []struct { + name string + options Options + metrics []telegraf.Metric + expectedCode int + }{ + { + name: "healthy on startup", + expectedCode: 200, + }, + { + name: "check passes", + options: Options{ + Compares: []*health.Compares{ + { + Field: "time_idle", + GT: func() *float64 { v := 0.0; return &v }(), + }, + }, + }, + metrics: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42, + }, + now), + }, + expectedCode: 200, + }, + { + name: "check fails", + options: Options{ + Compares: []*health.Compares{ + { + Field: "time_idle", + LT: func() *float64 { v := 0.0; return &v }(), + }, + }, + }, + metrics: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42, + }, + now), + }, + expectedCode: 503, + }, + { + name: "mixed check fails", + options: Options{ + Compares: []*health.Compares{ + { + Field: "time_idle", + LT: func() *float64 { v := 0.0; return &v }(), + }, + }, + Contains: []*health.Contains{ + { + Field: "foo", + }, + }, + }, + metrics: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42, + }, + now), + }, + expectedCode: 503, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + output := health.NewHealth() + output.ServiceAddress = "tcp://127.0.0.1:0" + output.Compares = tt.options.Compares + output.Contains = tt.options.Contains + + err := output.Connect() + + err = output.Write(tt.metrics) + require.NoError(t, err) + + resp, err := http.Get(output.Origin()) + require.NoError(t, err) + require.Equal(t, tt.expectedCode, resp.StatusCode) + + _, err = ioutil.ReadAll(resp.Body) + require.NoError(t, err) + + err = output.Close() + require.NoError(t, err) + }) + } +}