Merge branch 'master' of https://github.com/influxdb/telegraf into lustre2-plugin

Conflicts:
	testutil/accumulator.go
This commit is contained in:
Simon Fraser 2015-08-04 21:39:17 +01:00
commit a4f7ffea3f
20 changed files with 224 additions and 36 deletions

11
Makefile Normal file
View File

@ -0,0 +1,11 @@
prepare:
go get -d -v -t ./...
docker-compose up -d --no-recreate
test: prepare
go test -short ./...
update:
go get -u -v -d -t ./...
.PHONY: test

View File

@ -179,3 +179,16 @@ func init() {
}
```
## Testing
As Telegraf collects metrics from several third-party services it becomes a difficult task to mock each service as
some of them have complicated protocols which would take some time to replicate.
To overcome this situation we've decided to use docker containers to provide a fast and reproducible environment
to test those services which require it. For other situations (i.e: https://github.com/influxdb/telegraf/blob/master/plugins/redis/redis_test.go ) a simple mock will suffice.
To execute Telegraf tests follow this simple steps:
- Install docker compose following [these](https://docs.docker.com/compose/install/) instructions
- execute `make test`

View File

@ -10,6 +10,8 @@ import (
"github.com/influxdb/influxdb/client"
)
// BatchPoints is used to send a batch of data in a single write from telegraf
// to influx
type BatchPoints struct {
mu sync.Mutex
@ -22,6 +24,7 @@ type BatchPoints struct {
Config *ConfiguredPlugin
}
// Add adds a measurement
func (bp *BatchPoints) Add(measurement string, val interface{}, tags map[string]string) {
bp.mu.Lock()
defer bp.mu.Unlock()
@ -55,6 +58,7 @@ func (bp *BatchPoints) Add(measurement string, val interface{}, tags map[string]
})
}
// AddValuesWithTime adds a measurement with a provided timestamp
func (bp *BatchPoints) AddValuesWithTime(
measurement string,
values map[string]interface{},

View File

@ -19,8 +19,13 @@ type runningPlugin struct {
config *ConfiguredPlugin
}
// Agent runs telegraf and collects data based on the given config
type Agent struct {
// Interval at which to gather information
Interval Duration
// Run in debug mode?
Debug bool
Hostname string
@ -31,6 +36,7 @@ type Agent struct {
conn *client.Client
}
// NewAgent returns an Agent struct based off the given Config
func NewAgent(config *Config) (*Agent, error) {
agent := &Agent{Config: config, Interval: Duration{10 * time.Second}}
@ -57,8 +63,9 @@ func NewAgent(config *Config) (*Agent, error) {
return agent, nil
}
func (agent *Agent) Connect() error {
config := agent.Config
// Connect connects to the agent's config URL
func (a *Agent) Connect() error {
config := a.Config
u, err := url.Parse(config.URL)
if err != nil {
@ -77,11 +84,12 @@ func (agent *Agent) Connect() error {
return err
}
agent.conn = c
a.conn = c
return nil
}
// LoadPlugins loads the agent's plugins
func (a *Agent) LoadPlugins() ([]string, error) {
var names []string
@ -201,10 +209,12 @@ func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) err
}
}
// TestAllPlugins verifies that we can 'Gather' from all plugins with the
// default configuration
func (a *Agent) TestAllPlugins() error {
var names []string
for name, _ := range plugins.Plugins {
for name := range plugins.Plugins {
names = append(names, name)
}
@ -230,6 +240,8 @@ func (a *Agent) TestAllPlugins() error {
return nil
}
// Test verifies that we can 'Gather' from all plugins with their configured
// Config struct
func (a *Agent) Test() error {
var acc BatchPoints
@ -253,6 +265,7 @@ func (a *Agent) Test() error {
return nil
}
// Run runs the agent daemon, gathering every Interval
func (a *Agent) Run(shutdown chan struct{}) error {
if a.conn == nil {
err := a.Connect()

17
circle.yml Normal file
View File

@ -0,0 +1,17 @@
dependencies:
post:
# install golint
- go get github.com/golang/lint/golint
# install binaries
- go install ./...
test:
pre:
# Vet go code for any potential errors
- go vet ./...
override:
# Enforce that testutil, cmd, and main directory are fully linted
- golint .
- golint testutil/...
- golint cmd/...
# TODO run unit tests

View File

@ -19,7 +19,9 @@ var fVersion = flag.Bool("version", false, "display the version")
var fSampleConfig = flag.Bool("sample-config", false, "print out full sample configuration")
var fPidfile = flag.String("pidfile", "", "file to write our pid to")
// Telegraf version
var Version = "unreleased"
// Telegraf commit
var Commit = ""
func main() {

View File

@ -13,10 +13,12 @@ import (
"github.com/naoina/toml/ast"
)
// Duration just wraps time.Duration
type Duration struct {
time.Duration
}
// UnmarshalTOML parses the duration from the TOML config file
func (d *Duration) UnmarshalTOML(b []byte) error {
dur, err := time.ParseDuration(string(b[1 : len(b)-1]))
if err != nil {
@ -28,6 +30,9 @@ func (d *Duration) UnmarshalTOML(b []byte) error {
return nil
}
// Config specifies the URL/user/password for the database that telegraf
// will be logging to, as well as all the plugins that the user has
// specified
type Config struct {
URL string
Username string
@ -41,10 +46,12 @@ type Config struct {
plugins map[string]*ast.Table
}
// Plugins returns the configured plugins as a map of name -> plugin toml
func (c *Config) Plugins() map[string]*ast.Table {
return c.plugins
}
// ConfiguredPlugin containing a name, interval, and drop/pass prefix lists
type ConfiguredPlugin struct {
Name string
@ -54,6 +61,7 @@ type ConfiguredPlugin struct {
Interval time.Duration
}
// ShouldPass returns true if the metric should pass, false if should drop
func (cp *ConfiguredPlugin) ShouldPass(measurement string) bool {
if cp.Pass != nil {
for _, pat := range cp.Pass {
@ -78,6 +86,7 @@ func (cp *ConfiguredPlugin) ShouldPass(measurement string) bool {
return true
}
// ApplyAgent loads the toml config into the given interface
func (c *Config) ApplyAgent(v interface{}) error {
if c.agent != nil {
return toml.UnmarshalTable(c.agent, v)
@ -86,6 +95,9 @@ func (c *Config) ApplyAgent(v interface{}) error {
return nil
}
// ApplyPlugin takes defined plugin names and applies them to the given
// interface, returning a ConfiguredPlugin object in the end that can
// be inserted into a runningPlugin by the agent.
func (c *Config) ApplyPlugin(name string, v interface{}) (*ConfiguredPlugin, error) {
cp := &ConfiguredPlugin{Name: name}
@ -137,10 +149,11 @@ func (c *Config) ApplyPlugin(name string, v interface{}) (*ConfiguredPlugin, err
return cp, nil
}
// PluginsDeclared returns the name of all plugins declared in the config.
func (c *Config) PluginsDeclared() []string {
var plugins []string
for name, _ := range c.plugins {
for name := range c.plugins {
plugins = append(plugins, name)
}
@ -149,12 +162,14 @@ func (c *Config) PluginsDeclared() []string {
return plugins
}
// DefaultConfig returns an empty default configuration
func DefaultConfig() *Config {
return &Config{}
}
var ErrInvalidConfig = errors.New("invalid configuration")
var errInvalidConfig = errors.New("invalid configuration")
// LoadConfig loads the given config file and returns a *Config pointer
func LoadConfig(path string) (*Config, error) {
data, err := ioutil.ReadFile(path)
if err != nil {
@ -173,7 +188,7 @@ func LoadConfig(path string) (*Config, error) {
for name, val := range tbl.Fields {
subtbl, ok := val.(*ast.Table)
if !ok {
return nil, ErrInvalidConfig
return nil, errInvalidConfig
}
switch name {
@ -192,6 +207,8 @@ func LoadConfig(path string) (*Config, error) {
return c, nil
}
// ListTags returns a string of tags specified in the config,
// line-protocol style
func (c *Config) ListTags() string {
var tags []string
@ -271,12 +288,13 @@ database = "telegraf" # required.
`
// PrintSampleConfig prints the sample config!
func PrintSampleConfig() {
fmt.Printf(header)
var names []string
for name, _ := range plugins.Plugins {
for name := range plugins.Plugins {
names = append(names, name)
}

16
docker-compose.yml Normal file
View File

@ -0,0 +1,16 @@
mysql:
image: mysql
ports:
- "3306:3306"
environment:
MYSQL_ALLOW_EMPTY_PASSWORD: yes
memcached:
image: memcached
ports:
- "11211:11211"
postgres:
image: postgres
ports:
- "5432:5432"

View File

@ -35,8 +35,10 @@ AWS_FILE=~/aws.conf
INSTALL_ROOT_DIR=/opt/telegraf
TELEGRAF_LOG_DIR=/var/log/telegraf
CONFIG_ROOT_DIR=/etc/opt/telegraf
LOGROTATE_DIR=/etc/logrotate.d
SAMPLE_CONFIGURATION=etc/config.sample.toml
LOGROTATE_CONFIGURATION=etc/logrotate.d/telegraf
INITD_SCRIPT=scripts/init.sh
TMP_WORK_DIR=`mktemp -d`
@ -144,6 +146,11 @@ make_dir_tree() {
echo "Failed to create configuration directory -- aborting."
cleanup_exit 1
fi
mkdir -p $work_dir/$LOGROTATE_DIR
if [ $? -ne 0 ]; then
echo "Failed to create configuration directory -- aborting."
cleanup_exit 1
fi
}
@ -251,6 +258,12 @@ if [ $? -ne 0 ]; then
cleanup_exit 1
fi
cp $LOGROTATE_CONFIGURATION $TMP_WORK_DIR/$LOGROTATE_DIR/telegraf.conf
if [ $? -ne 0 ]; then
echo "Failed to copy $LOGROTATE_CONFIGURATION to packaging directory -- aborting."
cleanup_exit 1
fi
generate_postinstall_script $VERSION
###########################################################################

View File

@ -14,6 +14,9 @@ import (
)
func TestReadsMetricsFromKafka(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
var zkPeers, brokerPeers []string
if len(os.Getenv("ZOOKEEPER_PEERS")) == 0 {

View File

@ -10,7 +10,7 @@ import (
func TestMemcachedGeneratesMetrics(t *testing.T) {
m := &Memcached{
Servers: []string{"localhost"},
Servers: []string{testutil.GetLocalHost()},
}
var acc testutil.Accumulator

View File

@ -457,7 +457,7 @@ func NewStatLine(oldStat, newStat ServerStatus, key string, all bool, sampleSecs
oldStat.ExtraInfo.PageFaults != nil && newStat.ExtraInfo.PageFaults != nil {
returnVal.Faults = diff(*(newStat.ExtraInfo.PageFaults), *(oldStat.ExtraInfo.PageFaults), sampleSecs)
}
if !returnVal.IsMongos && oldStat.Locks != nil && oldStat.Locks != nil {
if !returnVal.IsMongos && oldStat.Locks != nil {
globalCheck, hasGlobal := oldStat.Locks["Global"]
if hasGlobal && globalCheck.AcquireCount != nil {
// This appears to be a 3.0+ server so the data in these fields do *not* refer to

View File

@ -1,6 +1,7 @@
package mysql
import (
"fmt"
"strings"
"testing"
@ -11,7 +12,7 @@ import (
func TestMysqlGeneratesMetrics(t *testing.T) {
m := &Mysql{
Servers: []string{""},
Servers: []string{fmt.Sprintf("root@tcp(%s:3306)/", testutil.GetLocalHost())},
}
var acc testutil.Accumulator
@ -53,7 +54,9 @@ func TestMysqlGeneratesMetrics(t *testing.T) {
}
func TestMysqlDefaultsToLocal(t *testing.T) {
m := &Mysql{}
m := &Mysql{
Servers: []string{fmt.Sprintf("root@tcp(%s:3306)/", testutil.GetLocalHost())},
}
var acc testutil.Accumulator

View File

@ -1,6 +1,7 @@
package postgresql
import (
"fmt"
"testing"
"github.com/influxdb/telegraf/testutil"
@ -12,7 +13,7 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) {
p := &Postgresql{
Servers: []*Server{
{
Address: "sslmode=disable",
Address: fmt.Sprintf("host=%s user=postgres sslmode=disable", testutil.GetLocalHost()),
Databases: []string{"postgres"},
},
},
@ -57,7 +58,7 @@ func TestPostgresqlTagsMetricsWithDatabaseName(t *testing.T) {
p := &Postgresql{
Servers: []*Server{
{
Address: "sslmode=disable",
Address: fmt.Sprintf("host=%s user=postgres sslmode=disable", testutil.GetLocalHost()),
Databases: []string{"postgres"},
},
},
@ -78,7 +79,7 @@ func TestPostgresqlDefaultsToAllDatabases(t *testing.T) {
p := &Postgresql{
Servers: []*Server{
{
Address: "sslmode=disable",
Address: fmt.Sprintf("host=%s user=postgres sslmode=disable", testutil.GetLocalHost()),
},
},
}

View File

@ -83,7 +83,7 @@ func TestRedisGeneratesMetrics(t *testing.T) {
}
for _, c := range checkInt {
assert.NoError(t, acc.ValidateValue(c.name, c.value))
assert.True(t, acc.CheckValue(c.name, c.value))
}
checkFloat := []struct {
@ -98,7 +98,7 @@ func TestRedisGeneratesMetrics(t *testing.T) {
}
for _, c := range checkFloat {
assert.NoError(t, acc.ValidateValue(c.name, c.value))
assert.True(t, acc.CheckValue(c.name, c.value))
}
}
@ -174,7 +174,7 @@ func TestRedisCanPullStatsFromMultipleServers(t *testing.T) {
}
for _, c := range checkInt {
assert.NoError(t, acc.ValidateValue(c.name, c.value))
assert.True(t, acc.CheckValue(c.name, c.value))
}
checkFloat := []struct {
@ -189,7 +189,7 @@ func TestRedisCanPullStatsFromMultipleServers(t *testing.T) {
}
for _, c := range checkFloat {
assert.NoError(t, acc.ValidateValue(c.name, c.value))
assert.True(t, acc.CheckValue(c.name, c.value))
}
}

View File

@ -118,7 +118,7 @@ func (s *Server) addClusterStats(acc plugins.Accumulator) error {
defer cursor.Close()
var clusterStats stats
if err := cursor.One(&clusterStats); err != nil {
return fmt.Errorf("failure to parse cluster stats, $s\n", err.Error())
return fmt.Errorf("failure to parse cluster stats, %s\n", err.Error())
}
tags := s.getDefaultTags()
@ -146,7 +146,7 @@ func (s *Server) addMemberStats(acc plugins.Accumulator) error {
defer cursor.Close()
var memberStats stats
if err := cursor.One(&memberStats); err != nil {
return fmt.Errorf("failure to parse member stats, $s\n", err.Error())
return fmt.Errorf("failure to parse member stats, %s\n", err.Error())
}
tags := s.getDefaultTags()

View File

@ -45,7 +45,7 @@ func TestDisk_io_counters(t *testing.T) {
t.Errorf("error %v", err)
}
if len(ret) == 0 {
t.Errorf("ret is empty", ret)
t.Errorf("ret is empty: %s", ret)
}
empty := DiskIOCountersStat{}
for part, io := range ret {

View File

@ -6,6 +6,7 @@ import (
"time"
)
// Point defines a single point measurement
type Point struct {
Measurement string
Tags map[string]string
@ -13,10 +14,12 @@ type Point struct {
Time time.Time
}
// Accumulator defines a mocked out accumulator
type Accumulator struct {
Points []*Point
}
// Add adds a measurement point to the accumulator
func (a *Accumulator) Add(measurement string, value interface{}, tags map[string]string) {
if tags == nil {
tags = map[string]string{}
@ -31,6 +34,7 @@ func (a *Accumulator) Add(measurement string, value interface{}, tags map[string
)
}
// AddValuesWithTime adds a measurement point with a specified timestamp.
func (a *Accumulator) AddValuesWithTime(
measurement string,
values map[string]interface{},
@ -48,6 +52,7 @@ func (a *Accumulator) AddValuesWithTime(
)
}
// Get gets the specified measurement point from the accumulator
func (a *Accumulator) Get(measurement string) (*Point, bool) {
for _, p := range a.Points {
if p.Measurement == measurement {
@ -58,6 +63,8 @@ func (a *Accumulator) Get(measurement string) (*Point, bool) {
return nil, false
}
// CheckValue checks that the accumulators point for the given measurement
// is the same as the given value.
func (a *Accumulator) CheckValue(measurement string, val interface{}) bool {
for _, p := range a.Points {
if p.Measurement == measurement {
@ -68,11 +75,22 @@ func (a *Accumulator) CheckValue(measurement string, val interface{}) bool {
return false
}
func (a *Accumulator) CheckTaggedValue(measurement string, val interface{}, tags map[string]string) bool {
// CheckTaggedValue calls ValidateTaggedValue
func (a *Accumulator) CheckTaggedValue(
measurement string,
val interface{},
tags map[string]string,
) bool {
return a.ValidateTaggedValue(measurement, val, tags) == nil
}
func (a *Accumulator) ValidateTaggedValue(measurement string, val interface{}, tags map[string]string) error {
// ValidateTaggedValue validates that the given measurement and value exist
// in the accumulator and with the given tags.
func (a *Accumulator) ValidateTaggedValue(
measurement string,
val interface{},
tags map[string]string,
) error {
if tags == nil {
tags = map[string]string{}
}
@ -83,7 +101,8 @@ func (a *Accumulator) ValidateTaggedValue(measurement string, val interface{}, t
if p.Measurement == measurement {
if p.Values["value"] != val {
return fmt.Errorf("%v (%T) != %v (%T)", p.Values["value"], p.Values["value"], val, val)
return fmt.Errorf("%v (%T) != %v (%T)",
p.Values["value"], p.Values["value"], val, val)
}
return nil
}
@ -92,10 +111,12 @@ func (a *Accumulator) ValidateTaggedValue(measurement string, val interface{}, t
return fmt.Errorf("unknown measurement %s with tags %v", measurement, tags)
}
// ValidateValue calls ValidateTaggedValue
func (a *Accumulator) ValidateValue(measurement string, val interface{}) error {
return a.ValidateTaggedValue(measurement, val, nil)
}
// HasIntValue returns true if the measurement has an Int value
func (a *Accumulator) HasIntValue(measurement string) bool {
for _, p := range a.Points {
if p.Measurement == measurement {
@ -107,17 +128,7 @@ func (a *Accumulator) HasIntValue(measurement string) bool {
return false
}
func (a *Accumulator) HasUIntValue(measurement string) bool {
for _, p := range a.Points {
if p.Measurement == measurement {
_, ok := p.Values["value"].(uint64)
return ok
}
}
return false
}
// HasFloatValue returns true if the given measurement has a float value
func (a *Accumulator) HasFloatValue(measurement string) bool {
for _, p := range a.Points {
if p.Measurement == measurement {

29
testutil/testutil.go Normal file
View File

@ -0,0 +1,29 @@
package testutil
import (
"net"
"net/url"
"os"
)
var localhost = "localhost"
// GetLocalHost returns the DOCKER_HOST environment variable, parsing
// out any scheme or ports so that only the IP address is returned.
func GetLocalHost() string {
if dockerHostVar := os.Getenv("DOCKER_HOST"); dockerHostVar != "" {
u, err := url.Parse(dockerHostVar)
if err != nil {
return dockerHostVar
}
// split out the ip addr from the port
host, _, err := net.SplitHostPort(u.Host)
if err != nil {
return dockerHostVar
}
return host
}
return localhost
}

34
testutil/testutil_test.go Normal file
View File

@ -0,0 +1,34 @@
package testutil
import (
"os"
"testing"
)
func TestDockerHost(t *testing.T) {
os.Unsetenv("DOCKER_HOST")
host := GetLocalHost()
if host != localhost {
t.Fatalf("Host should be localhost when DOCKER_HOST is not set. Current value [%s]", host)
}
os.Setenv("DOCKER_HOST", "1.1.1.1")
host = GetLocalHost()
if host != "1.1.1.1" {
t.Fatalf("Host should take DOCKER_HOST value when set. Current value is [%s] and DOCKER_HOST is [%s]", host, os.Getenv("DOCKER_HOST"))
}
os.Setenv("DOCKER_HOST", "tcp://1.1.1.1:8080")
host = GetLocalHost()
if host != "1.1.1.1" {
t.Fatalf("Host should take DOCKER_HOST value when set. Current value is [%s] and DOCKER_HOST is [%s]", host, os.Getenv("DOCKER_HOST"))
}
}