From c0fa6af51b97cd7c3e79535175077c2b86c3ec83 Mon Sep 17 00:00:00 2001 From: Graham Floyd Date: Fri, 31 Jul 2015 14:46:46 -0500 Subject: [PATCH 01/14] Add disque plugin --- plugins/all/all.go | 1 + plugins/disque/disque.go | 202 ++++++++++++++++++++++++++++ plugins/disque/disque_test.go | 242 ++++++++++++++++++++++++++++++++++ 3 files changed, 445 insertions(+) create mode 100644 plugins/disque/disque.go create mode 100644 plugins/disque/disque_test.go diff --git a/plugins/all/all.go b/plugins/all/all.go index 595fdcaf4..dcedf2bf8 100644 --- a/plugins/all/all.go +++ b/plugins/all/all.go @@ -1,6 +1,7 @@ package all import ( + _ "github.com/influxdb/telegraf/plugins/disque" _ "github.com/influxdb/telegraf/plugins/elasticsearch" _ "github.com/influxdb/telegraf/plugins/kafka_consumer" _ "github.com/influxdb/telegraf/plugins/memcached" diff --git a/plugins/disque/disque.go b/plugins/disque/disque.go new file mode 100644 index 000000000..292e1b363 --- /dev/null +++ b/plugins/disque/disque.go @@ -0,0 +1,202 @@ +package disque + +import ( + "bufio" + "errors" + "fmt" + "net" + "net/url" + "strconv" + "strings" + "sync" + + "github.com/influxdb/telegraf/plugins" +) + +type Disque struct { + Servers []string + + c net.Conn + buf []byte +} + +var sampleConfig = ` +# An array of URI to gather stats about. Specify an ip or hostname +# with optional port and password. ie disque://localhost, disque://10.10.3.33:18832, +# 10.0.0.1:10000, etc. +# +# If no servers are specified, then localhost is used as the host. +servers = ["localhost"]` + +func (r *Disque) SampleConfig() string { + return sampleConfig +} + +func (r *Disque) Description() string { + return "Read metrics from one or many disque servers" +} + +var Tracking = map[string]string{ + "uptime_in_seconds": "uptime", + "connected_clients": "clients", + "blocked_clients": "blocked_clients", + "used_memory": "used_memory", + "used_memory_rss": "used_memory_rss", + "used_memory_peak": "used_memory_peak", + "total_connections_received": "total_connections_received", + "total_commands_processed": "total_commands_processed", + "instantaneous_ops_per_sec": "instantaneous_ops_per_sec", + "latest_fork_usec": "latest_fork_usec", + "mem_fragmentation_ratio": "mem_fragmentation_ratio", + "used_cpu_sys": "used_cpu_sys", + "used_cpu_user": "used_cpu_user", + "used_cpu_sys_children": "used_cpu_sys_children", + "used_cpu_user_children": "used_cpu_user_children", + "registered_jobs": "registered_jobs", + "registered_queues": "registered_queues", +} + +var ErrProtocolError = errors.New("disque protocol error") + +// Reads stats from all configured servers accumulates stats. +// Returns one of the errors encountered while gather stats (if any). +func (g *Disque) Gather(acc plugins.Accumulator) error { + if len(g.Servers) == 0 { + url := &url.URL{ + Host: ":7711", + } + g.gatherServer(url, acc) + return nil + } + + var wg sync.WaitGroup + + var outerr error + + for _, serv := range g.Servers { + u, err := url.Parse(serv) + if err != nil { + return fmt.Errorf("Unable to parse to address '%s': %s", serv, err) + } else if u.Scheme == "" { + // fallback to simple string based address (i.e. "10.0.0.1:10000") + u.Scheme = "tcp" + u.Host = serv + u.Path = "" + } + wg.Add(1) + go func(serv string) { + defer wg.Done() + outerr = g.gatherServer(u, acc) + }(serv) + } + + wg.Wait() + + return outerr +} + +const defaultPort = "7711" + +func (g *Disque) gatherServer(addr *url.URL, acc plugins.Accumulator) error { + if g.c == nil { + + _, _, err := net.SplitHostPort(addr.Host) + if err != nil { + addr.Host = addr.Host + ":" + defaultPort + } + + c, err := net.Dial("tcp", addr.Host) + if err != nil { + return fmt.Errorf("Unable to connect to disque server '%s': %s", addr.Host, err) + } + + if addr.User != nil { + pwd, set := addr.User.Password() + if set && pwd != "" { + c.Write([]byte(fmt.Sprintf("AUTH %s\r\n", pwd))) + + r := bufio.NewReader(c) + + line, err := r.ReadString('\n') + if err != nil { + return err + } + if line[0] != '+' { + return fmt.Errorf("%s", strings.TrimSpace(line)[1:]) + } + } + } + + g.c = c + } + + g.c.Write([]byte("info\r\n")) + + r := bufio.NewReader(g.c) + + line, err := r.ReadString('\n') + if err != nil { + return err + } + + if line[0] != '$' { + return fmt.Errorf("bad line start: %s", ErrProtocolError) + } + + line = strings.TrimSpace(line) + + szStr := line[1:] + + sz, err := strconv.Atoi(szStr) + if err != nil { + return fmt.Errorf("bad size string <<%s>>: %s", szStr, ErrProtocolError) + } + + var read int + + for read < sz { + line, err := r.ReadString('\n') + if err != nil { + return err + } + + read += len(line) + + if len(line) == 1 || line[0] == '#' { + continue + } + + parts := strings.SplitN(line, ":", 2) + + name := string(parts[0]) + + metric, ok := Tracking[name] + if !ok { + continue + } + + tags := map[string]string{"host": addr.String()} + val := strings.TrimSpace(parts[1]) + + ival, err := strconv.ParseUint(val, 10, 64) + if err == nil { + acc.Add(metric, ival, tags) + continue + } + + fval, err := strconv.ParseFloat(val, 64) + if err != nil { + return err + } + + acc.Add(metric, fval, tags) + } + + return nil +} + +func init() { + plugins.Add("disque", func() plugins.Plugin { + return &Disque{} + }) +} diff --git a/plugins/disque/disque_test.go b/plugins/disque/disque_test.go new file mode 100644 index 000000000..68228e538 --- /dev/null +++ b/plugins/disque/disque_test.go @@ -0,0 +1,242 @@ +package disque + +import ( + "bufio" + "fmt" + "net" + "testing" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestDisqueGeneratesMetrics(t *testing.T) { + l, err := net.Listen("tcp", ":0") + require.NoError(t, err) + + defer l.Close() + + go func() { + c, err := l.Accept() + if err != nil { + return + } + + buf := bufio.NewReader(c) + + for { + line, err := buf.ReadString('\n') + if err != nil { + return + } + + if line != "info\r\n" { + return + } + + fmt.Fprintf(c, "$%d\n", len(testOutput)) + c.Write([]byte(testOutput)) + } + }() + + addr := fmt.Sprintf("disque://%s", l.Addr().String()) + + r := &Disque{ + Servers: []string{addr}, + } + + var acc testutil.Accumulator + + err = r.Gather(&acc) + require.NoError(t, err) + + checkInt := []struct { + name string + value uint64 + }{ + {"uptime", 1452705}, + {"clients", 31}, + {"blocked_clients", 13}, + {"used_memory", 1840104}, + {"used_memory_rss", 3227648}, + {"used_memory_peak", 89603656}, + {"total_connections_received", 5062777}, + {"total_commands_processed", 12308396}, + {"instantaneous_ops_per_sec", 18}, + {"latest_fork_usec", 1644}, + {"registered_jobs", 360}, + {"registered_queues", 12}, + } + + for _, c := range checkInt { + assert.True(t, acc.CheckValue(c.name, c.value)) + } + + checkFloat := []struct { + name string + value float64 + }{ + {"mem_fragmentation_ratio", 1.75}, + {"used_cpu_sys", 19585.73}, + {"used_cpu_user", 11255.96}, + {"used_cpu_sys_children", 1.75}, + {"used_cpu_user_children", 1.91}, + } + + for _, c := range checkFloat { + assert.True(t, acc.CheckValue(c.name, c.value)) + } +} + +func TestDisqueCanPullStatsFromMultipleServers(t *testing.T) { + l, err := net.Listen("tcp", ":0") + require.NoError(t, err) + + defer l.Close() + + go func() { + c, err := l.Accept() + if err != nil { + return + } + + buf := bufio.NewReader(c) + + for { + line, err := buf.ReadString('\n') + if err != nil { + return + } + + if line != "info\r\n" { + return + } + + fmt.Fprintf(c, "$%d\n", len(testOutput)) + c.Write([]byte(testOutput)) + } + }() + + addr := fmt.Sprintf("disque://%s", l.Addr().String()) + + r := &Disque{ + Servers: []string{addr}, + } + + var acc testutil.Accumulator + + err = r.Gather(&acc) + require.NoError(t, err) + + checkInt := []struct { + name string + value uint64 + }{ + {"uptime", 1452705}, + {"clients", 31}, + {"blocked_clients", 13}, + {"used_memory", 1840104}, + {"used_memory_rss", 3227648}, + {"used_memory_peak", 89603656}, + {"total_connections_received", 5062777}, + {"total_commands_processed", 12308396}, + {"instantaneous_ops_per_sec", 18}, + {"latest_fork_usec", 1644}, + {"registered_jobs", 360}, + {"registered_queues", 12}, + } + + for _, c := range checkInt { + assert.True(t, acc.CheckValue(c.name, c.value)) + } + + checkFloat := []struct { + name string + value float64 + }{ + {"mem_fragmentation_ratio", 1.75}, + {"used_cpu_sys", 19585.73}, + {"used_cpu_user", 11255.96}, + {"used_cpu_sys_children", 1.75}, + {"used_cpu_user_children", 1.91}, + } + + for _, c := range checkFloat { + assert.True(t, acc.CheckValue(c.name, c.value)) + } +} + +const testOutput = `# Server +disque_version:0.0.1 +disque_git_sha1:b5247598 +disque_git_dirty:0 +disque_build_id:379fda78983a60c6 +os:Linux 3.13.0-44-generic x86_64 +arch_bits:64 +multiplexing_api:epoll +gcc_version:4.8.2 +process_id:32420 +run_id:1cfdfa4c6bc3f285182db5427522a8a4c16e42e4 +tcp_port:7711 +uptime_in_seconds:1452705 +uptime_in_days:16 +hz:10 +config_file:/usr/local/etc/disque/disque.conf + +# Clients +connected_clients:31 +client_longest_output_list:0 +client_biggest_input_buf:0 +blocked_clients:13 + +# Memory +used_memory:1840104 +used_memory_human:1.75M +used_memory_rss:3227648 +used_memory_peak:89603656 +used_memory_peak_human:85.45M +mem_fragmentation_ratio:1.75 +mem_allocator:jemalloc-3.6.0 + +# Jobs +registered_jobs:360 + +# Queues +registered_queues:12 + +# Persistence +loading:0 +aof_enabled:1 +aof_state:on +aof_rewrite_in_progress:0 +aof_rewrite_scheduled:0 +aof_last_rewrite_time_sec:0 +aof_current_rewrite_time_sec:-1 +aof_last_bgrewrite_status:ok +aof_last_write_status:ok +aof_current_size:41952430 +aof_base_size:9808 +aof_pending_rewrite:0 +aof_buffer_length:0 +aof_rewrite_buffer_length:0 +aof_pending_bio_fsync:0 +aof_delayed_fsync:1 + +# Stats +total_connections_received:5062777 +total_commands_processed:12308396 +instantaneous_ops_per_sec:18 +total_net_input_bytes:1346996528 +total_net_output_bytes:1967551763 +instantaneous_input_kbps:1.38 +instantaneous_output_kbps:1.78 +rejected_connections:0 +latest_fork_usec:1644 + +# CPU +used_cpu_sys:19585.73 +used_cpu_user:11255.96 +used_cpu_sys_children:1.75 +used_cpu_user_children:1.91 +` From 9ea5a88f84109da953a8a7685e90eda00300cc74 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Mon, 3 Aug 2015 17:16:02 -0600 Subject: [PATCH 02/14] Fix GetLocalHost testutil function for mac users (boot2docker) --- testutil/testutil.go | 19 +++++++++++++++++-- testutil/testutil_test.go | 8 ++++++++ 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/testutil/testutil.go b/testutil/testutil.go index 71c3ce0c2..8735e882d 100644 --- a/testutil/testutil.go +++ b/testutil/testutil.go @@ -1,12 +1,27 @@ package testutil -import "os" +import ( + "net" + "net/url" + "os" +) var localhost = "localhost" func GetLocalHost() string { if dockerHostVar := os.Getenv("DOCKER_HOST"); dockerHostVar != "" { - return dockerHostVar + u, err := url.Parse(dockerHostVar) + if err != nil { + return dockerHostVar + } + + // split out the ip addr from the port + host, _, err := net.SplitHostPort(u.Host) + if err != nil { + return dockerHostVar + } + + return host } return localhost } diff --git a/testutil/testutil_test.go b/testutil/testutil_test.go index 39f548c0f..52a807514 100644 --- a/testutil/testutil_test.go +++ b/testutil/testutil_test.go @@ -23,4 +23,12 @@ func TestDockerHost(t *testing.T) { t.Fatalf("Host should take DOCKER_HOST value when set. Current value is [%s] and DOCKER_HOST is [%s]", host, os.Getenv("DOCKER_HOST")) } + os.Setenv("DOCKER_HOST", "tcp://1.1.1.1:8080") + + host = GetLocalHost() + + if host != "1.1.1.1" { + t.Fatalf("Host should take DOCKER_HOST value when set. Current value is [%s] and DOCKER_HOST is [%s]", host, os.Getenv("DOCKER_HOST")) + } + } From 6b510652edf84dabf5c262087a167b184b6b0293 Mon Sep 17 00:00:00 2001 From: Simon Fraser Date: Tue, 4 Aug 2015 13:47:50 +0100 Subject: [PATCH 03/14] Add Lustre 2 plugin --- plugins/all/all.go | 1 + 1 file changed, 1 insertion(+) diff --git a/plugins/all/all.go b/plugins/all/all.go index dcedf2bf8..d6ebd177a 100644 --- a/plugins/all/all.go +++ b/plugins/all/all.go @@ -4,6 +4,7 @@ import ( _ "github.com/influxdb/telegraf/plugins/disque" _ "github.com/influxdb/telegraf/plugins/elasticsearch" _ "github.com/influxdb/telegraf/plugins/kafka_consumer" + _ "github.com/influxdb/telegraf/plugins/lustre2" _ "github.com/influxdb/telegraf/plugins/memcached" _ "github.com/influxdb/telegraf/plugins/mongodb" _ "github.com/influxdb/telegraf/plugins/mysql" From e442d754d0671c33f280491e668d876180b9139d Mon Sep 17 00:00:00 2001 From: Simon Fraser Date: Tue, 4 Aug 2015 13:48:09 +0100 Subject: [PATCH 04/14] Lustre filesystem plugin (http://lustre.org/) The configuration allows users to override the /proc/ files scanned for data, since that has been known to change with lustre versions. --- plugins/lustre2/lustre2.go | 235 +++++++++++++++++++++++++++++++++++++ 1 file changed, 235 insertions(+) create mode 100644 plugins/lustre2/lustre2.go diff --git a/plugins/lustre2/lustre2.go b/plugins/lustre2/lustre2.go new file mode 100644 index 000000000..348a4c739 --- /dev/null +++ b/plugins/lustre2/lustre2.go @@ -0,0 +1,235 @@ +// +build linux +/* +Lustre 2.x telegraf plugin + +Lustre (http://lustre.org/) is an open-source, parallel file system +for HPC environments. It stores statistics about its activity in +/proc + +*/ +package lustre2 + +import ( + "path/filepath" + "strconv" + "strings" + + "github.com/influxdb/telegraf/plugins" + common "github.com/influxdb/telegraf/plugins/system/ps/common" +) + +// Lustre proc files can change between versions, so we want to future-proof +// by letting people choose what to look at. +type Lustre2 struct { + Ost_procfiles []string + Mds_procfiles []string +} + +var sampleConfig = ` +# An array of /proc globs to search for Lustre stats +# If not specified, the default will work on Lustre 2.5.x +# +# ost_procfiles = ["/proc/fs/lustre/obdfilter/*/stats", "/proc/fs/lustre/osd-ldiskfs/*/stats"] +# mds_procfiles = ["/proc/fs/lustre/mdt/*/md_stats"]` + +/* The wanted fields would be a []string if not for the +lines that start with read_bytes/write_bytes and contain + both the byte count and the function call count +*/ +type mapping struct { + inProc string // What to look for at the start of a line in /proc/fs/lustre/* + field uint32 // which field to extract from that line + reportAs string // What measurement name to use + tag string // Additional tag to add for this metric +} + +var wanted_ost_fields = []*mapping{ + { + inProc: "write_bytes", + field: 6, + reportAs: "write_bytes", + }, + { // line starts with 'write_bytes', but value write_calls is in second column + inProc: "write_bytes", + field: 1, + reportAs: "write_calls", + }, + { + inProc: "read_bytes", + field: 6, + reportAs: "read_bytes", + }, + { // line starts with 'read_bytes', but value read_calls is in second column + inProc: "read_bytes", + field: 1, + reportAs: "read_calls", + }, + { + inProc: "cache_hit", + }, + { + inProc: "cache_miss", + }, + { + inProc: "cache_access", + }, +} + +var wanted_mds_fields = []*mapping{ + { + inProc: "open", + }, + { + inProc: "close", + }, + { + inProc: "mknod", + }, + { + inProc: "link", + }, + { + inProc: "unlink", + }, + { + inProc: "mkdir", + }, + { + inProc: "rmdir", + }, + { + inProc: "rename", + }, + { + inProc: "getattr", + }, + { + inProc: "setattr", + }, + { + inProc: "getxattr", + }, + { + inProc: "setxattr", + }, + { + inProc: "statfs", + }, + { + inProc: "sync", + }, + { + inProc: "samedir_rename", + }, + { + inProc: "crossdir_rename", + }, +} + +func (l *Lustre2) GetLustreProcStats(fileglob string, wanted_fields []*mapping, acc plugins.Accumulator) error { + files, err := filepath.Glob(fileglob) + if err != nil { + return err + } + + for _, file := range files { + /* Turn /proc/fs/lustre/obdfilter//stats and similar + * into just the object store target name + * Assumpion: the target name is always second to last, + * which is true in Lustre 2.1->2.5 + */ + path := strings.Split(file, "/") + name := path[len(path)-2] + tags := map[string]string{ + "name": name, + } + + lines, err := common.ReadLines(file) + if err != nil { + return err + } + + for _, line := range lines { + fields := strings.Fields(line) + + for _, wanted := range wanted_fields { + var data uint64 + if fields[0] == wanted.inProc { + wanted_field := wanted.field + // if not set, assume field[1]. Shouldn't be field[0], as + // that's a string + if wanted_field == 0 { + wanted_field = 1 + } + data, err = strconv.ParseUint((fields[wanted_field]), 10, 64) + if err != nil { + return err + } + report_name := wanted.inProc + if wanted.reportAs != "" { + report_name = wanted.reportAs + } + acc.Add(report_name, data, tags) + + } + } + } + } + return nil +} + +// SampleConfig returns sample configuration message +func (l *Lustre2) SampleConfig() string { + return sampleConfig +} + +// Description returns description of Lustre2 plugin +func (l *Lustre2) Description() string { + return "Read metrics from local Lustre service on OST, MDS" +} + +// Gather reads stats from all lustre targets +func (l *Lustre2) Gather(acc plugins.Accumulator) error { + + if len(l.Ost_procfiles) == 0 { + // read/write bytes are in obdfilter//stats + err := l.GetLustreProcStats("/proc/fs/lustre/obdfilter/*/stats", wanted_ost_fields, acc) + if err != nil { + return err + } + // cache counters are in osd-ldiskfs//stats + err = l.GetLustreProcStats("/proc/fs/lustre/osd-ldiskfs/*/stats", wanted_ost_fields, acc) + if err != nil { + return err + } + } + + if len(l.Mds_procfiles) == 0 { + // Metadata server stats + err := l.GetLustreProcStats("/proc/fs/lustre/mdt/*/md_stats", wanted_mds_fields, acc) + if err != nil { + return err + } + } + + for _, procfile := range l.Ost_procfiles { + err := l.GetLustreProcStats(procfile, wanted_ost_fields, acc) + if err != nil { + return err + } + } + for _, procfile := range l.Mds_procfiles { + err := l.GetLustreProcStats(procfile, wanted_mds_fields, acc) + if err != nil { + return err + } + } + + return nil +} + +func init() { + plugins.Add("lustre2", func() plugins.Plugin { + return &Lustre2{} + }) +} From 236459569755bf2464f4d2dab89dbd2bc5942312 Mon Sep 17 00:00:00 2001 From: Simon Fraser Date: Tue, 4 Aug 2015 14:53:45 +0100 Subject: [PATCH 05/14] Require validation for uint64 as well as int64 --- testutil/accumulator.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 56657e711..107fa8db6 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -107,6 +107,17 @@ func (a *Accumulator) HasIntValue(measurement string) bool { return false } +func (a *Accumulator) HasUIntValue(measurement string) bool { + for _, p := range a.Points { + if p.Measurement == measurement { + _, ok := p.Values["value"].(uint64) + return ok + } + } + + return false +} + func (a *Accumulator) HasFloatValue(measurement string) bool { for _, p := range a.Points { if p.Measurement == measurement { From 8255945ea741deabde73196d8066592a458adcf0 Mon Sep 17 00:00:00 2001 From: Simon Fraser Date: Tue, 4 Aug 2015 14:54:50 +0100 Subject: [PATCH 06/14] Tests for the lustre plugin, initial commit --- plugins/lustre2/lustre2_test.go | 144 ++++++++++++++++++++++++++++++++ 1 file changed, 144 insertions(+) create mode 100644 plugins/lustre2/lustre2_test.go diff --git a/plugins/lustre2/lustre2_test.go b/plugins/lustre2/lustre2_test.go new file mode 100644 index 000000000..850a4ff32 --- /dev/null +++ b/plugins/lustre2/lustre2_test.go @@ -0,0 +1,144 @@ +package lustre2 + +import ( + "io/ioutil" + "os" + "testing" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// Set config file variables to point to fake directory structure instead of /proc? + +const obdfilterProcContents = `snapshot_time 1438693064.430544 secs.usecs +read_bytes 203238095 samples [bytes] 4096 1048576 78026117632000 +write_bytes 71893382 samples [bytes] 1 1048576 15201500833981 +get_info 1182008495 samples [reqs] +set_info_async 2 samples [reqs] +connect 1117 samples [reqs] +reconnect 1160 samples [reqs] +disconnect 1084 samples [reqs] +statfs 3575885 samples [reqs] +create 698 samples [reqs] +destroy 3190060 samples [reqs] +setattr 605647 samples [reqs] +punch 805187 samples [reqs] +sync 6608753 samples [reqs] +preprw 275131477 samples [reqs] +commitrw 275131477 samples [reqs] +quotactl 229231 samples [reqs] +ping 78020757 samples [reqs] +` + +const osdldiskfsProcContents = `snapshot_time 1438693135.640551 secs.usecs +get_page 275132812 samples [usec] 0 3147 1320420955 22041662259 +cache_access 19047063027 samples [pages] 1 1 19047063027 +cache_hit 7393729777 samples [pages] 1 1 7393729777 +cache_miss 11653333250 samples [pages] 1 1 11653333250 +` + +const mdtProcContents = `snapshot_time 1438693238.20113 secs.usecs +open 1024577037 samples [reqs] +close 873243496 samples [reqs] +mknod 349042 samples [reqs] +link 445 samples [reqs] +unlink 3549417 samples [reqs] +mkdir 705499 samples [reqs] +rmdir 227434 samples [reqs] +rename 629196 samples [reqs] +getattr 1503663097 samples [reqs] +setattr 1898364 samples [reqs] +getxattr 6145349681 samples [reqs] +setxattr 83969 samples [reqs] +statfs 2916320 samples [reqs] +sync 434081 samples [reqs] +samedir_rename 259625 samples [reqs] +crossdir_rename 369571 samples [reqs] +` + +type metrics struct { + name string + value uint64 +} + +func TestLustre2GeneratesMetrics(t *testing.T) { + + tempdir := os.TempDir() + "/telegraf/proc/fs/lustre/" + ost_name := "OST0001" + + mdtdir := tempdir + "/mdt/" + err := os.MkdirAll(mdtdir+"/"+ost_name, 0755) + require.NoError(t, err) + + osddir := tempdir + "/osd-ldiskfs/" + err = os.MkdirAll(osddir+"/"+ost_name, 0755) + require.NoError(t, err) + + obddir := tempdir + "/obdfilter/" + err = os.MkdirAll(obddir+"/"+ost_name, 0755) + require.NoError(t, err) + + err = ioutil.WriteFile(mdtdir+"/"+ost_name+"/md_stats", []byte(mdtProcContents), 0644) + require.NoError(t, err) + + err = ioutil.WriteFile(osddir+"/"+ost_name+"/stats", []byte(osdldiskfsProcContents), 0644) + require.NoError(t, err) + + err = ioutil.WriteFile(obddir+"/"+ost_name+"/stats", []byte(obdfilterProcContents), 0644) + require.NoError(t, err) + + m := &Lustre2{ + Ost_procfiles: []string{obddir + "/*/stats", osddir + "/*/stats"}, + Mds_procfiles: []string{mdtdir + "/*/md_stats"}, + } + + var acc testutil.Accumulator + + err = m.Gather(&acc) + require.NoError(t, err) + + tags := map[string]string{ + "name": ost_name, + } + + intMetrics := []*metrics{ + { + name: "write_bytes", + value: 15201500833981, + }, + { + name: "read_bytes", + value: 78026117632000, + }, + { + name: "write_calls", + value: 71893382, + }, + { + name: "read_calls", + value: 203238095, + }, + { + name: "cache_hit", + value: 7393729777, + }, + { + name: "cache_access", + value: 19047063027, + }, + { + name: "cache_miss", + value: 11653333250, + }, + } + + for _, metric := range intMetrics { + assert.True(t, acc.HasUIntValue(metric.name), metric.name) + assert.True(t, acc.CheckTaggedValue(metric.name, metric.value, tags)) + } + + err = os.RemoveAll(os.TempDir() + "/telegraf") + require.NoError(t, err) +} From 0647666c65fc35c199cd0f4b55706c13cfaf7517 Mon Sep 17 00:00:00 2001 From: Simon Fraser Date: Tue, 4 Aug 2015 15:30:27 +0100 Subject: [PATCH 07/14] Add default log rotation --- package.sh | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/package.sh b/package.sh index c20e566cb..fca719e8c 100755 --- a/package.sh +++ b/package.sh @@ -35,8 +35,10 @@ AWS_FILE=~/aws.conf INSTALL_ROOT_DIR=/opt/telegraf TELEGRAF_LOG_DIR=/var/log/telegraf CONFIG_ROOT_DIR=/etc/opt/telegraf +LOGROTATE_DIR=/etc/logrotate.d SAMPLE_CONFIGURATION=etc/config.sample.toml +LOGROTATE_CONFIGURATION=etc/logrotate.d/telegraf INITD_SCRIPT=scripts/init.sh TMP_WORK_DIR=`mktemp -d` @@ -144,6 +146,11 @@ make_dir_tree() { echo "Failed to create configuration directory -- aborting." cleanup_exit 1 fi + mkdir -p $work_dir/$LOGROTATE_DIR + if [ $? -ne 0 ]; then + echo "Failed to create configuration directory -- aborting." + cleanup_exit 1 + fi } @@ -251,6 +258,12 @@ if [ $? -ne 0 ]; then cleanup_exit 1 fi +cp $LOGROTATE_CONFIGURATION $TMP_WORK_DIR/$LOGROTATE_DIR/telegraf.conf +if [ $? -ne 0 ]; then + echo "Failed to copy $LOGROTATE_CONFIGURATION to packaging directory -- aborting." + cleanup_exit 1 +fi + generate_postinstall_script $VERSION ########################################################################### From bb7bdffadabc6a939ecc405ebd8499e2fa9f9d87 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Tue, 4 Aug 2015 08:58:32 -0600 Subject: [PATCH 08/14] Creating circleci job to just lint and vet code --- accumulator.go | 4 ++++ agent.go | 21 ++++++++++++++++---- circle.yml | 17 ++++++++++++++++ cmd/telegraf/telegraf.go | 2 ++ config.go | 26 +++++++++++++++++++++---- plugins/mongodb/mongostat.go | 2 +- plugins/rethinkdb/rethinkdb_server.go | 4 ++-- plugins/system/ps/disk/disk_test.go | 2 +- testutil/accumulator.go | 28 ++++++++++++++++++++++++--- testutil/testutil.go | 2 ++ 10 files changed, 93 insertions(+), 15 deletions(-) create mode 100644 circle.yml diff --git a/accumulator.go b/accumulator.go index ab5a02dae..b3f7a4511 100644 --- a/accumulator.go +++ b/accumulator.go @@ -10,6 +10,8 @@ import ( "github.com/influxdb/influxdb/client" ) +// BatchPoints is used to send a batch of data in a single write from telegraf +// to influx type BatchPoints struct { mu sync.Mutex @@ -22,6 +24,7 @@ type BatchPoints struct { Config *ConfiguredPlugin } +// Add adds a measurement func (bp *BatchPoints) Add(measurement string, val interface{}, tags map[string]string) { bp.mu.Lock() defer bp.mu.Unlock() @@ -55,6 +58,7 @@ func (bp *BatchPoints) Add(measurement string, val interface{}, tags map[string] }) } +// AddValuesWithTime adds a measurement with a provided timestamp func (bp *BatchPoints) AddValuesWithTime( measurement string, values map[string]interface{}, diff --git a/agent.go b/agent.go index 7f45e47a2..7e47ff21a 100644 --- a/agent.go +++ b/agent.go @@ -19,8 +19,13 @@ type runningPlugin struct { config *ConfiguredPlugin } +// Agent runs telegraf and collects data based on the given config type Agent struct { + + // Interval at which to gather information Interval Duration + + // Run in debug mode? Debug bool Hostname string @@ -31,6 +36,7 @@ type Agent struct { conn *client.Client } +// NewAgent returns an Agent struct based off the given Config func NewAgent(config *Config) (*Agent, error) { agent := &Agent{Config: config, Interval: Duration{10 * time.Second}} @@ -57,8 +63,9 @@ func NewAgent(config *Config) (*Agent, error) { return agent, nil } -func (agent *Agent) Connect() error { - config := agent.Config +// Connect connects to the agent's config URL +func (a *Agent) Connect() error { + config := a.Config u, err := url.Parse(config.URL) if err != nil { @@ -77,11 +84,12 @@ func (agent *Agent) Connect() error { return err } - agent.conn = c + a.conn = c return nil } +// LoadPlugins loads the agent's plugins func (a *Agent) LoadPlugins() ([]string, error) { var names []string @@ -201,10 +209,12 @@ func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) err } } +// TestAllPlugins verifies that we can 'Gather' from all plugins with the +// default configuration func (a *Agent) TestAllPlugins() error { var names []string - for name, _ := range plugins.Plugins { + for name := range plugins.Plugins { names = append(names, name) } @@ -230,6 +240,8 @@ func (a *Agent) TestAllPlugins() error { return nil } +// Test verifies that we can 'Gather' from all plugins with their configured +// Config struct func (a *Agent) Test() error { var acc BatchPoints @@ -253,6 +265,7 @@ func (a *Agent) Test() error { return nil } +// Run runs the agent daemon, gathering every Interval func (a *Agent) Run(shutdown chan struct{}) error { if a.conn == nil { err := a.Connect() diff --git a/circle.yml b/circle.yml new file mode 100644 index 000000000..6c346a360 --- /dev/null +++ b/circle.yml @@ -0,0 +1,17 @@ +dependencies: + post: + # install golint + - go get github.com/golang/lint/golint + # install binaries + - go install ./... + +test: + pre: + # Vet go code for any potential errors + - go vet ./... + override: + # Enforce that testutil, cmd, and main directory are fully linted + - golint . + - golint testutil/... + - golint cmd/... + # TODO run unit tests diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index f36693ebc..04a292d2b 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -19,7 +19,9 @@ var fVersion = flag.Bool("version", false, "display the version") var fSampleConfig = flag.Bool("sample-config", false, "print out full sample configuration") var fPidfile = flag.String("pidfile", "", "file to write our pid to") +// Telegraf version var Version = "unreleased" +// Telegraf commit var Commit = "" func main() { diff --git a/config.go b/config.go index a94c1c490..4382fc7c0 100644 --- a/config.go +++ b/config.go @@ -13,10 +13,12 @@ import ( "github.com/naoina/toml/ast" ) +// Duration just wraps time.Duration type Duration struct { time.Duration } +// UnmarshalTOML parses the duration from the TOML config file func (d *Duration) UnmarshalTOML(b []byte) error { dur, err := time.ParseDuration(string(b[1 : len(b)-1])) if err != nil { @@ -28,6 +30,9 @@ func (d *Duration) UnmarshalTOML(b []byte) error { return nil } +// Config specifies the URL/user/password for the database that telegraf +// will be logging to, as well as all the plugins that the user has +// specified type Config struct { URL string Username string @@ -41,10 +46,12 @@ type Config struct { plugins map[string]*ast.Table } +// Plugins returns the configured plugins as a map of name -> plugin toml func (c *Config) Plugins() map[string]*ast.Table { return c.plugins } +// ConfiguredPlugin containing a name, interval, and drop/pass prefix lists type ConfiguredPlugin struct { Name string @@ -54,6 +61,7 @@ type ConfiguredPlugin struct { Interval time.Duration } +// ShouldPass returns true if the metric should pass, false if should drop func (cp *ConfiguredPlugin) ShouldPass(measurement string) bool { if cp.Pass != nil { for _, pat := range cp.Pass { @@ -78,6 +86,7 @@ func (cp *ConfiguredPlugin) ShouldPass(measurement string) bool { return true } +// ApplyAgent loads the toml config into the given interface func (c *Config) ApplyAgent(v interface{}) error { if c.agent != nil { return toml.UnmarshalTable(c.agent, v) @@ -86,6 +95,9 @@ func (c *Config) ApplyAgent(v interface{}) error { return nil } +// ApplyPlugin takes defined plugin names and applies them to the given +// interface, returning a ConfiguredPlugin object in the end that can +// be inserted into a runningPlugin by the agent. func (c *Config) ApplyPlugin(name string, v interface{}) (*ConfiguredPlugin, error) { cp := &ConfiguredPlugin{Name: name} @@ -137,10 +149,11 @@ func (c *Config) ApplyPlugin(name string, v interface{}) (*ConfiguredPlugin, err return cp, nil } +// PluginsDeclared returns the name of all plugins declared in the config. func (c *Config) PluginsDeclared() []string { var plugins []string - for name, _ := range c.plugins { + for name := range c.plugins { plugins = append(plugins, name) } @@ -149,12 +162,14 @@ func (c *Config) PluginsDeclared() []string { return plugins } +// DefaultConfig returns an empty default configuration func DefaultConfig() *Config { return &Config{} } -var ErrInvalidConfig = errors.New("invalid configuration") +var errInvalidConfig = errors.New("invalid configuration") +// LoadConfig loads the given config file and returns a *Config pointer func LoadConfig(path string) (*Config, error) { data, err := ioutil.ReadFile(path) if err != nil { @@ -173,7 +188,7 @@ func LoadConfig(path string) (*Config, error) { for name, val := range tbl.Fields { subtbl, ok := val.(*ast.Table) if !ok { - return nil, ErrInvalidConfig + return nil, errInvalidConfig } switch name { @@ -192,6 +207,8 @@ func LoadConfig(path string) (*Config, error) { return c, nil } +// ListTags returns a string of tags specified in the config, +// line-protocol style func (c *Config) ListTags() string { var tags []string @@ -271,12 +288,13 @@ database = "telegraf" # required. ` +// PrintSampleConfig prints the sample config! func PrintSampleConfig() { fmt.Printf(header) var names []string - for name, _ := range plugins.Plugins { + for name := range plugins.Plugins { names = append(names, name) } diff --git a/plugins/mongodb/mongostat.go b/plugins/mongodb/mongostat.go index ae39a3e3a..b3c990b1a 100644 --- a/plugins/mongodb/mongostat.go +++ b/plugins/mongodb/mongostat.go @@ -457,7 +457,7 @@ func NewStatLine(oldStat, newStat ServerStatus, key string, all bool, sampleSecs oldStat.ExtraInfo.PageFaults != nil && newStat.ExtraInfo.PageFaults != nil { returnVal.Faults = diff(*(newStat.ExtraInfo.PageFaults), *(oldStat.ExtraInfo.PageFaults), sampleSecs) } - if !returnVal.IsMongos && oldStat.Locks != nil && oldStat.Locks != nil { + if !returnVal.IsMongos && oldStat.Locks != nil { globalCheck, hasGlobal := oldStat.Locks["Global"] if hasGlobal && globalCheck.AcquireCount != nil { // This appears to be a 3.0+ server so the data in these fields do *not* refer to diff --git a/plugins/rethinkdb/rethinkdb_server.go b/plugins/rethinkdb/rethinkdb_server.go index 43551fe25..9285068bd 100644 --- a/plugins/rethinkdb/rethinkdb_server.go +++ b/plugins/rethinkdb/rethinkdb_server.go @@ -118,7 +118,7 @@ func (s *Server) addClusterStats(acc plugins.Accumulator) error { defer cursor.Close() var clusterStats stats if err := cursor.One(&clusterStats); err != nil { - return fmt.Errorf("failure to parse cluster stats, $s\n", err.Error()) + return fmt.Errorf("failure to parse cluster stats, %s\n", err.Error()) } tags := s.getDefaultTags() @@ -146,7 +146,7 @@ func (s *Server) addMemberStats(acc plugins.Accumulator) error { defer cursor.Close() var memberStats stats if err := cursor.One(&memberStats); err != nil { - return fmt.Errorf("failure to parse member stats, $s\n", err.Error()) + return fmt.Errorf("failure to parse member stats, %s\n", err.Error()) } tags := s.getDefaultTags() diff --git a/plugins/system/ps/disk/disk_test.go b/plugins/system/ps/disk/disk_test.go index 04776b1d8..6a91bae8c 100644 --- a/plugins/system/ps/disk/disk_test.go +++ b/plugins/system/ps/disk/disk_test.go @@ -45,7 +45,7 @@ func TestDisk_io_counters(t *testing.T) { t.Errorf("error %v", err) } if len(ret) == 0 { - t.Errorf("ret is empty", ret) + t.Errorf("ret is empty: %s", ret) } empty := DiskIOCountersStat{} for part, io := range ret { diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 107fa8db6..0f258904c 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -6,6 +6,7 @@ import ( "time" ) +// Point defines a single point measurement type Point struct { Measurement string Tags map[string]string @@ -13,10 +14,12 @@ type Point struct { Time time.Time } +// Accumulator defines a mocked out accumulator type Accumulator struct { Points []*Point } +// Add adds a measurement point to the accumulator func (a *Accumulator) Add(measurement string, value interface{}, tags map[string]string) { if tags == nil { tags = map[string]string{} @@ -31,6 +34,7 @@ func (a *Accumulator) Add(measurement string, value interface{}, tags map[string ) } +// AddValuesWithTime adds a measurement point with a specified timestamp. func (a *Accumulator) AddValuesWithTime( measurement string, values map[string]interface{}, @@ -48,6 +52,7 @@ func (a *Accumulator) AddValuesWithTime( ) } +// Get gets the specified measurement point from the accumulator func (a *Accumulator) Get(measurement string) (*Point, bool) { for _, p := range a.Points { if p.Measurement == measurement { @@ -58,6 +63,8 @@ func (a *Accumulator) Get(measurement string) (*Point, bool) { return nil, false } +// CheckValue checks that the accumulators point for the given measurement +// is the same as the given value. func (a *Accumulator) CheckValue(measurement string, val interface{}) bool { for _, p := range a.Points { if p.Measurement == measurement { @@ -68,11 +75,22 @@ func (a *Accumulator) CheckValue(measurement string, val interface{}) bool { return false } -func (a *Accumulator) CheckTaggedValue(measurement string, val interface{}, tags map[string]string) bool { +// CheckTaggedValue calls ValidateTaggedValue +func (a *Accumulator) CheckTaggedValue( + measurement string, + val interface{}, + tags map[string]string, +) bool { return a.ValidateTaggedValue(measurement, val, tags) == nil } -func (a *Accumulator) ValidateTaggedValue(measurement string, val interface{}, tags map[string]string) error { +// ValidateTaggedValue validates that the given measurement and value exist +// in the accumulator and with the given tags. +func (a *Accumulator) ValidateTaggedValue( + measurement string, + val interface{}, + tags map[string]string, +) error { if tags == nil { tags = map[string]string{} } @@ -83,7 +101,8 @@ func (a *Accumulator) ValidateTaggedValue(measurement string, val interface{}, t if p.Measurement == measurement { if p.Values["value"] != val { - return fmt.Errorf("%v (%T) != %v (%T)", p.Values["value"], p.Values["value"], val, val) + return fmt.Errorf("%v (%T) != %v (%T)", + p.Values["value"], p.Values["value"], val, val) } return nil } @@ -92,10 +111,12 @@ func (a *Accumulator) ValidateTaggedValue(measurement string, val interface{}, t return fmt.Errorf("unknown measurement %s with tags %v", measurement, tags) } +// ValidateValue calls ValidateTaggedValue func (a *Accumulator) ValidateValue(measurement string, val interface{}) error { return a.ValidateTaggedValue(measurement, val, nil) } +// HasIntValue returns true if the measurement has an Int value func (a *Accumulator) HasIntValue(measurement string) bool { for _, p := range a.Points { if p.Measurement == measurement { @@ -118,6 +139,7 @@ func (a *Accumulator) HasUIntValue(measurement string) bool { return false } +// HasFloatValue returns true if the given measurement has a float value func (a *Accumulator) HasFloatValue(measurement string) bool { for _, p := range a.Points { if p.Measurement == measurement { diff --git a/testutil/testutil.go b/testutil/testutil.go index 8735e882d..91eb4b6b9 100644 --- a/testutil/testutil.go +++ b/testutil/testutil.go @@ -8,6 +8,8 @@ import ( var localhost = "localhost" +// GetLocalHost returns the DOCKER_HOST environment variable, parsing +// out any scheme or ports so that only the IP address is returned. func GetLocalHost() string { if dockerHostVar := os.Getenv("DOCKER_HOST"); dockerHostVar != "" { u, err := url.Parse(dockerHostVar) From 816313de30725d7213f239ac070e14b84248f430 Mon Sep 17 00:00:00 2001 From: Simon Fraser Date: Tue, 4 Aug 2015 21:44:15 +0100 Subject: [PATCH 09/14] Fix 'go vet' error, +build comment must be followed by a blank line --- plugins/lustre2/lustre2.go | 1 + 1 file changed, 1 insertion(+) diff --git a/plugins/lustre2/lustre2.go b/plugins/lustre2/lustre2.go index 348a4c739..72a83e5f1 100644 --- a/plugins/lustre2/lustre2.go +++ b/plugins/lustre2/lustre2.go @@ -1,4 +1,5 @@ // +build linux + /* Lustre 2.x telegraf plugin From b4ef7bb3ed8863c6f854b33fab171851381cddd8 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Tue, 4 Aug 2015 15:04:34 -0600 Subject: [PATCH 10/14] Adding circleci build badge --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 3294cc34b..4a607556e 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Telegraf - A native agent for InfluxDB +# Telegraf - A native agent for InfluxDB [![Circle CI](https://circleci.com/gh/influxdb/telegraf.svg?style=svg)](https://circleci.com/gh/influxdb/telegraf) Telegraf is an agent written in Go for collecting metrics from the system it's running on or from other services and writing them into InfluxDB. From 260fc43281fc1a8104fddec402478b64c16a93c2 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Tue, 4 Aug 2015 16:09:59 -0600 Subject: [PATCH 11/14] go fmt fixes --- cmd/telegraf/telegraf.go | 1 + plugins/mysql/mysql.go | 6 +-- plugins/prometheus/prometheus_test.go | 9 ++-- plugins/system/ps/common/common_freebsd.go | 3 +- plugins/system/ps/host/host_linux_386.go | 50 +++++++++++----------- plugins/system/system_test.go | 4 +- 6 files changed, 36 insertions(+), 37 deletions(-) diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index 04a292d2b..f34ffd1fd 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -21,6 +21,7 @@ var fPidfile = flag.String("pidfile", "", "file to write our pid to") // Telegraf version var Version = "unreleased" + // Telegraf commit var Commit = "" diff --git a/plugins/mysql/mysql.go b/plugins/mysql/mysql.go index b94b4f9b2..a55006a4d 100644 --- a/plugins/mysql/mysql.go +++ b/plugins/mysql/mysql.go @@ -72,9 +72,9 @@ var mappings = []*mapping{ inExport: "innodb_", }, { - onServer: "Tokudb_", - inExport: "tokudb_", - }, + onServer: "Tokudb_", + inExport: "tokudb_", + }, { onServer: "Threads_", inExport: "threads_", diff --git a/plugins/prometheus/prometheus_test.go b/plugins/prometheus/prometheus_test.go index be2b190e7..4f79822c1 100644 --- a/plugins/prometheus/prometheus_test.go +++ b/plugins/prometheus/prometheus_test.go @@ -11,7 +11,6 @@ import ( "github.com/stretchr/testify/require" ) - const sampleTextFormat = `# HELP go_gc_duration_seconds A summary of the GC invocation durations. # TYPE go_gc_duration_seconds summary go_gc_duration_seconds{quantile="0"} 0.00010425500000000001 @@ -27,9 +26,9 @@ go_goroutines 15 ` func TestPrometheusGeneratesMetrics(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - fmt.Fprintln(w, sampleTextFormat) - })) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, sampleTextFormat) + })) defer ts.Close() p := &Prometheus{ @@ -44,7 +43,7 @@ func TestPrometheusGeneratesMetrics(t *testing.T) { expected := []struct { name string value float64 - tags map[string]string + tags map[string]string }{ {"go_gc_duration_seconds_count", 7, map[string]string{}}, {"go_goroutines", 15, map[string]string{}}, diff --git a/plugins/system/ps/common/common_freebsd.go b/plugins/system/ps/common/common_freebsd.go index 3c1124655..8ccd40e90 100644 --- a/plugins/system/ps/common/common_freebsd.go +++ b/plugins/system/ps/common/common_freebsd.go @@ -3,9 +3,9 @@ package common import ( - "syscall" "os/exec" "strings" + "syscall" "unsafe" ) @@ -58,4 +58,3 @@ func CallSyscall(mib []int32) ([]byte, uint64, error) { return buf, length, nil } - diff --git a/plugins/system/ps/host/host_linux_386.go b/plugins/system/ps/host/host_linux_386.go index d8f31c2f6..fb6d7a0f6 100644 --- a/plugins/system/ps/host/host_linux_386.go +++ b/plugins/system/ps/host/host_linux_386.go @@ -6,39 +6,39 @@ package host const ( - sizeofPtr = 0x4 - sizeofShort = 0x2 - sizeofInt = 0x4 - sizeofLong = 0x4 - sizeofLongLong = 0x8 + sizeofPtr = 0x4 + sizeofShort = 0x2 + sizeofInt = 0x4 + sizeofLong = 0x4 + sizeofLongLong = 0x8 ) type ( - _C_short int16 - _C_int int32 - _C_long int32 - _C_long_long int64 + _C_short int16 + _C_int int32 + _C_long int32 + _C_long_long int64 ) type utmp struct { - Type int16 - Pad_cgo_0 [2]byte - Pid int32 - Line [32]int8 - Id [4]int8 - User [32]int8 - Host [256]int8 - Exit exit_status - Session int32 - Tv UtTv - Addr_v6 [4]int32 - X__unused [20]int8 + Type int16 + Pad_cgo_0 [2]byte + Pid int32 + Line [32]int8 + Id [4]int8 + User [32]int8 + Host [256]int8 + Exit exit_status + Session int32 + Tv UtTv + Addr_v6 [4]int32 + X__unused [20]int8 } type exit_status struct { - Termination int16 - Exit int16 + Termination int16 + Exit int16 } type UtTv struct { - TvSec int32 - TvUsec int32 + TvSec int32 + TvUsec int32 } diff --git a/plugins/system/system_test.go b/plugins/system/system_test.go index 1c40b7f97..7a3d13570 100644 --- a/plugins/system/system_test.go +++ b/plugins/system/system_test.go @@ -272,8 +272,8 @@ func TestSystemStats_GenerateStats(t *testing.T) { require.NoError(t, err) dockertags := map[string]string{ - "name": "blah", - "id": "", + "name": "blah", + "id": "", "command": "", } From b3cb8d0f5306925ab8faf455b73c48cc2c1c4ac1 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Tue, 4 Aug 2015 16:12:48 -0600 Subject: [PATCH 12/14] Verify proper go formatting in circleci job --- circle.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/circle.yml b/circle.yml index 6c346a360..ccbfae779 100644 --- a/circle.yml +++ b/circle.yml @@ -9,6 +9,8 @@ test: pre: # Vet go code for any potential errors - go vet ./... + # Verify that all files are properly go formatted + - "[ `git ls-files | grep '.go$' | xargs gofmt -l 2>&1 | wc -l` -eq 0 ]" override: # Enforce that testutil, cmd, and main directory are fully linted - golint . From 77dd1e3d45b5eca1e4b1dcfa582ec456bdb05b21 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Tue, 4 Aug 2015 08:58:32 -0600 Subject: [PATCH 13/14] Adding Kafka docker container and utilizing it in unit tests --- Makefile | 5 ++++- README.md | 22 ++++++++++++++++--- circle.yml | 4 ++++ docker-compose.yml | 11 ++++++++++ .../kafka_consumer_integration_test.go | 15 ++----------- 5 files changed, 40 insertions(+), 17 deletions(-) diff --git a/Makefile b/Makefile index cefa9ac44..8987e9763 100644 --- a/Makefile +++ b/Makefile @@ -2,9 +2,12 @@ prepare: go get -d -v -t ./... docker-compose up -d --no-recreate -test: prepare +test-short: prepare go test -short ./... +test: prepare + go test ./... + update: go get -u -v -d -t ./... diff --git a/README.md b/README.md index 4a607556e..f4f49376c 100644 --- a/README.md +++ b/README.md @@ -185,10 +185,26 @@ As Telegraf collects metrics from several third-party services it becomes a diff some of them have complicated protocols which would take some time to replicate. To overcome this situation we've decided to use docker containers to provide a fast and reproducible environment -to test those services which require it. For other situations (i.e: https://github.com/influxdb/telegraf/blob/master/plugins/redis/redis_test.go ) a simple mock will suffice. +to test those services which require it. For other situations (i.e: https://github.com/influxdb/telegraf/blob/master/plugins/redis/redis_test.go ) a simple mock will suffice. -To execute Telegraf tests follow this simple steps: +To execute Telegraf tests follow these simple steps: - Install docker compose following [these](https://docs.docker.com/compose/install/) instructions -- execute `make test` + - NOTE: mac users should be able to simply do `brew install boot2docker` and `brew install docker-compose` +### Execute short tests: + +execute `make short-test` + +### Execute long tests: +These tests requre additional docker containers, such as for kafka + +Mac: +execute ``ADVERTISED_HOST=`boot2docker ip` make test`` + +Linux: +execute `ADVERTISED_HOST=localhost make test` + +### Unit test troubleshooting: + +Try killing your docker containers via `docker-compose kill` and then re-running diff --git a/circle.yml b/circle.yml index ccbfae779..83cbd1d4c 100644 --- a/circle.yml +++ b/circle.yml @@ -11,9 +11,13 @@ test: - go vet ./... # Verify that all files are properly go formatted - "[ `git ls-files | grep '.go$' | xargs gofmt -l 2>&1 | wc -l` -eq 0 ]" + # Only docker-compose up kafka, the other services are already running + # see: https://circleci.com/docs/environment#databases + # - docker-compose up -d kafka override: # Enforce that testutil, cmd, and main directory are fully linted - golint . - golint testutil/... - golint cmd/... # TODO run unit tests + # - go test -short ./... diff --git a/docker-compose.yml b/docker-compose.yml index f2c1b2b54..c51a0235b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,3 +14,14 @@ postgres: image: postgres ports: - "5432:5432" + +# advertised host env variable must be set at runtime, ie, +# ADVERTISED_HOST=`boot2docker ip` docker-compose up -d +kafka: + image: spotify/kafka + ports: + - "2181:2181" + - "9092:9092" + environment: + ADVERTISED_HOST: + ADVERTISED_PORT: 9092 diff --git a/plugins/kafka_consumer/kafka_consumer_integration_test.go b/plugins/kafka_consumer/kafka_consumer_integration_test.go index 2a82a2374..325318014 100644 --- a/plugins/kafka_consumer/kafka_consumer_integration_test.go +++ b/plugins/kafka_consumer/kafka_consumer_integration_test.go @@ -2,8 +2,6 @@ package kafka_consumer import ( "fmt" - "os" - "strings" "testing" "time" @@ -19,17 +17,8 @@ func TestReadsMetricsFromKafka(t *testing.T) { } var zkPeers, brokerPeers []string - if len(os.Getenv("ZOOKEEPER_PEERS")) == 0 { - zkPeers = []string{"localhost:2181"} - } else { - zkPeers = strings.Split(os.Getenv("ZOOKEEPER_PEERS"), ",") - } - - if len(os.Getenv("KAFKA_PEERS")) == 0 { - brokerPeers = []string{"localhost:9092"} - } else { - brokerPeers = strings.Split(os.Getenv("KAFKA_PEERS"), ",") - } + zkPeers = []string{testutil.GetLocalHost() + ":2181"} + brokerPeers = []string{testutil.GetLocalHost() + ":9092"} k := &Kafka{ ConsumerGroupName: "telegraf_test_consumers", From c698dc97842cf0e63e79d2be11778bfaa431e648 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Tue, 4 Aug 2015 16:48:19 -0600 Subject: [PATCH 14/14] Build & unit test fixup --- plugins/lustre2/lustre2.go | 2 -- testutil/accumulator.go | 1 + 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/plugins/lustre2/lustre2.go b/plugins/lustre2/lustre2.go index 72a83e5f1..95b6bdbf7 100644 --- a/plugins/lustre2/lustre2.go +++ b/plugins/lustre2/lustre2.go @@ -1,5 +1,3 @@ -// +build linux - /* Lustre 2.x telegraf plugin diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 0f258904c..db3a67e66 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -128,6 +128,7 @@ func (a *Accumulator) HasIntValue(measurement string) bool { return false } +// HasUIntValue returns true if the measurement has a UInt value func (a *Accumulator) HasUIntValue(measurement string) bool { for _, p := range a.Points { if p.Measurement == measurement {