Initial spike

This is mostly to solidify the overall structure with the agent,
plugins, and testing philosphy.
This commit is contained in:
Evan Phoenix 2015-04-01 09:34:32 -07:00
parent b8e13890a9
commit db74acb86d
13 changed files with 649 additions and 0 deletions

132
agent.go Normal file
View File

@ -0,0 +1,132 @@
package tivan
import (
"fmt"
"log"
"sort"
"github.com/influxdb/tivan/plugins"
"github.com/vektra/cypress"
"github.com/vektra/cypress/plugins/metrics"
)
import "time"
type Metrics interface {
Receive(*cypress.Message) error
}
type Agent struct {
Interval Duration
Debug bool
HTTP string
Config *Config
plugins []plugins.Plugin
metrics Metrics
eachInternal []func()
}
func NewAgent(config *Config) *Agent {
m := metrics.NewMetricSink()
agent := &Agent{Config: config, metrics: m}
err := config.Apply("agent", agent)
if err != nil {
panic(err)
}
if config.URL != "" {
icfg := metrics.DefaultInfluxConfig()
icfg.URL = config.URL
icfg.Username = config.Username
icfg.Password = config.Password
icfg.Database = config.Database
icfg.UserAgent = config.UserAgent
agent.eachInternal = append(agent.eachInternal, func() {
if agent.Debug {
log.Printf("flushing to influxdb")
}
m.FlushInflux(icfg)
})
}
return agent
}
type HTTPInterface interface {
RunHTTP(string) error
}
func (a *Agent) RunHTTP(addr string) {
a.metrics.(HTTPInterface).RunHTTP(addr)
}
func (a *Agent) LoadPlugins() ([]string, error) {
var names []string
for name, creator := range plugins.Plugins {
a.plugins = append(a.plugins, creator())
names = append(names, name)
}
sort.Strings(names)
return names, nil
}
func (a *Agent) crank() error {
for _, plugin := range a.plugins {
msgs, err := plugin.Read()
if err != nil {
return err
}
for _, m := range msgs {
for k, v := range a.Config.Tags {
m.AddTag(k, v)
}
if a.Debug {
fmt.Println(m.KVString())
}
err = a.metrics.Receive(m)
if err != nil {
return err
}
}
}
return nil
}
func (a *Agent) Run(shutdown chan struct{}) {
if a.HTTP != "" {
go a.RunHTTP(a.HTTP)
}
ticker := time.NewTicker(a.Interval.Duration)
for {
err := a.crank()
if err != nil {
log.Printf("Error in plugins: %s", err)
}
for _, f := range a.eachInternal {
f()
}
select {
case <-shutdown:
return
case <-ticker.C:
continue
}
}
}

79
agent_test.go Normal file
View File

@ -0,0 +1,79 @@
package tivan
import (
"testing"
"github.com/influxdb/tivan/plugins"
"github.com/stretchr/testify/require"
"github.com/vektra/cypress"
"github.com/vektra/neko"
)
func TestAgent(t *testing.T) {
n := neko.Start(t)
var (
plugin plugins.MockPlugin
metrics MockMetrics
)
n.CheckMock(&plugin.Mock)
n.CheckMock(&metrics.Mock)
n.It("drives the plugins and sends them to the metrics", func() {
a := &Agent{
plugins: []plugins.Plugin{&plugin},
metrics: &metrics,
Config: &Config{},
}
m1 := cypress.Metric()
m1.Add("name", "foo")
m1.Add("value", 1.2)
m2 := cypress.Metric()
m2.Add("name", "bar")
m2.Add("value", 888)
msgs := []*cypress.Message{m1, m2}
plugin.On("Read").Return(msgs, nil)
metrics.On("Receive", m1).Return(nil)
metrics.On("Receive", m2).Return(nil)
err := a.crank()
require.NoError(t, err)
})
n.It("applies tags as the messages pass through", func() {
a := &Agent{
plugins: []plugins.Plugin{&plugin},
metrics: &metrics,
Config: &Config{
Tags: map[string]string{
"dc": "us-west-1",
},
},
}
m1 := cypress.Metric()
m1.Add("name", "foo")
m1.Add("value", 1.2)
msgs := []*cypress.Message{m1}
m2 := cypress.Metric()
m2.Timestamp = m1.Timestamp
m2.Add("name", "foo")
m2.Add("value", 1.2)
m2.AddTag("dc", "us-west-1")
plugin.On("Read").Return(msgs, nil)
metrics.On("Receive", m2).Return(nil)
err := a.crank()
require.NoError(t, err)
})
n.Meow()
}

View File

@ -0,0 +1,70 @@
package main
import (
"flag"
"log"
"os"
"os/signal"
"strings"
"github.com/influxdb/tivan"
_ "github.com/influxdb/tivan/plugins/all"
)
var fDebug = flag.Bool("debug", false, "show metrics as they're generated to stdout")
var fConfig = flag.String("config", "", "configuration file to load")
func main() {
flag.Parse()
var (
config *tivan.Config
err error
)
if *fConfig != "" {
config, err = tivan.LoadConfig(*fConfig)
if err != nil {
log.Fatal(err)
}
} else {
config = tivan.DefaultConfig()
}
ag := tivan.NewAgent(config)
if *fDebug {
ag.Debug = true
}
plugins, err := ag.LoadPlugins()
if err != nil {
log.Fatal(err)
}
shutdown := make(chan struct{})
signals := make(chan os.Signal)
signal.Notify(signals, os.Interrupt)
go func() {
<-signals
close(shutdown)
}()
log.Print("InfluxDB Agent running")
log.Printf("Loaded plugins: %s", strings.Join(plugins, " "))
if ag.Debug {
log.Printf("Debug: enabled")
log.Printf("Agent Config: %#v", ag)
}
if config.URL != "" {
log.Printf("Sending metrics to: %s", config.URL)
log.Printf("Tags enabled: %v", config.ListTags())
}
ag.Run(shutdown)
}

103
config.go Normal file
View File

@ -0,0 +1,103 @@
package tivan
import (
"errors"
"fmt"
"io/ioutil"
"sort"
"strings"
"time"
"github.com/naoina/toml"
"github.com/naoina/toml/ast"
)
type Duration struct {
time.Duration
}
func (d *Duration) UnmarshalTOML(b []byte) error {
dur, err := time.ParseDuration(string(b[1 : len(b)-1]))
if err != nil {
return err
}
d.Duration = dur
return nil
}
type Config struct {
URL string
Username string
Password string
Database string
UserAgent string
Tags map[string]string
plugins map[string]*ast.Table
}
func (c *Config) Plugins() map[string]*ast.Table {
return c.plugins
}
func (c *Config) Apply(name string, v interface{}) error {
if tbl, ok := c.plugins[name]; ok {
return toml.UnmarshalTable(tbl, v)
}
return nil
}
func DefaultConfig() *Config {
return &Config{}
}
var ErrInvalidConfig = errors.New("invalid configuration")
func LoadConfig(path string) (*Config, error) {
data, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
}
tbl, err := toml.Parse(data)
if err != nil {
return nil, err
}
c := &Config{
plugins: make(map[string]*ast.Table),
}
for name, val := range tbl.Fields {
subtbl, ok := val.(*ast.Table)
if !ok {
return nil, ErrInvalidConfig
}
if name == "influxdb" {
err := toml.UnmarshalTable(subtbl, c)
if err != nil {
return nil, err
}
} else {
c.plugins[name] = subtbl
}
}
return c, nil
}
func (c *Config) ListTags() string {
var tags []string
for k, v := range c.Tags {
tags = append(tags, fmt.Sprintf("%s=%s", k, v))
}
sort.Strings(tags)
return strings.Join(tags, " ")
}

17
mock_Metrics.go Normal file
View File

@ -0,0 +1,17 @@
package tivan
import "github.com/stretchr/testify/mock"
import "github.com/vektra/cypress"
type MockMetrics struct {
mock.Mock
}
func (m *MockMetrics) Receive(_a0 *cypress.Message) error {
ret := m.Called(_a0)
r0 := ret.Error(0)
return r0
}

18
mock_Plugin.go Normal file
View File

@ -0,0 +1,18 @@
package tivan
import "github.com/stretchr/testify/mock"
import "github.com/vektra/cypress"
type MockPlugin struct {
mock.Mock
}
func (m *MockPlugin) Read() ([]*cypress.Message, error) {
ret := m.Called()
r0 := ret.Get(0).([]*cypress.Message)
r1 := ret.Error(1)
return r0, r1
}

5
plugins/all/all.go Normal file
View File

@ -0,0 +1,5 @@
package all
import (
_ "github.com/influxdb/tivan/plugins/system"
)

18
plugins/mock_Plugin.go Normal file
View File

@ -0,0 +1,18 @@
package plugins
import "github.com/stretchr/testify/mock"
import "github.com/vektra/cypress"
type MockPlugin struct {
mock.Mock
}
func (m *MockPlugin) Read() ([]*cypress.Message, error) {
ret := m.Called()
r0 := ret.Get(0).([]*cypress.Message)
r1 := ret.Error(1)
return r0, r1
}

15
plugins/registry.go Normal file
View File

@ -0,0 +1,15 @@
package plugins
import "github.com/vektra/cypress"
type Plugin interface {
Read() ([]*cypress.Message, error)
}
type Creator func() Plugin
var Plugins = map[string]Creator{}
func Add(name string, creator Creator) {
Plugins[name] = creator
}

18
plugins/system/mock_PS.go Normal file
View File

@ -0,0 +1,18 @@
package system
import "github.com/stretchr/testify/mock"
import "github.com/shirou/gopsutil/load"
type MockPS struct {
mock.Mock
}
func (m *MockPS) LoadAvg() (*load.LoadAvgStat, error) {
ret := m.Called()
r0 := ret.Get(0).(*load.LoadAvgStat)
r1 := ret.Error(1)
return r0, r1
}

64
plugins/system/system.go Normal file
View File

@ -0,0 +1,64 @@
package system
import (
"github.com/influxdb/tivan/plugins"
"github.com/shirou/gopsutil/load"
"github.com/vektra/cypress"
)
type PS interface {
LoadAvg() (*load.LoadAvgStat, error)
}
type SystemStats struct {
ps PS
tags map[string]string
}
func (s *SystemStats) Read() ([]*cypress.Message, error) {
lv, err := s.ps.LoadAvg()
if err != nil {
return nil, err
}
m1 := cypress.Metric()
m1.Add("type", "gauge")
m1.Add("name", "load1")
m1.Add("value", lv.Load1)
for k, v := range s.tags {
m1.AddTag(k, v)
}
m2 := cypress.Metric()
m2.Add("type", "gauge")
m2.Add("name", "load5")
m2.Add("value", lv.Load5)
for k, v := range s.tags {
m2.AddTag(k, v)
}
m3 := cypress.Metric()
m3.Add("type", "gauge")
m3.Add("name", "load15")
m3.Add("value", lv.Load15)
for k, v := range s.tags {
m3.AddTag(k, v)
}
return []*cypress.Message{m1, m2, m3}, nil
}
type systemPS struct{}
func (s *systemPS) LoadAvg() (*load.LoadAvgStat, error) {
return load.LoadAvg()
}
func init() {
plugins.Add("system", func() plugins.Plugin {
return &SystemStats{ps: &systemPS{}}
})
}

View File

@ -0,0 +1,98 @@
package system
import (
"testing"
"github.com/shirou/gopsutil/load"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/vektra/neko"
)
func TestSystemStats(t *testing.T) {
n := neko.Start(t)
var mps MockPS
n.CheckMock(&mps.Mock)
n.It("generates metrics from the system information", func() {
ss := &SystemStats{ps: &mps}
lv := &load.LoadAvgStat{
Load1: 0.3,
Load5: 1.5,
Load15: 0.8,
}
mps.On("LoadAvg").Return(lv, nil)
msgs, err := ss.Read()
require.NoError(t, err)
name, ok := msgs[0].GetString("name")
require.True(t, ok)
assert.Equal(t, "load1", name)
val, ok := msgs[0].GetFloat("value")
require.True(t, ok)
assert.Equal(t, 0.3, val)
name, ok = msgs[1].GetString("name")
require.True(t, ok)
assert.Equal(t, "load5", name)
val, ok = msgs[1].GetFloat("value")
require.True(t, ok)
assert.Equal(t, 1.5, val)
name, ok = msgs[2].GetString("name")
require.True(t, ok)
assert.Equal(t, "load15", name)
val, ok = msgs[2].GetFloat("value")
require.True(t, ok)
assert.Equal(t, 0.8, val)
})
n.It("adds any tags registered", func() {
ss := &SystemStats{
ps: &mps,
tags: map[string]string{
"host": "my.test",
"dc": "us-west-1",
},
}
lv := &load.LoadAvgStat{
Load1: 0.3,
Load5: 1.5,
Load15: 0.8,
}
mps.On("LoadAvg").Return(lv, nil)
msgs, err := ss.Read()
require.NoError(t, err)
for _, m := range msgs {
val, ok := m.GetTag("host")
require.True(t, ok)
assert.Equal(t, val, "my.test")
val, ok = m.GetTag("dc")
require.True(t, ok)
assert.Equal(t, val, "us-west-1")
}
})
n.Meow()
}

12
testdata/influx.toml vendored Normal file
View File

@ -0,0 +1,12 @@
[agent]
interval = "5s"
http = ":11213"
debug = true
[influxdb]
url = "http://localhost:8086"
username = "root"
password = "root"
database = "cypress"
tags = { dc = "us-phx-1" }