telegraf/plugins/inputs/fluentd/fluentd.go

174 lines
3.9 KiB
Go

package fluentd
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
const (
measurement = "fluentd"
description = "Read metrics exposed by fluentd in_monitor plugin"
sampleConfig = `
## This plugin reads information exposed by fluentd (using /api/plugins.json endpoint).
##
## Endpoint:
## - only one URI is allowed
## - https is not supported
endpoint = "http://localhost:24220/api/plugins.json"
## Define which plugins have to be excluded (based on "type" field - e.g. monitor_agent)
exclude = [
"monitor_agent",
"dummy",
]
`
)
// Fluentd - plugin main structure
type Fluentd struct {
Endpoint string
Exclude []string
client *http.Client
}
type endpointInfo struct {
Payload []pluginData `json:"plugins"`
}
type pluginData struct {
PluginID string `json:"plugin_id"`
PluginType string `json:"type"`
PluginCategory string `json:"plugin_category"`
RetryCount *float64 `json:"retry_count"`
BufferQueueLength *float64 `json:"buffer_queue_length"`
BufferTotalQueuedSize *float64 `json:"buffer_total_queued_size"`
}
// parse JSON from fluentd Endpoint
// Parameters:
// data: unprocessed json recivied from endpoint
//
// Returns:
// pluginData: slice that contains parsed plugins
// error: error that may have occurred
func parse(data []byte) (datapointArray []pluginData, err error) {
var endpointData endpointInfo
if err = json.Unmarshal(data, &endpointData); err != nil {
err = fmt.Errorf("Processing JSON structure")
return
}
for _, point := range endpointData.Payload {
datapointArray = append(datapointArray, point)
}
return
}
// Description - display description
func (h *Fluentd) Description() string { return description }
// SampleConfig - generate configuretion
func (h *Fluentd) SampleConfig() string { return sampleConfig }
// Gather - Main code responsible for gathering, processing and creating metrics
func (h *Fluentd) Gather(acc telegraf.Accumulator) error {
_, err := url.Parse(h.Endpoint)
if err != nil {
return fmt.Errorf("Invalid URL \"%s\"", h.Endpoint)
}
if h.client == nil {
tr := &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}
client := &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}
h.client = client
}
resp, err := h.client.Get(h.Endpoint)
if err != nil {
return fmt.Errorf("Unable to perform HTTP client GET on \"%s\": %s", h.Endpoint, err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("Unable to read the HTTP body \"%s\": %s", string(body), err)
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("http status ok not met")
}
dataPoints, err := parse(body)
if err != nil {
return fmt.Errorf("Problem with parsing")
}
// Go through all plugins one by one
for _, p := range dataPoints {
skip := false
// Check if this specific type was excluded in configuration
for _, exclude := range h.Exclude {
if exclude == p.PluginType {
skip = true
}
}
// If not, create new metric and add it to Accumulator
if !skip {
tmpFields := make(map[string]interface{})
tmpTags := map[string]string{
"plugin_id": p.PluginID,
"plugin_category": p.PluginCategory,
"plugin_type": p.PluginType,
}
if p.BufferQueueLength != nil {
tmpFields["buffer_queue_length"] = p.BufferQueueLength
}
if p.RetryCount != nil {
tmpFields["retry_count"] = p.RetryCount
}
if p.BufferTotalQueuedSize != nil {
tmpFields["buffer_total_queued_size"] = p.BufferTotalQueuedSize
}
if !((p.BufferQueueLength == nil) && (p.RetryCount == nil) && (p.BufferTotalQueuedSize == nil)) {
acc.AddFields(measurement, tmpFields, tmpTags)
}
}
}
return nil
}
func init() {
inputs.Add("fluentd", func() telegraf.Input { return &Fluentd{} })
}