Support multiple templates for graphite serializers (#7136)
This commit is contained in:
		
							parent
							
								
									218fbc41b9
								
							
						
					
					
						commit
						0cad343de7
					
				|  | @ -1891,6 +1891,18 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error | |||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	if node, ok := tbl.Fields["templates"]; ok { | ||||
| 		if kv, ok := node.(*ast.KeyValue); ok { | ||||
| 			if ary, ok := kv.Value.(*ast.Array); ok { | ||||
| 				for _, elem := range ary.Value { | ||||
| 					if str, ok := elem.(*ast.String); ok { | ||||
| 						c.Templates = append(c.Templates, str.Value) | ||||
| 					} | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	if node, ok := tbl.Fields["influx_max_line_bytes"]; ok { | ||||
| 		if kv, ok := node.(*ast.KeyValue); ok { | ||||
| 			if integer, ok := kv.Value.(*ast.Integer); ok { | ||||
|  | @ -2046,6 +2058,7 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error | |||
| 	delete(tbl.Fields, "data_format") | ||||
| 	delete(tbl.Fields, "prefix") | ||||
| 	delete(tbl.Fields, "template") | ||||
| 	delete(tbl.Fields, "templates") | ||||
| 	delete(tbl.Fields, "json_timestamp_units") | ||||
| 	delete(tbl.Fields, "splunkmetric_hec_routing") | ||||
| 	delete(tbl.Fields, "splunkmetric_multimetric") | ||||
|  |  | |||
|  | @ -21,6 +21,16 @@ see the [Graphite Data Format](../../../docs/DATA_FORMATS_OUTPUT.md) | |||
|   ## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md | ||||
|   template = "host.tags.measurement.field" | ||||
| 
 | ||||
|   ## Graphite templates patterns | ||||
|   ## 1. Template for cpu | ||||
|   ## 2. Template for disk* | ||||
|   ## 3. Default template | ||||
|   # templates = [ | ||||
|   #  "cpu tags.measurement.host.field", | ||||
|   #  "disk* measurement.field", | ||||
|   #  "host.measurement.tags.field" | ||||
|   #] | ||||
| 
 | ||||
|   ## Enable Graphite tags support | ||||
|   # graphite_tag_support = false | ||||
| 
 | ||||
|  |  | |||
|  | @ -18,11 +18,12 @@ import ( | |||
| type Graphite struct { | ||||
| 	GraphiteTagSupport bool | ||||
| 	// URL is only for backwards compatibility
 | ||||
| 	Servers  []string | ||||
| 	Prefix   string | ||||
| 	Template string | ||||
| 	Timeout  int | ||||
| 	conns    []net.Conn | ||||
| 	Servers   []string | ||||
| 	Prefix    string | ||||
| 	Template  string | ||||
| 	Templates []string | ||||
| 	Timeout   int | ||||
| 	conns     []net.Conn | ||||
| 	tlsint.ClientConfig | ||||
| } | ||||
| 
 | ||||
|  | @ -40,6 +41,16 @@ var sampleConfig = ` | |||
|   ## Enable Graphite tags support | ||||
|   # graphite_tag_support = false | ||||
| 
 | ||||
|   ## Graphite templates patterns | ||||
|   ## 1. Template for cpu | ||||
|   ## 2. Template for disk* | ||||
|   ## 3. Default template | ||||
|   # templates = [ | ||||
|   #  "cpu tags.measurement.host.field", | ||||
|   #  "disk* measurement.field", | ||||
|   #  "host.measurement.tags.field" | ||||
|   #] | ||||
| 
 | ||||
|   ## timeout in seconds for the write connection to graphite | ||||
|   timeout = 2 | ||||
| 
 | ||||
|  | @ -134,7 +145,7 @@ func checkEOF(conn net.Conn) { | |||
| func (g *Graphite) Write(metrics []telegraf.Metric) error { | ||||
| 	// Prepare data
 | ||||
| 	var batch []byte | ||||
| 	s, err := serializers.NewGraphiteSerializer(g.Prefix, g.Template, g.GraphiteTagSupport) | ||||
| 	s, err := serializers.NewGraphiteSerializer(g.Prefix, g.Template, g.GraphiteTagSupport, g.Templates) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  |  | |||
|  | @ -98,6 +98,70 @@ func TestGraphiteOK(t *testing.T) { | |||
| 	g.Close() | ||||
| } | ||||
| 
 | ||||
| func TestGraphiteOKWithMultipleTemplates(t *testing.T) { | ||||
| 	var wg sync.WaitGroup | ||||
| 	// Start TCP server
 | ||||
| 	wg.Add(1) | ||||
| 	t.Log("Starting server") | ||||
| 	TCPServer1WithMultipleTemplates(t, &wg) | ||||
| 
 | ||||
| 	// Init plugin
 | ||||
| 	g := Graphite{ | ||||
| 		Prefix:   "my.prefix", | ||||
| 		Template: "measurement.host.tags.field", | ||||
| 		Templates: []string{ | ||||
| 			"my_* host.measurement.tags.field", | ||||
| 			"measurement.tags.host.field", | ||||
| 		}, | ||||
| 	} | ||||
| 
 | ||||
| 	// Init metrics
 | ||||
| 	m1, _ := metric.New( | ||||
| 		"mymeasurement", | ||||
| 		map[string]string{"host": "192.168.0.1", "mytag": "valuetag"}, | ||||
| 		map[string]interface{}{"myfield": float64(3.14)}, | ||||
| 		time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), | ||||
| 	) | ||||
| 	m2, _ := metric.New( | ||||
| 		"mymeasurement", | ||||
| 		map[string]string{"host": "192.168.0.1", "mytag": "valuetag"}, | ||||
| 		map[string]interface{}{"value": float64(3.14)}, | ||||
| 		time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), | ||||
| 	) | ||||
| 	m3, _ := metric.New( | ||||
| 		"my_measurement", | ||||
| 		map[string]string{"host": "192.168.0.1", "mytag": "valuetag"}, | ||||
| 		map[string]interface{}{"value": float64(3.14)}, | ||||
| 		time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), | ||||
| 	) | ||||
| 
 | ||||
| 	// Prepare point list
 | ||||
| 	metrics := []telegraf.Metric{m1} | ||||
| 	metrics2 := []telegraf.Metric{m2, m3} | ||||
| 	err1 := g.Connect() | ||||
| 	require.NoError(t, err1) | ||||
| 	// Send Data
 | ||||
| 	t.Log("Send first data") | ||||
| 	err2 := g.Write(metrics) | ||||
| 	require.NoError(t, err2) | ||||
| 
 | ||||
| 	// Waiting TCPserver, should reconnect and resend
 | ||||
| 	wg.Wait() | ||||
| 	t.Log("Finished Waiting for first data") | ||||
| 	var wg2 sync.WaitGroup | ||||
| 	// Start TCP server
 | ||||
| 	wg2.Add(1) | ||||
| 	TCPServer2WithMultipleTemplates(t, &wg2) | ||||
| 	//Write but expect an error, but reconnect
 | ||||
| 	err3 := g.Write(metrics2) | ||||
| 	t.Log("Finished writing second data, it should have reconnected automatically") | ||||
| 
 | ||||
| 	require.NoError(t, err3) | ||||
| 	t.Log("Finished writing third data") | ||||
| 	wg2.Wait() | ||||
| 	g.Close() | ||||
| } | ||||
| 
 | ||||
| func TestGraphiteOkWithTags(t *testing.T) { | ||||
| 	var wg sync.WaitGroup | ||||
| 	// Start TCP server
 | ||||
|  | @ -188,6 +252,36 @@ func TCPServer2(t *testing.T, wg *sync.WaitGroup) { | |||
| 	}() | ||||
| } | ||||
| 
 | ||||
| func TCPServer1WithMultipleTemplates(t *testing.T, wg *sync.WaitGroup) { | ||||
| 	tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003") | ||||
| 	go func() { | ||||
| 		defer wg.Done() | ||||
| 		conn, _ := (tcpServer).Accept() | ||||
| 		reader := bufio.NewReader(conn) | ||||
| 		tp := textproto.NewReader(reader) | ||||
| 		data1, _ := tp.ReadLine() | ||||
| 		assert.Equal(t, "my.prefix.mymeasurement.valuetag.192_168_0_1.myfield 3.14 1289430000", data1) | ||||
| 		conn.Close() | ||||
| 		tcpServer.Close() | ||||
| 	}() | ||||
| } | ||||
| 
 | ||||
| func TCPServer2WithMultipleTemplates(t *testing.T, wg *sync.WaitGroup) { | ||||
| 	tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003") | ||||
| 	go func() { | ||||
| 		defer wg.Done() | ||||
| 		conn2, _ := (tcpServer).Accept() | ||||
| 		reader := bufio.NewReader(conn2) | ||||
| 		tp := textproto.NewReader(reader) | ||||
| 		data2, _ := tp.ReadLine() | ||||
| 		assert.Equal(t, "my.prefix.mymeasurement.valuetag.192_168_0_1 3.14 1289430000", data2) | ||||
| 		data3, _ := tp.ReadLine() | ||||
| 		assert.Equal(t, "my.prefix.192_168_0_1.my_measurement.valuetag 3.14 1289430000", data3) | ||||
| 		conn2.Close() | ||||
| 		tcpServer.Close() | ||||
| 	}() | ||||
| } | ||||
| 
 | ||||
| func TCPServer1WithTags(t *testing.T, wg *sync.WaitGroup) { | ||||
| 	tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003") | ||||
| 	go func() { | ||||
|  |  | |||
|  | @ -27,6 +27,7 @@ type Instrumental struct { | |||
| 	Prefix     string | ||||
| 	DataFormat string | ||||
| 	Template   string | ||||
| 	Templates  []string | ||||
| 	Timeout    internal.Duration | ||||
| 	Debug      bool | ||||
| 
 | ||||
|  | @ -85,7 +86,7 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error { | |||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	s, err := serializers.NewGraphiteSerializer(i.Prefix, i.Template, false) | ||||
| 	s, err := serializers.NewGraphiteSerializer(i.Prefix, i.Template, false, i.Templates) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  |  | |||
|  | @ -22,6 +22,16 @@ method is used, otherwise the [Template Pattern](templates) is used. | |||
|   prefix = "telegraf" | ||||
|   ## Graphite template pattern | ||||
|   template = "host.tags.measurement.field" | ||||
|    | ||||
|   ## Graphite templates patterns | ||||
|   ## 1. Template for cpu | ||||
|   ## 2. Template for disk* | ||||
|   ## 3. Default template | ||||
|   # templates = [ | ||||
|   #  "cpu tags.measurement.host.field", | ||||
|   #  "disk* measurement.field", | ||||
|   #  "host.measurement.tags.field" | ||||
|   #] | ||||
| 
 | ||||
|   ## Support Graphite tags, recommended to enable when using Graphite 1.1 or later. | ||||
|   # graphite_tag_support = false | ||||
|  |  | |||
|  | @ -10,6 +10,7 @@ import ( | |||
| 	"strings" | ||||
| 
 | ||||
| 	"github.com/influxdata/telegraf" | ||||
| 	"github.com/influxdata/telegraf/filter" | ||||
| ) | ||||
| 
 | ||||
| const DEFAULT_TEMPLATE = "host.tags.measurement.field" | ||||
|  | @ -29,10 +30,16 @@ var ( | |||
| 	fieldDeleter = strings.NewReplacer(".FIELDNAME", "", "FIELDNAME.", "") | ||||
| ) | ||||
| 
 | ||||
| type GraphiteTemplate struct { | ||||
| 	Filter filter.Filter | ||||
| 	Value  string | ||||
| } | ||||
| 
 | ||||
| type GraphiteSerializer struct { | ||||
| 	Prefix     string | ||||
| 	Template   string | ||||
| 	TagSupport bool | ||||
| 	Templates  []*GraphiteTemplate | ||||
| } | ||||
| 
 | ||||
| func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]byte, error) { | ||||
|  | @ -59,7 +66,15 @@ func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]byte, error) { | |||
| 			out = append(out, point...) | ||||
| 		} | ||||
| 	default: | ||||
| 		bucket := SerializeBucketName(metric.Name(), metric.Tags(), s.Template, s.Prefix) | ||||
| 		template := s.Template | ||||
| 		for _, graphiteTemplate := range s.Templates { | ||||
| 			if graphiteTemplate.Filter.Match(metric.Name()) { | ||||
| 				template = graphiteTemplate.Value | ||||
| 				break | ||||
| 			} | ||||
| 		} | ||||
| 
 | ||||
| 		bucket := SerializeBucketName(metric.Name(), metric.Tags(), template, s.Prefix) | ||||
| 		if bucket == "" { | ||||
| 			return out, nil | ||||
| 		} | ||||
|  | @ -185,6 +200,45 @@ func SerializeBucketName( | |||
| 	return prefix + "." + strings.Join(out, ".") | ||||
| } | ||||
| 
 | ||||
| func InitGraphiteTemplates(templates []string) ([]*GraphiteTemplate, string, error) { | ||||
| 	var graphiteTemplates []*GraphiteTemplate | ||||
| 	defaultTemplate := "" | ||||
| 
 | ||||
| 	for i, t := range templates { | ||||
| 		parts := strings.Fields(t) | ||||
| 
 | ||||
| 		if len(parts) == 0 { | ||||
| 			return nil, "", fmt.Errorf("missing template at position: %d", i) | ||||
| 		} | ||||
| 		if len(parts) == 1 { | ||||
| 			if parts[0] == "" { | ||||
| 				return nil, "", fmt.Errorf("missing template at position: %d", i) | ||||
| 			} else { | ||||
| 				// Override default template
 | ||||
| 				defaultTemplate = t | ||||
| 				continue | ||||
| 			} | ||||
| 		} | ||||
| 
 | ||||
| 		if len(parts) > 2 { | ||||
| 			return nil, "", fmt.Errorf("invalid template format: '%s'", t) | ||||
| 		} | ||||
| 
 | ||||
| 		tFilter, err := filter.Compile([]string{parts[0]}) | ||||
| 
 | ||||
| 		if err != nil { | ||||
| 			return nil, "", err | ||||
| 		} | ||||
| 
 | ||||
| 		graphiteTemplates = append(graphiteTemplates, &GraphiteTemplate{ | ||||
| 			Filter: tFilter, | ||||
| 			Value:  parts[1], | ||||
| 		}) | ||||
| 	} | ||||
| 
 | ||||
| 	return graphiteTemplates, defaultTemplate, nil | ||||
| } | ||||
| 
 | ||||
| // SerializeBucketNameWithTags will take the given measurement name and tags and
 | ||||
| // produce a graphite bucket. It will use the Graphite11Serializer.
 | ||||
| // http://graphite.readthedocs.io/en/latest/tags.html
 | ||||
|  |  | |||
|  | @ -144,6 +144,97 @@ func TestSerializeMetricHost(t *testing.T) { | |||
| 	assert.Equal(t, expS, mS) | ||||
| } | ||||
| 
 | ||||
| func TestSerializeMetricHostWithMultipleTemplates(t *testing.T) { | ||||
| 	now := time.Now() | ||||
| 	tags := map[string]string{ | ||||
| 		"host":       "localhost", | ||||
| 		"cpu":        "cpu0", | ||||
| 		"datacenter": "us-west-2", | ||||
| 	} | ||||
| 	fields := map[string]interface{}{ | ||||
| 		"usage_idle": float64(91.5), | ||||
| 		"usage_busy": float64(8.5), | ||||
| 	} | ||||
| 	m1, err := metric.New("cpu", tags, fields, now) | ||||
| 	m2, err := metric.New("new_cpu", tags, fields, now) | ||||
| 	assert.NoError(t, err) | ||||
| 
 | ||||
| 	templates, defaultTemplate, err := InitGraphiteTemplates([]string{ | ||||
| 		"cp* tags.measurement.host.field", | ||||
| 		"new_cpu tags.host.measurement.field", | ||||
| 	}) | ||||
| 	assert.NoError(t, err) | ||||
| 	assert.Equal(t, defaultTemplate, "") | ||||
| 
 | ||||
| 	s := GraphiteSerializer{ | ||||
| 		Templates: templates, | ||||
| 	} | ||||
| 
 | ||||
| 	buf, _ := s.Serialize(m1) | ||||
| 	buf2, _ := s.Serialize(m2) | ||||
| 
 | ||||
| 	buf = append(buf, buf2...) | ||||
| 
 | ||||
| 	mS := strings.Split(strings.TrimSpace(string(buf)), "\n") | ||||
| 	assert.NoError(t, err) | ||||
| 
 | ||||
| 	expS := []string{ | ||||
| 		fmt.Sprintf("cpu0.us-west-2.cpu.localhost.usage_idle 91.5 %d", now.Unix()), | ||||
| 		fmt.Sprintf("cpu0.us-west-2.cpu.localhost.usage_busy 8.5 %d", now.Unix()), | ||||
| 		fmt.Sprintf("cpu0.us-west-2.localhost.new_cpu.usage_idle 91.5 %d", now.Unix()), | ||||
| 		fmt.Sprintf("cpu0.us-west-2.localhost.new_cpu.usage_busy 8.5 %d", now.Unix()), | ||||
| 	} | ||||
| 	sort.Strings(mS) | ||||
| 	sort.Strings(expS) | ||||
| 	assert.Equal(t, expS, mS) | ||||
| } | ||||
| 
 | ||||
| func TestSerializeMetricHostWithMultipleTemplatesWithDefault(t *testing.T) { | ||||
| 	now := time.Now() | ||||
| 	tags := map[string]string{ | ||||
| 		"host":       "localhost", | ||||
| 		"cpu":        "cpu0", | ||||
| 		"datacenter": "us-west-2", | ||||
| 	} | ||||
| 	fields := map[string]interface{}{ | ||||
| 		"usage_idle": float64(91.5), | ||||
| 		"usage_busy": float64(8.5), | ||||
| 	} | ||||
| 	m1, err := metric.New("cpu", tags, fields, now) | ||||
| 	m2, err := metric.New("new_cpu", tags, fields, now) | ||||
| 	assert.NoError(t, err) | ||||
| 
 | ||||
| 	templates, defaultTemplate, err := InitGraphiteTemplates([]string{ | ||||
| 		"cp* tags.measurement.host.field", | ||||
| 		"tags.host.measurement.field", | ||||
| 	}) | ||||
| 	assert.NoError(t, err) | ||||
| 	assert.Equal(t, defaultTemplate, "tags.host.measurement.field") | ||||
| 
 | ||||
| 	s := GraphiteSerializer{ | ||||
| 		Templates: templates, | ||||
| 		Template:  defaultTemplate, | ||||
| 	} | ||||
| 
 | ||||
| 	buf, _ := s.Serialize(m1) | ||||
| 	buf2, _ := s.Serialize(m2) | ||||
| 
 | ||||
| 	buf = append(buf, buf2...) | ||||
| 
 | ||||
| 	mS := strings.Split(strings.TrimSpace(string(buf)), "\n") | ||||
| 	assert.NoError(t, err) | ||||
| 
 | ||||
| 	expS := []string{ | ||||
| 		fmt.Sprintf("cpu0.us-west-2.cpu.localhost.usage_idle 91.5 %d", now.Unix()), | ||||
| 		fmt.Sprintf("cpu0.us-west-2.cpu.localhost.usage_busy 8.5 %d", now.Unix()), | ||||
| 		fmt.Sprintf("cpu0.us-west-2.localhost.new_cpu.usage_idle 91.5 %d", now.Unix()), | ||||
| 		fmt.Sprintf("cpu0.us-west-2.localhost.new_cpu.usage_busy 8.5 %d", now.Unix()), | ||||
| 	} | ||||
| 	sort.Strings(mS) | ||||
| 	sort.Strings(expS) | ||||
| 	assert.Equal(t, expS, mS) | ||||
| } | ||||
| 
 | ||||
| func TestSerializeMetricHostWithTagSupport(t *testing.T) { | ||||
| 	now := time.Now() | ||||
| 	tags := map[string]string{ | ||||
|  |  | |||
|  | @ -68,6 +68,9 @@ type Config struct { | |||
| 	// only supports Graphite
 | ||||
| 	Template string `toml:"template"` | ||||
| 
 | ||||
| 	// Templates same Template, but multiple
 | ||||
| 	Templates []string `toml:"templates"` | ||||
| 
 | ||||
| 	// Timestamp units to use for JSON formatted output
 | ||||
| 	TimestampUnits time.Duration `toml:"timestamp_units"` | ||||
| 
 | ||||
|  | @ -104,7 +107,7 @@ func NewSerializer(config *Config) (Serializer, error) { | |||
| 	case "influx": | ||||
| 		serializer, err = NewInfluxSerializerConfig(config) | ||||
| 	case "graphite": | ||||
| 		serializer, err = NewGraphiteSerializer(config.Prefix, config.Template, config.GraphiteTagSupport) | ||||
| 		serializer, err = NewGraphiteSerializer(config.Prefix, config.Template, config.GraphiteTagSupport, config.Templates) | ||||
| 	case "json": | ||||
| 		serializer, err = NewJsonSerializer(config.TimestampUnits) | ||||
| 	case "splunkmetric": | ||||
|  | @ -188,10 +191,21 @@ func NewInfluxSerializer() (Serializer, error) { | |||
| 	return influx.NewSerializer(), nil | ||||
| } | ||||
| 
 | ||||
| func NewGraphiteSerializer(prefix, template string, tag_support bool) (Serializer, error) { | ||||
| func NewGraphiteSerializer(prefix, template string, tag_support bool, templates []string) (Serializer, error) { | ||||
| 	graphiteTemplates, defaultTemplate, err := graphite.InitGraphiteTemplates(templates) | ||||
| 
 | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	if defaultTemplate != "" { | ||||
| 		template = defaultTemplate | ||||
| 	} | ||||
| 
 | ||||
| 	return &graphite.GraphiteSerializer{ | ||||
| 		Prefix:     prefix, | ||||
| 		Template:   template, | ||||
| 		TagSupport: tag_support, | ||||
| 		Templates:  graphiteTemplates, | ||||
| 	}, nil | ||||
| } | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue