From 9540a6532f14fa2cf03adbe107d74532232c4e22 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Thu, 6 Oct 2016 13:29:46 +0100 Subject: [PATCH] Update influxdb dependency for new models.Tags --- Godeps | 6 +++--- internal/config/config.go | 14 ++++++++++---- metric.go | 15 +++++++++++---- plugins/aggregators/minmax/minmax.go | 6 +++++- plugins/parsers/influx/parser.go | 10 ++++------ 5 files changed, 33 insertions(+), 18 deletions(-) diff --git a/Godeps b/Godeps index 76dc1673e..6dc0cec2d 100644 --- a/Godeps +++ b/Godeps @@ -19,7 +19,7 @@ github.com/eclipse/paho.mqtt.golang 0f7a459f04f13a41b7ed752d47944528d4bf9a86 github.com/go-sql-driver/mysql 1fca743146605a172a266e1654e01e5cd5669bee github.com/gobwas/glob 49571a1557cd20e6a2410adc6421f85b66c730b5 github.com/golang/protobuf 552c7b9542c194800fd493123b3798ef0a832032 -github.com/golang/snappy 427fb6fc07997f43afa32f35e850833760e489a7 +github.com/golang/snappy d9eb7a3d35ec988b8585d4a0068e462c27d28380 github.com/gonuts/go-shellquote e842a11b24c6abfb3dd27af69a17f482e4b483c2 github.com/gorilla/context 1ea25387ff6f684839d82767c1733ff4d4d15d0a github.com/gorilla/mux c9e326e2bdec29039a3761c07bece13133863e1e @@ -27,7 +27,7 @@ github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478 github.com/hashicorp/consul 5aa90455ce78d4d41578bafc86305e6e6b28d7d2 github.com/hpcloud/tail b2940955ab8b26e19d43a43c4da0475dd81bdb56 github.com/influxdata/config b79f6829346b8d6e78ba73544b1e1038f1f1c9da -github.com/influxdata/influxdb e094138084855d444195b252314dfee9eae34cab +github.com/influxdata/influxdb fc57c0f7c635df3873f3d64f0ed2100ddc94d5ae github.com/influxdata/toml af4df43894b16e3fd2b788d01bd27ad0776ef2d0 github.com/influxdata/wlog 7c63b0a71ef8300adc255344d275e10e5c3a71ec github.com/kardianos/osext 29ae4ffbc9a6fe9fb2bc5029050ce6996ea1d3bc @@ -56,7 +56,7 @@ github.com/wvanbergen/kafka 46f9a1cf3f670edec492029fadded9c2d9e18866 github.com/wvanbergen/kazoo-go 0f768712ae6f76454f987c3356177e138df258f8 github.com/yuin/gopher-lua bf3808abd44b1e55143a2d7f08571aaa80db1808 github.com/zensqlmonitor/go-mssqldb ffe5510c6fa5e15e6d983210ab501c815b56b363 -golang.org/x/crypto 5dc8cb4b8a8eb076cbb5a06bc3b8682c15bdbbd3 +golang.org/x/crypto c197bcf24cde29d3f73c7b4ac6fd41f4384e8af6 golang.org/x/net 6acef71eb69611914f7a30939ea9f6e194c78172 golang.org/x/text a71fd10341b064c10f4a81ceac72bcf70f26ea34 gopkg.in/dancannon/gorethink.v1 7d1af5be49cb5ecc7b177bf387d232050299d6ef diff --git a/internal/config/config.go b/internal/config/config.go index b6319c7fc..676f833d6 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -829,12 +829,15 @@ func (c *Config) addInput(name string, table *ast.Table) error { return nil } -// buildAggregator TODO doc +// buildAggregator parses Aggregator specific items from the ast.Table, +// builds the filter and returns a +// models.AggregatorConfig to be inserted into models.RunningAggregator func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, error) { unsupportedFields := []string{"tagexclude", "taginclude"} for _, field := range unsupportedFields { if _, ok := tbl.Fields[field]; ok { - // TODO raise error because field is not supported + return nil, fmt.Errorf("%s is not supported for aggregator plugins (%s).", + field, name) } } @@ -926,13 +929,16 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err return conf, nil } -// buildProcessor TODO doc +// buildProcessor parses Processor specific items from the ast.Table, +// builds the filter and returns a +// models.ProcessorConfig to be inserted into models.RunningProcessor func buildProcessor(name string, tbl *ast.Table) (*models.ProcessorConfig, error) { conf := &models.ProcessorConfig{Name: name} unsupportedFields := []string{"tagexclude", "taginclude", "fielddrop", "fieldpass"} for _, field := range unsupportedFields { if _, ok := tbl.Fields[field]; ok { - // TODO raise error because field is not supported + return nil, fmt.Errorf("%s is not supported for processor plugins (%s).", + field, name) } } diff --git a/metric.go b/metric.go index 9209de731..d21330ff6 100644 --- a/metric.go +++ b/metric.go @@ -65,6 +65,13 @@ type metric struct { isaggregate bool } +func NewMetricFromPoint(pt models.Point) Metric { + return &metric{ + pt: pt, + mType: Untyped, + } +} + // NewMetric returns an untyped metric. func NewMetric( name string, @@ -72,7 +79,7 @@ func NewMetric( fields map[string]interface{}, t time.Time, ) (Metric, error) { - pt, err := models.NewPoint(name, tags, fields, t) + pt, err := models.NewPoint(name, models.NewTags(tags), fields, t) if err != nil { return nil, err } @@ -91,7 +98,7 @@ func NewGaugeMetric( fields map[string]interface{}, t time.Time, ) (Metric, error) { - pt, err := models.NewPoint(name, tags, fields, t) + pt, err := models.NewPoint(name, models.NewTags(tags), fields, t) if err != nil { return nil, err } @@ -110,7 +117,7 @@ func NewCounterMetric( fields map[string]interface{}, t time.Time, ) (Metric, error) { - pt, err := models.NewPoint(name, tags, fields, t) + pt, err := models.NewPoint(name, models.NewTags(tags), fields, t) if err != nil { return nil, err } @@ -125,7 +132,7 @@ func (m *metric) Name() string { } func (m *metric) Tags() map[string]string { - return m.pt.Tags() + return m.pt.Tags().Map() } func (m *metric) Time() time.Time { diff --git a/plugins/aggregators/minmax/minmax.go b/plugins/aggregators/minmax/minmax.go index 0c88a31b4..a79d7bb01 100644 --- a/plugins/aggregators/minmax/minmax.go +++ b/plugins/aggregators/minmax/minmax.go @@ -24,8 +24,12 @@ type minmax struct { } var sampleConfig = ` - ## TODO doc + ## General Aggregator Arguments: + ## The period on which to flush & clear the aggregator. period = "30s" + ## If true, the original metric will be dropped by the + ## aggregator and will not get sent to the output plugins. + drop_original = false ` func (m *MinMax) SampleConfig() string { diff --git a/plugins/parsers/influx/parser.go b/plugins/parsers/influx/parser.go index 8ab783b0d..68b7497fe 100644 --- a/plugins/parsers/influx/parser.go +++ b/plugins/parsers/influx/parser.go @@ -26,17 +26,15 @@ func (p *InfluxParser) Parse(buf []byte) ([]telegraf.Metric, error) { points, err := models.ParsePoints(buf) metrics := make([]telegraf.Metric, len(points)) for i, point := range points { - tags := point.Tags() for k, v := range p.DefaultTags { - // Only set tags not in parsed metric - if _, ok := tags[k]; !ok { - tags[k] = v + // only set the default tag if it doesn't already exist: + if tmp := point.Tags().GetString(k); tmp == "" { + point.AddTag(k, v) } } // Ignore error here because it's impossible that a model.Point // wouldn't parse into client.Point properly - metrics[i], _ = telegraf.NewMetric(point.Name(), tags, - point.Fields(), point.Time()) + metrics[i] = telegraf.NewMetricFromPoint(point) } return metrics, err }