diff --git a/internal/config/config.go b/internal/config/config.go index f72f1ef26..c2335fac2 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1891,6 +1891,18 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error } } + if node, ok := tbl.Fields["templates"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if ary, ok := kv.Value.(*ast.Array); ok { + for _, elem := range ary.Value { + if str, ok := elem.(*ast.String); ok { + c.Templates = append(c.Templates, str.Value) + } + } + } + } + } + if node, ok := tbl.Fields["influx_max_line_bytes"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if integer, ok := kv.Value.(*ast.Integer); ok { @@ -2046,6 +2058,7 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error delete(tbl.Fields, "data_format") delete(tbl.Fields, "prefix") delete(tbl.Fields, "template") + delete(tbl.Fields, "templates") delete(tbl.Fields, "json_timestamp_units") delete(tbl.Fields, "splunkmetric_hec_routing") delete(tbl.Fields, "splunkmetric_multimetric") diff --git a/plugins/outputs/graphite/README.md b/plugins/outputs/graphite/README.md index 878eb8048..b7ffd361b 100644 --- a/plugins/outputs/graphite/README.md +++ b/plugins/outputs/graphite/README.md @@ -21,6 +21,16 @@ see the [Graphite Data Format](../../../docs/DATA_FORMATS_OUTPUT.md) ## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md template = "host.tags.measurement.field" + ## Graphite templates patterns + ## 1. Template for cpu + ## 2. Template for disk* + ## 3. Default template + # templates = [ + # "cpu tags.measurement.host.field", + # "disk* measurement.field", + # "host.measurement.tags.field" + #] + ## Enable Graphite tags support # graphite_tag_support = false diff --git a/plugins/outputs/graphite/graphite.go b/plugins/outputs/graphite/graphite.go index 09cdbe080..e7d192662 100644 --- a/plugins/outputs/graphite/graphite.go +++ b/plugins/outputs/graphite/graphite.go @@ -18,11 +18,12 @@ import ( type Graphite struct { GraphiteTagSupport bool // URL is only for backwards compatibility - Servers []string - Prefix string - Template string - Timeout int - conns []net.Conn + Servers []string + Prefix string + Template string + Templates []string + Timeout int + conns []net.Conn tlsint.ClientConfig } @@ -40,6 +41,16 @@ var sampleConfig = ` ## Enable Graphite tags support # graphite_tag_support = false + ## Graphite templates patterns + ## 1. Template for cpu + ## 2. Template for disk* + ## 3. Default template + # templates = [ + # "cpu tags.measurement.host.field", + # "disk* measurement.field", + # "host.measurement.tags.field" + #] + ## timeout in seconds for the write connection to graphite timeout = 2 @@ -134,7 +145,7 @@ func checkEOF(conn net.Conn) { func (g *Graphite) Write(metrics []telegraf.Metric) error { // Prepare data var batch []byte - s, err := serializers.NewGraphiteSerializer(g.Prefix, g.Template, g.GraphiteTagSupport) + s, err := serializers.NewGraphiteSerializer(g.Prefix, g.Template, g.GraphiteTagSupport, g.Templates) if err != nil { return err } diff --git a/plugins/outputs/graphite/graphite_test.go b/plugins/outputs/graphite/graphite_test.go index 3857236e5..ad76d45b5 100644 --- a/plugins/outputs/graphite/graphite_test.go +++ b/plugins/outputs/graphite/graphite_test.go @@ -98,6 +98,70 @@ func TestGraphiteOK(t *testing.T) { g.Close() } +func TestGraphiteOKWithMultipleTemplates(t *testing.T) { + var wg sync.WaitGroup + // Start TCP server + wg.Add(1) + t.Log("Starting server") + TCPServer1WithMultipleTemplates(t, &wg) + + // Init plugin + g := Graphite{ + Prefix: "my.prefix", + Template: "measurement.host.tags.field", + Templates: []string{ + "my_* host.measurement.tags.field", + "measurement.tags.host.field", + }, + } + + // Init metrics + m1, _ := metric.New( + "mymeasurement", + map[string]string{"host": "192.168.0.1", "mytag": "valuetag"}, + map[string]interface{}{"myfield": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + m2, _ := metric.New( + "mymeasurement", + map[string]string{"host": "192.168.0.1", "mytag": "valuetag"}, + map[string]interface{}{"value": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + m3, _ := metric.New( + "my_measurement", + map[string]string{"host": "192.168.0.1", "mytag": "valuetag"}, + map[string]interface{}{"value": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + + // Prepare point list + metrics := []telegraf.Metric{m1} + metrics2 := []telegraf.Metric{m2, m3} + err1 := g.Connect() + require.NoError(t, err1) + // Send Data + t.Log("Send first data") + err2 := g.Write(metrics) + require.NoError(t, err2) + + // Waiting TCPserver, should reconnect and resend + wg.Wait() + t.Log("Finished Waiting for first data") + var wg2 sync.WaitGroup + // Start TCP server + wg2.Add(1) + TCPServer2WithMultipleTemplates(t, &wg2) + //Write but expect an error, but reconnect + err3 := g.Write(metrics2) + t.Log("Finished writing second data, it should have reconnected automatically") + + require.NoError(t, err3) + t.Log("Finished writing third data") + wg2.Wait() + g.Close() +} + func TestGraphiteOkWithTags(t *testing.T) { var wg sync.WaitGroup // Start TCP server @@ -188,6 +252,36 @@ func TCPServer2(t *testing.T, wg *sync.WaitGroup) { }() } +func TCPServer1WithMultipleTemplates(t *testing.T, wg *sync.WaitGroup) { + tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003") + go func() { + defer wg.Done() + conn, _ := (tcpServer).Accept() + reader := bufio.NewReader(conn) + tp := textproto.NewReader(reader) + data1, _ := tp.ReadLine() + assert.Equal(t, "my.prefix.mymeasurement.valuetag.192_168_0_1.myfield 3.14 1289430000", data1) + conn.Close() + tcpServer.Close() + }() +} + +func TCPServer2WithMultipleTemplates(t *testing.T, wg *sync.WaitGroup) { + tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003") + go func() { + defer wg.Done() + conn2, _ := (tcpServer).Accept() + reader := bufio.NewReader(conn2) + tp := textproto.NewReader(reader) + data2, _ := tp.ReadLine() + assert.Equal(t, "my.prefix.mymeasurement.valuetag.192_168_0_1 3.14 1289430000", data2) + data3, _ := tp.ReadLine() + assert.Equal(t, "my.prefix.192_168_0_1.my_measurement.valuetag 3.14 1289430000", data3) + conn2.Close() + tcpServer.Close() + }() +} + func TCPServer1WithTags(t *testing.T, wg *sync.WaitGroup) { tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003") go func() { diff --git a/plugins/outputs/instrumental/instrumental.go b/plugins/outputs/instrumental/instrumental.go index f142705a5..a861ebc28 100644 --- a/plugins/outputs/instrumental/instrumental.go +++ b/plugins/outputs/instrumental/instrumental.go @@ -27,6 +27,7 @@ type Instrumental struct { Prefix string DataFormat string Template string + Templates []string Timeout internal.Duration Debug bool @@ -85,7 +86,7 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error { } } - s, err := serializers.NewGraphiteSerializer(i.Prefix, i.Template, false) + s, err := serializers.NewGraphiteSerializer(i.Prefix, i.Template, false, i.Templates) if err != nil { return err } diff --git a/plugins/serializers/graphite/README.md b/plugins/serializers/graphite/README.md index 6cff2cbe5..74bde2b5d 100644 --- a/plugins/serializers/graphite/README.md +++ b/plugins/serializers/graphite/README.md @@ -22,6 +22,16 @@ method is used, otherwise the [Template Pattern](templates) is used. prefix = "telegraf" ## Graphite template pattern template = "host.tags.measurement.field" + + ## Graphite templates patterns + ## 1. Template for cpu + ## 2. Template for disk* + ## 3. Default template + # templates = [ + # "cpu tags.measurement.host.field", + # "disk* measurement.field", + # "host.measurement.tags.field" + #] ## Support Graphite tags, recommended to enable when using Graphite 1.1 or later. # graphite_tag_support = false diff --git a/plugins/serializers/graphite/graphite.go b/plugins/serializers/graphite/graphite.go index d02b0e26b..590f80b45 100644 --- a/plugins/serializers/graphite/graphite.go +++ b/plugins/serializers/graphite/graphite.go @@ -10,6 +10,7 @@ import ( "strings" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/filter" ) const DEFAULT_TEMPLATE = "host.tags.measurement.field" @@ -29,10 +30,16 @@ var ( fieldDeleter = strings.NewReplacer(".FIELDNAME", "", "FIELDNAME.", "") ) +type GraphiteTemplate struct { + Filter filter.Filter + Value string +} + type GraphiteSerializer struct { Prefix string Template string TagSupport bool + Templates []*GraphiteTemplate } func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]byte, error) { @@ -59,7 +66,15 @@ func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]byte, error) { out = append(out, point...) } default: - bucket := SerializeBucketName(metric.Name(), metric.Tags(), s.Template, s.Prefix) + template := s.Template + for _, graphiteTemplate := range s.Templates { + if graphiteTemplate.Filter.Match(metric.Name()) { + template = graphiteTemplate.Value + break + } + } + + bucket := SerializeBucketName(metric.Name(), metric.Tags(), template, s.Prefix) if bucket == "" { return out, nil } @@ -185,6 +200,45 @@ func SerializeBucketName( return prefix + "." + strings.Join(out, ".") } +func InitGraphiteTemplates(templates []string) ([]*GraphiteTemplate, string, error) { + var graphiteTemplates []*GraphiteTemplate + defaultTemplate := "" + + for i, t := range templates { + parts := strings.Fields(t) + + if len(parts) == 0 { + return nil, "", fmt.Errorf("missing template at position: %d", i) + } + if len(parts) == 1 { + if parts[0] == "" { + return nil, "", fmt.Errorf("missing template at position: %d", i) + } else { + // Override default template + defaultTemplate = t + continue + } + } + + if len(parts) > 2 { + return nil, "", fmt.Errorf("invalid template format: '%s'", t) + } + + tFilter, err := filter.Compile([]string{parts[0]}) + + if err != nil { + return nil, "", err + } + + graphiteTemplates = append(graphiteTemplates, &GraphiteTemplate{ + Filter: tFilter, + Value: parts[1], + }) + } + + return graphiteTemplates, defaultTemplate, nil +} + // SerializeBucketNameWithTags will take the given measurement name and tags and // produce a graphite bucket. It will use the Graphite11Serializer. // http://graphite.readthedocs.io/en/latest/tags.html diff --git a/plugins/serializers/graphite/graphite_test.go b/plugins/serializers/graphite/graphite_test.go index e72ed7a30..e50b7292b 100644 --- a/plugins/serializers/graphite/graphite_test.go +++ b/plugins/serializers/graphite/graphite_test.go @@ -144,6 +144,97 @@ func TestSerializeMetricHost(t *testing.T) { assert.Equal(t, expS, mS) } +func TestSerializeMetricHostWithMultipleTemplates(t *testing.T) { + now := time.Now() + tags := map[string]string{ + "host": "localhost", + "cpu": "cpu0", + "datacenter": "us-west-2", + } + fields := map[string]interface{}{ + "usage_idle": float64(91.5), + "usage_busy": float64(8.5), + } + m1, err := metric.New("cpu", tags, fields, now) + m2, err := metric.New("new_cpu", tags, fields, now) + assert.NoError(t, err) + + templates, defaultTemplate, err := InitGraphiteTemplates([]string{ + "cp* tags.measurement.host.field", + "new_cpu tags.host.measurement.field", + }) + assert.NoError(t, err) + assert.Equal(t, defaultTemplate, "") + + s := GraphiteSerializer{ + Templates: templates, + } + + buf, _ := s.Serialize(m1) + buf2, _ := s.Serialize(m2) + + buf = append(buf, buf2...) + + mS := strings.Split(strings.TrimSpace(string(buf)), "\n") + assert.NoError(t, err) + + expS := []string{ + fmt.Sprintf("cpu0.us-west-2.cpu.localhost.usage_idle 91.5 %d", now.Unix()), + fmt.Sprintf("cpu0.us-west-2.cpu.localhost.usage_busy 8.5 %d", now.Unix()), + fmt.Sprintf("cpu0.us-west-2.localhost.new_cpu.usage_idle 91.5 %d", now.Unix()), + fmt.Sprintf("cpu0.us-west-2.localhost.new_cpu.usage_busy 8.5 %d", now.Unix()), + } + sort.Strings(mS) + sort.Strings(expS) + assert.Equal(t, expS, mS) +} + +func TestSerializeMetricHostWithMultipleTemplatesWithDefault(t *testing.T) { + now := time.Now() + tags := map[string]string{ + "host": "localhost", + "cpu": "cpu0", + "datacenter": "us-west-2", + } + fields := map[string]interface{}{ + "usage_idle": float64(91.5), + "usage_busy": float64(8.5), + } + m1, err := metric.New("cpu", tags, fields, now) + m2, err := metric.New("new_cpu", tags, fields, now) + assert.NoError(t, err) + + templates, defaultTemplate, err := InitGraphiteTemplates([]string{ + "cp* tags.measurement.host.field", + "tags.host.measurement.field", + }) + assert.NoError(t, err) + assert.Equal(t, defaultTemplate, "tags.host.measurement.field") + + s := GraphiteSerializer{ + Templates: templates, + Template: defaultTemplate, + } + + buf, _ := s.Serialize(m1) + buf2, _ := s.Serialize(m2) + + buf = append(buf, buf2...) + + mS := strings.Split(strings.TrimSpace(string(buf)), "\n") + assert.NoError(t, err) + + expS := []string{ + fmt.Sprintf("cpu0.us-west-2.cpu.localhost.usage_idle 91.5 %d", now.Unix()), + fmt.Sprintf("cpu0.us-west-2.cpu.localhost.usage_busy 8.5 %d", now.Unix()), + fmt.Sprintf("cpu0.us-west-2.localhost.new_cpu.usage_idle 91.5 %d", now.Unix()), + fmt.Sprintf("cpu0.us-west-2.localhost.new_cpu.usage_busy 8.5 %d", now.Unix()), + } + sort.Strings(mS) + sort.Strings(expS) + assert.Equal(t, expS, mS) +} + func TestSerializeMetricHostWithTagSupport(t *testing.T) { now := time.Now() tags := map[string]string{ diff --git a/plugins/serializers/registry.go b/plugins/serializers/registry.go index dc9859e34..17de980fd 100644 --- a/plugins/serializers/registry.go +++ b/plugins/serializers/registry.go @@ -68,6 +68,9 @@ type Config struct { // only supports Graphite Template string `toml:"template"` + // Templates same Template, but multiple + Templates []string `toml:"templates"` + // Timestamp units to use for JSON formatted output TimestampUnits time.Duration `toml:"timestamp_units"` @@ -104,7 +107,7 @@ func NewSerializer(config *Config) (Serializer, error) { case "influx": serializer, err = NewInfluxSerializerConfig(config) case "graphite": - serializer, err = NewGraphiteSerializer(config.Prefix, config.Template, config.GraphiteTagSupport) + serializer, err = NewGraphiteSerializer(config.Prefix, config.Template, config.GraphiteTagSupport, config.Templates) case "json": serializer, err = NewJsonSerializer(config.TimestampUnits) case "splunkmetric": @@ -188,10 +191,21 @@ func NewInfluxSerializer() (Serializer, error) { return influx.NewSerializer(), nil } -func NewGraphiteSerializer(prefix, template string, tag_support bool) (Serializer, error) { +func NewGraphiteSerializer(prefix, template string, tag_support bool, templates []string) (Serializer, error) { + graphiteTemplates, defaultTemplate, err := graphite.InitGraphiteTemplates(templates) + + if err != nil { + return nil, err + } + + if defaultTemplate != "" { + template = defaultTemplate + } + return &graphite.GraphiteSerializer{ Prefix: prefix, Template: template, TagSupport: tag_support, + Templates: graphiteTemplates, }, nil }