Statsd plugin, tags and timings

Closes #237
Closes #39
This commit is contained in:
Cameron Sparr 2015-10-07 16:11:52 -06:00
parent 52be516fa3
commit 6977119f1e
24 changed files with 1096 additions and 242 deletions

View File

@ -16,6 +16,7 @@ of metrics collected and from how many plugins.
- [#240](https://github.com/influxdb/telegraf/pull/240): procstat plugin, thanks @ranjib! - [#240](https://github.com/influxdb/telegraf/pull/240): procstat plugin, thanks @ranjib!
- [#244](https://github.com/influxdb/telegraf/pull/244): netstat plugin, thanks @shirou! - [#244](https://github.com/influxdb/telegraf/pull/244): netstat plugin, thanks @shirou!
- [#262](https://github.com/influxdb/telegraf/pull/262): zookeeper plugin, thanks @jrxFive! - [#262](https://github.com/influxdb/telegraf/pull/262): zookeeper plugin, thanks @jrxFive!
- [#237](https://github.com/influxdb/telegraf/pull/237): statsd service plugin, thanks @sparrc
### Bugfixes ### Bugfixes
- [#228](https://github.com/influxdb/telegraf/pull/228): New version of package will replace old one. Thanks @ekini! - [#228](https://github.com/influxdb/telegraf/pull/228): New version of package will replace old one. Thanks @ekini!

View File

@ -5,7 +5,7 @@ which can be found [on our website](http://influxdb.com/community/cla.html)
## Plugins ## Plugins
This section is for developers that want to create new collection plugins. This section is for developers who want to create new collection plugins.
Telegraf is entirely plugin driven. This interface allows for operators to Telegraf is entirely plugin driven. This interface allows for operators to
pick and chose what is gathered as well as makes it easy for developers pick and chose what is gathered as well as makes it easy for developers
to create new ways of generating metrics. to create new ways of generating metrics.
@ -87,7 +87,7 @@ func Gather(acc plugins.Accumulator) error {
} }
``` ```
### Example ### Plugin Example
```go ```go
package simple package simple
@ -123,9 +123,109 @@ func init() {
} }
``` ```
## Service Plugins
This section is for developers who want to create new "service" collection
plugins. A service plugin differs from a regular plugin in that it operates
a background service while Telegraf is running. One example would be the `statsd`
plugin, which operates a statsd server.
Service Plugins are substantially more complicated than a regular plugin, as they
will require threads and locks to verify data integrity. Service Plugins should
be avoided unless there is no way to create their behavior with a regular plugin.
Their interface is quite similar to a regular plugin, with the addition of `Start()`
and `Stop()` methods.
### Service Plugin Guidelines
* Same as the `Plugin` guidelines, except that they must conform to the
`plugins.ServicePlugin` interface.
### Service Plugin interface
```go
type ServicePlugin interface {
SampleConfig() string
Description() string
Gather(Accumulator) error
Start() error
Stop()
}
```
## Outputs ## Outputs
TODO: this section will describe requirements for contributing an output This section is for developers who want to create a new output sink. Outputs
are created in a similar manner as collection plugins, and their interface has
similar constructs.
### Output Guidelines
* An output must conform to the `outputs.Output` interface.
* Outputs should call `outputs.Add` in their `init` function to register themselves.
See below for a quick example.
* To be available within Telegraf itself, plugins must add themselves to the
`github.com/influxdb/telegraf/outputs/all/all.go` file.
* The `SampleConfig` function should return valid toml that describes how the
output can be configured. This is include in `telegraf -sample-config`.
* The `Description` function should say in one line what this output does.
### Output interface
```go
type Output interface {
Connect() error
Close() error
Description() string
SampleConfig() string
Write(client.BatchPoints) error
}
```
### Output Example
```go
package simpleoutput
// simpleoutput.go
import "github.com/influxdb/telegraf/outputs"
type Simple struct {
Ok bool
}
func (s *Simple) Description() string {
return "a demo output"
}
func (s *Simple) SampleConfig() string {
return "url = localhost"
}
func (s *Simple) Connect() error {
// Make a connection to the URL here
return nil
}
func (s *Simple) Close() error {
// Close connection to the URL here
return nil
}
func (s *Simple) Write(bp client.BatchPoints) error {
for _, pt := range bp {
// write `pt` to the output sink here
}
return nil
}
func init() {
outputs.Add("simpleoutput", func() outputs.Output { return &Simple{} })
}
```
## Unit Tests ## Unit Tests

4
Godeps/Godeps.json generated
View File

@ -102,8 +102,8 @@
}, },
{ {
"ImportPath": "github.com/influxdb/influxdb", "ImportPath": "github.com/influxdb/influxdb",
"Comment": "v0.9.4-rc1-457-g883d32c", "Comment": "v0.9.4-rc1-478-g73a630d",
"Rev": "883d32cfd06e8cf14e6d9fc75dbe7b7b92345623" "Rev": "73a630dfa64003c27782a1b0a6b817e839c5c3ea"
}, },
{ {
"ImportPath": "github.com/lib/pq", "ImportPath": "github.com/lib/pq",

View File

@ -38,6 +38,11 @@
- [#4296](https://github.com/influxdb/influxdb/pull/4296): Reject line protocol ending with '-'. Fixes [#4272](https://github.com/influxdb/influxdb/issues/4272) - [#4296](https://github.com/influxdb/influxdb/pull/4296): Reject line protocol ending with '-'. Fixes [#4272](https://github.com/influxdb/influxdb/issues/4272)
- [#4333](https://github.com/influxdb/influxdb/pull/4333): Retry monitor storage creation and only on Leader. - [#4333](https://github.com/influxdb/influxdb/pull/4333): Retry monitor storage creation and only on Leader.
- [#4276](https://github.com/influxdb/influxdb/issues/4276): Walk DropSeriesStatement & check for empty sources - [#4276](https://github.com/influxdb/influxdb/issues/4276): Walk DropSeriesStatement & check for empty sources
- [#4342](https://github.com/influxdb/influxdb/pull/4342): Fix mixing aggregates and math with non-aggregates. Thanks @kostya-sh.
- [#4349](https://github.com/influxdb/influxdb/issues/4349): If HH can't unmarshal a block, skip that block.
- [#4354](https://github.com/influxdb/influxdb/pull/4353): Fully lock node queues during hinted handoff. Fixes one cause of missing data on clusters.
- [#4357](https://github.com/influxdb/influxdb/issues/4357): Fix similar float values encoding overflow Thanks @dgryski!
- [#4344](https://github.com/influxdb/influxdb/issues/4344): Make client.Write default to client.precision if none is given.
## v0.9.4 [2015-09-14] ## v0.9.4 [2015-09-14]

View File

@ -72,7 +72,7 @@ func (b *nodeBalancer) Next() *meta.NodeInfo {
} }
d := &up[b.p] d := &up[b.p]
b.p += 1 b.p++
return d return d
} }

View File

@ -220,10 +220,16 @@ func (c *Client) Write(bp BatchPoints) (*Response, error) {
if c.username != "" { if c.username != "" {
req.SetBasicAuth(c.username, c.password) req.SetBasicAuth(c.username, c.password)
} }
precision := bp.Precision
if precision == "" {
precision = c.precision
}
params := req.URL.Query() params := req.URL.Query()
params.Set("db", bp.Database) params.Set("db", bp.Database)
params.Set("rp", bp.RetentionPolicy) params.Set("rp", bp.RetentionPolicy)
params.Set("precision", bp.Precision) params.Set("precision", precision)
params.Set("consistency", bp.WriteConsistency) params.Set("consistency", bp.WriteConsistency)
req.URL.RawQuery = params.Encode() req.URL.RawQuery = params.Encode()

View File

@ -16,15 +16,19 @@ var (
ErrFieldTypeConflict = errors.New("field type conflict") ErrFieldTypeConflict = errors.New("field type conflict")
) )
// ErrDatabaseNotFound indicates that a database operation failed on the
// specified database because the specified database does not exist.
func ErrDatabaseNotFound(name string) error { return fmt.Errorf("database not found: %s", name) } func ErrDatabaseNotFound(name string) error { return fmt.Errorf("database not found: %s", name) }
// ErrRetentionPolicyNotFound indicates that the named retention policy could
// not be found in the database.
func ErrRetentionPolicyNotFound(name string) error { func ErrRetentionPolicyNotFound(name string) error {
return fmt.Errorf("retention policy not found: %s", name) return fmt.Errorf("retention policy not found: %s", name)
} }
func ErrMeasurementNotFound(name string) error { return fmt.Errorf("measurement not found: %s", name) } func errMeasurementNotFound(name string) error { return fmt.Errorf("measurement not found: %s", name) }
func Errorf(format string, a ...interface{}) (err error) { func errorf(format string, a ...interface{}) (err error) {
if _, file, line, ok := runtime.Caller(2); ok { if _, file, line, ok := runtime.Caller(2); ok {
a = append(a, file, line) a = append(a, file, line)
err = fmt.Errorf(format+" (%s:%d)", a...) err = fmt.Errorf(format+" (%s:%d)", a...)

View File

@ -1131,8 +1131,11 @@ func (s *SelectStatement) validSelectWithAggregate() error {
calls := map[string]struct{}{} calls := map[string]struct{}{}
numAggregates := 0 numAggregates := 0
for _, f := range s.Fields { for _, f := range s.Fields {
if c, ok := f.Expr.(*Call); ok { fieldCalls := walkFunctionCalls(f.Expr)
for _, c := range fieldCalls {
calls[c.Name] = struct{}{} calls[c.Name] = struct{}{}
}
if len(fieldCalls) != 0 {
numAggregates++ numAggregates++
} }
} }
@ -1166,8 +1169,7 @@ func (s *SelectStatement) validSelectWithAggregate() error {
func (s *SelectStatement) validateAggregates(tr targetRequirement) error { func (s *SelectStatement) validateAggregates(tr targetRequirement) error {
for _, f := range s.Fields { for _, f := range s.Fields {
switch expr := f.Expr.(type) { for _, expr := range walkFunctionCalls(f.Expr) {
case *Call:
switch expr.Name { switch expr.Name {
case "derivative", "non_negative_derivative": case "derivative", "non_negative_derivative":
if err := s.validSelectWithAggregate(); err != nil { if err := s.validSelectWithAggregate(); err != nil {

View File

@ -1486,6 +1486,7 @@ func TestParser_ParseStatement(t *testing.T) {
{s: `SELECT field1 AS`, err: `found EOF, expected identifier at line 1, char 18`}, {s: `SELECT field1 AS`, err: `found EOF, expected identifier at line 1, char 18`},
{s: `SELECT field1 FROM foo group by time(1s)`, err: `GROUP BY requires at least one aggregate function`}, {s: `SELECT field1 FROM foo group by time(1s)`, err: `GROUP BY requires at least one aggregate function`},
{s: `SELECT count(value), value FROM foo`, err: `mixing aggregate and non-aggregate queries is not supported`}, {s: `SELECT count(value), value FROM foo`, err: `mixing aggregate and non-aggregate queries is not supported`},
{s: `SELECT count(value)/10, value FROM foo`, err: `mixing aggregate and non-aggregate queries is not supported`},
{s: `SELECT count(value) FROM foo group by time(1s)`, err: `aggregate functions with GROUP BY time require a WHERE time clause`}, {s: `SELECT count(value) FROM foo group by time(1s)`, err: `aggregate functions with GROUP BY time require a WHERE time clause`},
{s: `SELECT count(value) FROM foo group by time(1s) where host = 'hosta.influxdb.org'`, err: `aggregate functions with GROUP BY time require a WHERE time clause`}, {s: `SELECT count(value) FROM foo group by time(1s) where host = 'hosta.influxdb.org'`, err: `aggregate functions with GROUP BY time require a WHERE time clause`},
{s: `SELECT count(value) FROM foo group by time`, err: `time() is a function and expects at least one argument`}, {s: `SELECT count(value) FROM foo group by time`, err: `time() is a function and expects at least one argument`},

View File

@ -21,14 +21,20 @@ function send_failure_notification {
--body "The nightly build has failed, version: $version" --body "The nightly build has failed, version: $version"
} }
if [ $# -ne 4 ]; then if [ $# -lt 4 ]; then
echo "$0 <smtp server> <user> <password> <to>" echo "$0 <smtp server> <user> <password> <to> [RACE_ENABLED]"
exit 1 exit 1
fi fi
SMTP=$1 SMTP=$1
USER=$2 USER=$2
PASSWORD=$3 PASSWORD=$3
TO=$4 TO=$4
RACE_ENABLED=$5
if [ -n "$RACE_ENABLED" ]; then
race="-x"
echo "Race-detection build enabled."
fi
REPO_DIR=`mktemp -d` REPO_DIR=`mktemp -d`
echo "Using $REPO_DIR for all work..." echo "Using $REPO_DIR for all work..."
@ -41,7 +47,7 @@ git clone https://github.com/influxdb/influxdb.git
cd $GOPATH/src/github.com/influxdb/influxdb cd $GOPATH/src/github.com/influxdb/influxdb
VERSION="$MASTER_VERSION-nightly-`git log --pretty=format:'%h' -n 1`" VERSION="$MASTER_VERSION-nightly-`git log --pretty=format:'%h' -n 1`"
NIGHTLY_BUILD=true ./package.sh $VERSION NIGHTLY_BUILD=true ./package.sh $race $VERSION
if [ $? -ne 0 ]; then if [ $? -ne 0 ]; then
# Send notification e-mail. # Send notification e-mail.

View File

@ -83,6 +83,7 @@ $0 [-h] [-p|-w] [-t <dist>] [-r <number>] <version>
-r release candidate number, if any. -r release candidate number, if any.
Example: -r 7 Example: -r 7
-p just build packages -p just build packages
-x build with race-detection enabled
-w build packages for current working directory -w build packages for current working directory
imply -p imply -p
-t <dist> -t <dist>
@ -264,7 +265,7 @@ do_build() {
fi fi
date=`date -u --iso-8601=seconds` date=`date -u --iso-8601=seconds`
go install -a -ldflags="-X main.version=$version -X main.branch=$branch -X main.commit=$commit -X main.buildTime='$date'" ./... go install $RACE -a -ldflags="-X main.version=$version -X main.branch=$branch -X main.commit=$commit -X main.buildTime='$date'" ./...
if [ $? -ne 0 ]; then if [ $? -ne 0 ]; then
echo "Build failed, unable to create package -- aborting" echo "Build failed, unable to create package -- aborting"
cleanup_exit 1 cleanup_exit 1
@ -357,6 +358,11 @@ do
shift 2 shift 2
;; ;;
-x)
RACE="-race"
shift
;;
-w | --working-directory) -w | --working-directory)
PACKAGES_ONLY="PACKAGES_ONLY" PACKAGES_ONLY="PACKAGES_ONLY"
WORKING_DIR="WORKING_DIR" WORKING_DIR="WORKING_DIR"
@ -482,19 +488,6 @@ if [ -z "$NIGHTLY_BUILD" -a -z "$PACKAGES_ONLY" ]; then
fi fi
fi fi
if [ $ARCH == "i386" ]; then
rpm_package=influxdb-${VERSION}-1.i686.rpm # RPM packages use 1 for default package release.
debian_package=influxdb_`full_version $VERSION $RC`_i686.deb
deb_args="-a i686"
rpm_args="setarch i686"
elif [ $ARCH == "arm" ]; then
rpm_package=influxdb-${VERSION}-1.armel.rpm
debian_package=influxdb_`full_version $VERSION $RC`_armel.deb
else
rpm_package=influxdb-${VERSION}-1.x86_64.rpm
debian_package=influxdb_`full_version $VERSION $RC`_amd64.deb
fi
COMMON_FPM_ARGS="\ COMMON_FPM_ARGS="\
--log error \ --log error \
-C $TMP_WORK_DIR \ -C $TMP_WORK_DIR \
@ -504,7 +497,7 @@ COMMON_FPM_ARGS="\
--maintainer $MAINTAINER \ --maintainer $MAINTAINER \
--after-install $POST_INSTALL_PATH \ --after-install $POST_INSTALL_PATH \
--after-remove $POST_UNINSTALL_PATH \ --after-remove $POST_UNINSTALL_PATH \
--name influxdb \ --name influxdb${RACE} \
--config-files $CONFIG_ROOT_DIR \ --config-files $CONFIG_ROOT_DIR \
--config-files $LOGROTATE_DIR" --config-files $LOGROTATE_DIR"
@ -518,7 +511,11 @@ if [ -n "$DEB_WANTED" ]; then
fi fi
if [ -n "$TAR_WANTED" ]; then if [ -n "$TAR_WANTED" ]; then
$FPM -s dir -t tar --prefix influxdb_`full_version $VERSION $RC`_${ARCH} -p influxdb_`full_version $VERSION $RC`_${ARCH}.tar.gz --description "$DESCRIPTION" $COMMON_FPM_ARGS --version `full_version $VERSION $RC ` . if [ -n "$RACE" ]; then
# Tweak race prefix for tarball.
race="race_"
fi
$FPM -s dir -t tar --prefix influxdb_$race`full_version $VERSION $RC`_${ARCH} -p influxdb_$race`full_version $VERSION $RC`_${ARCH}.tar.gz --description "$DESCRIPTION" $COMMON_FPM_ARGS --version `full_version $VERSION $RC ` .
if [ $? -ne 0 ]; then if [ $? -ne 0 ]; then
echo "Failed to create Tar package -- aborting." echo "Failed to create Tar package -- aborting."
cleanup_exit 1 cleanup_exit 1

View File

@ -157,7 +157,14 @@ func (p *Parser) ApplyTemplate(line string) (string, map[string]string) {
} }
// decode the name and tags // decode the name and tags
template := p.matcher.Match(fields[0]) template := p.matcher.Match(fields[0])
return template.Apply(fields[0]) name, tags := template.Apply(fields[0])
// Set the default tags on the point if they are not already set
for k, v := range p.tags {
if _, ok := tags[k]; !ok {
tags[k] = v
}
}
return name, tags
} }
// template represents a pattern and tags to map a graphite metric string to a influxdb Point // template represents a pattern and tags to map a graphite metric string to a influxdb Point

View File

@ -4,6 +4,7 @@ import (
"encoding/binary" "encoding/binary"
"expvar" "expvar"
"fmt" "fmt"
"io"
"io/ioutil" "io/ioutil"
"log" "log"
"os" "os"
@ -21,6 +22,10 @@ const (
pointsHint = "points_hint" pointsHint = "points_hint"
pointsWrite = "points_write" pointsWrite = "points_write"
bytesWrite = "bytes_write" bytesWrite = "bytes_write"
writeErr = "write_err"
unmarshalErr = "unmarshal_err"
advanceErr = "advance_err"
currentErr = "current_err"
) )
type Processor struct { type Processor struct {
@ -98,10 +103,9 @@ func (p *Processor) loadQueues() error {
return nil return nil
} }
// addQueue adds a hinted-handoff queue for the given node. This function is not thread-safe
// and the caller must ensure this function is not called concurrently.
func (p *Processor) addQueue(nodeID uint64) (*queue, error) { func (p *Processor) addQueue(nodeID uint64) (*queue, error) {
p.mu.Lock()
defer p.mu.Unlock()
path := filepath.Join(p.dir, strconv.FormatUint(nodeID, 10)) path := filepath.Join(p.dir, strconv.FormatUint(nodeID, 10))
if err := os.MkdirAll(path, 0700); err != nil { if err := os.MkdirAll(path, 0700); err != nil {
return nil, err return nil, err
@ -123,14 +127,30 @@ func (p *Processor) addQueue(nodeID uint64) (*queue, error) {
return queue, nil return queue, nil
} }
// WriteShard writes hinted-handoff data for the given shard and node. Since it may manipulate
// hinted-handoff queues, and be called concurrently, it takes a lock during queue access.
func (p *Processor) WriteShard(shardID, ownerID uint64, points []models.Point) error { func (p *Processor) WriteShard(shardID, ownerID uint64, points []models.Point) error {
p.mu.RLock()
queue, ok := p.queues[ownerID] queue, ok := p.queues[ownerID]
p.mu.RUnlock()
if !ok {
if err := func() error {
// Check again under write-lock.
p.mu.Lock()
defer p.mu.Unlock()
queue, ok = p.queues[ownerID]
if !ok { if !ok {
var err error var err error
if queue, err = p.addQueue(ownerID); err != nil { if queue, err = p.addQueue(ownerID); err != nil {
return err return err
} }
} }
return nil
}(); err != nil {
return err
}
}
// Update stats // Update stats
p.updateShardStats(shardID, pointsHint, int64(len(points))) p.updateShardStats(shardID, pointsHint, int64(len(points)))
@ -162,6 +182,9 @@ func (p *Processor) Process() error {
// Get the current block from the queue // Get the current block from the queue
buf, err := q.Current() buf, err := q.Current()
if err != nil { if err != nil {
if err != io.EOF {
p.nodeStatMaps[nodeID].Add(currentErr, 1)
}
res <- nil res <- nil
break break
} }
@ -169,15 +192,20 @@ func (p *Processor) Process() error {
// unmarshal the byte slice back to shard ID and points // unmarshal the byte slice back to shard ID and points
shardID, points, err := p.unmarshalWrite(buf) shardID, points, err := p.unmarshalWrite(buf)
if err != nil { if err != nil {
p.nodeStatMaps[nodeID].Add(unmarshalErr, 1)
p.Logger.Printf("unmarshal write failed: %v", err) p.Logger.Printf("unmarshal write failed: %v", err)
if err := q.Advance(); err != nil { if err := q.Advance(); err != nil {
p.nodeStatMaps[nodeID].Add(advanceErr, 1)
res <- err res <- err
} }
return
// Skip and try the next block.
continue
} }
// Try to send the write to the node // Try to send the write to the node
if err := p.writer.WriteShard(shardID, nodeID, points); err != nil && tsdb.IsRetryable(err) { if err := p.writer.WriteShard(shardID, nodeID, points); err != nil && tsdb.IsRetryable(err) {
p.nodeStatMaps[nodeID].Add(writeErr, 1)
p.Logger.Printf("remote write failed: %v", err) p.Logger.Printf("remote write failed: %v", err)
res <- nil res <- nil
break break
@ -187,6 +215,7 @@ func (p *Processor) Process() error {
// If we get here, the write succeeded so advance the queue to the next item // If we get here, the write succeeded so advance the queue to the next item
if err := q.Advance(); err != nil { if err := q.Advance(); err != nil {
p.nodeStatMaps[nodeID].Add(advanceErr, 1)
res <- err res <- err
return return
} }

View File

@ -49,6 +49,26 @@ func TestEncoding_FloatBlock_ZeroTime(t *testing.T) {
} }
} }
func TestEncoding_FloatBlock_SimilarFloats(t *testing.T) {
values := make(tsm1.Values, 5)
values[0] = tsm1.NewValue(time.Unix(0, 1444238178437870000), 6.00065e+06)
values[1] = tsm1.NewValue(time.Unix(0, 1444238185286830000), 6.000656e+06)
values[2] = tsm1.NewValue(time.Unix(0, 1444238188441501000), 6.000657e+06)
values[3] = tsm1.NewValue(time.Unix(0, 1444238195286811000), 6.000659e+06)
values[4] = tsm1.NewValue(time.Unix(0, 1444238198439917000), 6.000661e+06)
b, err := values.Encode(nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
decodedValues := values.DecodeSameTypeBlock(b)
if !reflect.DeepEqual(decodedValues, values) {
t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values)
}
}
func TestEncoding_IntBlock_Basic(t *testing.T) { func TestEncoding_IntBlock_Basic(t *testing.T) {
valueCount := 1000 valueCount := 1000
times := getTimes(valueCount, 60, time.Second) times := getTimes(valueCount, 60, time.Second)

View File

@ -56,7 +56,7 @@ func (s *FloatEncoder) Bytes() []byte {
func (s *FloatEncoder) Finish() { func (s *FloatEncoder) Finish() {
if !s.finished { if !s.finished {
// // write an end-of-stream record // write an end-of-stream record
s.Push(math.NaN()) s.Push(math.NaN())
s.bw.Flush(bitstream.Zero) s.bw.Flush(bitstream.Zero)
s.finished = true s.finished = true
@ -82,6 +82,12 @@ func (s *FloatEncoder) Push(v float64) {
leading := bits.Clz(vDelta) leading := bits.Clz(vDelta)
trailing := bits.Ctz(vDelta) trailing := bits.Ctz(vDelta)
// Clamp number of leading zeros to avoid overflow when encoding
leading &= 0x1F
if leading >= 32 {
leading = 31
}
// TODO(dgryski): check if it's 'cheaper' to reset the leading/trailing bits instead // TODO(dgryski): check if it's 'cheaper' to reset the leading/trailing bits instead
if s.leading != ^uint64(0) && leading >= s.leading && trailing >= s.trailing { if s.leading != ^uint64(0) && leading >= s.leading && trailing >= s.trailing {
s.bw.WriteBit(bitstream.Zero) s.bw.WriteBit(bitstream.Zero)
@ -92,6 +98,11 @@ func (s *FloatEncoder) Push(v float64) {
s.bw.WriteBit(bitstream.One) s.bw.WriteBit(bitstream.One)
s.bw.WriteBits(leading, 5) s.bw.WriteBits(leading, 5)
// Note that if leading == trailing == 0, then sigbits == 64. But that
// value doesn't actually fit into the 6 bits we have.
// Luckily, we never need to encode 0 significant bits, since that would
// put us in the other case (vdelta == 0). So instead we write out a 0 and
// adjust it back to 64 on unpacking.
sigbits := 64 - leading - trailing sigbits := 64 - leading - trailing
s.bw.WriteBits(sigbits, 6) s.bw.WriteBits(sigbits, 6)
s.bw.WriteBits(vDelta>>trailing, int(sigbits)) s.bw.WriteBits(vDelta>>trailing, int(sigbits))
@ -178,6 +189,10 @@ func (it *FloatDecoder) Next() bool {
return false return false
} }
mbits := bits mbits := bits
// 0 significant bits here means we overflowed and we actually need 64; see comment in encoder
if mbits == 0 {
mbits = 64
}
it.trailing = 64 - it.leading - mbits it.trailing = 64 - it.leading - mbits
} }

View File

@ -7,7 +7,6 @@ import (
) )
func TestFloatEncoder_Simple(t *testing.T) { func TestFloatEncoder_Simple(t *testing.T) {
// Example from the paper // Example from the paper
s := tsm1.NewFloatEncoder() s := tsm1.NewFloatEncoder()
@ -67,6 +66,49 @@ func TestFloatEncoder_Simple(t *testing.T) {
} }
} }
func TestFloatEncoder_SimilarFloats(t *testing.T) {
s := tsm1.NewFloatEncoder()
want := []float64{
6.00065e+06,
6.000656e+06,
6.000657e+06,
6.000659e+06,
6.000661e+06,
}
for _, v := range want {
s.Push(v)
}
s.Finish()
b := s.Bytes()
it, err := tsm1.NewFloatDecoder(b)
if err != nil {
t.Fatalf("unexpected error creating float decoder: %v", err)
}
for _, w := range want {
if !it.Next() {
t.Fatalf("Next()=false, want true")
}
vv := it.Values()
if w != vv {
t.Errorf("Values()=(%v), want (%v)\n", vv, w)
}
}
if it.Next() {
t.Fatalf("Next()=true, want false")
}
if err := it.Error(); err != nil {
t.Errorf("it.Error()=%v, want nil", err)
}
}
var TwoHoursData = []struct { var TwoHoursData = []struct {
v float64 v float64
}{ }{

View File

@ -194,6 +194,12 @@ Telegraf currently has support for collecting metrics from
* disk * disk
* swap * swap
## Service Plugins
Telegraf can collect metrics via the following services
* statsd
We'll be adding support for many more over the coming months. Read on if you We'll be adding support for many more over the coming months. Read on if you
want to add support for another service or third-party API. want to add support for another service or third-party API.

View File

@ -27,8 +27,8 @@ type BatchPoints struct {
// deepcopy returns a deep copy of the BatchPoints object. This is primarily so // deepcopy returns a deep copy of the BatchPoints object. This is primarily so
// we can do multithreaded output flushing (see Agent.flush) // we can do multithreaded output flushing (see Agent.flush)
func (bp *BatchPoints) deepcopy() *BatchPoints { func (bp *BatchPoints) deepcopy() *BatchPoints {
bp.mu.Lock() bp.Lock()
defer bp.mu.Unlock() defer bp.Unlock()
var bpc BatchPoints var bpc BatchPoints
bpc.Time = bp.Time bpc.Time = bp.Time
@ -71,36 +71,9 @@ func (bp *BatchPoints) Add(
val interface{}, val interface{},
tags map[string]string, tags map[string]string,
) { ) {
bp.Lock() fields := make(map[string]interface{})
defer bp.Unlock() fields["value"] = val
bp.AddFields(measurement, fields, tags)
measurement = bp.Prefix + measurement
if bp.Config != nil {
if !bp.Config.ShouldPass(measurement, tags) {
return
}
}
if bp.Debug {
var tg []string
for k, v := range tags {
tg = append(tg, fmt.Sprintf("%s=\"%s\"", k, v))
}
sort.Strings(tg)
fmt.Printf("> [%s] %s value=%v\n", strings.Join(tg, " "), measurement, val)
}
bp.Points = append(bp.Points, client.Point{
Measurement: measurement,
Tags: tags,
Fields: map[string]interface{}{
"value": val,
},
})
} }
// AddFieldsWithTime adds a measurement with a provided timestamp // AddFieldsWithTime adds a measurement with a provided timestamp
@ -169,6 +142,16 @@ func (bp *BatchPoints) AddFields(
} }
} }
// Apply BatchPoints tags to tags passed in, giving precedence to those
// passed in. This is so that plugins have the ability to override global
// tags.
for k, v := range bp.Tags {
_, ok := tags[k]
if !ok {
tags[k] = v
}
}
if bp.Debug { if bp.Debug {
var tg []string var tg []string

View File

@ -194,6 +194,7 @@ func (a *Agent) crankParallel() error {
bp.Prefix = plugin.name + "_" bp.Prefix = plugin.name + "_"
bp.Config = plugin.config bp.Config = plugin.config
bp.Precision = a.Precision bp.Precision = a.Precision
bp.Tags = a.Config.Tags
if err := plugin.plugin.Gather(&bp); err != nil { if err := plugin.plugin.Gather(&bp); err != nil {
log.Printf("Error in plugin [%s]: %s", plugin.name, err) log.Printf("Error in plugin [%s]: %s", plugin.name, err)
@ -212,7 +213,6 @@ func (a *Agent) crankParallel() error {
if a.UTC { if a.UTC {
bp.Time = bp.Time.UTC() bp.Time = bp.Time.UTC()
} }
bp.Tags = a.Config.Tags
bp.Precision = a.Precision bp.Precision = a.Precision
for sub := range points { for sub := range points {
@ -265,13 +265,13 @@ func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) err
bp.Prefix = plugin.name + "_" bp.Prefix = plugin.name + "_"
bp.Config = plugin.config bp.Config = plugin.config
bp.Precision = a.Precision bp.Precision = a.Precision
bp.Tags = a.Config.Tags
if err := plugin.plugin.Gather(&bp); err != nil { if err := plugin.plugin.Gather(&bp); err != nil {
log.Printf("Error in plugin [%s]: %s", plugin.name, err) log.Printf("Error in plugin [%s]: %s", plugin.name, err)
outerr = errors.New("Error encountered processing plugins & outputs") outerr = errors.New("Error encountered processing plugins & outputs")
} }
bp.Tags = a.Config.Tags
bp.Time = time.Now() bp.Time = time.Now()
if a.UTC { if a.UTC {
bp.Time = bp.Time.UTC() bp.Time = bp.Time.UTC()

View File

@ -1,59 +1,5 @@
# Telegraf Service Plugin: statsd # Telegraf Service Plugin: statsd
#### Plugin arguments:
- **service_address** string: Address to listen for statsd UDP packets on
- **delete_gauges** boolean: Delete gauges on every collection interval
- **delete_counters** boolean: Delete counters on every collection interval
- **delete_sets** boolean: Delete set counters on every collection interval
- **allowed_pending_messages** integer: Number of messages allowed to queue up
on the UDP listener before the next flush. NOTE: gauge, counter, and set
measurements are aggregated as they arrive, so this is not a straight counter of
the number of total messages that the listener can handle between flushes.
#### Statsd bucket -> InfluxDB Mapping
By default, statsd buckets are converted to measurement names with the rules:
- "." -> "_"
- "-" -> "__"
This plugin also accepts a list of config tables to describe a mapping of a statsd
bucket to an InfluxDB measurement name and tags.
Each mapping must specify a match glob pattern. It can optionally take a name
for the measurement and a map of bucket indices to tag names.
For example, the following configuration:
```
[[statsd.mappings]]
match = "users.current.*.*"
name = "current_users"
[statsd.mappings.tagmap]
unit = 0
server = 2
service = 3
[[statsd.mappings]]
match = "deploys.*.*"
name = "service_deploys"
[statsd.mappings.tagmap]
service_type = 1
service_name = 2
```
Will map statsd -> influx like so:
```
users.current.den001.myapp:32|g
=> [server="den001" service="myapp" unit="users"] statsd_current_users_gauge value=32
deploys.test.myservice:1|c
=> [service_name="myservice" service_type="test"] statsd_service_deploys_counter value=1
random.jumping-sheep:10|c
=> [] statsd_random_jumping__sheep_counter value=10
```
#### Description #### Description
The statsd plugin is a special type of plugin which runs a backgrounded statsd The statsd plugin is a special type of plugin which runs a backgrounded statsd
@ -70,10 +16,129 @@ implementation. In short, the telegraf statsd listener will accept:
- Counters - Counters
- `deploys.test.myservice:1|c` <- increments by 1 - `deploys.test.myservice:1|c` <- increments by 1
- `deploys.test.myservice:101|c` <- increments by 101 - `deploys.test.myservice:101|c` <- increments by 101
- `deploys.test.myservice:1|c|@0.1` <- sample rate, increments by 10 - `deploys.test.myservice:1|c|@0.1` <- with sample rate, increments by 10
- Sets - Sets
- `users.unique:101|s` - `users.unique:101|s`
- `users.unique:101|s` - `users.unique:101|s`
- `users.unique:102|s` <- would result in a count of 2 for `users.unique` - `users.unique:102|s` <- would result in a count of 2 for `users.unique`
- Timings - Timings & Histograms
- TODO - `load.time:320|ms`
- `load.time.nanoseconds:1|h`
- `load.time:200|ms|@0.1` <- sampled 1/10 of the time
#### Influx Statsd
In order to take advantage of InfluxDB's tagging system, we have made a couple
additions to the standard statsd protocol. First, you can specify
tags in a manner similar to the line-protocol, like this:
```
users.current,service=payroll,region=us-west:32|g
```
COMING SOON: there will be a way to specify multiple fields.
<!-- TODO Second, you can specify multiple fields within a measurement:
```
current.users,service=payroll,server=host01:west=10,east=10,central=2,south=10|g
``` -->
#### Measurements:
Meta:
- tags: `metric_type=<gauge|set|counter|timing|histogram>`
Outputted measurements will depend entirely on the measurements that the user
sends, but here is a brief rundown of what you can expect to find from each
metric type:
- Gauges
- Gauges are a constant data type. They are not subject to averaging, and they
dont change unless you change them. That is, once you set a gauge value, it
will be a flat line on the graph until you change it again.
- Counters
- Counters are the most basic type. They are treated as a count of a type of
event. They will continually increase unless you set `delete_counters=true`.
- Sets
- Sets count the number of unique values passed to a key. For example, you
could count the number of users accessing your system using `users:<user_id>|s`.
No matter how many times the same user_id is sent, the count will only increase
by 1.
- Timings & Histograms
- Timers are meant to track how long something took. They are an invaluable
tool for tracking application performance.
- The following aggregate measurements are made for timers:
- `statsd_<name>_lower`: The lower bound is the lowest value statsd saw
for that stat during that interval.
- `statsd_<name>_upper`: The upper bound is the highest value statsd saw
for that stat during that interval.
- `statsd_<name>_mean`: The mean is the average of all values statsd saw
for that stat during that interval.
- `statsd_<name>_stddev`: The stddev is the sample standard deviation
of all values statsd saw for that stat during that interval.
- `statsd_<name>_count`: The count is the number of timings statsd saw
for that stat during that interval. It is not averaged.
- `statsd_<name>_percentile_<P>` The `Pth` percentile is a value x such
that `P%` of all the values statsd saw for that stat during that time
period are below x. The most common value that people use for `P` is the
`90`, this is a great number to try to optimize.
#### Plugin arguments
- **service_address** string: Address to listen for statsd UDP packets on
- **delete_gauges** boolean: Delete gauges on every collection interval
- **delete_counters** boolean: Delete counters on every collection interval
- **delete_sets** boolean: Delete set counters on every collection interval
- **delete_timings** boolean: Delete timings on every collection interval
- **percentiles** []int: Percentiles to calculate for timing & histogram stats
- **allowed_pending_messages** integer: Number of messages allowed to queue up
waiting to be processed. When this fills, messages will be dropped and logged.
- **percentile_limit** integer: Number of timing/histogram values to track
per-measurement in the calculation of percentiles. Raising this limit increases
the accuracy of percentiles but also increases the memory usage and cpu time.
- **templates** []string: Templates for transforming statsd buckets into influx
measurements and tags.
#### Statsd bucket -> InfluxDB line-protocol Templates
The plugin supports specifying templates for transforming statsd buckets into
InfluxDB measurement names and tags. The templates have a _measurement_ keyword,
which can be used to specify parts of the bucket that are to be used in the
measurement name. Other words in the template are used as tag names. For example,
the following template:
```
templates = [
"measurement.measurement.region"
]
```
would result in the following transformation:
```
cpu.load.us-west:100|g
=> cpu_load,region=us-west 100
```
Users can also filter the template to use based on the name of the bucket,
using glob matching, like so:
```
templates = [
"cpu.* measurement.measurement.region",
"mem.* measurement.measurement.host"
]
```
which would result in the following transformation:
```
cpu.load.us-west:100|g
=> cpu_load,region=us-west 100
mem.cached.localhost:256|g
=> mem_cached,host=localhost 256
```
There are many more options available,
[More details can be found here](https://github.com/influxdb/influxdb/tree/master/services/graphite#templates)

View File

@ -0,0 +1,108 @@
package statsd
import (
"math"
"math/rand"
"sort"
)
const defaultPercentileLimit = 1000
// RunningStats calculates a running mean, variance, standard deviation,
// lower bound, upper bound, count, and can calculate estimated percentiles.
// It is based on the incremental algorithm described here:
// https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
type RunningStats struct {
k float64
n int64
ex float64
ex2 float64
// Array used to calculate estimated percentiles
// We will store a maximum of PercLimit values, at which point we will start
// randomly replacing old values, hence it is an estimated percentile.
perc []float64
PercLimit int
upper float64
lower float64
// cache if we have sorted the list so that we never re-sort a sorted list,
// which can have very bad performance.
sorted bool
}
func (rs *RunningStats) AddValue(v float64) {
// Whenever a value is added, the list is no longer sorted.
rs.sorted = false
if rs.n == 0 {
rs.k = v
rs.upper = v
rs.lower = v
if rs.PercLimit == 0 {
rs.PercLimit = defaultPercentileLimit
}
rs.perc = make([]float64, 0, rs.PercLimit)
}
// These are used for the running mean and variance
rs.n += 1
rs.ex += v - rs.k
rs.ex2 += (v - rs.k) * (v - rs.k)
// track upper and lower bounds
if v > rs.upper {
rs.upper = v
} else if v < rs.lower {
rs.lower = v
}
if len(rs.perc) < rs.PercLimit {
rs.perc = append(rs.perc, v)
} else {
// Reached limit, choose random index to overwrite in the percentile array
rs.perc[rand.Intn(len(rs.perc))] = v
}
}
func (rs *RunningStats) Mean() float64 {
return rs.k + rs.ex/float64(rs.n)
}
func (rs *RunningStats) Variance() float64 {
return (rs.ex2 - (rs.ex*rs.ex)/float64(rs.n)) / float64(rs.n)
}
func (rs *RunningStats) Stddev() float64 {
return math.Sqrt(rs.Variance())
}
func (rs *RunningStats) Upper() float64 {
return rs.upper
}
func (rs *RunningStats) Lower() float64 {
return rs.lower
}
func (rs *RunningStats) Count() int64 {
return rs.n
}
func (rs *RunningStats) Percentile(n int) float64 {
if n > 100 {
n = 100
}
if !rs.sorted {
sort.Float64s(rs.perc)
rs.sorted = true
}
i := int(float64(len(rs.perc)) * float64(n) / float64(100))
if i < 0 {
i = 0
}
return rs.perc[i]
}

View File

@ -0,0 +1,136 @@
package statsd
import (
"math"
"testing"
)
// Test that a single metric is handled correctly
func TestRunningStats_Single(t *testing.T) {
rs := RunningStats{}
values := []float64{10.1}
for _, v := range values {
rs.AddValue(v)
}
if rs.Mean() != 10.1 {
t.Errorf("Expected %v, got %v", 10.1, rs.Mean())
}
if rs.Upper() != 10.1 {
t.Errorf("Expected %v, got %v", 10.1, rs.Upper())
}
if rs.Lower() != 10.1 {
t.Errorf("Expected %v, got %v", 10.1, rs.Lower())
}
if rs.Percentile(90) != 10.1 {
t.Errorf("Expected %v, got %v", 10.1, rs.Percentile(90))
}
if rs.Percentile(50) != 10.1 {
t.Errorf("Expected %v, got %v", 10.1, rs.Percentile(50))
}
if rs.Count() != 1 {
t.Errorf("Expected %v, got %v", 1, rs.Count())
}
if rs.Variance() != 0 {
t.Errorf("Expected %v, got %v", 0, rs.Variance())
}
if rs.Stddev() != 0 {
t.Errorf("Expected %v, got %v", 0, rs.Stddev())
}
}
// Test that duplicate values are handled correctly
func TestRunningStats_Duplicate(t *testing.T) {
rs := RunningStats{}
values := []float64{10.1, 10.1, 10.1, 10.1}
for _, v := range values {
rs.AddValue(v)
}
if rs.Mean() != 10.1 {
t.Errorf("Expected %v, got %v", 10.1, rs.Mean())
}
if rs.Upper() != 10.1 {
t.Errorf("Expected %v, got %v", 10.1, rs.Upper())
}
if rs.Lower() != 10.1 {
t.Errorf("Expected %v, got %v", 10.1, rs.Lower())
}
if rs.Percentile(90) != 10.1 {
t.Errorf("Expected %v, got %v", 10.1, rs.Percentile(90))
}
if rs.Percentile(50) != 10.1 {
t.Errorf("Expected %v, got %v", 10.1, rs.Percentile(50))
}
if rs.Count() != 4 {
t.Errorf("Expected %v, got %v", 4, rs.Count())
}
if rs.Variance() != 0 {
t.Errorf("Expected %v, got %v", 0, rs.Variance())
}
if rs.Stddev() != 0 {
t.Errorf("Expected %v, got %v", 0, rs.Stddev())
}
}
// Test a list of sample values, returns all correct values
func TestRunningStats(t *testing.T) {
rs := RunningStats{}
values := []float64{10, 20, 10, 30, 20, 11, 12, 32, 45, 9, 5, 5, 5, 10, 23, 8}
for _, v := range values {
rs.AddValue(v)
}
if rs.Mean() != 15.9375 {
t.Errorf("Expected %v, got %v", 15.9375, rs.Mean())
}
if rs.Upper() != 45 {
t.Errorf("Expected %v, got %v", 45, rs.Upper())
}
if rs.Lower() != 5 {
t.Errorf("Expected %v, got %v", 5, rs.Lower())
}
if rs.Percentile(90) != 32 {
t.Errorf("Expected %v, got %v", 32, rs.Percentile(90))
}
if rs.Percentile(50) != 11 {
t.Errorf("Expected %v, got %v", 11, rs.Percentile(50))
}
if rs.Count() != 16 {
t.Errorf("Expected %v, got %v", 4, rs.Count())
}
if !fuzzyEqual(rs.Variance(), 124.93359, .00001) {
t.Errorf("Expected %v, got %v", 124.93359, rs.Variance())
}
if !fuzzyEqual(rs.Stddev(), 11.17736, .00001) {
t.Errorf("Expected %v, got %v", 11.17736, rs.Stddev())
}
}
// Test that the percentile limit is respected.
func TestRunningStats_PercentileLimit(t *testing.T) {
rs := RunningStats{}
rs.PercLimit = 10
values := []float64{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}
for _, v := range values {
rs.AddValue(v)
}
if rs.Count() != 11 {
t.Errorf("Expected %v, got %v", 11, rs.Count())
}
if len(rs.perc) != 10 {
t.Errorf("Expected %v, got %v", 10, len(rs.perc))
}
}
func fuzzyEqual(a, b, epsilon float64) bool {
if math.Abs(a-b) > epsilon {
return false
}
return true
}

View File

@ -26,21 +26,27 @@ type Statsd struct {
// fills up, packets will get dropped until the next Gather interval is ran. // fills up, packets will get dropped until the next Gather interval is ran.
AllowedPendingMessages int AllowedPendingMessages int
// Percentiles specifies the percentiles that will be calculated for timing
// and histogram stats.
Percentiles []int
PercentileLimit int
DeleteGauges bool DeleteGauges bool
DeleteCounters bool DeleteCounters bool
DeleteSets bool DeleteSets bool
DeleteTimings bool
sync.Mutex sync.Mutex
// Channel for all incoming statsd messages // Channel for all incoming statsd messages
in chan string in chan string
inmetrics chan metric
done chan struct{} done chan struct{}
// Cache gauges, counters & sets so they can be aggregated as they arrive // Cache gauges, counters & sets so they can be aggregated as they arrive
gauges map[string]cachedgauge gauges map[string]cachedgauge
counters map[string]cachedcounter counters map[string]cachedcounter
sets map[string]cachedset sets map[string]cachedset
timings map[string]cachedtimings
// bucket -> influx templates // bucket -> influx templates
Templates []string Templates []string
@ -52,10 +58,10 @@ func NewStatsd() *Statsd {
// Make data structures // Make data structures
s.done = make(chan struct{}) s.done = make(chan struct{})
s.in = make(chan string, s.AllowedPendingMessages) s.in = make(chan string, s.AllowedPendingMessages)
s.inmetrics = make(chan metric, s.AllowedPendingMessages)
s.gauges = make(map[string]cachedgauge) s.gauges = make(map[string]cachedgauge)
s.counters = make(map[string]cachedcounter) s.counters = make(map[string]cachedcounter)
s.sets = make(map[string]cachedset) s.sets = make(map[string]cachedset)
s.timings = make(map[string]cachedtimings)
return &s return &s
} }
@ -91,9 +97,9 @@ type cachedcounter struct {
tags map[string]string tags map[string]string
} }
type cachedtiming struct { type cachedtimings struct {
name string name string
timings []float64 stats RunningStats
tags map[string]string tags map[string]string
} }
@ -104,16 +110,29 @@ func (_ *Statsd) Description() string {
const sampleConfig = ` const sampleConfig = `
# Address and port to host UDP listener on # Address and port to host UDP listener on
service_address = ":8125" service_address = ":8125"
# Delete gauges every interval # Delete gauges every interval (default=false)
delete_gauges = false delete_gauges = false
# Delete counters every interval # Delete counters every interval (default=false)
delete_counters = false delete_counters = false
# Delete sets every interval # Delete sets every interval (default=false)
delete_sets = false delete_sets = false
# Delete timings & histograms every interval (default=true)
delete_timings = true
# Percentiles to calculate for timing & histogram stats
percentiles = [90]
# Number of messages allowed to queue up, once filled, # templates = [
# "cpu.* measurement*"
# ]
# Number of UDP messages allowed to queue up, once filled,
# the statsd server will start dropping packets # the statsd server will start dropping packets
allowed_pending_messages = 10000 allowed_pending_messages = 10000
# Number of timing/histogram values to track per-measurement in the
# calculation of percentiles. Raising this limit increases the accuracy
# of percentiles but also increases the memory usage and cpu time.
percentile_limit = 1000
` `
func (_ *Statsd) SampleConfig() string { func (_ *Statsd) SampleConfig() string {
@ -124,35 +143,37 @@ func (s *Statsd) Gather(acc plugins.Accumulator) error {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
items := len(s.inmetrics) for _, metric := range s.timings {
for i := 0; i < items; i++ { acc.Add(metric.name+"_mean", metric.stats.Mean(), metric.tags)
acc.Add(metric.name+"_stddev", metric.stats.Stddev(), metric.tags)
m := <-s.inmetrics acc.Add(metric.name+"_upper", metric.stats.Upper(), metric.tags)
acc.Add(metric.name+"_lower", metric.stats.Lower(), metric.tags)
switch m.mtype { acc.Add(metric.name+"_count", metric.stats.Count(), metric.tags)
case "c", "g", "s": for _, percentile := range s.Percentiles {
log.Println("ERROR: Uh oh, this should not have happened") name := fmt.Sprintf("%s_percentile_%v", metric.name, percentile)
case "ms", "h": acc.Add(name, metric.stats.Percentile(percentile), metric.tags)
// TODO
} }
} }
if s.DeleteTimings {
s.timings = make(map[string]cachedtimings)
}
for _, cmetric := range s.gauges { for _, metric := range s.gauges {
acc.Add(cmetric.name, cmetric.value, cmetric.tags) acc.Add(metric.name, metric.value, metric.tags)
} }
if s.DeleteGauges { if s.DeleteGauges {
s.gauges = make(map[string]cachedgauge) s.gauges = make(map[string]cachedgauge)
} }
for _, cmetric := range s.counters { for _, metric := range s.counters {
acc.Add(cmetric.name, cmetric.value, cmetric.tags) acc.Add(metric.name, metric.value, metric.tags)
} }
if s.DeleteCounters { if s.DeleteCounters {
s.counters = make(map[string]cachedcounter) s.counters = make(map[string]cachedcounter)
} }
for _, cmetric := range s.sets { for _, metric := range s.sets {
acc.Add(cmetric.name, int64(len(cmetric.set)), cmetric.tags) acc.Add(metric.name, int64(len(metric.set)), metric.tags)
} }
if s.DeleteSets { if s.DeleteSets {
s.sets = make(map[string]cachedset) s.sets = make(map[string]cachedset)
@ -167,10 +188,10 @@ func (s *Statsd) Start() error {
// Make data structures // Make data structures
s.done = make(chan struct{}) s.done = make(chan struct{})
s.in = make(chan string, s.AllowedPendingMessages) s.in = make(chan string, s.AllowedPendingMessages)
s.inmetrics = make(chan metric, s.AllowedPendingMessages)
s.gauges = make(map[string]cachedgauge) s.gauges = make(map[string]cachedgauge)
s.counters = make(map[string]cachedcounter) s.counters = make(map[string]cachedcounter)
s.sets = make(map[string]cachedset) s.sets = make(map[string]cachedset)
s.timings = make(map[string]cachedtimings)
// Start the UDP listener // Start the UDP listener
go s.udpListen() go s.udpListen()
@ -216,8 +237,7 @@ func (s *Statsd) udpListen() error {
} }
// parser monitors the s.in channel, if there is a line ready, it parses the // parser monitors the s.in channel, if there is a line ready, it parses the
// statsd string into a usable metric struct and either aggregates the value // statsd string into a usable metric struct and aggregates the value
// or pushes it into the s.inmetrics channel.
func (s *Statsd) parser() error { func (s *Statsd) parser() error {
for { for {
select { select {
@ -235,14 +255,15 @@ func (s *Statsd) parseStatsdLine(line string) error {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
// Validate splitting the line on "|"
m := metric{} m := metric{}
parts1 := strings.Split(line, "|")
if len(parts1) < 2 { // Validate splitting the line on "|"
pipesplit := strings.Split(line, "|")
if len(pipesplit) < 2 {
log.Printf("Error: splitting '|', Unable to parse metric: %s\n", line) log.Printf("Error: splitting '|', Unable to parse metric: %s\n", line)
return errors.New("Error Parsing statsd line") return errors.New("Error Parsing statsd line")
} else if len(parts1) > 2 { } else if len(pipesplit) > 2 {
sr := parts1[2] sr := pipesplit[2]
errmsg := "Error: parsing sample rate, %s, it must be in format like: " + errmsg := "Error: parsing sample rate, %s, it must be in format like: " +
"@0.1, @0.5, etc. Ignoring sample rate for line: %s\n" "@0.1, @0.5, etc. Ignoring sample rate for line: %s\n"
if strings.Contains(sr, "@") && len(sr) > 1 { if strings.Contains(sr, "@") && len(sr) > 1 {
@ -250,6 +271,7 @@ func (s *Statsd) parseStatsdLine(line string) error {
if err != nil { if err != nil {
log.Printf(errmsg, err.Error(), line) log.Printf(errmsg, err.Error(), line)
} else { } else {
// sample rate successfully parsed
m.samplerate = samplerate m.samplerate = samplerate
} }
} else { } else {
@ -258,24 +280,24 @@ func (s *Statsd) parseStatsdLine(line string) error {
} }
// Validate metric type // Validate metric type
switch parts1[1] { switch pipesplit[1] {
case "g", "c", "s", "ms", "h": case "g", "c", "s", "ms", "h":
m.mtype = parts1[1] m.mtype = pipesplit[1]
default: default:
log.Printf("Error: Statsd Metric type %s unsupported", parts1[1]) log.Printf("Error: Statsd Metric type %s unsupported", pipesplit[1])
return errors.New("Error Parsing statsd line") return errors.New("Error Parsing statsd line")
} }
// Validate splitting the rest of the line on ":" // Validate splitting the rest of the line on ":"
parts2 := strings.Split(parts1[0], ":") colonsplit := strings.Split(pipesplit[0], ":")
if len(parts2) != 2 { if len(colonsplit) != 2 {
log.Printf("Error: splitting ':', Unable to parse metric: %s\n", line) log.Printf("Error: splitting ':', Unable to parse metric: %s\n", line)
return errors.New("Error Parsing statsd line") return errors.New("Error Parsing statsd line")
} }
m.bucket = parts2[0] m.bucket = colonsplit[0]
// Parse the value // Parse the value
if strings.ContainsAny(parts2[1], "-+") { if strings.ContainsAny(colonsplit[1], "-+") {
if m.mtype != "g" { if m.mtype != "g" {
log.Printf("Error: +- values are only supported for gauges: %s\n", line) log.Printf("Error: +- values are only supported for gauges: %s\n", line)
return errors.New("Error Parsing statsd line") return errors.New("Error Parsing statsd line")
@ -285,14 +307,14 @@ func (s *Statsd) parseStatsdLine(line string) error {
switch m.mtype { switch m.mtype {
case "g", "ms", "h": case "g", "ms", "h":
v, err := strconv.ParseFloat(parts2[1], 64) v, err := strconv.ParseFloat(colonsplit[1], 64)
if err != nil { if err != nil {
log.Printf("Error: parsing value to float64: %s\n", line) log.Printf("Error: parsing value to float64: %s\n", line)
return errors.New("Error Parsing statsd line") return errors.New("Error Parsing statsd line")
} }
m.floatvalue = v m.floatvalue = v
case "c", "s": case "c", "s":
v, err := strconv.ParseInt(parts2[1], 10, 64) v, err := strconv.ParseInt(colonsplit[1], 10, 64)
if err != nil { if err != nil {
log.Printf("Error: parsing value to int64: %s\n", line) log.Printf("Error: parsing value to int64: %s\n", line)
return errors.New("Error Parsing statsd line") return errors.New("Error Parsing statsd line")
@ -304,8 +326,20 @@ func (s *Statsd) parseStatsdLine(line string) error {
m.intvalue = v m.intvalue = v
} }
// Parse the name // Parse the name & tags from bucket
m.name, m.tags = s.parseName(m) m.name, m.tags = s.parseName(m.bucket)
switch m.mtype {
case "c":
m.tags["metric_type"] = "counter"
case "g":
m.tags["metric_type"] = "gauge"
case "s":
m.tags["metric_type"] = "set"
case "ms":
m.tags["metric_type"] = "timing"
case "h":
m.tags["metric_type"] = "histogram"
}
// Make a unique key for the measurement name/tags // Make a unique key for the measurement name/tags
var tg []string var tg []string
@ -315,18 +349,7 @@ func (s *Statsd) parseStatsdLine(line string) error {
sort.Strings(tg) sort.Strings(tg)
m.hash = fmt.Sprintf("%s%s", strings.Join(tg, ""), m.name) m.hash = fmt.Sprintf("%s%s", strings.Join(tg, ""), m.name)
switch m.mtype {
// Aggregate gauges, counters and sets as we go
case "g", "c", "s":
s.aggregate(m) s.aggregate(m)
// Timers get processed at flush time
default:
select {
case s.inmetrics <- m:
default:
log.Printf(dropwarn, line)
}
}
return nil return nil
} }
@ -334,42 +357,79 @@ func (s *Statsd) parseStatsdLine(line string) error {
// config file. If there is a match, it will parse the name of the metric and // config file. If there is a match, it will parse the name of the metric and
// map of tags. // map of tags.
// Return values are (<name>, <tags>) // Return values are (<name>, <tags>)
func (s *Statsd) parseName(m metric) (string, map[string]string) { func (s *Statsd) parseName(bucket string) (string, map[string]string) {
name := m.bucket
tags := make(map[string]string) tags := make(map[string]string)
bucketparts := strings.Split(bucket, ",")
// Parse out any tags in the bucket
if len(bucketparts) > 1 {
for _, btag := range bucketparts[1:] {
k, v := parseKeyValue(btag)
if k != "" {
tags[k] = v
}
}
}
o := graphite.Options{ o := graphite.Options{
Separator: "_", Separator: "_",
Templates: s.Templates, Templates: s.Templates,
DefaultTags: tags,
} }
name := bucketparts[0]
p, err := graphite.NewParserWithOptions(o) p, err := graphite.NewParserWithOptions(o)
if err == nil { if err == nil {
name, tags = p.ApplyTemplate(m.bucket) name, tags = p.ApplyTemplate(name)
} }
name = strings.Replace(name, ".", "_", -1) name = strings.Replace(name, ".", "_", -1)
name = strings.Replace(name, "-", "__", -1) name = strings.Replace(name, "-", "__", -1)
switch m.mtype {
case "c":
tags["metric_type"] = "counter"
case "g":
tags["metric_type"] = "gauge"
case "s":
tags["metric_type"] = "set"
case "ms", "h":
tags["metric_type"] = "timer"
}
return name, tags return name, tags
} }
// aggregate takes in a metric of type "counter", "gauge", or "set". It then // Parse the key,value out of a string that looks like "key=value"
// aggregates and caches the current value. It does not deal with the func parseKeyValue(keyvalue string) (string, string) {
// DeleteCounters, DeleteGauges or DeleteSets options, because those are dealt var key, val string
// with in the Gather function.
split := strings.Split(keyvalue, "=")
// Must be exactly 2 to get anything meaningful out of them
if len(split) == 2 {
key = split[0]
val = split[1]
} else if len(split) == 1 {
val = split[0]
}
return key, val
}
// aggregate takes in a metric. It then
// aggregates and caches the current value(s). It does not deal with the
// Delete* options, because those are dealt with in the Gather function.
func (s *Statsd) aggregate(m metric) { func (s *Statsd) aggregate(m metric) {
switch m.mtype { switch m.mtype {
case "ms", "h":
cached, ok := s.timings[m.hash]
if !ok {
cached = cachedtimings{
name: m.name,
tags: m.tags,
stats: RunningStats{
PercLimit: s.PercentileLimit,
},
}
}
if m.samplerate > 0 {
for i := 0; i < int(1.0/m.samplerate); i++ {
cached.stats.AddValue(m.floatvalue)
}
s.timings[m.hash] = cached
} else {
cached.stats.AddValue(m.floatvalue)
s.timings[m.hash] = cached
}
case "c": case "c":
cached, ok := s.counters[m.hash] cached, ok := s.counters[m.hash]
if !ok { if !ok {
@ -380,7 +440,6 @@ func (s *Statsd) aggregate(m metric) {
} }
} else { } else {
cached.value += m.intvalue cached.value += m.intvalue
cached.tags = m.tags
s.counters[m.hash] = cached s.counters[m.hash] = cached
} }
case "g": case "g":
@ -397,7 +456,6 @@ func (s *Statsd) aggregate(m metric) {
} else { } else {
cached.value = m.floatvalue cached.value = m.floatvalue
} }
cached.tags = m.tags
s.gauges[m.hash] = cached s.gauges[m.hash] = cached
} }
case "s": case "s":
@ -422,7 +480,6 @@ func (s *Statsd) Stop() {
log.Println("Stopping the statsd service") log.Println("Stopping the statsd service")
close(s.done) close(s.done)
close(s.in) close(s.in)
close(s.inmetrics)
} }
func init() { func init() {

View File

@ -121,25 +121,208 @@ func TestParse_DefaultNameParsing(t *testing.T) {
} }
} }
// Test that name mappings match and work // Test that template name transformation works
func TestParse_NameMap(t *testing.T) { func TestParse_Template(t *testing.T) {
s := NewStatsd()
s.Templates = []string{
"measurement.measurement.host.service",
}
lines := []string{
"cpu.idle.localhost:1|c",
"cpu.busy.host01.myservice:11|c",
}
for _, line := range lines {
err := s.parseStatsdLine(line)
if err != nil {
t.Errorf("Parsing line %s should not have resulted in an error\n", line)
}
}
validations := []struct {
name string
value int64
}{
{
"cpu_idle",
1,
},
{
"cpu_busy",
11,
},
}
// Validate counters
for _, test := range validations {
err := test_validate_counter(test.name, test.value, s.counters)
if err != nil {
t.Error(err.Error())
}
}
}
// Test that template filters properly
func TestParse_TemplateFilter(t *testing.T) {
s := NewStatsd()
s.Templates = []string{
"cpu.idle.* measurement.measurement.host",
}
lines := []string{
"cpu.idle.localhost:1|c",
"cpu.busy.host01.myservice:11|c",
}
for _, line := range lines {
err := s.parseStatsdLine(line)
if err != nil {
t.Errorf("Parsing line %s should not have resulted in an error\n", line)
}
}
validations := []struct {
name string
value int64
}{
{
"cpu_idle",
1,
},
{
"cpu_busy_host01_myservice",
11,
},
}
// Validate counters
for _, test := range validations {
err := test_validate_counter(test.name, test.value, s.counters)
if err != nil {
t.Error(err.Error())
}
}
}
// Test that most specific template is chosen
func TestParse_TemplateSpecificity(t *testing.T) {
s := NewStatsd()
s.Templates = []string{
"cpu.* measurement.foo.host",
"cpu.idle.* measurement.measurement.host",
}
lines := []string{
"cpu.idle.localhost:1|c",
}
for _, line := range lines {
err := s.parseStatsdLine(line)
if err != nil {
t.Errorf("Parsing line %s should not have resulted in an error\n", line)
}
}
validations := []struct {
name string
value int64
}{
{
"cpu_idle",
1,
},
}
// Validate counters
for _, test := range validations {
err := test_validate_counter(test.name, test.value, s.counters)
if err != nil {
t.Error(err.Error())
}
}
}
// Test that fields are parsed correctly
func TestParse_Fields(t *testing.T) {
if false { if false {
t.Errorf("TODO") t.Errorf("TODO")
} }
} }
// Test that name map tags are applied properly // Test that tags within the bucket are parsed correctly
func TestParse_NameMapTags(t *testing.T) { func TestParse_Tags(t *testing.T) {
if false { s := NewStatsd()
t.Errorf("TODO")
tests := []struct {
bucket string
name string
tags map[string]string
}{
{
"cpu.idle,host=localhost",
"cpu_idle",
map[string]string{
"host": "localhost",
},
},
{
"cpu.idle,host=localhost,region=west",
"cpu_idle",
map[string]string{
"host": "localhost",
"region": "west",
},
},
{
"cpu.idle,host=localhost,color=red,region=west",
"cpu_idle",
map[string]string{
"host": "localhost",
"region": "west",
"color": "red",
},
},
}
for _, test := range tests {
name, tags := s.parseName(test.bucket)
if name != test.name {
t.Errorf("Expected: %s, got %s", test.name, name)
}
for k, v := range test.tags {
actual, ok := tags[k]
if !ok {
t.Errorf("Expected key: %s not found", k)
}
if actual != v {
t.Errorf("Expected %s, got %s", v, actual)
}
}
} }
} }
// Test that measurements with the same name, but different tags, are treated // Test that measurements with the same name, but different tags, are treated
// as different values in the statsd cache // as different outputs
func TestParse_MeasurementsWithSameName(t *testing.T) { func TestParse_MeasurementsWithSameName(t *testing.T) {
if false { s := NewStatsd()
t.Errorf("TODO")
// Test that counters work
valid_lines := []string{
"test.counter,host=localhost:1|c",
"test.counter,host=localhost,region=west:1|c",
}
for _, line := range valid_lines {
err := s.parseStatsdLine(line)
if err != nil {
t.Errorf("Parsing line %s should not have resulted in an error\n", line)
}
}
if len(s.counters) != 2 {
t.Errorf("Expected 2 separate measurements, found %d", len(s.counters))
} }
} }
@ -150,9 +333,8 @@ func TestParse_ValidLines(t *testing.T) {
"valid:45|c", "valid:45|c",
"valid:45|s", "valid:45|s",
"valid:45|g", "valid:45|g",
// TODO(cam): timings "valid.timer:45|ms",
//"valid.timer:45|ms", "valid.timer:45|h",
//"valid.timer:45|h",
} }
for _, line := range valid_lines { for _, line := range valid_lines {
@ -163,13 +345,6 @@ func TestParse_ValidLines(t *testing.T) {
} }
} }
// Test that floats are handled as expected for all metric types
func TestParse_Floats(t *testing.T) {
if false {
t.Errorf("TODO")
}
}
// Tests low-level functionality of gauges // Tests low-level functionality of gauges
func TestParse_Gauges(t *testing.T) { func TestParse_Gauges(t *testing.T) {
s := NewStatsd() s := NewStatsd()
@ -340,8 +515,86 @@ func TestParse_Counters(t *testing.T) {
// Tests low-level functionality of timings // Tests low-level functionality of timings
func TestParse_Timings(t *testing.T) { func TestParse_Timings(t *testing.T) {
if false { s := NewStatsd()
t.Errorf("TODO") s.Percentiles = []int{90}
testacc := &testutil.Accumulator{}
// Test that counters work
valid_lines := []string{
"test.timing:1|ms",
"test.timing:1|ms",
"test.timing:1|ms",
"test.timing:1|ms",
"test.timing:1|ms",
}
for _, line := range valid_lines {
err := s.parseStatsdLine(line)
if err != nil {
t.Errorf("Parsing line %s should not have resulted in an error\n", line)
}
}
s.Gather(testacc)
tests := []struct {
name string
value interface{}
}{
{
"test_timing_mean",
float64(1),
},
{
"test_timing_stddev",
float64(0),
},
{
"test_timing_upper",
float64(1),
},
{
"test_timing_lower",
float64(1),
},
{
"test_timing_count",
int64(5),
},
{
"test_timing_percentile_90",
float64(1),
},
}
for _, test := range tests {
if !testacc.CheckValue(test.name, test.value) {
t.Errorf("Did not find measurement %s with value %v",
test.name, test.value)
}
}
}
func TestParse_Timings_Delete(t *testing.T) {
s := NewStatsd()
s.DeleteTimings = true
fakeacc := &testutil.Accumulator{}
var err error
line := "timing:100|ms"
err = s.parseStatsdLine(line)
if err != nil {
t.Errorf("Parsing line %s should not have resulted in an error\n", line)
}
if len(s.timings) != 1 {
t.Errorf("Should be 1 timing, found %d", len(s.timings))
}
s.Gather(fakeacc)
if len(s.timings) != 0 {
t.Errorf("All timings should have been deleted, found %d", len(s.timings))
} }
} }
@ -423,10 +676,21 @@ func TestParse_Counters_Delete(t *testing.T) {
} }
} }
// Integration test the listener starting up and receiving UDP packets func TestParseKeyValue(t *testing.T) {
func TestListen(t *testing.T) { k, v := parseKeyValue("foo=bar")
if false { if k != "foo" {
t.Errorf("TODO") t.Errorf("Expected %s, got %s", "foo", k)
}
if v != "bar" {
t.Errorf("Expected %s, got %s", "bar", v)
}
k2, v2 := parseKeyValue("baz")
if k2 != "" {
t.Errorf("Expected %s, got %s", "", k2)
}
if v2 != "baz" {
t.Errorf("Expected %s, got %s", "baz", v2)
} }
} }