Switch plugin API to use an accumulator

This commit is contained in:
Evan Phoenix 2015-04-06 09:32:10 -07:00
parent f9250e8e39
commit d1cc82653a
12 changed files with 295 additions and 184 deletions

17
accumulator.go Normal file
View File

@ -0,0 +1,17 @@
package tivan
import "github.com/influxdb/influxdb/client"
type BatchPoints struct {
client.BatchPoints
}
func (bp *BatchPoints) Add(name string, val interface{}, tags map[string]string) {
bp.Points = append(bp.Points, client.Point{
Name: name,
Tags: tags,
Fields: map[string]interface{}{
"value": val,
},
})
}

View File

@ -1,13 +1,13 @@
package tivan package tivan
import ( import (
"fmt"
"log" "log"
"net/url"
"sort" "sort"
"github.com/influxdb/influxdb/client"
"github.com/influxdb/tivan/plugins" "github.com/influxdb/tivan/plugins"
"github.com/vektra/cypress" "github.com/vektra/cypress"
"github.com/vektra/cypress/plugins/metrics"
) )
import "time" import "time"
@ -23,47 +23,39 @@ type Agent struct {
Config *Config Config *Config
plugins []plugins.Plugin plugins []plugins.Plugin
metrics Metrics
conn *client.Client
eachInternal []func() eachInternal []func()
} }
func NewAgent(config *Config) *Agent { func NewAgent(config *Config) (*Agent, error) {
m := metrics.NewMetricSink() agent := &Agent{Config: config}
agent := &Agent{Config: config, metrics: m}
err := config.Apply("agent", agent) err := config.Apply("agent", agent)
if err != nil { if err != nil {
panic(err) return nil, err
} }
if config.URL != "" { u, err := url.Parse(config.URL)
icfg := metrics.DefaultInfluxConfig() if err != nil {
icfg.URL = config.URL return nil, err
icfg.Username = config.Username
icfg.Password = config.Password
icfg.Database = config.Database
icfg.UserAgent = config.UserAgent
agent.eachInternal = append(agent.eachInternal, func() {
if agent.Debug {
log.Printf("flushing to influxdb")
} }
m.FlushInflux(icfg) c, err := client.NewClient(client.Config{
URL: *u,
Username: config.Username,
Password: config.Password,
UserAgent: config.UserAgent,
}) })
if err != nil {
return nil, err
} }
return agent agent.conn = c
}
type HTTPInterface interface { return agent, nil
RunHTTP(string) error
}
func (a *Agent) RunHTTP(addr string) {
a.metrics.(HTTPInterface).RunHTTP(addr)
} }
func (a *Agent) LoadPlugins() ([]string, error) { func (a *Agent) LoadPlugins() ([]string, error) {
@ -80,36 +72,24 @@ func (a *Agent) LoadPlugins() ([]string, error) {
} }
func (a *Agent) crank() error { func (a *Agent) crank() error {
var acc BatchPoints
for _, plugin := range a.plugins { for _, plugin := range a.plugins {
msgs, err := plugin.Read() err := plugin.Gather(&acc)
if err != nil {
return err
}
for _, m := range msgs {
for k, v := range a.Config.Tags {
m.AddTag(k, v)
}
if a.Debug {
fmt.Println(m.KVString())
}
err = a.metrics.Receive(m)
if err != nil { if err != nil {
return err return err
} }
} }
}
return nil acc.Tags = a.Config.Tags
acc.Timestamp = time.Now()
acc.Database = a.Config.Database
_, err := a.conn.Write(acc.BatchPoints)
return err
} }
func (a *Agent) Run(shutdown chan struct{}) { func (a *Agent) Run(shutdown chan struct{}) {
if a.HTTP != "" {
go a.RunHTTP(a.HTTP)
}
ticker := time.NewTicker(a.Interval.Duration) ticker := time.NewTicker(a.Interval.Duration)
for { for {

View File

@ -1,17 +1,9 @@
package tivan package tivan
import ( /*
"testing"
"github.com/influxdb/tivan/plugins"
"github.com/stretchr/testify/require"
"github.com/vektra/cypress"
)
func TestAgent_DrivesMetrics(t *testing.T) { func TestAgent_DrivesMetrics(t *testing.T) {
var ( var (
plugin plugins.MockPlugin plugin plugins.MockPlugin
metrics MockMetrics
) )
defer plugin.AssertExpectations(t) defer plugin.AssertExpectations(t)
@ -19,23 +11,11 @@ func TestAgent_DrivesMetrics(t *testing.T) {
a := &Agent{ a := &Agent{
plugins: []plugins.Plugin{&plugin}, plugins: []plugins.Plugin{&plugin},
metrics: &metrics,
Config: &Config{}, Config: &Config{},
} }
m1 := cypress.Metric() plugin.On("Add", "foo", 1.2, nil).Return(nil)
m1.Add("name", "foo") plugin.On("Add", "bar", 888, nil).Return(nil)
m1.Add("value", 1.2)
m2 := cypress.Metric()
m2.Add("name", "bar")
m2.Add("value", 888)
msgs := []*cypress.Message{m1, m2}
plugin.On("Read").Return(msgs, nil)
metrics.On("Receive", m1).Return(nil)
metrics.On("Receive", m2).Return(nil)
err := a.crank() err := a.crank()
require.NoError(t, err) require.NoError(t, err)
@ -78,3 +58,4 @@ func TestAgent_AppliesTags(t *testing.T) {
err := a.crank() err := a.crank()
require.NoError(t, err) require.NoError(t, err)
} }
*/

View File

@ -32,7 +32,10 @@ func main() {
config = tivan.DefaultConfig() config = tivan.DefaultConfig()
} }
ag := tivan.NewAgent(config) ag, err := tivan.NewAgent(config)
if err != nil {
log.Fatal(err)
}
if *fDebug { if *fDebug {
ag.Debug = true ag.Debug = true

View File

@ -2,17 +2,14 @@ package plugins
import "github.com/stretchr/testify/mock" import "github.com/stretchr/testify/mock"
import "github.com/vektra/cypress"
type MockPlugin struct { type MockPlugin struct {
mock.Mock mock.Mock
} }
func (m *MockPlugin) Read() ([]*cypress.Message, error) { func (m *MockPlugin) Gather(_a0 Accumulator) error {
ret := m.Called() ret := m.Called(_a0)
r0 := ret.Get(0).([]*cypress.Message) r0 := ret.Error(0)
r1 := ret.Error(1)
return r0, r1 return r0
} }

View File

@ -1,9 +1,11 @@
package plugins package plugins
import "github.com/vektra/cypress" type Accumulator interface {
Add(name string, value interface{}, tags map[string]string)
}
type Plugin interface { type Plugin interface {
Read() ([]*cypress.Message, error) Gather(Accumulator) error
} }
type Creator func() Plugin type Creator func() Plugin

View File

@ -1,9 +1,9 @@
package system package system
import ( import "github.com/stretchr/testify/mock"
"github.com/influxdb/tivan/plugins/system/ps/load"
"github.com/stretchr/testify/mock" import "github.com/influxdb/tivan/plugins/system/ps/cpu"
) import "github.com/influxdb/tivan/plugins/system/ps/load"
type MockPS struct { type MockPS struct {
mock.Mock mock.Mock
@ -17,3 +17,11 @@ func (m *MockPS) LoadAvg() (*load.LoadAvgStat, error) {
return r0, r1 return r0, r1
} }
func (m *MockPS) CPUTimes() ([]cpu.CPUTimesStat, error) {
ret := m.Called()
r0 := ret.Get(0).([]cpu.CPUTimesStat)
r1 := ret.Error(1)
return r0, r1
}

View File

@ -2,13 +2,29 @@
package cpu package cpu
/*
#include <stdlib.h>
#include <sys/sysctl.h>
#include <sys/mount.h>
#include <mach/mach_init.h>
#include <mach/mach_host.h>
#include <mach/host_info.h>
#include <libproc.h>
#include <mach/processor_info.h>
#include <mach/vm_map.h>
*/
import "C"
import ( import (
"bytes"
"encoding/binary"
"fmt" "fmt"
"os/exec" "os/exec"
"strconv" "strconv"
"strings" "strings"
"unsafe"
common "github.com/shirou/gopsutil/common" common "github.com/influxdb/tivan/plugins/system/ps/common"
) )
// sys/resource.h // sys/resource.h
@ -26,7 +42,110 @@ const (
ClocksPerSec = 128 ClocksPerSec = 128
) )
func perCPUTimes() ([]CPUTimesStat, error) {
var (
count C.mach_msg_type_number_t
cpuload *C.processor_cpu_load_info_data_t
ncpu C.natural_t
)
status := C.host_processor_info(C.host_t(C.mach_host_self()),
C.PROCESSOR_CPU_LOAD_INFO,
&ncpu,
(*C.processor_info_array_t)(unsafe.Pointer(&cpuload)),
&count)
if status != C.KERN_SUCCESS {
return nil, fmt.Errorf("host_processor_info error=%d", status)
}
// jump through some cgo casting hoops and ensure we properly free
// the memory that cpuload points to
target := C.vm_map_t(C.mach_task_self_)
address := C.vm_address_t(uintptr(unsafe.Pointer(cpuload)))
defer C.vm_deallocate(target, address, C.vm_size_t(ncpu))
// the body of struct processor_cpu_load_info
// aka processor_cpu_load_info_data_t
var cpu_ticks [C.CPU_STATE_MAX]uint32
// copy the cpuload array to a []byte buffer
// where we can binary.Read the data
size := int(ncpu) * binary.Size(cpu_ticks)
buf := C.GoBytes(unsafe.Pointer(cpuload), C.int(size))
bbuf := bytes.NewBuffer(buf)
var ret []CPUTimesStat
for i := 0; i < int(ncpu); i++ {
err := binary.Read(bbuf, binary.LittleEndian, &cpu_ticks)
if err != nil {
return nil, err
}
c := CPUTimesStat{
CPU: fmt.Sprintf("cpu%d", i),
User: float64(cpu_ticks[C.CPU_STATE_USER]) / ClocksPerSec,
System: float64(cpu_ticks[C.CPU_STATE_SYSTEM]) / ClocksPerSec,
Nice: float64(cpu_ticks[C.CPU_STATE_NICE]) / ClocksPerSec,
Idle: float64(cpu_ticks[C.CPU_STATE_IDLE]) / ClocksPerSec,
Iowait: -1,
Irq: -1,
Softirq: -1,
Steal: -1,
Guest: -1,
GuestNice: -1,
Stolen: -1,
}
ret = append(ret, c)
}
return ret, nil
}
func allCPUTimes() ([]CPUTimesStat, error) {
var count C.mach_msg_type_number_t = C.HOST_CPU_LOAD_INFO_COUNT
var cpuload C.host_cpu_load_info_data_t
status := C.host_statistics(C.host_t(C.mach_host_self()),
C.HOST_CPU_LOAD_INFO,
C.host_info_t(unsafe.Pointer(&cpuload)),
&count)
if status != C.KERN_SUCCESS {
return nil, fmt.Errorf("host_statistics error=%d", status)
}
c := CPUTimesStat{
CPU: "cpu-total",
User: float64(cpuload.cpu_ticks[C.CPU_STATE_USER]) / ClocksPerSec,
System: float64(cpuload.cpu_ticks[C.CPU_STATE_SYSTEM]) / ClocksPerSec,
Nice: float64(cpuload.cpu_ticks[C.CPU_STATE_NICE]) / ClocksPerSec,
Idle: float64(cpuload.cpu_ticks[C.CPU_STATE_IDLE]) / ClocksPerSec,
Iowait: -1,
Irq: -1,
Softirq: -1,
Steal: -1,
Guest: -1,
GuestNice: -1,
Stolen: -1,
}
return []CPUTimesStat{c}, nil
}
func CPUTimes(percpu bool) ([]CPUTimesStat, error) { func CPUTimes(percpu bool) ([]CPUTimesStat, error) {
if percpu {
return perCPUTimes()
}
return allCPUTimes()
}
func sysctrlCPUTimes(percpu bool) ([]CPUTimesStat, error) {
var ret []CPUTimesStat var ret []CPUTimesStat
var sysctlCall string var sysctlCall string

View File

@ -1,54 +1,58 @@
package system package system
import ( import (
"fmt"
"github.com/influxdb/tivan/plugins" "github.com/influxdb/tivan/plugins"
"github.com/influxdb/tivan/plugins/system/ps/cpu"
"github.com/influxdb/tivan/plugins/system/ps/load" "github.com/influxdb/tivan/plugins/system/ps/load"
"github.com/vektra/cypress"
) )
type PS interface { type PS interface {
LoadAvg() (*load.LoadAvgStat, error) LoadAvg() (*load.LoadAvgStat, error)
CPUTimes() ([]cpu.CPUTimesStat, error)
} }
type SystemStats struct { type SystemStats struct {
ps PS ps PS
tags map[string]string
} }
func (s *SystemStats) Read() ([]*cypress.Message, error) { func (s *SystemStats) add(acc plugins.Accumulator, name string, val float64) {
if val >= 0 {
acc.Add(name, val, nil)
}
}
func (s *SystemStats) Gather(acc plugins.Accumulator) error {
lv, err := s.ps.LoadAvg() lv, err := s.ps.LoadAvg()
if err != nil { if err != nil {
return nil, err return err
} }
m1 := cypress.Metric() acc.Add("load1", lv.Load1, nil)
m1.Add("type", "gauge") acc.Add("load5", lv.Load5, nil)
m1.Add("name", "load1") acc.Add("load15", lv.Load15, nil)
m1.Add("value", lv.Load1)
for k, v := range s.tags { times, err := s.ps.CPUTimes()
m1.AddTag(k, v) if err != nil {
return fmt.Errorf("error getting CPU info: %s", err)
} }
m2 := cypress.Metric() for _, cts := range times {
m2.Add("type", "gauge") s.add(acc, cts.CPU+".user", cts.User)
m2.Add("name", "load5") s.add(acc, cts.CPU+".system", cts.System)
m2.Add("value", lv.Load5) s.add(acc, cts.CPU+".idle", cts.Idle)
s.add(acc, cts.CPU+".nice", cts.Nice)
for k, v := range s.tags { s.add(acc, cts.CPU+".iowait", cts.Iowait)
m2.AddTag(k, v) s.add(acc, cts.CPU+".irq", cts.Irq)
s.add(acc, cts.CPU+".softirq", cts.Softirq)
s.add(acc, cts.CPU+".steal", cts.Steal)
s.add(acc, cts.CPU+".guest", cts.Guest)
s.add(acc, cts.CPU+".guestNice", cts.GuestNice)
s.add(acc, cts.CPU+".stolen", cts.Stolen)
} }
m3 := cypress.Metric() return nil
m3.Add("type", "gauge")
m3.Add("name", "load15")
m3.Add("value", lv.Load15)
for k, v := range s.tags {
m3.AddTag(k, v)
}
return []*cypress.Message{m1, m2, m3}, nil
} }
type systemPS struct{} type systemPS struct{}
@ -57,6 +61,10 @@ func (s *systemPS) LoadAvg() (*load.LoadAvgStat, error) {
return load.LoadAvg() return load.LoadAvg()
} }
func (s *systemPS) CPUTimes() ([]cpu.CPUTimesStat, error) {
return cpu.CPUTimes(true)
}
func init() { func init() {
plugins.Add("system", func() plugins.Plugin { plugins.Add("system", func() plugins.Plugin {
return &SystemStats{ps: &systemPS{}} return &SystemStats{ps: &systemPS{}}

View File

@ -3,16 +3,20 @@ package system
import ( import (
"testing" "testing"
"github.com/influxdb/tivan/plugins/system/ps/cpu"
"github.com/influxdb/tivan/plugins/system/ps/load" "github.com/influxdb/tivan/plugins/system/ps/load"
"github.com/influxdb/tivan/testutil"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestSystemStats_GenerateLoad(t *testing.T) { func TestSystemStats_GenerateStats(t *testing.T) {
var mps MockPS var mps MockPS
defer mps.AssertExpectations(t) defer mps.AssertExpectations(t)
var acc testutil.Accumulator
ss := &SystemStats{ps: &mps} ss := &SystemStats{ps: &mps}
lv := &load.LoadAvgStat{ lv := &load.LoadAvgStat{
@ -23,72 +27,39 @@ func TestSystemStats_GenerateLoad(t *testing.T) {
mps.On("LoadAvg").Return(lv, nil) mps.On("LoadAvg").Return(lv, nil)
msgs, err := ss.Read() cts := cpu.CPUTimesStat{
CPU: "all",
User: 3.1,
System: 8.2,
Idle: 80.1,
Nice: 1.3,
Iowait: 0.2,
Irq: 0.1,
Softirq: 0.11,
Steal: 0.0001,
Guest: 8.1,
GuestNice: 0.324,
Stolen: 0.051,
}
mps.On("CPUTimes").Return([]cpu.CPUTimesStat{cts}, nil)
err := ss.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
name, ok := msgs[0].GetString("name") assert.True(t, acc.CheckValue("load1", 0.3))
require.True(t, ok) assert.True(t, acc.CheckValue("load5", 1.5))
assert.True(t, acc.CheckValue("load15", 0.8))
assert.Equal(t, "load1", name) assert.True(t, acc.CheckValue("all.user", 3.1))
assert.True(t, acc.CheckValue("all.system", 8.2))
val, ok := msgs[0].GetFloat("value") assert.True(t, acc.CheckValue("all.idle", 80.1))
require.True(t, ok) assert.True(t, acc.CheckValue("all.nice", 1.3))
assert.True(t, acc.CheckValue("all.iowait", 0.2))
assert.Equal(t, 0.3, val) assert.True(t, acc.CheckValue("all.irq", 0.1))
assert.True(t, acc.CheckValue("all.softirq", 0.11))
name, ok = msgs[1].GetString("name") assert.True(t, acc.CheckValue("all.steal", 0.0001))
require.True(t, ok) assert.True(t, acc.CheckValue("all.guest", 8.1))
assert.True(t, acc.CheckValue("all.guestNice", 0.324))
assert.Equal(t, "load5", name) assert.True(t, acc.CheckValue("all.stolen", 0.051))
val, ok = msgs[1].GetFloat("value")
require.True(t, ok)
assert.Equal(t, 1.5, val)
name, ok = msgs[2].GetString("name")
require.True(t, ok)
assert.Equal(t, "load15", name)
val, ok = msgs[2].GetFloat("value")
require.True(t, ok)
assert.Equal(t, 0.8, val)
}
func TestSystemStats_AddTags(t *testing.T) {
var mps MockPS
defer mps.AssertExpectations(t)
ss := &SystemStats{
ps: &mps,
tags: map[string]string{
"host": "my.test",
"dc": "us-west-1",
},
}
lv := &load.LoadAvgStat{
Load1: 0.3,
Load5: 1.5,
Load15: 0.8,
}
mps.On("LoadAvg").Return(lv, nil)
msgs, err := ss.Read()
require.NoError(t, err)
for _, m := range msgs {
val, ok := m.GetTag("host")
require.True(t, ok)
assert.Equal(t, val, "my.test")
val, ok = m.GetTag("dc")
require.True(t, ok)
assert.Equal(t, val, "us-west-1")
}
} }

View File

@ -7,6 +7,6 @@ debug = true
url = "http://localhost:8086" url = "http://localhost:8086"
username = "root" username = "root"
password = "root" password = "root"
database = "cypress" database = "tivan"
tags = { dc = "us-phx-1" } tags = { dc = "us-phx-1" }

25
testutil/accumulator.go Normal file
View File

@ -0,0 +1,25 @@
package testutil
type Point struct {
Name string
Value interface{}
Tags map[string]string
}
type Accumulator struct {
Points []*Point
}
func (a *Accumulator) Add(name string, value interface{}, tags map[string]string) {
a.Points = append(a.Points, &Point{name, value, tags})
}
func (a *Accumulator) CheckValue(name string, val interface{}) bool {
for _, p := range a.Points {
if p.Name == name {
return p.Value == val
}
}
return false
}