Add Splunk Metrics serializer (#4339)

This commit is contained in:
Lance O'Connor 2018-09-11 13:01:08 -07:00 committed by Daniel Nelson
parent e85a9e0956
commit c80aab0445
6 changed files with 471 additions and 0 deletions

View File

@ -7,6 +7,7 @@ plugins.
1. [InfluxDB Line Protocol](#influx) 1. [InfluxDB Line Protocol](#influx)
1. [JSON](#json) 1. [JSON](#json)
1. [Graphite](#graphite) 1. [Graphite](#graphite)
1. [SplunkMetric](../plugins/serializers/splunkmetric/README.md)
You will be able to identify the plugins with support by the presence of a You will be able to identify the plugins with support by the presence of a
`data_format` config option, for example, in the `file` output plugin: `data_format` config option, for example, in the `file` output plugin:

View File

@ -1693,6 +1693,18 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error
} }
} }
if node, ok := tbl.Fields["splunkmetric_hec_routing"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if b, ok := kv.Value.(*ast.Boolean); ok {
var err error
c.HecRouting, err = b.Boolean()
if err != nil {
return nil, err
}
}
}
}
delete(tbl.Fields, "influx_max_line_bytes") delete(tbl.Fields, "influx_max_line_bytes")
delete(tbl.Fields, "influx_sort_fields") delete(tbl.Fields, "influx_sort_fields")
delete(tbl.Fields, "influx_uint_support") delete(tbl.Fields, "influx_uint_support")
@ -1701,6 +1713,7 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error
delete(tbl.Fields, "prefix") delete(tbl.Fields, "prefix")
delete(tbl.Fields, "template") delete(tbl.Fields, "template")
delete(tbl.Fields, "json_timestamp_units") delete(tbl.Fields, "json_timestamp_units")
delete(tbl.Fields, "splunkmetric_hec_routing")
return serializers.NewSerializer(c) return serializers.NewSerializer(c)
} }

View File

@ -9,6 +9,7 @@ import (
"github.com/influxdata/telegraf/plugins/serializers/graphite" "github.com/influxdata/telegraf/plugins/serializers/graphite"
"github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/influxdata/telegraf/plugins/serializers/json" "github.com/influxdata/telegraf/plugins/serializers/json"
"github.com/influxdata/telegraf/plugins/serializers/splunkmetric"
) )
// SerializerOutput is an interface for output plugins that are able to // SerializerOutput is an interface for output plugins that are able to
@ -60,6 +61,9 @@ type Config struct {
// Timestamp units to use for JSON formatted output // Timestamp units to use for JSON formatted output
TimestampUnits time.Duration TimestampUnits time.Duration
// Include HEC routing fields for splunkmetric output
HecRouting bool
} }
// NewSerializer a Serializer interface based on the given config. // NewSerializer a Serializer interface based on the given config.
@ -73,6 +77,8 @@ func NewSerializer(config *Config) (Serializer, error) {
serializer, err = NewGraphiteSerializer(config.Prefix, config.Template, config.GraphiteTagSupport) serializer, err = NewGraphiteSerializer(config.Prefix, config.Template, config.GraphiteTagSupport)
case "json": case "json":
serializer, err = NewJsonSerializer(config.TimestampUnits) serializer, err = NewJsonSerializer(config.TimestampUnits)
case "splunkmetric":
serializer, err = NewSplunkmetricSerializer(config.HecRouting)
default: default:
err = fmt.Errorf("Invalid data format: %s", config.DataFormat) err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
} }
@ -83,6 +89,10 @@ func NewJsonSerializer(timestampUnits time.Duration) (Serializer, error) {
return json.NewSerializer(timestampUnits) return json.NewSerializer(timestampUnits)
} }
func NewSplunkmetricSerializer(splunkmetric_hec_routing bool) (Serializer, error) {
return splunkmetric.NewSerializer(splunkmetric_hec_routing)
}
func NewInfluxSerializerConfig(config *Config) (Serializer, error) { func NewInfluxSerializerConfig(config *Config) (Serializer, error) {
var sort influx.FieldSortOrder var sort influx.FieldSortOrder
if config.InfluxSortFields { if config.InfluxSortFields {

View File

@ -0,0 +1,139 @@
# Splunk Metrics serializer
This serializer formats and outputs the metric data in a format that can be consumed by a Splunk metrics index.
It can be used to write to a file using the file output, or for sending metrics to a HEC using the standard telegraf HTTP output.
If you're using the HTTP output, this serializer knows how to batch the metrics so you don't end up with an HTTP POST per metric.
Th data is output in a format that conforms to the specified Splunk HEC JSON format as found here:
[Send metrics in JSON format](http://dev.splunk.com/view/event-collector/SP-CAAAFDN).
An example event looks like:
```javascript
{
"time": 1529708430,
"event": "metric",
"host": "patas-mbp",
"fields": {
"_value": 0.6,
"cpu": "cpu0",
"dc": "mobile",
"metric_name": "cpu.usage_user",
"user": "ronnocol"
}
}
```
In the above snippet, the following keys are dimensions:
* cpu
* dc
* user
## Using with the HTTP output
To send this data to a Splunk HEC, you can use the HTTP output, there are some custom headers that you need to add
to manage the HEC authorization, here's a sample config for an HTTP output:
```toml
[[outputs.http]]
## URL is the address to send metrics to
url = "https://localhost:8088/services/collector"
## Timeout for HTTP message
# timeout = "5s"
## HTTP method, one of: "POST" or "PUT"
# method = "POST"
## HTTP Basic Auth credentials
# username = "username"
# password = "pa$$word"
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## Data format to output.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "splunkmetric"
## Provides time, index, source overrides for the HEC
splunkmetric_hec_routing = true
## Additional HTTP headers
[outputs.http.headers]
# Should be set manually to "application/json" for json data_format
Content-Type = "application/json"
Authorization = "Splunk xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
X-Splunk-Request-Channel = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
```
## Overrides
You can override the default values for the HEC token you are using by adding additional tags to the config file.
The following aspects of the token can be overriden with tags:
* index
* source
You can either use `[global_tags]` or using a more advanced configuration as documented [here](https://github.com/influxdata/telegraf/blob/master/docs/CONFIGURATION.md).
Such as this example which overrides the index just on the cpu metric:
```toml
[[inputs.cpu]]
percpu = false
totalcpu = true
[inputs.cpu.tags]
index = "cpu_metrics"
```
## Using with the File output
You can use the file output when running telegraf on a machine with a Splunk forwarder.
A sample event when `hec_routing` is false (or unset) looks like:
```javascript
{
"_value": 0.6,
"cpu": "cpu0",
"dc": "mobile",
"metric_name": "cpu.usage_user",
"user": "ronnocol",
"time": 1529708430
}
```
Data formatted in this manner can be ingested with a simple `props.conf` file that
looks like this:
```ini
[telegraf]
category = Metrics
description = Telegraf Metrics
pulldown_type = 1
DATETIME_CONFIG =
NO_BINARY_CHECK = true
SHOULD_LINEMERGE = true
disabled = false
INDEXED_EXTRACTIONS = json
KV_MODE = none
TIMESTAMP_FIELDS = time
TIME_FORMAT = %s.%3N
```
An example configuration of a file based output is:
```toml
# Send telegraf metrics to file(s)
[[outputs.file]]
## Files to write to, "stdout" is a specially handled file.
files = ["/tmp/metrics.out"]
## Data format to output.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "splunkmetric"
hec_routing = false
```

View File

@ -0,0 +1,126 @@
package splunkmetric
import (
"encoding/json"
"fmt"
"log"
"github.com/influxdata/telegraf"
)
type serializer struct {
HecRouting bool
}
func NewSerializer(splunkmetric_hec_routing bool) (*serializer, error) {
s := &serializer{
HecRouting: splunkmetric_hec_routing,
}
return s, nil
}
func (s *serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
m, err := s.createObject(metric)
if err != nil {
return nil, fmt.Errorf("D! [serializer.splunkmetric] Dropping invalid metric: %s", metric.Name())
}
return m, nil
}
func (s *serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
var serialized []byte
for _, metric := range metrics {
m, err := s.createObject(metric)
if err != nil {
return nil, fmt.Errorf("D! [serializer.splunkmetric] Dropping invalid metric: %s", metric.Name())
} else if m != nil {
serialized = append(serialized, m...)
}
}
return serialized, nil
}
func (s *serializer) createObject(metric telegraf.Metric) (metricGroup []byte, err error) {
/* Splunk supports one metric json object, and does _not_ support an array of JSON objects.
** Splunk has the following required names for the metric store:
** metric_name: The name of the metric
** _value: The value for the metric
** time: The timestamp for the metric
** All other index fields become deminsions.
*/
type HECTimeSeries struct {
Time float64 `json:"time"`
Event string `json:"event"`
Host string `json:"host,omitempty"`
Index string `json:"index,omitempty"`
Source string `json:"source,omitempty"`
Fields map[string]interface{} `json:"fields"`
}
dataGroup := HECTimeSeries{}
var metricJson []byte
for _, field := range metric.FieldList() {
if !verifyValue(field.Value) {
log.Printf("D! Can not parse value: %v for key: %v", field.Value, field.Key)
continue
}
obj := map[string]interface{}{}
obj["metric_name"] = metric.Name() + "." + field.Key
obj["_value"] = field.Value
dataGroup.Event = "metric"
// Convert ns to float seconds since epoch.
dataGroup.Time = float64(metric.Time().UnixNano()) / float64(1000000000)
dataGroup.Fields = obj
// Break tags out into key(n)=value(t) pairs
for n, t := range metric.Tags() {
if n == "host" {
dataGroup.Host = t
} else if n == "index" {
dataGroup.Index = t
} else if n == "source" {
dataGroup.Source = t
} else {
dataGroup.Fields[n] = t
}
}
dataGroup.Fields["metric_name"] = metric.Name() + "." + field.Key
dataGroup.Fields["_value"] = field.Value
switch s.HecRouting {
case true:
// Output the data as a fields array and host,index,time,source overrides for the HEC.
metricJson, err = json.Marshal(dataGroup)
default:
// Just output the data and the time, useful for file based outuputs
dataGroup.Fields["time"] = dataGroup.Time
metricJson, err = json.Marshal(dataGroup.Fields)
}
metricGroup = append(metricGroup, metricJson...)
if err != nil {
return nil, err
}
}
return metricGroup, nil
}
func verifyValue(v interface{}) bool {
switch v.(type) {
case string:
return false
}
return true
}

View File

@ -0,0 +1,182 @@
package splunkmetric
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
)
func MustMetric(v telegraf.Metric, err error) telegraf.Metric {
if err != nil {
panic(err)
}
return v
}
func TestSerializeMetricFloat(t *testing.T) {
// Test sub-second time
now := time.Unix(1529875740, 819000000)
tags := map[string]string{
"cpu": "cpu0",
}
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
m, err := metric.New("cpu", tags, fields, now)
assert.NoError(t, err)
s, _ := NewSerializer(false)
var buf []byte
buf, err = s.Serialize(m)
assert.NoError(t, err)
expS := `{"_value":91.5,"cpu":"cpu0","metric_name":"cpu.usage_idle","time":1529875740.819}`
assert.Equal(t, string(expS), string(buf))
}
func TestSerializeMetricFloatHec(t *testing.T) {
// Test sub-second time
now := time.Unix(1529875740, 819000000)
tags := map[string]string{
"cpu": "cpu0",
}
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
m, err := metric.New("cpu", tags, fields, now)
assert.NoError(t, err)
s, _ := NewSerializer(true)
var buf []byte
buf, err = s.Serialize(m)
assert.NoError(t, err)
expS := `{"time":1529875740.819,"event":"metric","fields":{"_value":91.5,"cpu":"cpu0","metric_name":"cpu.usage_idle"}}`
assert.Equal(t, string(expS), string(buf))
}
func TestSerializeMetricInt(t *testing.T) {
now := time.Unix(0, 0)
tags := map[string]string{
"cpu": "cpu0",
}
fields := map[string]interface{}{
"usage_idle": int64(90),
}
m, err := metric.New("cpu", tags, fields, now)
assert.NoError(t, err)
s, _ := NewSerializer(false)
var buf []byte
buf, err = s.Serialize(m)
assert.NoError(t, err)
expS := `{"_value":90,"cpu":"cpu0","metric_name":"cpu.usage_idle","time":0}`
assert.Equal(t, string(expS), string(buf))
}
func TestSerializeMetricIntHec(t *testing.T) {
now := time.Unix(0, 0)
tags := map[string]string{
"cpu": "cpu0",
}
fields := map[string]interface{}{
"usage_idle": int64(90),
}
m, err := metric.New("cpu", tags, fields, now)
assert.NoError(t, err)
s, _ := NewSerializer(true)
var buf []byte
buf, err = s.Serialize(m)
assert.NoError(t, err)
expS := `{"time":0,"event":"metric","fields":{"_value":90,"cpu":"cpu0","metric_name":"cpu.usage_idle"}}`
assert.Equal(t, string(expS), string(buf))
}
func TestSerializeMetricString(t *testing.T) {
now := time.Unix(0, 0)
tags := map[string]string{
"cpu": "cpu0",
}
fields := map[string]interface{}{
"processorType": "ARMv7 Processor rev 4 (v7l)",
"usage_idle": int64(5),
}
m, err := metric.New("cpu", tags, fields, now)
assert.NoError(t, err)
s, _ := NewSerializer(false)
var buf []byte
buf, err = s.Serialize(m)
assert.NoError(t, err)
expS := `{"_value":5,"cpu":"cpu0","metric_name":"cpu.usage_idle","time":0}`
assert.Equal(t, string(expS), string(buf))
assert.NoError(t, err)
}
func TestSerializeBatch(t *testing.T) {
m := MustMetric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
)
n := MustMetric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 92.0,
},
time.Unix(0, 0),
),
)
metrics := []telegraf.Metric{m, n}
s, _ := NewSerializer(false)
buf, err := s.SerializeBatch(metrics)
assert.NoError(t, err)
expS := `{"_value":42,"metric_name":"cpu.value","time":0}` + `{"_value":92,"metric_name":"cpu.value","time":0}`
assert.Equal(t, string(expS), string(buf))
}
func TestSerializeBatchHec(t *testing.T) {
m := MustMetric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
)
n := MustMetric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 92.0,
},
time.Unix(0, 0),
),
)
metrics := []telegraf.Metric{m, n}
s, _ := NewSerializer(true)
buf, err := s.SerializeBatch(metrics)
assert.NoError(t, err)
expS := `{"time":0,"event":"metric","fields":{"_value":42,"metric_name":"cpu.value"}}` + `{"time":0,"event":"metric","fields":{"_value":92,"metric_name":"cpu.value"}}`
assert.Equal(t, string(expS), string(buf))
}