diff --git a/agent.go b/agent.go new file mode 100644 index 000000000..2bdaf613e --- /dev/null +++ b/agent.go @@ -0,0 +1,132 @@ +package tivan + +import ( + "fmt" + "log" + "sort" + + "github.com/influxdb/tivan/plugins" + "github.com/vektra/cypress" + "github.com/vektra/cypress/plugins/metrics" +) +import "time" + +type Metrics interface { + Receive(*cypress.Message) error +} + +type Agent struct { + Interval Duration + Debug bool + HTTP string + + Config *Config + + plugins []plugins.Plugin + metrics Metrics + + eachInternal []func() +} + +func NewAgent(config *Config) *Agent { + m := metrics.NewMetricSink() + + agent := &Agent{Config: config, metrics: m} + + err := config.Apply("agent", agent) + if err != nil { + panic(err) + } + + if config.URL != "" { + icfg := metrics.DefaultInfluxConfig() + icfg.URL = config.URL + icfg.Username = config.Username + icfg.Password = config.Password + icfg.Database = config.Database + icfg.UserAgent = config.UserAgent + + agent.eachInternal = append(agent.eachInternal, func() { + if agent.Debug { + log.Printf("flushing to influxdb") + } + + m.FlushInflux(icfg) + }) + } + + return agent +} + +type HTTPInterface interface { + RunHTTP(string) error +} + +func (a *Agent) RunHTTP(addr string) { + a.metrics.(HTTPInterface).RunHTTP(addr) +} + +func (a *Agent) LoadPlugins() ([]string, error) { + var names []string + + for name, creator := range plugins.Plugins { + a.plugins = append(a.plugins, creator()) + names = append(names, name) + } + + sort.Strings(names) + + return names, nil +} + +func (a *Agent) crank() error { + for _, plugin := range a.plugins { + msgs, err := plugin.Read() + if err != nil { + return err + } + + for _, m := range msgs { + for k, v := range a.Config.Tags { + m.AddTag(k, v) + } + + if a.Debug { + fmt.Println(m.KVString()) + } + + err = a.metrics.Receive(m) + if err != nil { + return err + } + } + } + + return nil +} + +func (a *Agent) Run(shutdown chan struct{}) { + if a.HTTP != "" { + go a.RunHTTP(a.HTTP) + } + + ticker := time.NewTicker(a.Interval.Duration) + + for { + err := a.crank() + if err != nil { + log.Printf("Error in plugins: %s", err) + } + + for _, f := range a.eachInternal { + f() + } + + select { + case <-shutdown: + return + case <-ticker.C: + continue + } + } +} diff --git a/agent_test.go b/agent_test.go new file mode 100644 index 000000000..655c608df --- /dev/null +++ b/agent_test.go @@ -0,0 +1,79 @@ +package tivan + +import ( + "testing" + + "github.com/influxdb/tivan/plugins" + "github.com/stretchr/testify/require" + "github.com/vektra/cypress" + "github.com/vektra/neko" +) + +func TestAgent(t *testing.T) { + n := neko.Start(t) + + var ( + plugin plugins.MockPlugin + metrics MockMetrics + ) + + n.CheckMock(&plugin.Mock) + n.CheckMock(&metrics.Mock) + + n.It("drives the plugins and sends them to the metrics", func() { + a := &Agent{ + plugins: []plugins.Plugin{&plugin}, + metrics: &metrics, + Config: &Config{}, + } + + m1 := cypress.Metric() + m1.Add("name", "foo") + m1.Add("value", 1.2) + + m2 := cypress.Metric() + m2.Add("name", "bar") + m2.Add("value", 888) + + msgs := []*cypress.Message{m1, m2} + + plugin.On("Read").Return(msgs, nil) + metrics.On("Receive", m1).Return(nil) + metrics.On("Receive", m2).Return(nil) + + err := a.crank() + require.NoError(t, err) + }) + + n.It("applies tags as the messages pass through", func() { + a := &Agent{ + plugins: []plugins.Plugin{&plugin}, + metrics: &metrics, + Config: &Config{ + Tags: map[string]string{ + "dc": "us-west-1", + }, + }, + } + + m1 := cypress.Metric() + m1.Add("name", "foo") + m1.Add("value", 1.2) + + msgs := []*cypress.Message{m1} + + m2 := cypress.Metric() + m2.Timestamp = m1.Timestamp + m2.Add("name", "foo") + m2.Add("value", 1.2) + m2.AddTag("dc", "us-west-1") + + plugin.On("Read").Return(msgs, nil) + metrics.On("Receive", m2).Return(nil) + + err := a.crank() + require.NoError(t, err) + }) + + n.Meow() +} diff --git a/cmd/influxdb-agent/agent.go b/cmd/influxdb-agent/agent.go new file mode 100644 index 000000000..6e6c8b421 --- /dev/null +++ b/cmd/influxdb-agent/agent.go @@ -0,0 +1,70 @@ +package main + +import ( + "flag" + "log" + "os" + "os/signal" + "strings" + + "github.com/influxdb/tivan" + _ "github.com/influxdb/tivan/plugins/all" +) + +var fDebug = flag.Bool("debug", false, "show metrics as they're generated to stdout") + +var fConfig = flag.String("config", "", "configuration file to load") + +func main() { + flag.Parse() + + var ( + config *tivan.Config + err error + ) + + if *fConfig != "" { + config, err = tivan.LoadConfig(*fConfig) + if err != nil { + log.Fatal(err) + } + } else { + config = tivan.DefaultConfig() + } + + ag := tivan.NewAgent(config) + + if *fDebug { + ag.Debug = true + } + + plugins, err := ag.LoadPlugins() + if err != nil { + log.Fatal(err) + } + + shutdown := make(chan struct{}) + + signals := make(chan os.Signal) + + signal.Notify(signals, os.Interrupt) + + go func() { + <-signals + close(shutdown) + }() + + log.Print("InfluxDB Agent running") + log.Printf("Loaded plugins: %s", strings.Join(plugins, " ")) + if ag.Debug { + log.Printf("Debug: enabled") + log.Printf("Agent Config: %#v", ag) + } + + if config.URL != "" { + log.Printf("Sending metrics to: %s", config.URL) + log.Printf("Tags enabled: %v", config.ListTags()) + } + + ag.Run(shutdown) +} diff --git a/config.go b/config.go new file mode 100644 index 000000000..57dbd5af0 --- /dev/null +++ b/config.go @@ -0,0 +1,103 @@ +package tivan + +import ( + "errors" + "fmt" + "io/ioutil" + "sort" + "strings" + "time" + + "github.com/naoina/toml" + "github.com/naoina/toml/ast" +) + +type Duration struct { + time.Duration +} + +func (d *Duration) UnmarshalTOML(b []byte) error { + dur, err := time.ParseDuration(string(b[1 : len(b)-1])) + if err != nil { + return err + } + + d.Duration = dur + + return nil +} + +type Config struct { + URL string + Username string + Password string + Database string + UserAgent string + Tags map[string]string + + plugins map[string]*ast.Table +} + +func (c *Config) Plugins() map[string]*ast.Table { + return c.plugins +} + +func (c *Config) Apply(name string, v interface{}) error { + if tbl, ok := c.plugins[name]; ok { + return toml.UnmarshalTable(tbl, v) + } + + return nil +} + +func DefaultConfig() *Config { + return &Config{} +} + +var ErrInvalidConfig = errors.New("invalid configuration") + +func LoadConfig(path string) (*Config, error) { + data, err := ioutil.ReadFile(path) + if err != nil { + return nil, err + } + + tbl, err := toml.Parse(data) + if err != nil { + return nil, err + } + + c := &Config{ + plugins: make(map[string]*ast.Table), + } + + for name, val := range tbl.Fields { + subtbl, ok := val.(*ast.Table) + if !ok { + return nil, ErrInvalidConfig + } + + if name == "influxdb" { + err := toml.UnmarshalTable(subtbl, c) + if err != nil { + return nil, err + } + } else { + c.plugins[name] = subtbl + } + } + + return c, nil +} + +func (c *Config) ListTags() string { + var tags []string + + for k, v := range c.Tags { + tags = append(tags, fmt.Sprintf("%s=%s", k, v)) + } + + sort.Strings(tags) + + return strings.Join(tags, " ") +} diff --git a/mock_Metrics.go b/mock_Metrics.go new file mode 100644 index 000000000..d3249cd07 --- /dev/null +++ b/mock_Metrics.go @@ -0,0 +1,17 @@ +package tivan + +import "github.com/stretchr/testify/mock" + +import "github.com/vektra/cypress" + +type MockMetrics struct { + mock.Mock +} + +func (m *MockMetrics) Receive(_a0 *cypress.Message) error { + ret := m.Called(_a0) + + r0 := ret.Error(0) + + return r0 +} diff --git a/mock_Plugin.go b/mock_Plugin.go new file mode 100644 index 000000000..f2f36980c --- /dev/null +++ b/mock_Plugin.go @@ -0,0 +1,18 @@ +package tivan + +import "github.com/stretchr/testify/mock" + +import "github.com/vektra/cypress" + +type MockPlugin struct { + mock.Mock +} + +func (m *MockPlugin) Read() ([]*cypress.Message, error) { + ret := m.Called() + + r0 := ret.Get(0).([]*cypress.Message) + r1 := ret.Error(1) + + return r0, r1 +} diff --git a/plugins/all/all.go b/plugins/all/all.go new file mode 100644 index 000000000..602d919d6 --- /dev/null +++ b/plugins/all/all.go @@ -0,0 +1,5 @@ +package all + +import ( + _ "github.com/influxdb/tivan/plugins/system" +) diff --git a/plugins/mock_Plugin.go b/plugins/mock_Plugin.go new file mode 100644 index 000000000..d775c6a64 --- /dev/null +++ b/plugins/mock_Plugin.go @@ -0,0 +1,18 @@ +package plugins + +import "github.com/stretchr/testify/mock" + +import "github.com/vektra/cypress" + +type MockPlugin struct { + mock.Mock +} + +func (m *MockPlugin) Read() ([]*cypress.Message, error) { + ret := m.Called() + + r0 := ret.Get(0).([]*cypress.Message) + r1 := ret.Error(1) + + return r0, r1 +} diff --git a/plugins/registry.go b/plugins/registry.go new file mode 100644 index 000000000..b2a0850ba --- /dev/null +++ b/plugins/registry.go @@ -0,0 +1,15 @@ +package plugins + +import "github.com/vektra/cypress" + +type Plugin interface { + Read() ([]*cypress.Message, error) +} + +type Creator func() Plugin + +var Plugins = map[string]Creator{} + +func Add(name string, creator Creator) { + Plugins[name] = creator +} diff --git a/plugins/system/mock_PS.go b/plugins/system/mock_PS.go new file mode 100644 index 000000000..957839811 --- /dev/null +++ b/plugins/system/mock_PS.go @@ -0,0 +1,18 @@ +package system + +import "github.com/stretchr/testify/mock" + +import "github.com/shirou/gopsutil/load" + +type MockPS struct { + mock.Mock +} + +func (m *MockPS) LoadAvg() (*load.LoadAvgStat, error) { + ret := m.Called() + + r0 := ret.Get(0).(*load.LoadAvgStat) + r1 := ret.Error(1) + + return r0, r1 +} diff --git a/plugins/system/system.go b/plugins/system/system.go new file mode 100644 index 000000000..6edcfe43c --- /dev/null +++ b/plugins/system/system.go @@ -0,0 +1,64 @@ +package system + +import ( + "github.com/influxdb/tivan/plugins" + "github.com/shirou/gopsutil/load" + "github.com/vektra/cypress" +) + +type PS interface { + LoadAvg() (*load.LoadAvgStat, error) +} + +type SystemStats struct { + ps PS + tags map[string]string +} + +func (s *SystemStats) Read() ([]*cypress.Message, error) { + lv, err := s.ps.LoadAvg() + if err != nil { + return nil, err + } + + m1 := cypress.Metric() + m1.Add("type", "gauge") + m1.Add("name", "load1") + m1.Add("value", lv.Load1) + + for k, v := range s.tags { + m1.AddTag(k, v) + } + + m2 := cypress.Metric() + m2.Add("type", "gauge") + m2.Add("name", "load5") + m2.Add("value", lv.Load5) + + for k, v := range s.tags { + m2.AddTag(k, v) + } + + m3 := cypress.Metric() + m3.Add("type", "gauge") + m3.Add("name", "load15") + m3.Add("value", lv.Load15) + + for k, v := range s.tags { + m3.AddTag(k, v) + } + + return []*cypress.Message{m1, m2, m3}, nil +} + +type systemPS struct{} + +func (s *systemPS) LoadAvg() (*load.LoadAvgStat, error) { + return load.LoadAvg() +} + +func init() { + plugins.Add("system", func() plugins.Plugin { + return &SystemStats{ps: &systemPS{}} + }) +} diff --git a/plugins/system/system_test.go b/plugins/system/system_test.go new file mode 100644 index 000000000..1c136a567 --- /dev/null +++ b/plugins/system/system_test.go @@ -0,0 +1,98 @@ +package system + +import ( + "testing" + + "github.com/shirou/gopsutil/load" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/vektra/neko" +) + +func TestSystemStats(t *testing.T) { + n := neko.Start(t) + + var mps MockPS + + n.CheckMock(&mps.Mock) + + n.It("generates metrics from the system information", func() { + ss := &SystemStats{ps: &mps} + + lv := &load.LoadAvgStat{ + Load1: 0.3, + Load5: 1.5, + Load15: 0.8, + } + + mps.On("LoadAvg").Return(lv, nil) + + msgs, err := ss.Read() + require.NoError(t, err) + + name, ok := msgs[0].GetString("name") + require.True(t, ok) + + assert.Equal(t, "load1", name) + + val, ok := msgs[0].GetFloat("value") + require.True(t, ok) + + assert.Equal(t, 0.3, val) + + name, ok = msgs[1].GetString("name") + require.True(t, ok) + + assert.Equal(t, "load5", name) + + val, ok = msgs[1].GetFloat("value") + require.True(t, ok) + + assert.Equal(t, 1.5, val) + + name, ok = msgs[2].GetString("name") + require.True(t, ok) + + assert.Equal(t, "load15", name) + + val, ok = msgs[2].GetFloat("value") + require.True(t, ok) + + assert.Equal(t, 0.8, val) + }) + + n.It("adds any tags registered", func() { + ss := &SystemStats{ + ps: &mps, + tags: map[string]string{ + "host": "my.test", + "dc": "us-west-1", + }, + } + + lv := &load.LoadAvgStat{ + Load1: 0.3, + Load5: 1.5, + Load15: 0.8, + } + + mps.On("LoadAvg").Return(lv, nil) + + msgs, err := ss.Read() + require.NoError(t, err) + + for _, m := range msgs { + val, ok := m.GetTag("host") + require.True(t, ok) + + assert.Equal(t, val, "my.test") + + val, ok = m.GetTag("dc") + require.True(t, ok) + + assert.Equal(t, val, "us-west-1") + } + }) + + n.Meow() +} diff --git a/testdata/influx.toml b/testdata/influx.toml new file mode 100644 index 000000000..00376fa7a --- /dev/null +++ b/testdata/influx.toml @@ -0,0 +1,12 @@ +[agent] +interval = "5s" +http = ":11213" +debug = true + +[influxdb] +url = "http://localhost:8086" +username = "root" +password = "root" +database = "cypress" +tags = { dc = "us-phx-1" } +