From d1cc82653a11db9ce85fc23e9e8b97b648141ac0 Mon Sep 17 00:00:00 2001 From: Evan Phoenix Date: Mon, 6 Apr 2015 09:32:10 -0700 Subject: [PATCH] Switch plugin API to use an accumulator --- accumulator.go | 17 ++++ agent.go | 82 +++++++------------ agent_test.go | 27 +------ cmd/influxdb-agent/agent.go | 5 +- plugins/mock_Plugin.go | 11 +-- plugins/registry.go | 6 +- plugins/system/mock_PS.go | 16 +++- plugins/system/ps/cpu/cpu_darwin.go | 121 +++++++++++++++++++++++++++- plugins/system/system.go | 64 ++++++++------- plugins/system/system_test.go | 103 +++++++++-------------- testdata/influx.toml | 2 +- testutil/accumulator.go | 25 ++++++ 12 files changed, 295 insertions(+), 184 deletions(-) create mode 100644 accumulator.go create mode 100644 testutil/accumulator.go diff --git a/accumulator.go b/accumulator.go new file mode 100644 index 000000000..3c7f7efcf --- /dev/null +++ b/accumulator.go @@ -0,0 +1,17 @@ +package tivan + +import "github.com/influxdb/influxdb/client" + +type BatchPoints struct { + client.BatchPoints +} + +func (bp *BatchPoints) Add(name string, val interface{}, tags map[string]string) { + bp.Points = append(bp.Points, client.Point{ + Name: name, + Tags: tags, + Fields: map[string]interface{}{ + "value": val, + }, + }) +} diff --git a/agent.go b/agent.go index 2bdaf613e..19b1f5508 100644 --- a/agent.go +++ b/agent.go @@ -1,13 +1,13 @@ package tivan import ( - "fmt" "log" + "net/url" "sort" + "github.com/influxdb/influxdb/client" "github.com/influxdb/tivan/plugins" "github.com/vektra/cypress" - "github.com/vektra/cypress/plugins/metrics" ) import "time" @@ -23,47 +23,39 @@ type Agent struct { Config *Config plugins []plugins.Plugin - metrics Metrics + + conn *client.Client eachInternal []func() } -func NewAgent(config *Config) *Agent { - m := metrics.NewMetricSink() - - agent := &Agent{Config: config, metrics: m} +func NewAgent(config *Config) (*Agent, error) { + agent := &Agent{Config: config} err := config.Apply("agent", agent) if err != nil { - panic(err) + return nil, err } - if config.URL != "" { - icfg := metrics.DefaultInfluxConfig() - icfg.URL = config.URL - icfg.Username = config.Username - icfg.Password = config.Password - icfg.Database = config.Database - icfg.UserAgent = config.UserAgent - - agent.eachInternal = append(agent.eachInternal, func() { - if agent.Debug { - log.Printf("flushing to influxdb") - } - - m.FlushInflux(icfg) - }) + u, err := url.Parse(config.URL) + if err != nil { + return nil, err } - return agent -} + c, err := client.NewClient(client.Config{ + URL: *u, + Username: config.Username, + Password: config.Password, + UserAgent: config.UserAgent, + }) -type HTTPInterface interface { - RunHTTP(string) error -} + if err != nil { + return nil, err + } -func (a *Agent) RunHTTP(addr string) { - a.metrics.(HTTPInterface).RunHTTP(addr) + agent.conn = c + + return agent, nil } func (a *Agent) LoadPlugins() ([]string, error) { @@ -80,36 +72,24 @@ func (a *Agent) LoadPlugins() ([]string, error) { } func (a *Agent) crank() error { + var acc BatchPoints + for _, plugin := range a.plugins { - msgs, err := plugin.Read() + err := plugin.Gather(&acc) if err != nil { return err } - - for _, m := range msgs { - for k, v := range a.Config.Tags { - m.AddTag(k, v) - } - - if a.Debug { - fmt.Println(m.KVString()) - } - - err = a.metrics.Receive(m) - if err != nil { - return err - } - } } - return nil + acc.Tags = a.Config.Tags + acc.Timestamp = time.Now() + acc.Database = a.Config.Database + + _, err := a.conn.Write(acc.BatchPoints) + return err } func (a *Agent) Run(shutdown chan struct{}) { - if a.HTTP != "" { - go a.RunHTTP(a.HTTP) - } - ticker := time.NewTicker(a.Interval.Duration) for { diff --git a/agent_test.go b/agent_test.go index e7ca05860..6103b4eb8 100644 --- a/agent_test.go +++ b/agent_test.go @@ -1,17 +1,9 @@ package tivan -import ( - "testing" - - "github.com/influxdb/tivan/plugins" - "github.com/stretchr/testify/require" - "github.com/vektra/cypress" -) - +/* func TestAgent_DrivesMetrics(t *testing.T) { var ( plugin plugins.MockPlugin - metrics MockMetrics ) defer plugin.AssertExpectations(t) @@ -19,23 +11,11 @@ func TestAgent_DrivesMetrics(t *testing.T) { a := &Agent{ plugins: []plugins.Plugin{&plugin}, - metrics: &metrics, Config: &Config{}, } - m1 := cypress.Metric() - m1.Add("name", "foo") - m1.Add("value", 1.2) - - m2 := cypress.Metric() - m2.Add("name", "bar") - m2.Add("value", 888) - - msgs := []*cypress.Message{m1, m2} - - plugin.On("Read").Return(msgs, nil) - metrics.On("Receive", m1).Return(nil) - metrics.On("Receive", m2).Return(nil) + plugin.On("Add", "foo", 1.2, nil).Return(nil) + plugin.On("Add", "bar", 888, nil).Return(nil) err := a.crank() require.NoError(t, err) @@ -78,3 +58,4 @@ func TestAgent_AppliesTags(t *testing.T) { err := a.crank() require.NoError(t, err) } +*/ diff --git a/cmd/influxdb-agent/agent.go b/cmd/influxdb-agent/agent.go index 6e6c8b421..93350a0ef 100644 --- a/cmd/influxdb-agent/agent.go +++ b/cmd/influxdb-agent/agent.go @@ -32,7 +32,10 @@ func main() { config = tivan.DefaultConfig() } - ag := tivan.NewAgent(config) + ag, err := tivan.NewAgent(config) + if err != nil { + log.Fatal(err) + } if *fDebug { ag.Debug = true diff --git a/plugins/mock_Plugin.go b/plugins/mock_Plugin.go index d775c6a64..492384b25 100644 --- a/plugins/mock_Plugin.go +++ b/plugins/mock_Plugin.go @@ -2,17 +2,14 @@ package plugins import "github.com/stretchr/testify/mock" -import "github.com/vektra/cypress" - type MockPlugin struct { mock.Mock } -func (m *MockPlugin) Read() ([]*cypress.Message, error) { - ret := m.Called() +func (m *MockPlugin) Gather(_a0 Accumulator) error { + ret := m.Called(_a0) - r0 := ret.Get(0).([]*cypress.Message) - r1 := ret.Error(1) + r0 := ret.Error(0) - return r0, r1 + return r0 } diff --git a/plugins/registry.go b/plugins/registry.go index b2a0850ba..9f17662a4 100644 --- a/plugins/registry.go +++ b/plugins/registry.go @@ -1,9 +1,11 @@ package plugins -import "github.com/vektra/cypress" +type Accumulator interface { + Add(name string, value interface{}, tags map[string]string) +} type Plugin interface { - Read() ([]*cypress.Message, error) + Gather(Accumulator) error } type Creator func() Plugin diff --git a/plugins/system/mock_PS.go b/plugins/system/mock_PS.go index fef3a2e4c..ed7a1f65a 100644 --- a/plugins/system/mock_PS.go +++ b/plugins/system/mock_PS.go @@ -1,9 +1,9 @@ package system -import ( - "github.com/influxdb/tivan/plugins/system/ps/load" - "github.com/stretchr/testify/mock" -) +import "github.com/stretchr/testify/mock" + +import "github.com/influxdb/tivan/plugins/system/ps/cpu" +import "github.com/influxdb/tivan/plugins/system/ps/load" type MockPS struct { mock.Mock @@ -17,3 +17,11 @@ func (m *MockPS) LoadAvg() (*load.LoadAvgStat, error) { return r0, r1 } +func (m *MockPS) CPUTimes() ([]cpu.CPUTimesStat, error) { + ret := m.Called() + + r0 := ret.Get(0).([]cpu.CPUTimesStat) + r1 := ret.Error(1) + + return r0, r1 +} diff --git a/plugins/system/ps/cpu/cpu_darwin.go b/plugins/system/ps/cpu/cpu_darwin.go index 72b08d1a8..01edb83f1 100644 --- a/plugins/system/ps/cpu/cpu_darwin.go +++ b/plugins/system/ps/cpu/cpu_darwin.go @@ -2,13 +2,29 @@ package cpu +/* +#include +#include +#include +#include +#include +#include +#include +#include +#include +*/ +import "C" + import ( + "bytes" + "encoding/binary" "fmt" "os/exec" "strconv" "strings" + "unsafe" - common "github.com/shirou/gopsutil/common" + common "github.com/influxdb/tivan/plugins/system/ps/common" ) // sys/resource.h @@ -26,7 +42,110 @@ const ( ClocksPerSec = 128 ) +func perCPUTimes() ([]CPUTimesStat, error) { + var ( + count C.mach_msg_type_number_t + cpuload *C.processor_cpu_load_info_data_t + ncpu C.natural_t + ) + + status := C.host_processor_info(C.host_t(C.mach_host_self()), + C.PROCESSOR_CPU_LOAD_INFO, + &ncpu, + (*C.processor_info_array_t)(unsafe.Pointer(&cpuload)), + &count) + + if status != C.KERN_SUCCESS { + return nil, fmt.Errorf("host_processor_info error=%d", status) + } + + // jump through some cgo casting hoops and ensure we properly free + // the memory that cpuload points to + target := C.vm_map_t(C.mach_task_self_) + address := C.vm_address_t(uintptr(unsafe.Pointer(cpuload))) + defer C.vm_deallocate(target, address, C.vm_size_t(ncpu)) + + // the body of struct processor_cpu_load_info + // aka processor_cpu_load_info_data_t + var cpu_ticks [C.CPU_STATE_MAX]uint32 + + // copy the cpuload array to a []byte buffer + // where we can binary.Read the data + size := int(ncpu) * binary.Size(cpu_ticks) + buf := C.GoBytes(unsafe.Pointer(cpuload), C.int(size)) + + bbuf := bytes.NewBuffer(buf) + + var ret []CPUTimesStat + + for i := 0; i < int(ncpu); i++ { + err := binary.Read(bbuf, binary.LittleEndian, &cpu_ticks) + if err != nil { + return nil, err + } + + c := CPUTimesStat{ + CPU: fmt.Sprintf("cpu%d", i), + User: float64(cpu_ticks[C.CPU_STATE_USER]) / ClocksPerSec, + System: float64(cpu_ticks[C.CPU_STATE_SYSTEM]) / ClocksPerSec, + Nice: float64(cpu_ticks[C.CPU_STATE_NICE]) / ClocksPerSec, + Idle: float64(cpu_ticks[C.CPU_STATE_IDLE]) / ClocksPerSec, + Iowait: -1, + Irq: -1, + Softirq: -1, + Steal: -1, + Guest: -1, + GuestNice: -1, + Stolen: -1, + } + + ret = append(ret, c) + } + + return ret, nil +} + +func allCPUTimes() ([]CPUTimesStat, error) { + var count C.mach_msg_type_number_t = C.HOST_CPU_LOAD_INFO_COUNT + var cpuload C.host_cpu_load_info_data_t + + status := C.host_statistics(C.host_t(C.mach_host_self()), + C.HOST_CPU_LOAD_INFO, + C.host_info_t(unsafe.Pointer(&cpuload)), + &count) + + if status != C.KERN_SUCCESS { + return nil, fmt.Errorf("host_statistics error=%d", status) + } + + c := CPUTimesStat{ + CPU: "cpu-total", + User: float64(cpuload.cpu_ticks[C.CPU_STATE_USER]) / ClocksPerSec, + System: float64(cpuload.cpu_ticks[C.CPU_STATE_SYSTEM]) / ClocksPerSec, + Nice: float64(cpuload.cpu_ticks[C.CPU_STATE_NICE]) / ClocksPerSec, + Idle: float64(cpuload.cpu_ticks[C.CPU_STATE_IDLE]) / ClocksPerSec, + Iowait: -1, + Irq: -1, + Softirq: -1, + Steal: -1, + Guest: -1, + GuestNice: -1, + Stolen: -1, + } + + return []CPUTimesStat{c}, nil + +} + func CPUTimes(percpu bool) ([]CPUTimesStat, error) { + if percpu { + return perCPUTimes() + } + + return allCPUTimes() +} + +func sysctrlCPUTimes(percpu bool) ([]CPUTimesStat, error) { var ret []CPUTimesStat var sysctlCall string diff --git a/plugins/system/system.go b/plugins/system/system.go index 0a933ca7f..7f5b66bfc 100644 --- a/plugins/system/system.go +++ b/plugins/system/system.go @@ -1,54 +1,58 @@ package system import ( + "fmt" + "github.com/influxdb/tivan/plugins" + "github.com/influxdb/tivan/plugins/system/ps/cpu" "github.com/influxdb/tivan/plugins/system/ps/load" - "github.com/vektra/cypress" ) type PS interface { LoadAvg() (*load.LoadAvgStat, error) + CPUTimes() ([]cpu.CPUTimesStat, error) } type SystemStats struct { - ps PS - tags map[string]string + ps PS } -func (s *SystemStats) Read() ([]*cypress.Message, error) { +func (s *SystemStats) add(acc plugins.Accumulator, name string, val float64) { + if val >= 0 { + acc.Add(name, val, nil) + } +} + +func (s *SystemStats) Gather(acc plugins.Accumulator) error { lv, err := s.ps.LoadAvg() if err != nil { - return nil, err + return err } - m1 := cypress.Metric() - m1.Add("type", "gauge") - m1.Add("name", "load1") - m1.Add("value", lv.Load1) + acc.Add("load1", lv.Load1, nil) + acc.Add("load5", lv.Load5, nil) + acc.Add("load15", lv.Load15, nil) - for k, v := range s.tags { - m1.AddTag(k, v) + times, err := s.ps.CPUTimes() + if err != nil { + return fmt.Errorf("error getting CPU info: %s", err) } - m2 := cypress.Metric() - m2.Add("type", "gauge") - m2.Add("name", "load5") - m2.Add("value", lv.Load5) - - for k, v := range s.tags { - m2.AddTag(k, v) + for _, cts := range times { + s.add(acc, cts.CPU+".user", cts.User) + s.add(acc, cts.CPU+".system", cts.System) + s.add(acc, cts.CPU+".idle", cts.Idle) + s.add(acc, cts.CPU+".nice", cts.Nice) + s.add(acc, cts.CPU+".iowait", cts.Iowait) + s.add(acc, cts.CPU+".irq", cts.Irq) + s.add(acc, cts.CPU+".softirq", cts.Softirq) + s.add(acc, cts.CPU+".steal", cts.Steal) + s.add(acc, cts.CPU+".guest", cts.Guest) + s.add(acc, cts.CPU+".guestNice", cts.GuestNice) + s.add(acc, cts.CPU+".stolen", cts.Stolen) } - m3 := cypress.Metric() - m3.Add("type", "gauge") - m3.Add("name", "load15") - m3.Add("value", lv.Load15) - - for k, v := range s.tags { - m3.AddTag(k, v) - } - - return []*cypress.Message{m1, m2, m3}, nil + return nil } type systemPS struct{} @@ -57,6 +61,10 @@ func (s *systemPS) LoadAvg() (*load.LoadAvgStat, error) { return load.LoadAvg() } +func (s *systemPS) CPUTimes() ([]cpu.CPUTimesStat, error) { + return cpu.CPUTimes(true) +} + func init() { plugins.Add("system", func() plugins.Plugin { return &SystemStats{ps: &systemPS{}} diff --git a/plugins/system/system_test.go b/plugins/system/system_test.go index 2ee8802d7..e3e25a688 100644 --- a/plugins/system/system_test.go +++ b/plugins/system/system_test.go @@ -3,16 +3,20 @@ package system import ( "testing" + "github.com/influxdb/tivan/plugins/system/ps/cpu" "github.com/influxdb/tivan/plugins/system/ps/load" + "github.com/influxdb/tivan/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func TestSystemStats_GenerateLoad(t *testing.T) { +func TestSystemStats_GenerateStats(t *testing.T) { var mps MockPS defer mps.AssertExpectations(t) + var acc testutil.Accumulator + ss := &SystemStats{ps: &mps} lv := &load.LoadAvgStat{ @@ -23,72 +27,39 @@ func TestSystemStats_GenerateLoad(t *testing.T) { mps.On("LoadAvg").Return(lv, nil) - msgs, err := ss.Read() + cts := cpu.CPUTimesStat{ + CPU: "all", + User: 3.1, + System: 8.2, + Idle: 80.1, + Nice: 1.3, + Iowait: 0.2, + Irq: 0.1, + Softirq: 0.11, + Steal: 0.0001, + Guest: 8.1, + GuestNice: 0.324, + Stolen: 0.051, + } + + mps.On("CPUTimes").Return([]cpu.CPUTimesStat{cts}, nil) + + err := ss.Gather(&acc) require.NoError(t, err) - name, ok := msgs[0].GetString("name") - require.True(t, ok) + assert.True(t, acc.CheckValue("load1", 0.3)) + assert.True(t, acc.CheckValue("load5", 1.5)) + assert.True(t, acc.CheckValue("load15", 0.8)) - assert.Equal(t, "load1", name) - - val, ok := msgs[0].GetFloat("value") - require.True(t, ok) - - assert.Equal(t, 0.3, val) - - name, ok = msgs[1].GetString("name") - require.True(t, ok) - - assert.Equal(t, "load5", name) - - val, ok = msgs[1].GetFloat("value") - require.True(t, ok) - - assert.Equal(t, 1.5, val) - - name, ok = msgs[2].GetString("name") - require.True(t, ok) - - assert.Equal(t, "load15", name) - - val, ok = msgs[2].GetFloat("value") - require.True(t, ok) - - assert.Equal(t, 0.8, val) -} - -func TestSystemStats_AddTags(t *testing.T) { - var mps MockPS - defer mps.AssertExpectations(t) - - ss := &SystemStats{ - ps: &mps, - tags: map[string]string{ - "host": "my.test", - "dc": "us-west-1", - }, - } - - lv := &load.LoadAvgStat{ - Load1: 0.3, - Load5: 1.5, - Load15: 0.8, - } - - mps.On("LoadAvg").Return(lv, nil) - - msgs, err := ss.Read() - require.NoError(t, err) - - for _, m := range msgs { - val, ok := m.GetTag("host") - require.True(t, ok) - - assert.Equal(t, val, "my.test") - - val, ok = m.GetTag("dc") - require.True(t, ok) - - assert.Equal(t, val, "us-west-1") - } + assert.True(t, acc.CheckValue("all.user", 3.1)) + assert.True(t, acc.CheckValue("all.system", 8.2)) + assert.True(t, acc.CheckValue("all.idle", 80.1)) + assert.True(t, acc.CheckValue("all.nice", 1.3)) + assert.True(t, acc.CheckValue("all.iowait", 0.2)) + assert.True(t, acc.CheckValue("all.irq", 0.1)) + assert.True(t, acc.CheckValue("all.softirq", 0.11)) + assert.True(t, acc.CheckValue("all.steal", 0.0001)) + assert.True(t, acc.CheckValue("all.guest", 8.1)) + assert.True(t, acc.CheckValue("all.guestNice", 0.324)) + assert.True(t, acc.CheckValue("all.stolen", 0.051)) } diff --git a/testdata/influx.toml b/testdata/influx.toml index 00376fa7a..f91934be3 100644 --- a/testdata/influx.toml +++ b/testdata/influx.toml @@ -7,6 +7,6 @@ debug = true url = "http://localhost:8086" username = "root" password = "root" -database = "cypress" +database = "tivan" tags = { dc = "us-phx-1" } diff --git a/testutil/accumulator.go b/testutil/accumulator.go new file mode 100644 index 000000000..cda030890 --- /dev/null +++ b/testutil/accumulator.go @@ -0,0 +1,25 @@ +package testutil + +type Point struct { + Name string + Value interface{} + Tags map[string]string +} + +type Accumulator struct { + Points []*Point +} + +func (a *Accumulator) Add(name string, value interface{}, tags map[string]string) { + a.Points = append(a.Points, &Point{name, value, tags}) +} + +func (a *Accumulator) CheckValue(name string, val interface{}) bool { + for _, p := range a.Points { + if p.Name == name { + return p.Value == val + } + } + + return false +}