convert influxdb output to multiple outputs
This commit is contained in:
parent
e9ad786578
commit
4a12471918
101
agent.go
101
agent.go
|
@ -3,16 +3,20 @@ package telegraf
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"net/url"
|
|
||||||
"os"
|
"os"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdb/influxdb/client"
|
"github.com/influxdb/telegraf/outputs"
|
||||||
"github.com/influxdb/telegraf/plugins"
|
"github.com/influxdb/telegraf/plugins"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type runningOutput struct {
|
||||||
|
name string
|
||||||
|
output outputs.Output
|
||||||
|
}
|
||||||
|
|
||||||
type runningPlugin struct {
|
type runningPlugin struct {
|
||||||
name string
|
name string
|
||||||
plugin plugins.Plugin
|
plugin plugins.Plugin
|
||||||
|
@ -26,9 +30,8 @@ type Agent struct {
|
||||||
|
|
||||||
Config *Config
|
Config *Config
|
||||||
|
|
||||||
|
outputs []*runningOutput
|
||||||
plugins []*runningPlugin
|
plugins []*runningPlugin
|
||||||
|
|
||||||
conn *client.Client
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewAgent(config *Config) (*Agent, error) {
|
func NewAgent(config *Config) (*Agent, error) {
|
||||||
|
@ -57,30 +60,41 @@ func NewAgent(config *Config) (*Agent, error) {
|
||||||
return agent, nil
|
return agent, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (agent *Agent) Connect() error {
|
func (a *Agent) Connect() error {
|
||||||
config := agent.Config
|
for _, o := range a.outputs {
|
||||||
|
err := o.output.Connect()
|
||||||
u, err := url.Parse(config.URL)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
c, err := client.NewClient(client.Config{
|
|
||||||
URL: *u,
|
|
||||||
Username: config.Username,
|
|
||||||
Password: config.Password,
|
|
||||||
UserAgent: config.UserAgent,
|
|
||||||
})
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
agent.conn = c
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *Agent) LoadOutputs() ([]string, error) {
|
||||||
|
var names []string
|
||||||
|
|
||||||
|
for _, name := range a.Config.OutputsDeclared() {
|
||||||
|
creator, ok := outputs.Outputs[name]
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("Undefined but requested output: %s", name)
|
||||||
|
}
|
||||||
|
|
||||||
|
output := creator()
|
||||||
|
|
||||||
|
err := a.Config.ApplyOutput(name, output)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
a.outputs = append(a.outputs, &runningOutput{name, output})
|
||||||
|
names = append(names, name)
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Strings(names)
|
||||||
|
|
||||||
|
return names, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (a *Agent) LoadPlugins() ([]string, error) {
|
func (a *Agent) LoadPlugins() ([]string, error) {
|
||||||
var names []string
|
var names []string
|
||||||
|
|
||||||
|
@ -135,17 +149,15 @@ func (a *Agent) crankParallel() error {
|
||||||
|
|
||||||
close(points)
|
close(points)
|
||||||
|
|
||||||
var acc BatchPoints
|
var bp BatchPoints
|
||||||
acc.Tags = a.Config.Tags
|
bp.Tags = a.Config.Tags
|
||||||
acc.Time = time.Now()
|
bp.Time = time.Now()
|
||||||
acc.Database = a.Config.Database
|
|
||||||
|
|
||||||
for sub := range points {
|
for sub := range points {
|
||||||
acc.Points = append(acc.Points, sub.Points...)
|
bp.Points = append(bp.Points, sub.Points...)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := a.conn.Write(acc.BatchPoints)
|
return a.flush(bp)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Agent) crank() error {
|
func (a *Agent) crank() error {
|
||||||
|
@ -164,10 +176,8 @@ func (a *Agent) crank() error {
|
||||||
|
|
||||||
acc.Tags = a.Config.Tags
|
acc.Tags = a.Config.Tags
|
||||||
acc.Time = time.Now()
|
acc.Time = time.Now()
|
||||||
acc.Database = a.Config.Database
|
|
||||||
|
|
||||||
_, err := a.conn.Write(acc.BatchPoints)
|
return a.flush(acc)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) error {
|
func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) error {
|
||||||
|
@ -187,9 +197,11 @@ func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) err
|
||||||
|
|
||||||
acc.Tags = a.Config.Tags
|
acc.Tags = a.Config.Tags
|
||||||
acc.Time = time.Now()
|
acc.Time = time.Now()
|
||||||
acc.Database = a.Config.Database
|
|
||||||
|
|
||||||
a.conn.Write(acc.BatchPoints)
|
err = a.flush(acc)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-shutdown:
|
case <-shutdown:
|
||||||
|
@ -200,6 +212,22 @@ func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *Agent) flush(bp BatchPoints) error {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
var outerr error
|
||||||
|
for _, o := range a.outputs {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(output *runningOutput) {
|
||||||
|
defer wg.Done()
|
||||||
|
outerr = o.output.Write(bp.BatchPoints)
|
||||||
|
}(o)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
return outerr
|
||||||
|
}
|
||||||
|
|
||||||
func (a *Agent) TestAllPlugins() error {
|
func (a *Agent) TestAllPlugins() error {
|
||||||
var names []string
|
var names []string
|
||||||
|
|
||||||
|
@ -253,13 +281,6 @@ func (a *Agent) Test() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Agent) Run(shutdown chan struct{}) error {
|
func (a *Agent) Run(shutdown chan struct{}) error {
|
||||||
if a.conn == nil {
|
|
||||||
err := a.Connect()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
for _, plugin := range a.plugins {
|
for _, plugin := range a.plugins {
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/influxdb/telegraf"
|
"github.com/influxdb/telegraf"
|
||||||
|
_ "github.com/influxdb/telegraf/outputs/all"
|
||||||
_ "github.com/influxdb/telegraf/plugins/all"
|
_ "github.com/influxdb/telegraf/plugins/all"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -58,6 +59,11 @@ func main() {
|
||||||
ag.Debug = true
|
ag.Debug = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
outputs, err := ag.LoadOutputs()
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
plugins, err := ag.LoadPlugins()
|
plugins, err := ag.LoadPlugins()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
|
@ -94,6 +100,7 @@ func main() {
|
||||||
}()
|
}()
|
||||||
|
|
||||||
log.Print("InfluxDB Agent running")
|
log.Print("InfluxDB Agent running")
|
||||||
|
log.Printf("Loaded outputs: %s", strings.Join(outputs, " "))
|
||||||
log.Printf("Loaded plugins: %s", strings.Join(plugins, " "))
|
log.Printf("Loaded plugins: %s", strings.Join(plugins, " "))
|
||||||
if ag.Debug {
|
if ag.Debug {
|
||||||
log.Printf("Debug: enabled")
|
log.Printf("Debug: enabled")
|
||||||
|
@ -101,8 +108,7 @@ func main() {
|
||||||
ag.Interval, ag.Debug, ag.Hostname)
|
ag.Interval, ag.Debug, ag.Hostname)
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.URL != "" {
|
if len(outputs) > 0 {
|
||||||
log.Printf("Sending metrics to: %s", config.URL)
|
|
||||||
log.Printf("Tags enabled: %v", config.ListTags())
|
log.Printf("Tags enabled: %v", config.ListTags())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
50
config.go
50
config.go
|
@ -29,21 +29,21 @@ func (d *Duration) UnmarshalTOML(b []byte) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
URL string
|
|
||||||
Username string
|
|
||||||
Password string
|
|
||||||
Database string
|
|
||||||
UserAgent string
|
|
||||||
Tags map[string]string
|
Tags map[string]string
|
||||||
|
|
||||||
agent *ast.Table
|
agent *ast.Table
|
||||||
plugins map[string]*ast.Table
|
plugins map[string]*ast.Table
|
||||||
|
outputs map[string]*ast.Table
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Config) Plugins() map[string]*ast.Table {
|
func (c *Config) Plugins() map[string]*ast.Table {
|
||||||
return c.plugins
|
return c.plugins
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Config) Outputs() map[string]*ast.Table {
|
||||||
|
return c.outputs
|
||||||
|
}
|
||||||
|
|
||||||
type ConfiguredPlugin struct {
|
type ConfiguredPlugin struct {
|
||||||
Name string
|
Name string
|
||||||
|
|
||||||
|
@ -77,6 +77,14 @@ func (cp *ConfiguredPlugin) ShouldPass(measurement string) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Config) ApplyOutput(name string, v interface{}) error {
|
||||||
|
if c.outputs[name] != nil {
|
||||||
|
return toml.UnmarshalTable(c.outputs[name], v)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Config) ApplyAgent(v interface{}) error {
|
func (c *Config) ApplyAgent(v interface{}) error {
|
||||||
if c.agent != nil {
|
if c.agent != nil {
|
||||||
return toml.UnmarshalTable(c.agent, v)
|
return toml.UnmarshalTable(c.agent, v)
|
||||||
|
@ -137,15 +145,23 @@ func (c *Config) ApplyPlugin(name string, v interface{}) (*ConfiguredPlugin, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Config) PluginsDeclared() []string {
|
func (c *Config) PluginsDeclared() []string {
|
||||||
var plugins []string
|
return declared(c.plugins)
|
||||||
|
}
|
||||||
|
|
||||||
for name, _ := range c.plugins {
|
func (c *Config) OutputsDeclared() []string {
|
||||||
plugins = append(plugins, name)
|
return declared(c.outputs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func declared(endpoints map[string]*ast.Table) []string {
|
||||||
|
var names []string
|
||||||
|
|
||||||
|
for name, _ := range endpoints {
|
||||||
|
names = append(names, name)
|
||||||
}
|
}
|
||||||
|
|
||||||
sort.Strings(plugins)
|
sort.Strings(names)
|
||||||
|
|
||||||
return plugins
|
return names
|
||||||
}
|
}
|
||||||
|
|
||||||
func DefaultConfig() *Config {
|
func DefaultConfig() *Config {
|
||||||
|
@ -167,6 +183,7 @@ func LoadConfig(path string) (*Config, error) {
|
||||||
|
|
||||||
c := &Config{
|
c := &Config{
|
||||||
plugins: make(map[string]*ast.Table),
|
plugins: make(map[string]*ast.Table),
|
||||||
|
outputs: make(map[string]*ast.Table),
|
||||||
}
|
}
|
||||||
|
|
||||||
for name, val := range tbl.Fields {
|
for name, val := range tbl.Fields {
|
||||||
|
@ -176,13 +193,16 @@ func LoadConfig(path string) (*Config, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
switch name {
|
switch name {
|
||||||
case "influxdb":
|
|
||||||
err := toml.UnmarshalTable(subtbl, c)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
case "agent":
|
case "agent":
|
||||||
c.agent = subtbl
|
c.agent = subtbl
|
||||||
|
case "outputs":
|
||||||
|
for outputName, outputVal := range subtbl.Fields {
|
||||||
|
outputSubtbl, ok := outputVal.(*ast.Table)
|
||||||
|
if !ok {
|
||||||
|
return nil, ErrInvalidConfig
|
||||||
|
}
|
||||||
|
c.outputs[outputName] = outputSubtbl
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
c.plugins[name] = subtbl
|
c.plugins[name] = subtbl
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,5 @@
|
||||||
|
package all
|
||||||
|
|
||||||
|
import (
|
||||||
|
_ "github.com/influxdb/telegraf/outputs/influxdb"
|
||||||
|
)
|
|
@ -0,0 +1,53 @@
|
||||||
|
package influxdb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/url"
|
||||||
|
|
||||||
|
"github.com/influxdb/influxdb/client"
|
||||||
|
"github.com/influxdb/telegraf/outputs"
|
||||||
|
)
|
||||||
|
|
||||||
|
type InfluxDB struct {
|
||||||
|
URL string
|
||||||
|
Username string
|
||||||
|
Password string
|
||||||
|
Database string
|
||||||
|
UserAgent string
|
||||||
|
|
||||||
|
conn *client.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *InfluxDB) Connect() error {
|
||||||
|
u, err := url.Parse(i.URL)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
c, err := client.NewClient(client.Config{
|
||||||
|
URL: *u,
|
||||||
|
Username: i.Username,
|
||||||
|
Password: i.Password,
|
||||||
|
UserAgent: i.UserAgent,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
i.conn = c
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *InfluxDB) Write(bp client.BatchPoints) error {
|
||||||
|
bp.Database = i.Database
|
||||||
|
if _, err := i.conn.Write(bp); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
outputs.Add("influxdb", func() outputs.Output {
|
||||||
|
return &InfluxDB{}
|
||||||
|
})
|
||||||
|
}
|
|
@ -0,0 +1,18 @@
|
||||||
|
package outputs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/influxdb/influxdb/client"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Output interface {
|
||||||
|
Connect() error
|
||||||
|
Write(client.BatchPoints) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type Creator func() Output
|
||||||
|
|
||||||
|
var Outputs = map[string]Creator{}
|
||||||
|
|
||||||
|
func Add(name string, creator Creator) {
|
||||||
|
Outputs[name] = creator
|
||||||
|
}
|
Loading…
Reference in New Issue