package fluentd import ( "encoding/json" "fmt" "io/ioutil" "net/http" "net/url" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" ) const ( measurement = "fluentd" description = "Read metrics exposed by fluentd in_monitor plugin" sampleConfig = ` ## This plugin reads information exposed by fluentd (using /api/plugins.json endpoint). ## ## Endpoint: ## - only one URI is allowed ## - https is not supported endpoint = "http://localhost:24220/api/plugins.json" ## Define which plugins have to be excluded (based on "type" field - e.g. monitor_agent) exclude = [ "monitor_agent", "dummy", ] ` ) // Fluentd - plugin main structure type Fluentd struct { Endpoint string Exclude []string client *http.Client } type endpointInfo struct { Payload []pluginData `json:"plugins"` } type pluginData struct { PluginID string `json:"plugin_id"` PluginType string `json:"type"` PluginCategory string `json:"plugin_category"` RetryCount *float64 `json:"retry_count"` BufferQueueLength *float64 `json:"buffer_queue_length"` BufferTotalQueuedSize *float64 `json:"buffer_total_queued_size"` } // parse JSON from fluentd Endpoint // Parameters: // data: unprocessed json received from endpoint // // Returns: // pluginData: slice that contains parsed plugins // error: error that may have occurred func parse(data []byte) (datapointArray []pluginData, err error) { var endpointData endpointInfo if err = json.Unmarshal(data, &endpointData); err != nil { err = fmt.Errorf("Processing JSON structure") return } for _, point := range endpointData.Payload { datapointArray = append(datapointArray, point) } return } // Description - display description func (h *Fluentd) Description() string { return description } // SampleConfig - generate configuration func (h *Fluentd) SampleConfig() string { return sampleConfig } // Gather - Main code responsible for gathering, processing and creating metrics func (h *Fluentd) Gather(acc telegraf.Accumulator) error { _, err := url.Parse(h.Endpoint) if err != nil { return fmt.Errorf("Invalid URL \"%s\"", h.Endpoint) } if h.client == nil { tr := &http.Transport{ ResponseHeaderTimeout: time.Duration(3 * time.Second), } client := &http.Client{ Transport: tr, Timeout: time.Duration(4 * time.Second), } h.client = client } resp, err := h.client.Get(h.Endpoint) if err != nil { return fmt.Errorf("Unable to perform HTTP client GET on \"%s\": %s", h.Endpoint, err) } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { return fmt.Errorf("Unable to read the HTTP body \"%s\": %s", string(body), err) } if resp.StatusCode != http.StatusOK { return fmt.Errorf("http status ok not met") } dataPoints, err := parse(body) if err != nil { return fmt.Errorf("Problem with parsing") } // Go through all plugins one by one for _, p := range dataPoints { skip := false // Check if this specific type was excluded in configuration for _, exclude := range h.Exclude { if exclude == p.PluginType { skip = true } } // If not, create new metric and add it to Accumulator if !skip { tmpFields := make(map[string]interface{}) tmpTags := map[string]string{ "plugin_id": p.PluginID, "plugin_category": p.PluginCategory, "plugin_type": p.PluginType, } if p.BufferQueueLength != nil { tmpFields["buffer_queue_length"] = *p.BufferQueueLength } if p.RetryCount != nil { tmpFields["retry_count"] = *p.RetryCount } if p.BufferTotalQueuedSize != nil { tmpFields["buffer_total_queued_size"] = *p.BufferTotalQueuedSize } if !((p.BufferQueueLength == nil) && (p.RetryCount == nil) && (p.BufferTotalQueuedSize == nil)) { acc.AddFields(measurement, tmpFields, tmpTags) } } } return nil } func init() { inputs.Add("fluentd", func() telegraf.Input { return &Fluentd{} }) }