diff --git a/plugins/inputs/README.md b/plugins/inputs/README.md new file mode 100644 index 000000000..a73ed5bd5 --- /dev/null +++ b/plugins/inputs/README.md @@ -0,0 +1,37 @@ +# Telegraf Input Plugin: Fleet + +The plugin will gather names of running units from [fleet](https://github.com/coreos/fleet) and the sum total of each running unit. It uses the fleet v1 API to gather data. + +### Configuration: + +```toml +# Description +[[inputs.fleet]] +## Works with Fleet HTTP API +## Multiple Hosts from which to read Fleet stats: + hosts = ["http://localhost:49153/fleet/v1/state"] +``` + +### Measurements & Fields: + +The fields are dynamically generated from the output of the fleet API. Using the ```name``` value.. The values of those fields are the number of containers with the ```systemdSubState``` value of "running". + + +The unit names will have their instanced id and the @ symbol stripped off. +For example if you had a unit named ```nginx-1.10.1@35``` the field name would be ```nginx-1.10.1```. + +- fleet + - `````` (int) + +### Tags: + +- All measurements have the following tags: + - server (name of the host/container telegraf is running on) + +### Example Output: + +``` +$ ./telegraf -config telegraf.conf -input-filter example -test +* Plugin: fleet, Collection 1 +> fleet,host=localhost.local,server=http://fleet.testserver.com:49153/fleet/v1/state some-api=2i,test-application=1i,webapp=1i,nginx=2i,redis=1i 1470615664000000000 +``` \ No newline at end of file diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index ddb7d4039..2c25a3bca 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -20,6 +20,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/elasticsearch" _ "github.com/influxdata/telegraf/plugins/inputs/exec" _ "github.com/influxdata/telegraf/plugins/inputs/filestat" + _ "github.com/influxdata/telegraf/plugins/inputs/fleet" _ "github.com/influxdata/telegraf/plugins/inputs/graylog" _ "github.com/influxdata/telegraf/plugins/inputs/haproxy" _ "github.com/influxdata/telegraf/plugins/inputs/hddtemp" diff --git a/plugins/inputs/fleet.go b/plugins/inputs/fleet.go new file mode 100644 index 000000000..0f885c477 --- /dev/null +++ b/plugins/inputs/fleet.go @@ -0,0 +1,161 @@ +/* +* @Author: Jim Weber +* @Date: 2016-05-18 22:07:31 +* @Last Modified by: Jim Weber +* @Last Modified time: 2016-08-07 20:20:26 + */ + +package fleet + +import ( + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net/http" + "os" + "strings" + "sync" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" +) + +// FleetStates struct to hold all the data for a machine state +type FleetStates struct { + States []struct { + SystemdActiveState string `json:"systemdActiveState"` + MachineID string `json:"machineID"` + Hash string `json:"hash"` + SystemdSubState string `json:"systemdSubState"` + Name string `json:"name"` + SystemdLoadState string `json:"systemdLoadState"` + } +} + +// Fleet struct to hold fleet hosts +type Fleet struct { + Hosts []string `toml:"hosts"` +} + +// Description - Method to provide description of plugin +func (f *Fleet) Description() string { + return "Fleetd Plugin to glather information about container states in fleet cluster" +} + +// SampleConfig output sample config for this plugin +func (*Fleet) SampleConfig() string { + return ` +# Description +[[inputs.fleet]] +## Works with Fleet HTTP API +## Multiple Hosts from which to read Fleet stats: + host = ["http://localhost:49153/fleet/v1/state"] +` +} + +// Gather method to gather stats for telegraf input +func (f *Fleet) Gather(accumulator telegraf.Accumulator) error { + errorChannel := make(chan error, len(f.Hosts)) + var wg sync.WaitGroup + for _, u := range f.Hosts { + wg.Add(1) + go func(host string) { + defer wg.Done() + if err := f.fetchAndReturnData(accumulator, host); err != nil { + errorChannel <- fmt.Errorf("[host=%s]: %s", host, err) + } + }(u) + } + + wg.Wait() + close(errorChannel) + + // If there weren't any errors, we can return nil now. + if len(errorChannel) == 0 { + return nil + } + + // There were errors, so join them all together as one big error. + errorStrings := make([]string, 0, len(errorChannel)) + for err := range errorChannel { + errorStrings = append(errorStrings, err.Error()) + } + + return errors.New(strings.Join(errorStrings, "\n")) +} + +var tr = &http.Transport{ + ResponseHeaderTimeout: time.Duration(3 * time.Second), +} + +var client = &http.Client{ + Transport: tr, + Timeout: time.Duration(4 * time.Second), +} + +func (f *Fleet) fetchAndReturnData(accumulator telegraf.Accumulator, host string) error { + _, error := client.Get(host) + if error != nil { + return error + } + + fleetStates := getInstanceStates(host, nil) + containerCounts := getContainerCount(fleetStates) + fields := make(map[string]interface{}) + tags := make(map[string]string) + + for k, v := range containerCounts { + fields[k] = v + } + + // create tags for each host if needed + tags["server"] = host + + accumulator.AddFields("fleet", fields, tags) + return nil +} + +func getInstanceStates(host string, params map[string]string) FleetStates { + + response, err := http.Get(host) + fleetStates := FleetStates{} + + if err != nil { + fmt.Printf("%s", err) + } else { + defer response.Body.Close() + contents, err := ioutil.ReadAll(response.Body) + if err != nil { + fmt.Printf("%s", err) + os.Exit(1) + } + + if err := json.Unmarshal(contents, &fleetStates); err != nil { + panic(err) + } + + } + + return fleetStates +} + +func getContainerCount(fleetUnits FleetStates) map[string]int { + containerCount := make(map[string]int) + for _, fleetUnit := range fleetUnits.States { + shortNameParts := strings.Split(fleetUnit.Name, "@") + shortName := shortNameParts[0] + if fleetUnit.SystemdSubState == "running" { + containerCount[shortName]++ + } + } + + return containerCount +} + +func init() { + inputs.Add("fleet", func() telegraf.Input { + return &Fleet{} + }) +}