reverse graphite host

This commit is contained in:
Sebastian Brandt 2016-11-10 22:05:50 +01:00
parent 1c7715780e
commit fbb017e124
5 changed files with 95 additions and 65 deletions

View File

@ -15,9 +15,10 @@ import (
"github.com/influxdata/telegraf/plugins/serializers/graphite"
)
// Instrumental struct
type Instrumental struct {
Host string
ApiToken string
APIToken string
Prefix string
DataFormat string
Template string
@ -27,6 +28,7 @@ type Instrumental struct {
conn net.Conn
}
// constants
const (
DefaultHost = "collector.instrumentalapp.com"
HelloMessage = "hello version go/telegraf/1.1\n"
@ -34,6 +36,7 @@ const (
HandshakeFormat = HelloMessage + AuthFormat
)
// vars
var (
ValueIncludesBadChar = regexp.MustCompile("[^[:digit:].]")
MetricNameReplacer = regexp.MustCompile("[^-[:alnum:]_.]+")
@ -53,6 +56,7 @@ var sampleConfig = `
debug = false
`
// Connect func
func (i *Instrumental) Connect() error {
connection, err := net.DialTimeout("tcp", i.Host+":8000", i.Timeout.Duration)
@ -70,6 +74,7 @@ func (i *Instrumental) Connect() error {
return nil
}
// Close func
func (i *Instrumental) Close() error {
i.conn.Close()
i.conn = nil
@ -139,10 +144,10 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error {
time := splitStat[2]
// replace invalid components of metric name with underscore
clean_metric := MetricNameReplacer.ReplaceAllString(metric, "_")
cleanMetric := MetricNameReplacer.ReplaceAllString(metric, "_")
if !ValueIncludesBadChar.MatchString(value) {
points = append(points, fmt.Sprintf("%s %s %s %s", metricType, clean_metric, value, time))
points = append(points, fmt.Sprintf("%s %s %s %s", metricType, cleanMetric, value, time))
} else if i.Debug {
log.Printf("E! Instrumental unable to send bad stat: %s", stat)
}
@ -170,16 +175,18 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error {
return nil
}
// Description func
func (i *Instrumental) Description() string {
return "Configuration for sending metrics to an Instrumental project"
}
// SampleConfig func
func (i *Instrumental) SampleConfig() string {
return sampleConfig
}
func (i *Instrumental) authenticate(conn net.Conn) error {
_, err := fmt.Fprintf(conn, HandshakeFormat, i.ApiToken)
_, err := fmt.Fprintf(conn, HandshakeFormat, i.APIToken)
if err != nil {
return err
}
@ -202,7 +209,7 @@ func init() {
outputs.Add("instrumental", func() telegraf.Output {
return &Instrumental{
Host: DefaultHost,
Template: graphite.DEFAULT_TEMPLATE,
Template: graphite.DefaultTemplate,
}
})
}

View File

@ -21,7 +21,7 @@ func TestWrite(t *testing.T) {
i := Instrumental{
Host: "127.0.0.1",
ApiToken: "abc123token",
APIToken: "abc123token",
Prefix: "my.prefix",
}
@ -91,9 +91,9 @@ func TCPServer(t *testing.T, wg *sync.WaitGroup) {
conn.Write([]byte("ok\nok\n"))
data1, _ := tp.ReadLine()
assert.Equal(t, "gauge my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000", data1)
assert.Equal(t, "gauge my.prefix.1.0.168.192.mymeasurement.myfield 3.14 1289430000", data1)
data2, _ := tp.ReadLine()
assert.Equal(t, "gauge my.prefix.192_168_0_1.mymeasurement 3.14 1289430000", data2)
assert.Equal(t, "gauge my.prefix.1.0.168.192.mymeasurement 3.14 1289430000", data2)
conn, _ = tcpServer.Accept()
conn.SetDeadline(time.Now().Add(1 * time.Second))
@ -107,13 +107,13 @@ func TCPServer(t *testing.T, wg *sync.WaitGroup) {
conn.Write([]byte("ok\nok\n"))
data3, _ := tp.ReadLine()
assert.Equal(t, "increment my.prefix.192_168_0_1.my_histogram 3.14 1289430000", data3)
assert.Equal(t, "increment my.prefix.1.0.168.192.my_histogram 3.14 1289430000", data3)
data4, _ := tp.ReadLine()
assert.Equal(t, "increment my.prefix.192_168_0_1_8888_123.bad_metric_name 1 1289430000", data4)
assert.Equal(t, "increment my.prefix.1.0.168.192_8888_123.bad_metric_name 1 1289430000", data4)
data5, _ := tp.ReadLine()
assert.Equal(t, "increment my.prefix.192_168_0_1.my_counter 3.14 1289430000", data5)
assert.Equal(t, "increment my.prefix.1.0.168.192.my_counter 3.14 1289430000", data5)
data6, _ := tp.ReadLine()
assert.Equal(t, "", data6)

View File

@ -8,19 +8,22 @@ import (
"github.com/influxdata/telegraf"
)
const DEFAULT_TEMPLATE = "host.tags.measurement.field"
// DefaultTemplate const
const DefaultTemplate = "host.tags.measurement.field"
var (
fieldDeleter = strings.NewReplacer(".FIELDNAME", "", "FIELDNAME.", "")
sanitizedChars = strings.NewReplacer("/", "-", "@", "-", "*", "-", " ", "_", "..", ".", `\`, "", ")", "_", "(", "_")
)
type GraphiteSerializer struct {
// SerializerGraphite struct
type SerializerGraphite struct {
Prefix string
Template string
}
func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]string, error) {
// Serialize ([]string, error)
func (s *SerializerGraphite) Serialize(metric telegraf.Metric) ([]string, error) {
out := []string{}
// Convert UnixNano to Unix timestamps
@ -45,13 +48,13 @@ func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]string, error)
}
// SerializeBucketName will take the given measurement name and tags and
// produce a graphite bucket. It will use the GraphiteSerializer.Template
// to generate this, or DEFAULT_TEMPLATE.
// produce a graphite bucket. It will use the Serializer.Template
// to generate this, or DefaultTemplate.
//
// NOTE: SerializeBucketName replaces the "field" portion of the template with
// FIELDNAME. It is up to the user to replace this. This is so that
// SerializeBucketName can be called just once per measurement, rather than
// once per field. See GraphiteSerializer.InsertField() function.
// once per field. See Serializer.InsertField() function.
func SerializeBucketName(
measurement string,
tags map[string]string,
@ -59,7 +62,7 @@ func SerializeBucketName(
prefix string,
) string {
if template == "" {
template = DEFAULT_TEMPLATE
template = DefaultTemplate
}
tagsCopy := make(map[string]string)
for k, v := range tags {
@ -81,7 +84,14 @@ func SerializeBucketName(
default:
// This is a tag being applied
if tagvalue, ok := tagsCopy[templatePart]; ok {
if templatePart == "host" {
hostSplit := strings.Split(tagvalue, ".")
for i := len(hostSplit) - 1; i >= 0; i-- {
out = append(out, hostSplit[i])
}
} else {
out = append(out, strings.Replace(tagvalue, ".", "_", -1))
}
delete(tagsCopy, templatePart)
}
}
@ -123,14 +133,24 @@ func buildTags(tags map[string]string) string {
}
sort.Strings(keys)
var tag_str string
var tagStr string
var tagValue string
var reversedHost []string
for i, k := range keys {
tag_value := strings.Replace(tags[k], ".", "_", -1)
if i == 0 {
tag_str += tag_value
if k == "host" {
hostSplit := strings.Split(tags[k], ".")
for i := len(hostSplit) - 1; i >= 0; i-- {
reversedHost = append(reversedHost, hostSplit[i])
}
tagValue = strings.Join(reversedHost, ".")
} else {
tag_str += "." + tag_value
tagValue = strings.Replace(tags[k], ".", "_", -1)
}
if i == 0 {
tagStr += tagValue
} else {
tagStr += "." + tagValue
}
}
return tag_str
return tagStr
}

View File

@ -12,7 +12,7 @@ import (
)
var defaultTags = map[string]string{
"host": "localhost",
"host": "localhost.local",
"cpu": "cpu0",
"datacenter": "us-west-2",
}
@ -52,8 +52,8 @@ func TestGraphiteTags(t *testing.T) {
tags2 := buildTags(m2.Tags())
tags3 := buildTags(m3.Tags())
assert.Equal(t, "192_168_0_1", tags1)
assert.Equal(t, "first.second.192_168_0_1", tags2)
assert.Equal(t, "1.0.168.192", tags1)
assert.Equal(t, "first.second.1.0.168.192", tags2)
assert.Equal(t, "first.second", tags3)
}
@ -70,7 +70,7 @@ func TestSerializeMetricNoHost(t *testing.T) {
m, err := telegraf.NewMetric("cpu", tags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{}
s := SerializerGraphite{}
mS, err := s.Serialize(m)
assert.NoError(t, err)
@ -86,7 +86,7 @@ func TestSerializeMetricNoHost(t *testing.T) {
func TestSerializeMetricHost(t *testing.T) {
now := time.Now()
tags := map[string]string{
"host": "localhost",
"host": "localhost.local",
"cpu": "cpu0",
"datacenter": "us-west-2",
}
@ -97,13 +97,13 @@ func TestSerializeMetricHost(t *testing.T) {
m, err := telegraf.NewMetric("cpu", tags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{}
s := SerializerGraphite{}
mS, err := s.Serialize(m)
assert.NoError(t, err)
expS := []string{
fmt.Sprintf("localhost.cpu0.us-west-2.cpu.usage_idle 91.5 %d", now.Unix()),
fmt.Sprintf("localhost.cpu0.us-west-2.cpu.usage_busy 8.5 %d", now.Unix()),
fmt.Sprintf("local.localhost.cpu0.us-west-2.cpu.usage_idle 91.5 %d", now.Unix()),
fmt.Sprintf("local.localhost.cpu0.us-west-2.cpu.usage_busy 8.5 %d", now.Unix()),
}
sort.Strings(mS)
sort.Strings(expS)
@ -114,7 +114,7 @@ func TestSerializeMetricHost(t *testing.T) {
func TestSerializeValueField(t *testing.T) {
now := time.Now()
tags := map[string]string{
"host": "localhost",
"host": "localhost.local",
"cpu": "cpu0",
"datacenter": "us-west-2",
}
@ -124,12 +124,12 @@ func TestSerializeValueField(t *testing.T) {
m, err := telegraf.NewMetric("cpu", tags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{}
s := SerializerGraphite{}
mS, err := s.Serialize(m)
assert.NoError(t, err)
expS := []string{
fmt.Sprintf("localhost.cpu0.us-west-2.cpu 91.5 %d", now.Unix()),
fmt.Sprintf("local.localhost.cpu0.us-west-2.cpu 91.5 %d", now.Unix()),
}
assert.Equal(t, expS, mS)
}
@ -138,7 +138,7 @@ func TestSerializeValueField(t *testing.T) {
func TestSerializeValueField2(t *testing.T) {
now := time.Now()
tags := map[string]string{
"host": "localhost",
"host": "localhost.local",
"cpu": "cpu0",
"datacenter": "us-west-2",
}
@ -148,14 +148,14 @@ func TestSerializeValueField2(t *testing.T) {
m, err := telegraf.NewMetric("cpu", tags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{
s := SerializerGraphite{
Template: "host.field.tags.measurement",
}
mS, err := s.Serialize(m)
assert.NoError(t, err)
expS := []string{
fmt.Sprintf("localhost.cpu0.us-west-2.cpu 91.5 %d", now.Unix()),
fmt.Sprintf("local.localhost.cpu0.us-west-2.cpu 91.5 %d", now.Unix()),
}
assert.Equal(t, expS, mS)
}
@ -164,7 +164,7 @@ func TestSerializeValueField2(t *testing.T) {
func TestSerializeFieldWithSpaces(t *testing.T) {
now := time.Now()
tags := map[string]string{
"host": "localhost",
"host": "localhost.local",
"cpu": "cpu0",
"datacenter": "us-west-2",
}
@ -174,14 +174,14 @@ func TestSerializeFieldWithSpaces(t *testing.T) {
m, err := telegraf.NewMetric("cpu", tags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{
s := SerializerGraphite{
Template: "host.tags.measurement.field",
}
mS, err := s.Serialize(m)
assert.NoError(t, err)
expS := []string{
fmt.Sprintf("localhost.cpu0.us-west-2.cpu.field_with_spaces 91.5 %d", now.Unix()),
fmt.Sprintf("local.localhost.cpu0.us-west-2.cpu.field_with_spaces 91.5 %d", now.Unix()),
}
assert.Equal(t, expS, mS)
}
@ -190,7 +190,7 @@ func TestSerializeFieldWithSpaces(t *testing.T) {
func TestSerializeTagWithSpaces(t *testing.T) {
now := time.Now()
tags := map[string]string{
"host": "localhost",
"host": "localhost.local",
"cpu": `cpu\ 0`,
"datacenter": "us-west-2",
}
@ -200,14 +200,14 @@ func TestSerializeTagWithSpaces(t *testing.T) {
m, err := telegraf.NewMetric("cpu", tags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{
s := SerializerGraphite{
Template: "host.tags.measurement.field",
}
mS, err := s.Serialize(m)
assert.NoError(t, err)
expS := []string{
fmt.Sprintf("localhost.cpu_0.us-west-2.cpu.field_with_spaces 91.5 %d", now.Unix()),
fmt.Sprintf("local.localhost.cpu_0.us-west-2.cpu.field_with_spaces 91.5 %d", now.Unix()),
}
assert.Equal(t, expS, mS)
}
@ -216,7 +216,7 @@ func TestSerializeTagWithSpaces(t *testing.T) {
func TestSerializeValueField3(t *testing.T) {
now := time.Now()
tags := map[string]string{
"host": "localhost",
"host": "localhost.local",
"cpu": "cpu0",
"datacenter": "us-west-2",
}
@ -226,14 +226,14 @@ func TestSerializeValueField3(t *testing.T) {
m, err := telegraf.NewMetric("cpu", tags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{
s := SerializerGraphite{
Template: "field.host.tags.measurement",
}
mS, err := s.Serialize(m)
assert.NoError(t, err)
expS := []string{
fmt.Sprintf("localhost.cpu0.us-west-2.cpu 91.5 %d", now.Unix()),
fmt.Sprintf("local.localhost.cpu0.us-west-2.cpu 91.5 %d", now.Unix()),
}
assert.Equal(t, expS, mS)
}
@ -242,7 +242,7 @@ func TestSerializeValueField3(t *testing.T) {
func TestSerializeValueField5(t *testing.T) {
now := time.Now()
tags := map[string]string{
"host": "localhost",
"host": "localhost.local",
"cpu": "cpu0",
"datacenter": "us-west-2",
}
@ -252,14 +252,14 @@ func TestSerializeValueField5(t *testing.T) {
m, err := telegraf.NewMetric("cpu", tags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{
s := SerializerGraphite{
Template: template5,
}
mS, err := s.Serialize(m)
assert.NoError(t, err)
expS := []string{
fmt.Sprintf("localhost.us-west-2.cpu0.cpu 91.5 %d", now.Unix()),
fmt.Sprintf("local.localhost.us-west-2.cpu0.cpu 91.5 %d", now.Unix()),
}
assert.Equal(t, expS, mS)
}
@ -267,7 +267,7 @@ func TestSerializeValueField5(t *testing.T) {
func TestSerializeMetricPrefix(t *testing.T) {
now := time.Now()
tags := map[string]string{
"host": "localhost",
"host": "localhost.local",
"cpu": "cpu0",
"datacenter": "us-west-2",
}
@ -278,13 +278,13 @@ func TestSerializeMetricPrefix(t *testing.T) {
m, err := telegraf.NewMetric("cpu", tags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{Prefix: "prefix"}
s := SerializerGraphite{Prefix: "prefix"}
mS, err := s.Serialize(m)
assert.NoError(t, err)
expS := []string{
fmt.Sprintf("prefix.localhost.cpu0.us-west-2.cpu.usage_idle 91.5 %d", now.Unix()),
fmt.Sprintf("prefix.localhost.cpu0.us-west-2.cpu.usage_busy 8.5 %d", now.Unix()),
fmt.Sprintf("prefix.local.localhost.cpu0.us-west-2.cpu.usage_idle 91.5 %d", now.Unix()),
fmt.Sprintf("prefix.local.localhost.cpu0.us-west-2.cpu.usage_busy 8.5 %d", now.Unix()),
}
sort.Strings(mS)
sort.Strings(expS)
@ -319,7 +319,7 @@ func TestSerializeBucketNameHost(t *testing.T) {
mS := SerializeBucketName(m.Name(), m.Tags(), "", "")
expS := "localhost.cpu0.us-west-2.cpu.FIELDNAME"
expS := "local.localhost.cpu0.us-west-2.cpu.FIELDNAME"
assert.Equal(t, expS, mS)
}
@ -333,7 +333,7 @@ func TestSerializeBucketNamePrefix(t *testing.T) {
mS := SerializeBucketName(m.Name(), m.Tags(), "", "prefix")
expS := "prefix.localhost.cpu0.us-west-2.cpu.FIELDNAME"
expS := "prefix.local.localhost.cpu0.us-west-2.cpu.FIELDNAME"
assert.Equal(t, expS, mS)
}
@ -347,7 +347,7 @@ func TestTemplate1(t *testing.T) {
mS := SerializeBucketName(m.Name(), m.Tags(), template1, "")
expS := "cpu0.us-west-2.localhost.cpu.FIELDNAME"
expS := "cpu0.us-west-2.local.localhost.cpu.FIELDNAME"
assert.Equal(t, expS, mS)
}
@ -361,7 +361,7 @@ func TestTemplate2(t *testing.T) {
mS := SerializeBucketName(m.Name(), m.Tags(), template2, "")
expS := "localhost.cpu.FIELDNAME"
expS := "local.localhost.cpu.FIELDNAME"
assert.Equal(t, expS, mS)
}
@ -375,7 +375,7 @@ func TestTemplate3(t *testing.T) {
mS := SerializeBucketName(m.Name(), m.Tags(), template3, "")
expS := "localhost.cpu0.us-west-2.FIELDNAME"
expS := "local.localhost.cpu0.us-west-2.FIELDNAME"
assert.Equal(t, expS, mS)
}
@ -389,7 +389,7 @@ func TestTemplate4(t *testing.T) {
mS := SerializeBucketName(m.Name(), m.Tags(), template4, "")
expS := "localhost.cpu0.us-west-2.cpu"
expS := "local.localhost.cpu0.us-west-2.cpu"
assert.Equal(t, expS, mS)
}
@ -403,6 +403,6 @@ func TestTemplate6(t *testing.T) {
mS := SerializeBucketName(m.Name(), m.Tags(), template6, "")
expS := "localhost.cpu0.us-west-2.cpu.FIELDNAME"
expS := "local.localhost.cpu0.us-west-2.cpu.FIELDNAME"
assert.Equal(t, expS, mS)
}

View File

@ -46,21 +46,24 @@ func NewSerializer(config *Config) (Serializer, error) {
case "graphite":
serializer, err = NewGraphiteSerializer(config.Prefix, config.Template)
case "json":
serializer, err = NewJsonSerializer()
serializer, err = NewJSONSerializer()
}
return serializer, err
}
func NewJsonSerializer() (Serializer, error) {
// NewJSONSerializer func
func NewJSONSerializer() (Serializer, error) {
return &json.JsonSerializer{}, nil
}
// NewInfluxSerializer func
func NewInfluxSerializer() (Serializer, error) {
return &influx.InfluxSerializer{}, nil
}
// NewGraphiteSerializer func
func NewGraphiteSerializer(prefix, template string) (Serializer, error) {
return &graphite.GraphiteSerializer{
return &graphite.SerializerGraphite{
Prefix: prefix,
Template: template,
}, nil