diff --git a/CHANGELOG.md b/CHANGELOG.md index dc8958de9..74deab7f2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ of metrics collected and from how many plugins. - [#240](https://github.com/influxdb/telegraf/pull/240): procstat plugin, thanks @ranjib! - [#244](https://github.com/influxdb/telegraf/pull/244): netstat plugin, thanks @shirou! - [#262](https://github.com/influxdb/telegraf/pull/262): zookeeper plugin, thanks @jrxFive! +- [#237](https://github.com/influxdb/telegraf/pull/237): statsd service plugin, thanks @sparrc ### Bugfixes - [#228](https://github.com/influxdb/telegraf/pull/228): New version of package will replace old one. Thanks @ekini! diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 62f026fed..78bd28634 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -5,7 +5,7 @@ which can be found [on our website](http://influxdb.com/community/cla.html) ## Plugins -This section is for developers that want to create new collection plugins. +This section is for developers who want to create new collection plugins. Telegraf is entirely plugin driven. This interface allows for operators to pick and chose what is gathered as well as makes it easy for developers to create new ways of generating metrics. @@ -87,7 +87,7 @@ func Gather(acc plugins.Accumulator) error { } ``` -### Example +### Plugin Example ```go package simple @@ -123,9 +123,109 @@ func init() { } ``` +## Service Plugins + +This section is for developers who want to create new "service" collection +plugins. A service plugin differs from a regular plugin in that it operates +a background service while Telegraf is running. One example would be the `statsd` +plugin, which operates a statsd server. + +Service Plugins are substantially more complicated than a regular plugin, as they +will require threads and locks to verify data integrity. Service Plugins should +be avoided unless there is no way to create their behavior with a regular plugin. + +Their interface is quite similar to a regular plugin, with the addition of `Start()` +and `Stop()` methods. + +### Service Plugin Guidelines + +* Same as the `Plugin` guidelines, except that they must conform to the +`plugins.ServicePlugin` interface. + +### Service Plugin interface + +```go +type ServicePlugin interface { + SampleConfig() string + Description() string + Gather(Accumulator) error + Start() error + Stop() +} +``` + ## Outputs -TODO: this section will describe requirements for contributing an output +This section is for developers who want to create a new output sink. Outputs +are created in a similar manner as collection plugins, and their interface has +similar constructs. + +### Output Guidelines + +* An output must conform to the `outputs.Output` interface. +* Outputs should call `outputs.Add` in their `init` function to register themselves. +See below for a quick example. +* To be available within Telegraf itself, plugins must add themselves to the +`github.com/influxdb/telegraf/outputs/all/all.go` file. +* The `SampleConfig` function should return valid toml that describes how the +output can be configured. This is include in `telegraf -sample-config`. +* The `Description` function should say in one line what this output does. + +### Output interface + +```go +type Output interface { + Connect() error + Close() error + Description() string + SampleConfig() string + Write(client.BatchPoints) error +} +``` + +### Output Example + +```go +package simpleoutput + +// simpleoutput.go + +import "github.com/influxdb/telegraf/outputs" + +type Simple struct { + Ok bool +} + +func (s *Simple) Description() string { + return "a demo output" +} + +func (s *Simple) SampleConfig() string { + return "url = localhost" +} + +func (s *Simple) Connect() error { + // Make a connection to the URL here + return nil +} + +func (s *Simple) Close() error { + // Close connection to the URL here + return nil +} + +func (s *Simple) Write(bp client.BatchPoints) error { + for _, pt := range bp { + // write `pt` to the output sink here + } + return nil +} + +func init() { + outputs.Add("simpleoutput", func() outputs.Output { return &Simple{} }) +} + +``` ## Unit Tests diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index bcfa686ed..44de9ca98 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -102,8 +102,8 @@ }, { "ImportPath": "github.com/influxdb/influxdb", - "Comment": "v0.9.4-rc1-457-g883d32c", - "Rev": "883d32cfd06e8cf14e6d9fc75dbe7b7b92345623" + "Comment": "v0.9.4-rc1-478-g73a630d", + "Rev": "73a630dfa64003c27782a1b0a6b817e839c5c3ea" }, { "ImportPath": "github.com/lib/pq", diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/CHANGELOG.md b/Godeps/_workspace/src/github.com/influxdb/influxdb/CHANGELOG.md index 8003ead0d..f983c4b69 100644 --- a/Godeps/_workspace/src/github.com/influxdb/influxdb/CHANGELOG.md +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/CHANGELOG.md @@ -38,6 +38,11 @@ - [#4296](https://github.com/influxdb/influxdb/pull/4296): Reject line protocol ending with '-'. Fixes [#4272](https://github.com/influxdb/influxdb/issues/4272) - [#4333](https://github.com/influxdb/influxdb/pull/4333): Retry monitor storage creation and only on Leader. - [#4276](https://github.com/influxdb/influxdb/issues/4276): Walk DropSeriesStatement & check for empty sources +- [#4342](https://github.com/influxdb/influxdb/pull/4342): Fix mixing aggregates and math with non-aggregates. Thanks @kostya-sh. +- [#4349](https://github.com/influxdb/influxdb/issues/4349): If HH can't unmarshal a block, skip that block. +- [#4354](https://github.com/influxdb/influxdb/pull/4353): Fully lock node queues during hinted handoff. Fixes one cause of missing data on clusters. +- [#4357](https://github.com/influxdb/influxdb/issues/4357): Fix similar float values encoding overflow Thanks @dgryski! +- [#4344](https://github.com/influxdb/influxdb/issues/4344): Make client.Write default to client.precision if none is given. ## v0.9.4 [2015-09-14] diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/balancer.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/balancer.go index 25abbf6f1..d7cadd91f 100644 --- a/Godeps/_workspace/src/github.com/influxdb/influxdb/balancer.go +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/balancer.go @@ -72,7 +72,7 @@ func (b *nodeBalancer) Next() *meta.NodeInfo { } d := &up[b.p] - b.p += 1 + b.p++ return d } diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/client/influxdb.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/client/influxdb.go index 031db0bc5..60908e347 100644 --- a/Godeps/_workspace/src/github.com/influxdb/influxdb/client/influxdb.go +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/client/influxdb.go @@ -220,10 +220,16 @@ func (c *Client) Write(bp BatchPoints) (*Response, error) { if c.username != "" { req.SetBasicAuth(c.username, c.password) } + + precision := bp.Precision + if precision == "" { + precision = c.precision + } + params := req.URL.Query() params.Set("db", bp.Database) params.Set("rp", bp.RetentionPolicy) - params.Set("precision", bp.Precision) + params.Set("precision", precision) params.Set("consistency", bp.WriteConsistency) req.URL.RawQuery = params.Encode() diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/errors.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/errors.go index ff925d68c..8ae1cdec4 100644 --- a/Godeps/_workspace/src/github.com/influxdb/influxdb/errors.go +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/errors.go @@ -16,15 +16,19 @@ var ( ErrFieldTypeConflict = errors.New("field type conflict") ) +// ErrDatabaseNotFound indicates that a database operation failed on the +// specified database because the specified database does not exist. func ErrDatabaseNotFound(name string) error { return fmt.Errorf("database not found: %s", name) } +// ErrRetentionPolicyNotFound indicates that the named retention policy could +// not be found in the database. func ErrRetentionPolicyNotFound(name string) error { return fmt.Errorf("retention policy not found: %s", name) } -func ErrMeasurementNotFound(name string) error { return fmt.Errorf("measurement not found: %s", name) } +func errMeasurementNotFound(name string) error { return fmt.Errorf("measurement not found: %s", name) } -func Errorf(format string, a ...interface{}) (err error) { +func errorf(format string, a ...interface{}) (err error) { if _, file, line, ok := runtime.Caller(2); ok { a = append(a, file, line) err = fmt.Errorf(format+" (%s:%d)", a...) diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/influxql/ast.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/influxql/ast.go index 3987a505d..a803bf42b 100644 --- a/Godeps/_workspace/src/github.com/influxdb/influxdb/influxql/ast.go +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/influxql/ast.go @@ -1131,8 +1131,11 @@ func (s *SelectStatement) validSelectWithAggregate() error { calls := map[string]struct{}{} numAggregates := 0 for _, f := range s.Fields { - if c, ok := f.Expr.(*Call); ok { + fieldCalls := walkFunctionCalls(f.Expr) + for _, c := range fieldCalls { calls[c.Name] = struct{}{} + } + if len(fieldCalls) != 0 { numAggregates++ } } @@ -1166,8 +1169,7 @@ func (s *SelectStatement) validSelectWithAggregate() error { func (s *SelectStatement) validateAggregates(tr targetRequirement) error { for _, f := range s.Fields { - switch expr := f.Expr.(type) { - case *Call: + for _, expr := range walkFunctionCalls(f.Expr) { switch expr.Name { case "derivative", "non_negative_derivative": if err := s.validSelectWithAggregate(); err != nil { diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/influxql/parser_test.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/influxql/parser_test.go index 47f43e554..9293d765e 100644 --- a/Godeps/_workspace/src/github.com/influxdb/influxdb/influxql/parser_test.go +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/influxql/parser_test.go @@ -1486,6 +1486,7 @@ func TestParser_ParseStatement(t *testing.T) { {s: `SELECT field1 AS`, err: `found EOF, expected identifier at line 1, char 18`}, {s: `SELECT field1 FROM foo group by time(1s)`, err: `GROUP BY requires at least one aggregate function`}, {s: `SELECT count(value), value FROM foo`, err: `mixing aggregate and non-aggregate queries is not supported`}, + {s: `SELECT count(value)/10, value FROM foo`, err: `mixing aggregate and non-aggregate queries is not supported`}, {s: `SELECT count(value) FROM foo group by time(1s)`, err: `aggregate functions with GROUP BY time require a WHERE time clause`}, {s: `SELECT count(value) FROM foo group by time(1s) where host = 'hosta.influxdb.org'`, err: `aggregate functions with GROUP BY time require a WHERE time clause`}, {s: `SELECT count(value) FROM foo group by time`, err: `time() is a function and expects at least one argument`}, diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/nightly.sh b/Godeps/_workspace/src/github.com/influxdb/influxdb/nightly.sh index 992e8fb20..edd50d2a4 100644 --- a/Godeps/_workspace/src/github.com/influxdb/influxdb/nightly.sh +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/nightly.sh @@ -21,14 +21,20 @@ function send_failure_notification { --body "The nightly build has failed, version: $version" } -if [ $# -ne 4 ]; then - echo "$0 " +if [ $# -lt 4 ]; then + echo "$0 [RACE_ENABLED]" exit 1 fi SMTP=$1 USER=$2 PASSWORD=$3 TO=$4 +RACE_ENABLED=$5 + +if [ -n "$RACE_ENABLED" ]; then + race="-x" + echo "Race-detection build enabled." +fi REPO_DIR=`mktemp -d` echo "Using $REPO_DIR for all work..." @@ -41,7 +47,7 @@ git clone https://github.com/influxdb/influxdb.git cd $GOPATH/src/github.com/influxdb/influxdb VERSION="$MASTER_VERSION-nightly-`git log --pretty=format:'%h' -n 1`" -NIGHTLY_BUILD=true ./package.sh $VERSION +NIGHTLY_BUILD=true ./package.sh $race $VERSION if [ $? -ne 0 ]; then # Send notification e-mail. diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/package.sh b/Godeps/_workspace/src/github.com/influxdb/influxdb/package.sh index e147590bc..7a6bdd8ec 100644 --- a/Godeps/_workspace/src/github.com/influxdb/influxdb/package.sh +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/package.sh @@ -83,6 +83,7 @@ $0 [-h] [-p|-w] [-t ] [-r ] -r release candidate number, if any. Example: -r 7 -p just build packages + -x build with race-detection enabled -w build packages for current working directory imply -p -t @@ -264,7 +265,7 @@ do_build() { fi date=`date -u --iso-8601=seconds` - go install -a -ldflags="-X main.version=$version -X main.branch=$branch -X main.commit=$commit -X main.buildTime='$date'" ./... + go install $RACE -a -ldflags="-X main.version=$version -X main.branch=$branch -X main.commit=$commit -X main.buildTime='$date'" ./... if [ $? -ne 0 ]; then echo "Build failed, unable to create package -- aborting" cleanup_exit 1 @@ -357,6 +358,11 @@ do shift 2 ;; + -x) + RACE="-race" + shift + ;; + -w | --working-directory) PACKAGES_ONLY="PACKAGES_ONLY" WORKING_DIR="WORKING_DIR" @@ -482,19 +488,6 @@ if [ -z "$NIGHTLY_BUILD" -a -z "$PACKAGES_ONLY" ]; then fi fi -if [ $ARCH == "i386" ]; then - rpm_package=influxdb-${VERSION}-1.i686.rpm # RPM packages use 1 for default package release. - debian_package=influxdb_`full_version $VERSION $RC`_i686.deb - deb_args="-a i686" - rpm_args="setarch i686" -elif [ $ARCH == "arm" ]; then - rpm_package=influxdb-${VERSION}-1.armel.rpm - debian_package=influxdb_`full_version $VERSION $RC`_armel.deb -else - rpm_package=influxdb-${VERSION}-1.x86_64.rpm - debian_package=influxdb_`full_version $VERSION $RC`_amd64.deb -fi - COMMON_FPM_ARGS="\ --log error \ -C $TMP_WORK_DIR \ @@ -504,7 +497,7 @@ COMMON_FPM_ARGS="\ --maintainer $MAINTAINER \ --after-install $POST_INSTALL_PATH \ --after-remove $POST_UNINSTALL_PATH \ ---name influxdb \ +--name influxdb${RACE} \ --config-files $CONFIG_ROOT_DIR \ --config-files $LOGROTATE_DIR" @@ -518,7 +511,11 @@ if [ -n "$DEB_WANTED" ]; then fi if [ -n "$TAR_WANTED" ]; then - $FPM -s dir -t tar --prefix influxdb_`full_version $VERSION $RC`_${ARCH} -p influxdb_`full_version $VERSION $RC`_${ARCH}.tar.gz --description "$DESCRIPTION" $COMMON_FPM_ARGS --version `full_version $VERSION $RC ` . + if [ -n "$RACE" ]; then + # Tweak race prefix for tarball. + race="race_" + fi + $FPM -s dir -t tar --prefix influxdb_$race`full_version $VERSION $RC`_${ARCH} -p influxdb_$race`full_version $VERSION $RC`_${ARCH}.tar.gz --description "$DESCRIPTION" $COMMON_FPM_ARGS --version `full_version $VERSION $RC ` . if [ $? -ne 0 ]; then echo "Failed to create Tar package -- aborting." cleanup_exit 1 diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/services/graphite/parser.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/services/graphite/parser.go index 021382b27..374140fe5 100644 --- a/Godeps/_workspace/src/github.com/influxdb/influxdb/services/graphite/parser.go +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/services/graphite/parser.go @@ -157,7 +157,14 @@ func (p *Parser) ApplyTemplate(line string) (string, map[string]string) { } // decode the name and tags template := p.matcher.Match(fields[0]) - return template.Apply(fields[0]) + name, tags := template.Apply(fields[0]) + // Set the default tags on the point if they are not already set + for k, v := range p.tags { + if _, ok := tags[k]; !ok { + tags[k] = v + } + } + return name, tags } // template represents a pattern and tags to map a graphite metric string to a influxdb Point diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/services/hh/processor.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/services/hh/processor.go index 769ddec95..1b3b5b323 100644 --- a/Godeps/_workspace/src/github.com/influxdb/influxdb/services/hh/processor.go +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/services/hh/processor.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "expvar" "fmt" + "io" "io/ioutil" "log" "os" @@ -18,9 +19,13 @@ import ( ) const ( - pointsHint = "points_hint" - pointsWrite = "points_write" - bytesWrite = "bytes_write" + pointsHint = "points_hint" + pointsWrite = "points_write" + bytesWrite = "bytes_write" + writeErr = "write_err" + unmarshalErr = "unmarshal_err" + advanceErr = "advance_err" + currentErr = "current_err" ) type Processor struct { @@ -98,10 +103,9 @@ func (p *Processor) loadQueues() error { return nil } +// addQueue adds a hinted-handoff queue for the given node. This function is not thread-safe +// and the caller must ensure this function is not called concurrently. func (p *Processor) addQueue(nodeID uint64) (*queue, error) { - p.mu.Lock() - defer p.mu.Unlock() - path := filepath.Join(p.dir, strconv.FormatUint(nodeID, 10)) if err := os.MkdirAll(path, 0700); err != nil { return nil, err @@ -123,11 +127,27 @@ func (p *Processor) addQueue(nodeID uint64) (*queue, error) { return queue, nil } +// WriteShard writes hinted-handoff data for the given shard and node. Since it may manipulate +// hinted-handoff queues, and be called concurrently, it takes a lock during queue access. func (p *Processor) WriteShard(shardID, ownerID uint64, points []models.Point) error { + p.mu.RLock() queue, ok := p.queues[ownerID] + p.mu.RUnlock() if !ok { - var err error - if queue, err = p.addQueue(ownerID); err != nil { + if err := func() error { + // Check again under write-lock. + p.mu.Lock() + defer p.mu.Unlock() + + queue, ok = p.queues[ownerID] + if !ok { + var err error + if queue, err = p.addQueue(ownerID); err != nil { + return err + } + } + return nil + }(); err != nil { return err } } @@ -162,6 +182,9 @@ func (p *Processor) Process() error { // Get the current block from the queue buf, err := q.Current() if err != nil { + if err != io.EOF { + p.nodeStatMaps[nodeID].Add(currentErr, 1) + } res <- nil break } @@ -169,15 +192,20 @@ func (p *Processor) Process() error { // unmarshal the byte slice back to shard ID and points shardID, points, err := p.unmarshalWrite(buf) if err != nil { + p.nodeStatMaps[nodeID].Add(unmarshalErr, 1) p.Logger.Printf("unmarshal write failed: %v", err) if err := q.Advance(); err != nil { + p.nodeStatMaps[nodeID].Add(advanceErr, 1) res <- err } - return + + // Skip and try the next block. + continue } // Try to send the write to the node if err := p.writer.WriteShard(shardID, nodeID, points); err != nil && tsdb.IsRetryable(err) { + p.nodeStatMaps[nodeID].Add(writeErr, 1) p.Logger.Printf("remote write failed: %v", err) res <- nil break @@ -187,6 +215,7 @@ func (p *Processor) Process() error { // If we get here, the write succeeded so advance the queue to the next item if err := q.Advance(); err != nil { + p.nodeStatMaps[nodeID].Add(advanceErr, 1) res <- err return } diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine/tsm1/encoding_test.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine/tsm1/encoding_test.go index 309b947eb..3a7ff4b43 100644 --- a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine/tsm1/encoding_test.go +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine/tsm1/encoding_test.go @@ -49,6 +49,26 @@ func TestEncoding_FloatBlock_ZeroTime(t *testing.T) { } } +func TestEncoding_FloatBlock_SimilarFloats(t *testing.T) { + values := make(tsm1.Values, 5) + values[0] = tsm1.NewValue(time.Unix(0, 1444238178437870000), 6.00065e+06) + values[1] = tsm1.NewValue(time.Unix(0, 1444238185286830000), 6.000656e+06) + values[2] = tsm1.NewValue(time.Unix(0, 1444238188441501000), 6.000657e+06) + values[3] = tsm1.NewValue(time.Unix(0, 1444238195286811000), 6.000659e+06) + values[4] = tsm1.NewValue(time.Unix(0, 1444238198439917000), 6.000661e+06) + + b, err := values.Encode(nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + decodedValues := values.DecodeSameTypeBlock(b) + + if !reflect.DeepEqual(decodedValues, values) { + t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values) + } +} + func TestEncoding_IntBlock_Basic(t *testing.T) { valueCount := 1000 times := getTimes(valueCount, 60, time.Second) diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine/tsm1/float.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine/tsm1/float.go index 8961c70f4..6be42352f 100644 --- a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine/tsm1/float.go +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine/tsm1/float.go @@ -56,7 +56,7 @@ func (s *FloatEncoder) Bytes() []byte { func (s *FloatEncoder) Finish() { if !s.finished { - // // write an end-of-stream record + // write an end-of-stream record s.Push(math.NaN()) s.bw.Flush(bitstream.Zero) s.finished = true @@ -82,6 +82,12 @@ func (s *FloatEncoder) Push(v float64) { leading := bits.Clz(vDelta) trailing := bits.Ctz(vDelta) + // Clamp number of leading zeros to avoid overflow when encoding + leading &= 0x1F + if leading >= 32 { + leading = 31 + } + // TODO(dgryski): check if it's 'cheaper' to reset the leading/trailing bits instead if s.leading != ^uint64(0) && leading >= s.leading && trailing >= s.trailing { s.bw.WriteBit(bitstream.Zero) @@ -92,6 +98,11 @@ func (s *FloatEncoder) Push(v float64) { s.bw.WriteBit(bitstream.One) s.bw.WriteBits(leading, 5) + // Note that if leading == trailing == 0, then sigbits == 64. But that + // value doesn't actually fit into the 6 bits we have. + // Luckily, we never need to encode 0 significant bits, since that would + // put us in the other case (vdelta == 0). So instead we write out a 0 and + // adjust it back to 64 on unpacking. sigbits := 64 - leading - trailing s.bw.WriteBits(sigbits, 6) s.bw.WriteBits(vDelta>>trailing, int(sigbits)) @@ -178,6 +189,10 @@ func (it *FloatDecoder) Next() bool { return false } mbits := bits + // 0 significant bits here means we overflowed and we actually need 64; see comment in encoder + if mbits == 0 { + mbits = 64 + } it.trailing = 64 - it.leading - mbits } diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine/tsm1/float_test.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine/tsm1/float_test.go index 794d62e5b..fc9ebfd35 100644 --- a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine/tsm1/float_test.go +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine/tsm1/float_test.go @@ -7,7 +7,6 @@ import ( ) func TestFloatEncoder_Simple(t *testing.T) { - // Example from the paper s := tsm1.NewFloatEncoder() @@ -67,6 +66,49 @@ func TestFloatEncoder_Simple(t *testing.T) { } } +func TestFloatEncoder_SimilarFloats(t *testing.T) { + s := tsm1.NewFloatEncoder() + want := []float64{ + 6.00065e+06, + 6.000656e+06, + 6.000657e+06, + + 6.000659e+06, + 6.000661e+06, + } + + for _, v := range want { + s.Push(v) + } + + s.Finish() + + b := s.Bytes() + + it, err := tsm1.NewFloatDecoder(b) + if err != nil { + t.Fatalf("unexpected error creating float decoder: %v", err) + } + + for _, w := range want { + if !it.Next() { + t.Fatalf("Next()=false, want true") + } + vv := it.Values() + if w != vv { + t.Errorf("Values()=(%v), want (%v)\n", vv, w) + } + } + + if it.Next() { + t.Fatalf("Next()=true, want false") + } + + if err := it.Error(); err != nil { + t.Errorf("it.Error()=%v, want nil", err) + } +} + var TwoHoursData = []struct { v float64 }{ diff --git a/README.md b/README.md index b234c5b91..aab01ba2f 100644 --- a/README.md +++ b/README.md @@ -194,6 +194,12 @@ Telegraf currently has support for collecting metrics from * disk * swap +## Service Plugins + +Telegraf can collect metrics via the following services + +* statsd + We'll be adding support for many more over the coming months. Read on if you want to add support for another service or third-party API. diff --git a/accumulator.go b/accumulator.go index df510afdf..f7fb7c1e4 100644 --- a/accumulator.go +++ b/accumulator.go @@ -27,8 +27,8 @@ type BatchPoints struct { // deepcopy returns a deep copy of the BatchPoints object. This is primarily so // we can do multithreaded output flushing (see Agent.flush) func (bp *BatchPoints) deepcopy() *BatchPoints { - bp.mu.Lock() - defer bp.mu.Unlock() + bp.Lock() + defer bp.Unlock() var bpc BatchPoints bpc.Time = bp.Time @@ -71,36 +71,9 @@ func (bp *BatchPoints) Add( val interface{}, tags map[string]string, ) { - bp.Lock() - defer bp.Unlock() - - measurement = bp.Prefix + measurement - - if bp.Config != nil { - if !bp.Config.ShouldPass(measurement, tags) { - return - } - } - - if bp.Debug { - var tg []string - - for k, v := range tags { - tg = append(tg, fmt.Sprintf("%s=\"%s\"", k, v)) - } - - sort.Strings(tg) - - fmt.Printf("> [%s] %s value=%v\n", strings.Join(tg, " "), measurement, val) - } - - bp.Points = append(bp.Points, client.Point{ - Measurement: measurement, - Tags: tags, - Fields: map[string]interface{}{ - "value": val, - }, - }) + fields := make(map[string]interface{}) + fields["value"] = val + bp.AddFields(measurement, fields, tags) } // AddFieldsWithTime adds a measurement with a provided timestamp @@ -169,6 +142,16 @@ func (bp *BatchPoints) AddFields( } } + // Apply BatchPoints tags to tags passed in, giving precedence to those + // passed in. This is so that plugins have the ability to override global + // tags. + for k, v := range bp.Tags { + _, ok := tags[k] + if !ok { + tags[k] = v + } + } + if bp.Debug { var tg []string diff --git a/agent.go b/agent.go index 3610430f7..1ae7e0f94 100644 --- a/agent.go +++ b/agent.go @@ -194,6 +194,7 @@ func (a *Agent) crankParallel() error { bp.Prefix = plugin.name + "_" bp.Config = plugin.config bp.Precision = a.Precision + bp.Tags = a.Config.Tags if err := plugin.plugin.Gather(&bp); err != nil { log.Printf("Error in plugin [%s]: %s", plugin.name, err) @@ -212,7 +213,6 @@ func (a *Agent) crankParallel() error { if a.UTC { bp.Time = bp.Time.UTC() } - bp.Tags = a.Config.Tags bp.Precision = a.Precision for sub := range points { @@ -265,13 +265,13 @@ func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) err bp.Prefix = plugin.name + "_" bp.Config = plugin.config bp.Precision = a.Precision + bp.Tags = a.Config.Tags if err := plugin.plugin.Gather(&bp); err != nil { log.Printf("Error in plugin [%s]: %s", plugin.name, err) outerr = errors.New("Error encountered processing plugins & outputs") } - bp.Tags = a.Config.Tags bp.Time = time.Now() if a.UTC { bp.Time = bp.Time.UTC() diff --git a/plugins/statsd/README.md b/plugins/statsd/README.md index 54544e1ab..6045a5d9f 100644 --- a/plugins/statsd/README.md +++ b/plugins/statsd/README.md @@ -1,59 +1,5 @@ # Telegraf Service Plugin: statsd -#### Plugin arguments: - -- **service_address** string: Address to listen for statsd UDP packets on -- **delete_gauges** boolean: Delete gauges on every collection interval -- **delete_counters** boolean: Delete counters on every collection interval -- **delete_sets** boolean: Delete set counters on every collection interval -- **allowed_pending_messages** integer: Number of messages allowed to queue up -on the UDP listener before the next flush. NOTE: gauge, counter, and set -measurements are aggregated as they arrive, so this is not a straight counter of -the number of total messages that the listener can handle between flushes. - -#### Statsd bucket -> InfluxDB Mapping - -By default, statsd buckets are converted to measurement names with the rules: -- "." -> "_" -- "-" -> "__" - -This plugin also accepts a list of config tables to describe a mapping of a statsd -bucket to an InfluxDB measurement name and tags. - -Each mapping must specify a match glob pattern. It can optionally take a name -for the measurement and a map of bucket indices to tag names. - -For example, the following configuration: - -``` - [[statsd.mappings]] - match = "users.current.*.*" - name = "current_users" - [statsd.mappings.tagmap] - unit = 0 - server = 2 - service = 3 - - [[statsd.mappings]] - match = "deploys.*.*" - name = "service_deploys" - [statsd.mappings.tagmap] - service_type = 1 - service_name = 2 -``` - -Will map statsd -> influx like so: -``` -users.current.den001.myapp:32|g -=> [server="den001" service="myapp" unit="users"] statsd_current_users_gauge value=32 - -deploys.test.myservice:1|c -=> [service_name="myservice" service_type="test"] statsd_service_deploys_counter value=1 - -random.jumping-sheep:10|c -=> [] statsd_random_jumping__sheep_counter value=10 -``` - #### Description The statsd plugin is a special type of plugin which runs a backgrounded statsd @@ -70,10 +16,129 @@ implementation. In short, the telegraf statsd listener will accept: - Counters - `deploys.test.myservice:1|c` <- increments by 1 - `deploys.test.myservice:101|c` <- increments by 101 - - `deploys.test.myservice:1|c|@0.1` <- sample rate, increments by 10 + - `deploys.test.myservice:1|c|@0.1` <- with sample rate, increments by 10 - Sets - `users.unique:101|s` - `users.unique:101|s` - `users.unique:102|s` <- would result in a count of 2 for `users.unique` -- Timings - - TODO +- Timings & Histograms + - `load.time:320|ms` + - `load.time.nanoseconds:1|h` + - `load.time:200|ms|@0.1` <- sampled 1/10 of the time + +#### Influx Statsd + +In order to take advantage of InfluxDB's tagging system, we have made a couple +additions to the standard statsd protocol. First, you can specify +tags in a manner similar to the line-protocol, like this: + +``` +users.current,service=payroll,region=us-west:32|g +``` + +COMING SOON: there will be a way to specify multiple fields. + + +#### Measurements: + +Meta: +- tags: `metric_type=` + +Outputted measurements will depend entirely on the measurements that the user +sends, but here is a brief rundown of what you can expect to find from each +metric type: + +- Gauges + - Gauges are a constant data type. They are not subject to averaging, and they + don’t change unless you change them. That is, once you set a gauge value, it + will be a flat line on the graph until you change it again. +- Counters + - Counters are the most basic type. They are treated as a count of a type of + event. They will continually increase unless you set `delete_counters=true`. +- Sets + - Sets count the number of unique values passed to a key. For example, you + could count the number of users accessing your system using `users:|s`. + No matter how many times the same user_id is sent, the count will only increase + by 1. +- Timings & Histograms + - Timers are meant to track how long something took. They are an invaluable + tool for tracking application performance. + - The following aggregate measurements are made for timers: + - `statsd__lower`: The lower bound is the lowest value statsd saw + for that stat during that interval. + - `statsd__upper`: The upper bound is the highest value statsd saw + for that stat during that interval. + - `statsd__mean`: The mean is the average of all values statsd saw + for that stat during that interval. + - `statsd__stddev`: The stddev is the sample standard deviation + of all values statsd saw for that stat during that interval. + - `statsd__count`: The count is the number of timings statsd saw + for that stat during that interval. It is not averaged. + - `statsd__percentile_

` The `Pth` percentile is a value x such + that `P%` of all the values statsd saw for that stat during that time + period are below x. The most common value that people use for `P` is the + `90`, this is a great number to try to optimize. + +#### Plugin arguments + +- **service_address** string: Address to listen for statsd UDP packets on +- **delete_gauges** boolean: Delete gauges on every collection interval +- **delete_counters** boolean: Delete counters on every collection interval +- **delete_sets** boolean: Delete set counters on every collection interval +- **delete_timings** boolean: Delete timings on every collection interval +- **percentiles** []int: Percentiles to calculate for timing & histogram stats +- **allowed_pending_messages** integer: Number of messages allowed to queue up +waiting to be processed. When this fills, messages will be dropped and logged. +- **percentile_limit** integer: Number of timing/histogram values to track +per-measurement in the calculation of percentiles. Raising this limit increases +the accuracy of percentiles but also increases the memory usage and cpu time. +- **templates** []string: Templates for transforming statsd buckets into influx +measurements and tags. + +#### Statsd bucket -> InfluxDB line-protocol Templates + +The plugin supports specifying templates for transforming statsd buckets into +InfluxDB measurement names and tags. The templates have a _measurement_ keyword, +which can be used to specify parts of the bucket that are to be used in the +measurement name. Other words in the template are used as tag names. For example, +the following template: + +``` +templates = [ + "measurement.measurement.region" +] +``` + +would result in the following transformation: + +``` +cpu.load.us-west:100|g +=> cpu_load,region=us-west 100 +``` + +Users can also filter the template to use based on the name of the bucket, +using glob matching, like so: + +``` +templates = [ + "cpu.* measurement.measurement.region", + "mem.* measurement.measurement.host" +] +``` + +which would result in the following transformation: + +``` +cpu.load.us-west:100|g +=> cpu_load,region=us-west 100 + +mem.cached.localhost:256|g +=> mem_cached,host=localhost 256 +``` + +There are many more options available, +[More details can be found here](https://github.com/influxdb/influxdb/tree/master/services/graphite#templates) diff --git a/plugins/statsd/running_stats.go b/plugins/statsd/running_stats.go new file mode 100644 index 000000000..fd467dfb6 --- /dev/null +++ b/plugins/statsd/running_stats.go @@ -0,0 +1,108 @@ +package statsd + +import ( + "math" + "math/rand" + "sort" +) + +const defaultPercentileLimit = 1000 + +// RunningStats calculates a running mean, variance, standard deviation, +// lower bound, upper bound, count, and can calculate estimated percentiles. +// It is based on the incremental algorithm described here: +// https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance +type RunningStats struct { + k float64 + n int64 + ex float64 + ex2 float64 + + // Array used to calculate estimated percentiles + // We will store a maximum of PercLimit values, at which point we will start + // randomly replacing old values, hence it is an estimated percentile. + perc []float64 + PercLimit int + + upper float64 + lower float64 + + // cache if we have sorted the list so that we never re-sort a sorted list, + // which can have very bad performance. + sorted bool +} + +func (rs *RunningStats) AddValue(v float64) { + // Whenever a value is added, the list is no longer sorted. + rs.sorted = false + + if rs.n == 0 { + rs.k = v + rs.upper = v + rs.lower = v + if rs.PercLimit == 0 { + rs.PercLimit = defaultPercentileLimit + } + rs.perc = make([]float64, 0, rs.PercLimit) + } + + // These are used for the running mean and variance + rs.n += 1 + rs.ex += v - rs.k + rs.ex2 += (v - rs.k) * (v - rs.k) + + // track upper and lower bounds + if v > rs.upper { + rs.upper = v + } else if v < rs.lower { + rs.lower = v + } + + if len(rs.perc) < rs.PercLimit { + rs.perc = append(rs.perc, v) + } else { + // Reached limit, choose random index to overwrite in the percentile array + rs.perc[rand.Intn(len(rs.perc))] = v + } +} + +func (rs *RunningStats) Mean() float64 { + return rs.k + rs.ex/float64(rs.n) +} + +func (rs *RunningStats) Variance() float64 { + return (rs.ex2 - (rs.ex*rs.ex)/float64(rs.n)) / float64(rs.n) +} + +func (rs *RunningStats) Stddev() float64 { + return math.Sqrt(rs.Variance()) +} + +func (rs *RunningStats) Upper() float64 { + return rs.upper +} + +func (rs *RunningStats) Lower() float64 { + return rs.lower +} + +func (rs *RunningStats) Count() int64 { + return rs.n +} + +func (rs *RunningStats) Percentile(n int) float64 { + if n > 100 { + n = 100 + } + + if !rs.sorted { + sort.Float64s(rs.perc) + rs.sorted = true + } + + i := int(float64(len(rs.perc)) * float64(n) / float64(100)) + if i < 0 { + i = 0 + } + return rs.perc[i] +} diff --git a/plugins/statsd/running_stats_test.go b/plugins/statsd/running_stats_test.go new file mode 100644 index 000000000..614de9cec --- /dev/null +++ b/plugins/statsd/running_stats_test.go @@ -0,0 +1,136 @@ +package statsd + +import ( + "math" + "testing" +) + +// Test that a single metric is handled correctly +func TestRunningStats_Single(t *testing.T) { + rs := RunningStats{} + values := []float64{10.1} + + for _, v := range values { + rs.AddValue(v) + } + + if rs.Mean() != 10.1 { + t.Errorf("Expected %v, got %v", 10.1, rs.Mean()) + } + if rs.Upper() != 10.1 { + t.Errorf("Expected %v, got %v", 10.1, rs.Upper()) + } + if rs.Lower() != 10.1 { + t.Errorf("Expected %v, got %v", 10.1, rs.Lower()) + } + if rs.Percentile(90) != 10.1 { + t.Errorf("Expected %v, got %v", 10.1, rs.Percentile(90)) + } + if rs.Percentile(50) != 10.1 { + t.Errorf("Expected %v, got %v", 10.1, rs.Percentile(50)) + } + if rs.Count() != 1 { + t.Errorf("Expected %v, got %v", 1, rs.Count()) + } + if rs.Variance() != 0 { + t.Errorf("Expected %v, got %v", 0, rs.Variance()) + } + if rs.Stddev() != 0 { + t.Errorf("Expected %v, got %v", 0, rs.Stddev()) + } +} + +// Test that duplicate values are handled correctly +func TestRunningStats_Duplicate(t *testing.T) { + rs := RunningStats{} + values := []float64{10.1, 10.1, 10.1, 10.1} + + for _, v := range values { + rs.AddValue(v) + } + + if rs.Mean() != 10.1 { + t.Errorf("Expected %v, got %v", 10.1, rs.Mean()) + } + if rs.Upper() != 10.1 { + t.Errorf("Expected %v, got %v", 10.1, rs.Upper()) + } + if rs.Lower() != 10.1 { + t.Errorf("Expected %v, got %v", 10.1, rs.Lower()) + } + if rs.Percentile(90) != 10.1 { + t.Errorf("Expected %v, got %v", 10.1, rs.Percentile(90)) + } + if rs.Percentile(50) != 10.1 { + t.Errorf("Expected %v, got %v", 10.1, rs.Percentile(50)) + } + if rs.Count() != 4 { + t.Errorf("Expected %v, got %v", 4, rs.Count()) + } + if rs.Variance() != 0 { + t.Errorf("Expected %v, got %v", 0, rs.Variance()) + } + if rs.Stddev() != 0 { + t.Errorf("Expected %v, got %v", 0, rs.Stddev()) + } +} + +// Test a list of sample values, returns all correct values +func TestRunningStats(t *testing.T) { + rs := RunningStats{} + values := []float64{10, 20, 10, 30, 20, 11, 12, 32, 45, 9, 5, 5, 5, 10, 23, 8} + + for _, v := range values { + rs.AddValue(v) + } + + if rs.Mean() != 15.9375 { + t.Errorf("Expected %v, got %v", 15.9375, rs.Mean()) + } + if rs.Upper() != 45 { + t.Errorf("Expected %v, got %v", 45, rs.Upper()) + } + if rs.Lower() != 5 { + t.Errorf("Expected %v, got %v", 5, rs.Lower()) + } + if rs.Percentile(90) != 32 { + t.Errorf("Expected %v, got %v", 32, rs.Percentile(90)) + } + if rs.Percentile(50) != 11 { + t.Errorf("Expected %v, got %v", 11, rs.Percentile(50)) + } + if rs.Count() != 16 { + t.Errorf("Expected %v, got %v", 4, rs.Count()) + } + if !fuzzyEqual(rs.Variance(), 124.93359, .00001) { + t.Errorf("Expected %v, got %v", 124.93359, rs.Variance()) + } + if !fuzzyEqual(rs.Stddev(), 11.17736, .00001) { + t.Errorf("Expected %v, got %v", 11.17736, rs.Stddev()) + } +} + +// Test that the percentile limit is respected. +func TestRunningStats_PercentileLimit(t *testing.T) { + rs := RunningStats{} + rs.PercLimit = 10 + values := []float64{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1} + + for _, v := range values { + rs.AddValue(v) + } + + if rs.Count() != 11 { + t.Errorf("Expected %v, got %v", 11, rs.Count()) + } + if len(rs.perc) != 10 { + t.Errorf("Expected %v, got %v", 10, len(rs.perc)) + } +} + +func fuzzyEqual(a, b, epsilon float64) bool { + if math.Abs(a-b) > epsilon { + return false + } + return true +} diff --git a/plugins/statsd/statsd.go b/plugins/statsd/statsd.go index 263785137..25fabf91a 100644 --- a/plugins/statsd/statsd.go +++ b/plugins/statsd/statsd.go @@ -26,21 +26,27 @@ type Statsd struct { // fills up, packets will get dropped until the next Gather interval is ran. AllowedPendingMessages int + // Percentiles specifies the percentiles that will be calculated for timing + // and histogram stats. + Percentiles []int + PercentileLimit int + DeleteGauges bool DeleteCounters bool DeleteSets bool + DeleteTimings bool sync.Mutex // Channel for all incoming statsd messages - in chan string - inmetrics chan metric - done chan struct{} + in chan string + done chan struct{} // Cache gauges, counters & sets so they can be aggregated as they arrive gauges map[string]cachedgauge counters map[string]cachedcounter sets map[string]cachedset + timings map[string]cachedtimings // bucket -> influx templates Templates []string @@ -52,10 +58,10 @@ func NewStatsd() *Statsd { // Make data structures s.done = make(chan struct{}) s.in = make(chan string, s.AllowedPendingMessages) - s.inmetrics = make(chan metric, s.AllowedPendingMessages) s.gauges = make(map[string]cachedgauge) s.counters = make(map[string]cachedcounter) s.sets = make(map[string]cachedset) + s.timings = make(map[string]cachedtimings) return &s } @@ -91,10 +97,10 @@ type cachedcounter struct { tags map[string]string } -type cachedtiming struct { - name string - timings []float64 - tags map[string]string +type cachedtimings struct { + name string + stats RunningStats + tags map[string]string } func (_ *Statsd) Description() string { @@ -104,16 +110,29 @@ func (_ *Statsd) Description() string { const sampleConfig = ` # Address and port to host UDP listener on service_address = ":8125" - # Delete gauges every interval + # Delete gauges every interval (default=false) delete_gauges = false - # Delete counters every interval + # Delete counters every interval (default=false) delete_counters = false - # Delete sets every interval + # Delete sets every interval (default=false) delete_sets = false + # Delete timings & histograms every interval (default=true) + delete_timings = true + # Percentiles to calculate for timing & histogram stats + percentiles = [90] - # Number of messages allowed to queue up, once filled, + # templates = [ + # "cpu.* measurement*" + # ] + + # Number of UDP messages allowed to queue up, once filled, # the statsd server will start dropping packets allowed_pending_messages = 10000 + + # Number of timing/histogram values to track per-measurement in the + # calculation of percentiles. Raising this limit increases the accuracy + # of percentiles but also increases the memory usage and cpu time. + percentile_limit = 1000 ` func (_ *Statsd) SampleConfig() string { @@ -124,35 +143,37 @@ func (s *Statsd) Gather(acc plugins.Accumulator) error { s.Lock() defer s.Unlock() - items := len(s.inmetrics) - for i := 0; i < items; i++ { - - m := <-s.inmetrics - - switch m.mtype { - case "c", "g", "s": - log.Println("ERROR: Uh oh, this should not have happened") - case "ms", "h": - // TODO + for _, metric := range s.timings { + acc.Add(metric.name+"_mean", metric.stats.Mean(), metric.tags) + acc.Add(metric.name+"_stddev", metric.stats.Stddev(), metric.tags) + acc.Add(metric.name+"_upper", metric.stats.Upper(), metric.tags) + acc.Add(metric.name+"_lower", metric.stats.Lower(), metric.tags) + acc.Add(metric.name+"_count", metric.stats.Count(), metric.tags) + for _, percentile := range s.Percentiles { + name := fmt.Sprintf("%s_percentile_%v", metric.name, percentile) + acc.Add(name, metric.stats.Percentile(percentile), metric.tags) } } + if s.DeleteTimings { + s.timings = make(map[string]cachedtimings) + } - for _, cmetric := range s.gauges { - acc.Add(cmetric.name, cmetric.value, cmetric.tags) + for _, metric := range s.gauges { + acc.Add(metric.name, metric.value, metric.tags) } if s.DeleteGauges { s.gauges = make(map[string]cachedgauge) } - for _, cmetric := range s.counters { - acc.Add(cmetric.name, cmetric.value, cmetric.tags) + for _, metric := range s.counters { + acc.Add(metric.name, metric.value, metric.tags) } if s.DeleteCounters { s.counters = make(map[string]cachedcounter) } - for _, cmetric := range s.sets { - acc.Add(cmetric.name, int64(len(cmetric.set)), cmetric.tags) + for _, metric := range s.sets { + acc.Add(metric.name, int64(len(metric.set)), metric.tags) } if s.DeleteSets { s.sets = make(map[string]cachedset) @@ -167,10 +188,10 @@ func (s *Statsd) Start() error { // Make data structures s.done = make(chan struct{}) s.in = make(chan string, s.AllowedPendingMessages) - s.inmetrics = make(chan metric, s.AllowedPendingMessages) s.gauges = make(map[string]cachedgauge) s.counters = make(map[string]cachedcounter) s.sets = make(map[string]cachedset) + s.timings = make(map[string]cachedtimings) // Start the UDP listener go s.udpListen() @@ -216,8 +237,7 @@ func (s *Statsd) udpListen() error { } // parser monitors the s.in channel, if there is a line ready, it parses the -// statsd string into a usable metric struct and either aggregates the value -// or pushes it into the s.inmetrics channel. +// statsd string into a usable metric struct and aggregates the value func (s *Statsd) parser() error { for { select { @@ -235,14 +255,15 @@ func (s *Statsd) parseStatsdLine(line string) error { s.Lock() defer s.Unlock() - // Validate splitting the line on "|" m := metric{} - parts1 := strings.Split(line, "|") - if len(parts1) < 2 { + + // Validate splitting the line on "|" + pipesplit := strings.Split(line, "|") + if len(pipesplit) < 2 { log.Printf("Error: splitting '|', Unable to parse metric: %s\n", line) return errors.New("Error Parsing statsd line") - } else if len(parts1) > 2 { - sr := parts1[2] + } else if len(pipesplit) > 2 { + sr := pipesplit[2] errmsg := "Error: parsing sample rate, %s, it must be in format like: " + "@0.1, @0.5, etc. Ignoring sample rate for line: %s\n" if strings.Contains(sr, "@") && len(sr) > 1 { @@ -250,6 +271,7 @@ func (s *Statsd) parseStatsdLine(line string) error { if err != nil { log.Printf(errmsg, err.Error(), line) } else { + // sample rate successfully parsed m.samplerate = samplerate } } else { @@ -258,24 +280,24 @@ func (s *Statsd) parseStatsdLine(line string) error { } // Validate metric type - switch parts1[1] { + switch pipesplit[1] { case "g", "c", "s", "ms", "h": - m.mtype = parts1[1] + m.mtype = pipesplit[1] default: - log.Printf("Error: Statsd Metric type %s unsupported", parts1[1]) + log.Printf("Error: Statsd Metric type %s unsupported", pipesplit[1]) return errors.New("Error Parsing statsd line") } // Validate splitting the rest of the line on ":" - parts2 := strings.Split(parts1[0], ":") - if len(parts2) != 2 { + colonsplit := strings.Split(pipesplit[0], ":") + if len(colonsplit) != 2 { log.Printf("Error: splitting ':', Unable to parse metric: %s\n", line) return errors.New("Error Parsing statsd line") } - m.bucket = parts2[0] + m.bucket = colonsplit[0] // Parse the value - if strings.ContainsAny(parts2[1], "-+") { + if strings.ContainsAny(colonsplit[1], "-+") { if m.mtype != "g" { log.Printf("Error: +- values are only supported for gauges: %s\n", line) return errors.New("Error Parsing statsd line") @@ -285,14 +307,14 @@ func (s *Statsd) parseStatsdLine(line string) error { switch m.mtype { case "g", "ms", "h": - v, err := strconv.ParseFloat(parts2[1], 64) + v, err := strconv.ParseFloat(colonsplit[1], 64) if err != nil { log.Printf("Error: parsing value to float64: %s\n", line) return errors.New("Error Parsing statsd line") } m.floatvalue = v case "c", "s": - v, err := strconv.ParseInt(parts2[1], 10, 64) + v, err := strconv.ParseInt(colonsplit[1], 10, 64) if err != nil { log.Printf("Error: parsing value to int64: %s\n", line) return errors.New("Error Parsing statsd line") @@ -304,8 +326,20 @@ func (s *Statsd) parseStatsdLine(line string) error { m.intvalue = v } - // Parse the name - m.name, m.tags = s.parseName(m) + // Parse the name & tags from bucket + m.name, m.tags = s.parseName(m.bucket) + switch m.mtype { + case "c": + m.tags["metric_type"] = "counter" + case "g": + m.tags["metric_type"] = "gauge" + case "s": + m.tags["metric_type"] = "set" + case "ms": + m.tags["metric_type"] = "timing" + case "h": + m.tags["metric_type"] = "histogram" + } // Make a unique key for the measurement name/tags var tg []string @@ -315,18 +349,7 @@ func (s *Statsd) parseStatsdLine(line string) error { sort.Strings(tg) m.hash = fmt.Sprintf("%s%s", strings.Join(tg, ""), m.name) - switch m.mtype { - // Aggregate gauges, counters and sets as we go - case "g", "c", "s": - s.aggregate(m) - // Timers get processed at flush time - default: - select { - case s.inmetrics <- m: - default: - log.Printf(dropwarn, line) - } - } + s.aggregate(m) return nil } @@ -334,42 +357,79 @@ func (s *Statsd) parseStatsdLine(line string) error { // config file. If there is a match, it will parse the name of the metric and // map of tags. // Return values are (, ) -func (s *Statsd) parseName(m metric) (string, map[string]string) { - name := m.bucket +func (s *Statsd) parseName(bucket string) (string, map[string]string) { tags := make(map[string]string) - o := graphite.Options{ - Separator: "_", - Templates: s.Templates, + bucketparts := strings.Split(bucket, ",") + // Parse out any tags in the bucket + if len(bucketparts) > 1 { + for _, btag := range bucketparts[1:] { + k, v := parseKeyValue(btag) + if k != "" { + tags[k] = v + } + } } + o := graphite.Options{ + Separator: "_", + Templates: s.Templates, + DefaultTags: tags, + } + + name := bucketparts[0] p, err := graphite.NewParserWithOptions(o) if err == nil { - name, tags = p.ApplyTemplate(m.bucket) + name, tags = p.ApplyTemplate(name) } name = strings.Replace(name, ".", "_", -1) name = strings.Replace(name, "-", "__", -1) - switch m.mtype { - case "c": - tags["metric_type"] = "counter" - case "g": - tags["metric_type"] = "gauge" - case "s": - tags["metric_type"] = "set" - case "ms", "h": - tags["metric_type"] = "timer" - } - return name, tags } -// aggregate takes in a metric of type "counter", "gauge", or "set". It then -// aggregates and caches the current value. It does not deal with the -// DeleteCounters, DeleteGauges or DeleteSets options, because those are dealt -// with in the Gather function. +// Parse the key,value out of a string that looks like "key=value" +func parseKeyValue(keyvalue string) (string, string) { + var key, val string + + split := strings.Split(keyvalue, "=") + // Must be exactly 2 to get anything meaningful out of them + if len(split) == 2 { + key = split[0] + val = split[1] + } else if len(split) == 1 { + val = split[0] + } + + return key, val +} + +// aggregate takes in a metric. It then +// aggregates and caches the current value(s). It does not deal with the +// Delete* options, because those are dealt with in the Gather function. func (s *Statsd) aggregate(m metric) { switch m.mtype { + case "ms", "h": + cached, ok := s.timings[m.hash] + if !ok { + cached = cachedtimings{ + name: m.name, + tags: m.tags, + stats: RunningStats{ + PercLimit: s.PercentileLimit, + }, + } + } + + if m.samplerate > 0 { + for i := 0; i < int(1.0/m.samplerate); i++ { + cached.stats.AddValue(m.floatvalue) + } + s.timings[m.hash] = cached + } else { + cached.stats.AddValue(m.floatvalue) + s.timings[m.hash] = cached + } case "c": cached, ok := s.counters[m.hash] if !ok { @@ -380,7 +440,6 @@ func (s *Statsd) aggregate(m metric) { } } else { cached.value += m.intvalue - cached.tags = m.tags s.counters[m.hash] = cached } case "g": @@ -397,7 +456,6 @@ func (s *Statsd) aggregate(m metric) { } else { cached.value = m.floatvalue } - cached.tags = m.tags s.gauges[m.hash] = cached } case "s": @@ -422,7 +480,6 @@ func (s *Statsd) Stop() { log.Println("Stopping the statsd service") close(s.done) close(s.in) - close(s.inmetrics) } func init() { diff --git a/plugins/statsd/statsd_test.go b/plugins/statsd/statsd_test.go index 9f47363d0..b7e1f2d93 100644 --- a/plugins/statsd/statsd_test.go +++ b/plugins/statsd/statsd_test.go @@ -121,25 +121,208 @@ func TestParse_DefaultNameParsing(t *testing.T) { } } -// Test that name mappings match and work -func TestParse_NameMap(t *testing.T) { +// Test that template name transformation works +func TestParse_Template(t *testing.T) { + s := NewStatsd() + s.Templates = []string{ + "measurement.measurement.host.service", + } + + lines := []string{ + "cpu.idle.localhost:1|c", + "cpu.busy.host01.myservice:11|c", + } + + for _, line := range lines { + err := s.parseStatsdLine(line) + if err != nil { + t.Errorf("Parsing line %s should not have resulted in an error\n", line) + } + } + + validations := []struct { + name string + value int64 + }{ + { + "cpu_idle", + 1, + }, + { + "cpu_busy", + 11, + }, + } + + // Validate counters + for _, test := range validations { + err := test_validate_counter(test.name, test.value, s.counters) + if err != nil { + t.Error(err.Error()) + } + } +} + +// Test that template filters properly +func TestParse_TemplateFilter(t *testing.T) { + s := NewStatsd() + s.Templates = []string{ + "cpu.idle.* measurement.measurement.host", + } + + lines := []string{ + "cpu.idle.localhost:1|c", + "cpu.busy.host01.myservice:11|c", + } + + for _, line := range lines { + err := s.parseStatsdLine(line) + if err != nil { + t.Errorf("Parsing line %s should not have resulted in an error\n", line) + } + } + + validations := []struct { + name string + value int64 + }{ + { + "cpu_idle", + 1, + }, + { + "cpu_busy_host01_myservice", + 11, + }, + } + + // Validate counters + for _, test := range validations { + err := test_validate_counter(test.name, test.value, s.counters) + if err != nil { + t.Error(err.Error()) + } + } +} + +// Test that most specific template is chosen +func TestParse_TemplateSpecificity(t *testing.T) { + s := NewStatsd() + s.Templates = []string{ + "cpu.* measurement.foo.host", + "cpu.idle.* measurement.measurement.host", + } + + lines := []string{ + "cpu.idle.localhost:1|c", + } + + for _, line := range lines { + err := s.parseStatsdLine(line) + if err != nil { + t.Errorf("Parsing line %s should not have resulted in an error\n", line) + } + } + + validations := []struct { + name string + value int64 + }{ + { + "cpu_idle", + 1, + }, + } + + // Validate counters + for _, test := range validations { + err := test_validate_counter(test.name, test.value, s.counters) + if err != nil { + t.Error(err.Error()) + } + } +} + +// Test that fields are parsed correctly +func TestParse_Fields(t *testing.T) { if false { t.Errorf("TODO") } } -// Test that name map tags are applied properly -func TestParse_NameMapTags(t *testing.T) { - if false { - t.Errorf("TODO") +// Test that tags within the bucket are parsed correctly +func TestParse_Tags(t *testing.T) { + s := NewStatsd() + + tests := []struct { + bucket string + name string + tags map[string]string + }{ + { + "cpu.idle,host=localhost", + "cpu_idle", + map[string]string{ + "host": "localhost", + }, + }, + { + "cpu.idle,host=localhost,region=west", + "cpu_idle", + map[string]string{ + "host": "localhost", + "region": "west", + }, + }, + { + "cpu.idle,host=localhost,color=red,region=west", + "cpu_idle", + map[string]string{ + "host": "localhost", + "region": "west", + "color": "red", + }, + }, + } + + for _, test := range tests { + name, tags := s.parseName(test.bucket) + if name != test.name { + t.Errorf("Expected: %s, got %s", test.name, name) + } + + for k, v := range test.tags { + actual, ok := tags[k] + if !ok { + t.Errorf("Expected key: %s not found", k) + } + if actual != v { + t.Errorf("Expected %s, got %s", v, actual) + } + } } } // Test that measurements with the same name, but different tags, are treated -// as different values in the statsd cache +// as different outputs func TestParse_MeasurementsWithSameName(t *testing.T) { - if false { - t.Errorf("TODO") + s := NewStatsd() + + // Test that counters work + valid_lines := []string{ + "test.counter,host=localhost:1|c", + "test.counter,host=localhost,region=west:1|c", + } + + for _, line := range valid_lines { + err := s.parseStatsdLine(line) + if err != nil { + t.Errorf("Parsing line %s should not have resulted in an error\n", line) + } + } + + if len(s.counters) != 2 { + t.Errorf("Expected 2 separate measurements, found %d", len(s.counters)) } } @@ -150,9 +333,8 @@ func TestParse_ValidLines(t *testing.T) { "valid:45|c", "valid:45|s", "valid:45|g", - // TODO(cam): timings - //"valid.timer:45|ms", - //"valid.timer:45|h", + "valid.timer:45|ms", + "valid.timer:45|h", } for _, line := range valid_lines { @@ -163,13 +345,6 @@ func TestParse_ValidLines(t *testing.T) { } } -// Test that floats are handled as expected for all metric types -func TestParse_Floats(t *testing.T) { - if false { - t.Errorf("TODO") - } -} - // Tests low-level functionality of gauges func TestParse_Gauges(t *testing.T) { s := NewStatsd() @@ -340,8 +515,86 @@ func TestParse_Counters(t *testing.T) { // Tests low-level functionality of timings func TestParse_Timings(t *testing.T) { - if false { - t.Errorf("TODO") + s := NewStatsd() + s.Percentiles = []int{90} + testacc := &testutil.Accumulator{} + + // Test that counters work + valid_lines := []string{ + "test.timing:1|ms", + "test.timing:1|ms", + "test.timing:1|ms", + "test.timing:1|ms", + "test.timing:1|ms", + } + + for _, line := range valid_lines { + err := s.parseStatsdLine(line) + if err != nil { + t.Errorf("Parsing line %s should not have resulted in an error\n", line) + } + } + + s.Gather(testacc) + + tests := []struct { + name string + value interface{} + }{ + { + "test_timing_mean", + float64(1), + }, + { + "test_timing_stddev", + float64(0), + }, + { + "test_timing_upper", + float64(1), + }, + { + "test_timing_lower", + float64(1), + }, + { + "test_timing_count", + int64(5), + }, + { + "test_timing_percentile_90", + float64(1), + }, + } + + for _, test := range tests { + if !testacc.CheckValue(test.name, test.value) { + t.Errorf("Did not find measurement %s with value %v", + test.name, test.value) + } + } +} + +func TestParse_Timings_Delete(t *testing.T) { + s := NewStatsd() + s.DeleteTimings = true + fakeacc := &testutil.Accumulator{} + var err error + + line := "timing:100|ms" + err = s.parseStatsdLine(line) + if err != nil { + t.Errorf("Parsing line %s should not have resulted in an error\n", line) + } + + if len(s.timings) != 1 { + t.Errorf("Should be 1 timing, found %d", len(s.timings)) + } + + s.Gather(fakeacc) + + if len(s.timings) != 0 { + t.Errorf("All timings should have been deleted, found %d", len(s.timings)) } } @@ -423,10 +676,21 @@ func TestParse_Counters_Delete(t *testing.T) { } } -// Integration test the listener starting up and receiving UDP packets -func TestListen(t *testing.T) { - if false { - t.Errorf("TODO") +func TestParseKeyValue(t *testing.T) { + k, v := parseKeyValue("foo=bar") + if k != "foo" { + t.Errorf("Expected %s, got %s", "foo", k) + } + if v != "bar" { + t.Errorf("Expected %s, got %s", "bar", v) + } + + k2, v2 := parseKeyValue("baz") + if k2 != "" { + t.Errorf("Expected %s, got %s", "", k2) + } + if v2 != "baz" { + t.Errorf("Expected %s, got %s", "baz", v2) } }