From e6724bfb7c619619a14c14871b59955b294114d4 Mon Sep 17 00:00:00 2001 From: JefMuller <37331926+JefMuller@users.noreply.github.com> Date: Wed, 9 Jan 2019 00:28:00 +0100 Subject: [PATCH] Add ServiceNow serializer (#4809) --- plugins/serializers/nowmetric/README.md | 83 ++++++++ plugins/serializers/nowmetric/nowmetric.go | 137 +++++++++++++ .../serializers/nowmetric/nowmetric_test.go | 184 ++++++++++++++++++ plugins/serializers/registry.go | 7 + 4 files changed, 411 insertions(+) create mode 100644 plugins/serializers/nowmetric/README.md create mode 100644 plugins/serializers/nowmetric/nowmetric.go create mode 100644 plugins/serializers/nowmetric/nowmetric_test.go diff --git a/plugins/serializers/nowmetric/README.md b/plugins/serializers/nowmetric/README.md new file mode 100644 index 000000000..9bfbc3346 --- /dev/null +++ b/plugins/serializers/nowmetric/README.md @@ -0,0 +1,83 @@ +# ServiceNow Metrics serializer + +The ServiceNow Metrics serializer outputs metrics in the [ServiceNow Operational Intelligence format][ServiceNow-format]. + +It can be used to write to a file using the file output, or for sending metrics to a MID Server with Enable REST endpoint activated using the standard telegraf HTTP output. +If you're using the HTTP output, this serializer knows how to batch the metrics so you don't end up with an HTTP POST per metric. + +[ServiceNow-format]: https://docs.servicenow.com/bundle/london-it-operations-management/page/product/event-management/reference/mid-POST-metrics.html + + +An example event looks like: +```javascript +[{ + "metric_type": "Disk C: % Free Space", + "resource": "C:\\", + "node": "lnux100", + "value": 50, + "timestamp": 1473183012000, + "ci2metric_id": { + "node": "lnux100" + }, + "source": “Telegraf” +}] +``` +## Using with the HTTP output + +To send this data to a ServiceNow MID Server with Web Server extension activated, you can use the HTTP output, there are some custom headers that you need to add to manage the MID Web Server authorization, here's a sample config for an HTTP output: + +```toml +[[outputs.http]] + ## URL is the address to send metrics to + url = "http://:9082/api/mid/sa/metrics" + + ## Timeout for HTTP message + # timeout = "5s" + + ## HTTP method, one of: "POST" or "PUT" + method = "POST" + + ## HTTP Basic Auth credentials + username = 'evt.integration' + password = 'P@$$w0rd!' + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false + + ## Data format to output. + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md + data_format = "nowmetric" + + ## Additional HTTP headers + [outputs.http.headers] + # # Should be set manually to "application/json" for json data_format + Content-Type = "application/json" + Accept = "application/json" +``` + +Starting with the London release, you also need to explicitly create event rule to allow binding of metric events to host CIs. + +https://docs.servicenow.com/bundle/london-it-operations-management/page/product/event-management/task/event-rule-bind-metrics-to-host.html + +## Using with the File output + +You can use the file output to output the payload in a file. +In this case, just add the following section to your telegraf config file + +```toml +[[outputs.file]] + ## Files to write to, "stdout" is a specially handled file. + files = ["C:/Telegraf/metrics.out"] + + ## Data format to output. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md + data_format = "nowmetric" +``` diff --git a/plugins/serializers/nowmetric/nowmetric.go b/plugins/serializers/nowmetric/nowmetric.go new file mode 100644 index 000000000..c9d0b9463 --- /dev/null +++ b/plugins/serializers/nowmetric/nowmetric.go @@ -0,0 +1,137 @@ +package nowmetric + +import ( + "bytes" + "encoding/json" + "fmt" + "time" + + "github.com/influxdata/telegraf" +) + +type serializer struct { + TimestampUnits time.Duration +} + +/* +Example for the JSON generated and pushed to the MID +{ + "metric_type":"cpu_usage_system", + "resource":"", + "node":"ASGARD", + "value": 0.89, + "timestamp":1487365430, + "ci2metric_id":{"node":"ASGARD"}, + "source":"Telegraf" +} +*/ + +type OIMetric struct { + Metric string `json:"metric_type"` + Resource string `json:"resource"` + Node string `json:"node"` + Value interface{} `json:"value"` + Timestamp int64 `json:"timestamp"` + CiMapping map[string]string `json:"ci2metric_id"` + Source string `json:"source"` +} + +type OIMetrics []OIMetric + +func NewSerializer() (*serializer, error) { + s := &serializer{} + return s, nil +} + +func (s *serializer) Serialize(metric telegraf.Metric) (out []byte, err error) { + serialized, err := s.createObject(metric) + if err != nil { + return []byte{}, nil + } + return serialized, err +} + +func (s *serializer) SerializeBatch(metrics []telegraf.Metric) (out []byte, err error) { + objects := make([]byte, 0) + for _, metric := range metrics { + m, err := s.createObject(metric) + if err != nil { + return nil, fmt.Errorf("D! [serializer.nowmetric] Dropping invalid metric: %s", metric.Name()) + } else if m != nil { + objects = append(objects, m...) + } + } + replaced := bytes.Replace(objects, []byte("]["), []byte(","), -1) + return replaced, nil +} + +func (s *serializer) createObject(metric telegraf.Metric) ([]byte, error) { + /* ServiceNow Operational Intelligence supports an array of JSON objects. + ** Following elements accepted in the request body: + ** metric_type: The name of the metric + ** resource: Information about the resource for which metric data is being collected. In the example below, C:\ is the resource for which metric data is collected + ** node: IP, FQDN, name of the CI, or host + ** value: Value of the metric + ** timestamp: Epoch timestamp of the metric in milliseconds + ** ci2metric_id: List of key-value pairs to identify the CI. + ** source: Data source monitoring the metric type + */ + var allmetrics OIMetrics + var oimetric OIMetric + + oimetric.Source = "Telegraf" + + // Process Tags to extract node & resource name info + for _, tag := range metric.TagList() { + if tag.Key == "" || tag.Value == "" { + continue + } + + if tag.Key == "objectname" { + oimetric.Resource = tag.Value + } + + if tag.Key == "host" { + oimetric.Node = tag.Value + } + } + + // Format timestamp to UNIX epoch + oimetric.Timestamp = (metric.Time().UnixNano() / int64(time.Millisecond)) + + // Loop of fields value pair and build datapoint for each of them + for _, field := range metric.FieldList() { + if !verifyValue(field.Value) { + // Ignore String + continue + } + + if field.Key == "" { + // Ignore Empty Key + continue + } + + oimetric.Metric = field.Key + oimetric.Value = field.Value + + if oimetric.Node != "" { + cimapping := map[string]string{} + cimapping["node"] = oimetric.Node + oimetric.CiMapping = cimapping + } + + allmetrics = append(allmetrics, oimetric) + } + + metricsJson, err := json.Marshal(allmetrics) + + return metricsJson, err +} + +func verifyValue(v interface{}) bool { + switch v.(type) { + case string: + return false + } + return true +} diff --git a/plugins/serializers/nowmetric/nowmetric_test.go b/plugins/serializers/nowmetric/nowmetric_test.go new file mode 100644 index 000000000..d326cef8c --- /dev/null +++ b/plugins/serializers/nowmetric/nowmetric_test.go @@ -0,0 +1,184 @@ +package nowmetric + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" +) + +func MustMetric(v telegraf.Metric, err error) telegraf.Metric { + if err != nil { + panic(err) + } + return v +} + +func TestSerializeMetricFloat(t *testing.T) { + now := time.Now() + tags := map[string]string{ + "cpu": "cpu0", + } + fields := map[string]interface{}{ + "usage_idle": float64(91.5), + } + m, err := metric.New("cpu", tags, fields, now) + assert.NoError(t, err) + + s, _ := NewSerializer() + var buf []byte + buf, err = s.Serialize(m) + assert.NoError(t, err) + expS := []byte(fmt.Sprintf(`[{"metric_type":"usage_idle","resource":"","node":"","value":91.5,"timestamp":%d,"ci2metric_id":null,"source":"Telegraf"}]`, (now.UnixNano() / int64(time.Millisecond)))) + assert.Equal(t, string(expS), string(buf)) +} + +func TestSerialize_TimestampUnits(t *testing.T) { + tests := []struct { + name string + timestampUnits time.Duration + expected string + }{ + { + name: "1ms", + timestampUnits: 1 * time.Millisecond, + expected: `[{"metric_type":"value","resource":"","node":"","value":42,"timestamp":1525478795123,"ci2metric_id":null,"source":"Telegraf"}]`, + }, + { + name: "10ms", + timestampUnits: 10 * time.Millisecond, + expected: `[{"metric_type":"value","resource":"","node":"","value":42,"timestamp":1525478795123,"ci2metric_id":null,"source":"Telegraf"}]`, + }, + { + name: "15ms is reduced to 10ms", + timestampUnits: 15 * time.Millisecond, + expected: `[{"metric_type":"value","resource":"","node":"","value":42,"timestamp":1525478795123,"ci2metric_id":null,"source":"Telegraf"}]`, + }, + { + name: "65ms is reduced to 10ms", + timestampUnits: 65 * time.Millisecond, + expected: `[{"metric_type":"value","resource":"","node":"","value":42,"timestamp":1525478795123,"ci2metric_id":null,"source":"Telegraf"}]`, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := MustMetric( + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(1525478795, 123456789), + ), + ) + s, _ := NewSerializer() + actual, err := s.Serialize(m) + require.NoError(t, err) + require.Equal(t, tt.expected, string(actual)) + }) + } +} + +func TestSerializeMetricInt(t *testing.T) { + now := time.Now() + tags := map[string]string{ + "cpu": "cpu0", + } + fields := map[string]interface{}{ + "usage_idle": int64(90), + } + m, err := metric.New("cpu", tags, fields, now) + assert.NoError(t, err) + + s, _ := NewSerializer() + var buf []byte + buf, err = s.Serialize(m) + assert.NoError(t, err) + + expS := []byte(fmt.Sprintf(`[{"metric_type":"usage_idle","resource":"","node":"","value":90,"timestamp":%d,"ci2metric_id":null,"source":"Telegraf"}]`, (now.UnixNano() / int64(time.Millisecond)))) + assert.Equal(t, string(expS), string(buf)) +} + +func TestSerializeMetricString(t *testing.T) { + now := time.Now() + tags := map[string]string{ + "cpu": "cpu0", + } + fields := map[string]interface{}{ + "usage_idle": "foobar", + } + m, err := metric.New("cpu", tags, fields, now) + assert.NoError(t, err) + + s, _ := NewSerializer() + var buf []byte + buf, err = s.Serialize(m) + assert.NoError(t, err) + + assert.Equal(t, "null", string(buf)) +} + +func TestSerializeMultiFields(t *testing.T) { + now := time.Now() + tags := map[string]string{ + "cpu": "cpu0", + } + fields := map[string]interface{}{ + "usage_idle": int64(90), + "usage_total": 8559615, + } + m, err := metric.New("cpu", tags, fields, now) + assert.NoError(t, err) + + s, _ := NewSerializer() + var buf []byte + buf, err = s.Serialize(m) + assert.NoError(t, err) + + expS := []byte(fmt.Sprintf(`[{"metric_type":"usage_idle","resource":"","node":"","value":90,"timestamp":%d,"ci2metric_id":null,"source":"Telegraf"},{"metric_type":"usage_total","resource":"","node":"","value":8559615,"timestamp":%d,"ci2metric_id":null,"source":"Telegraf"}]`, (now.UnixNano() / int64(time.Millisecond)), (now.UnixNano() / int64(time.Millisecond)))) + assert.Equal(t, string(expS), string(buf)) +} + +func TestSerializeMetricWithEscapes(t *testing.T) { + now := time.Now() + tags := map[string]string{ + "cpu tag": "cpu0", + } + fields := map[string]interface{}{ + "U,age=Idle": int64(90), + } + m, err := metric.New("My CPU", tags, fields, now) + assert.NoError(t, err) + + s, _ := NewSerializer() + buf, err := s.Serialize(m) + assert.NoError(t, err) + + expS := []byte(fmt.Sprintf(`[{"metric_type":"U,age=Idle","resource":"","node":"","value":90,"timestamp":%d,"ci2metric_id":null,"source":"Telegraf"}]`, (now.UnixNano() / int64(time.Millisecond)))) + assert.Equal(t, string(expS), string(buf)) +} + +func TestSerializeBatch(t *testing.T) { + m := MustMetric( + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ), + ) + + metrics := []telegraf.Metric{m, m} + s, _ := NewSerializer() + buf, err := s.SerializeBatch(metrics) + require.NoError(t, err) + require.Equal(t, []byte(`[{"metric_type":"value","resource":"","node":"","value":42,"timestamp":0,"ci2metric_id":null,"source":"Telegraf"},{"metric_type":"value","resource":"","node":"","value":42,"timestamp":0,"ci2metric_id":null,"source":"Telegraf"}]`), buf) +} diff --git a/plugins/serializers/registry.go b/plugins/serializers/registry.go index b8a0aef07..9ca2f42e7 100644 --- a/plugins/serializers/registry.go +++ b/plugins/serializers/registry.go @@ -9,6 +9,7 @@ import ( "github.com/influxdata/telegraf/plugins/serializers/graphite" "github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/influxdata/telegraf/plugins/serializers/json" + "github.com/influxdata/telegraf/plugins/serializers/nowmetric" "github.com/influxdata/telegraf/plugins/serializers/splunkmetric" ) @@ -79,6 +80,8 @@ func NewSerializer(config *Config) (Serializer, error) { serializer, err = NewJsonSerializer(config.TimestampUnits) case "splunkmetric": serializer, err = NewSplunkmetricSerializer(config.HecRouting) + case "nowmetric": + serializer, err = NewNowSerializer() default: err = fmt.Errorf("Invalid data format: %s", config.DataFormat) } @@ -93,6 +96,10 @@ func NewSplunkmetricSerializer(splunkmetric_hec_routing bool) (Serializer, error return splunkmetric.NewSerializer(splunkmetric_hec_routing) } +func NewNowSerializer() (Serializer, error) { + return nowmetric.NewSerializer() +} + func NewInfluxSerializerConfig(config *Config) (Serializer, error) { var sort influx.FieldSortOrder if config.InfluxSortFields {