Make review changes to logstash input (#6299)

This commit is contained in:
Daniel Nelson 2019-08-21 18:04:30 -07:00 committed by GitHub
parent 5c8d0e3ac9
commit 8b938f3bd4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 226 additions and 296 deletions

5
.gitignore vendored
View File

@ -1,10 +1,5 @@
# Build and binaries
/build /build
/telegraf /telegraf
/telegraf.exe /telegraf.exe
/telegraf.gz /telegraf.gz
/vendor /vendor
# Editor files
*~
.idea

36
internal/choice/choice.go Normal file
View File

@ -0,0 +1,36 @@
// Package choice provides basic functions for working with
// plugin options that must be one of several values.
package choice
import "fmt"
// Contains return true if the choice in the list of choices.
func Contains(choice string, choices []string) bool {
for _, item := range choices {
if item == choice {
return true
}
}
return false
}
// CheckSContains returns an error if a choice is not one of
// the available choices.
func Check(choice string, available []string) error {
if !Contains(choice, available) {
return fmt.Errorf("unknown choice %s", choice)
}
return nil
}
// CheckSliceContains returns an error if the choices is not a subset of
// available.
func CheckSlice(choices, available []string) error {
for _, choice := range choices {
err := Check(choice, available)
if err != nil {
return err
}
}
return nil
}

View File

@ -3,62 +3,52 @@
This plugin reads metrics exposed by This plugin reads metrics exposed by
[Logstash Monitoring API](https://www.elastic.co/guide/en/logstash/current/monitoring-logstash.html). [Logstash Monitoring API](https://www.elastic.co/guide/en/logstash/current/monitoring-logstash.html).
### Configuration: Logstash 5 and later is supported.
### Configuration
```toml ```toml
## This plugin reads metrics exposed by Logstash Monitoring API. [[inputs.logstash]]
## https://www.elastic.co/guide/en/logstash/current/monitoring.html ## The URL of the exposed Logstash API endpoint.
## The URL of the exposed Logstash API endpoint
url = "http://127.0.0.1:9600" url = "http://127.0.0.1:9600"
## Enable Logstash 6+ multi-pipeline statistics support ## Use Logstash 5 single pipeline API, set to true when monitoring
multi_pipeline = true ## Logstash 5.
# single_pipeline = false
## Should the general process statistics be gathered ## Enable optional collection components. Can contain
collect_process_stats = true ## "pipelines", "process", and "jvm".
# collect = ["pipelines", "process", "jvm"]
## Should the JVM specific statistics be gathered ## Timeout for HTTP requests.
collect_jvm_stats = true # timeout = "5s"
## Should the event pipelines statistics be gathered ## Optional HTTP Basic Auth credentials.
collect_pipelines_stats = true
## Should the plugin statistics be gathered
collect_plugins_stats = true
## Should the queue statistics be gathered
collect_queue_stats = true
## HTTP method
# method = "GET"
## Optional HTTP headers
# headers = {"X-Special-Header" = "Special-Value"}
## Override HTTP "Host" header
# host_header = "logstash.example.com"
## Timeout for HTTP requests
timeout = "5s"
## Optional HTTP Basic Auth credentials
# username = "username" # username = "username"
# password = "pa$$word" # password = "pa$$word"
## Optional TLS Config ## Optional TLS Config.
# tls_ca = "/etc/telegraf/ca.pem" # tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem" # tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem" # tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification ## Use TLS but skip chain & host verification.
# insecure_skip_verify = false # insecure_skip_verify = false
## Optional HTTP headers.
# [inputs.logstash.headers]
# "X-Special-Header" = "Special-Value"
``` ```
### Measurements & Fields: ### Metrics
- **logstash_jvm** - logstash_jvm
* Fields: - tags:
- node_id
- node_name
- node_host
- node_version
- fields:
- threads_peak_count - threads_peak_count
- mem_pools_survivor_peak_max_in_bytes - mem_pools_survivor_peak_max_in_bytes
- mem_pools_survivor_max_in_bytes - mem_pools_survivor_max_in_bytes
@ -87,14 +77,14 @@ This plugin reads metrics exposed by
- mem_heap_used_in_bytes - mem_heap_used_in_bytes
- gc_collectors_young_collection_count - gc_collectors_young_collection_count
- uptime_in_millis - uptime_in_millis
* Tags:
+ logstash_process
- tags:
- node_id - node_id
- node_name - node_name
- node_host - source
- node_version - node_version
- fields:
- **logstash_process**
* Fields:
- open_file_descriptors - open_file_descriptors
- cpu_load_average_1m - cpu_load_average_1m
- cpu_load_average_5m - cpu_load_average_5m
@ -105,85 +95,60 @@ This plugin reads metrics exposed by
- max_file_descriptors - max_file_descriptors
- mem_total_virtual_in_bytes - mem_total_virtual_in_bytes
- mem_total_virtual_in_bytes - mem_total_virtual_in_bytes
* Tags:
- logstash_events
- tags:
- node_id - node_id
- node_name - node_name
- node_host - source
- node_version - node_version
- pipeline (for Logstash 6+)
- **logstash_events** - fields:
* Fields:
- queue_push_duration_in_millis - queue_push_duration_in_millis
- duration_in_millis - duration_in_millis
- in - in
- filtered - filtered
- out - out
* Tags:
+ logstash_plugins
- tags:
- node_id - node_id
- node_name - node_name
- node_host - source
- node_version - node_version
- pipeline (for Logstash 6 only) - pipeline (for Logstash 6+)
- plugin_id
- **logstash_plugins** - plugin_name
* Fields: - plugin_type
- fields:
- queue_push_duration_in_millis (for input plugins only) - queue_push_duration_in_millis (for input plugins only)
- duration_in_millis - duration_in_millis
- in - in
- out - out
* Tags:
- logstash_queue
- tags:
- node_id - node_id
- node_name - node_name
- node_host - source
- node_version - node_version
- pipeline (for Logstash 6 only) - pipeline (for Logstash 6+)
- plugin_id - queue_type
- plugin_name - fields:
- plugin_type
- **logstash_queue**
* Fields:
- events - events
- free_space_in_bytes - free_space_in_bytes
- max_queue_size_in_bytes - max_queue_size_in_bytes
- max_unread_events - max_unread_events
- page_capacity_in_bytes - page_capacity_in_bytes
- queue_size_in_bytes - queue_size_in_bytes
* Tags:
- node_id
- node_name
- node_host
- node_version
- pipeline (for Logstash 6 only)
- queue_type
### Tags description ### Example Output
- node_id - The uuid of the logstash node. Randomly generated.
- node_name - The name of the logstash node. Can be defined in the *logstash.yml* or defaults to the hostname.
Can be used to break apart metrics from different logstash instances of the same host.
- node_host - The hostname of the logstash node.
Can be different from the telegraf's host if a remote connection to logstash instance is used.
- node_version - The version of logstash service running on this node.
- pipeline (for Logstash 6 only) - The name of a pipeline if multi-pipeline is configured.
Will defaults to "main" if there is only one pipeline and will be missing for logstash 5.
- plugin_id - The unique id of this plugin.
It will be a randomly generated string unless it's defined in the logstash pipeline config file.
- plugin_name - The name of this plugin. i.e. file, elasticsearch, date, mangle.
- plugin_type - The type of this plugin i.e. input/filter/output.
- queue_type - The type of the event queue (memory/persisted).
### Example Output:
``` ```
$ ./telegraf -config telegraf.conf -input-filter logstash -test logstash_jvm,node_id=3da53ed0-a946-4a33-9cdb-33013f2273f6,node_name=debian-stretch-logstash6.virt,node_version=6.8.1,source=debian-stretch-logstash6.virt gc_collectors_old_collection_count=2,gc_collectors_old_collection_time_in_millis=100,gc_collectors_young_collection_count=26,gc_collectors_young_collection_time_in_millis=1028,mem_heap_committed_in_bytes=1056309248,mem_heap_max_in_bytes=1056309248,mem_heap_used_in_bytes=207216328,mem_heap_used_percent=19,mem_non_heap_committed_in_bytes=160878592,mem_non_heap_used_in_bytes=140838184,mem_pools_old_committed_in_bytes=899284992,mem_pools_old_max_in_bytes=899284992,mem_pools_old_peak_max_in_bytes=899284992,mem_pools_old_peak_used_in_bytes=189468088,mem_pools_old_used_in_bytes=189468088,mem_pools_survivor_committed_in_bytes=17432576,mem_pools_survivor_max_in_bytes=17432576,mem_pools_survivor_peak_max_in_bytes=17432576,mem_pools_survivor_peak_used_in_bytes=17432576,mem_pools_survivor_used_in_bytes=12572640,mem_pools_young_committed_in_bytes=139591680,mem_pools_young_max_in_bytes=139591680,mem_pools_young_peak_max_in_bytes=139591680,mem_pools_young_peak_used_in_bytes=139591680,mem_pools_young_used_in_bytes=5175600,threads_count=20,threads_peak_count=24,uptime_in_millis=739089 1566425244000000000
logstash_process,node_id=3da53ed0-a946-4a33-9cdb-33013f2273f6,node_name=debian-stretch-logstash6.virt,node_version=6.8.1,source=debian-stretch-logstash6.virt cpu_load_average_15m=0.03,cpu_load_average_1m=0.01,cpu_load_average_5m=0.04,cpu_percent=0,cpu_total_in_millis=83230,max_file_descriptors=16384,mem_total_virtual_in_bytes=3689132032,open_file_descriptors=118,peak_open_file_descriptors=118 1566425244000000000
> logstash_jvm,host=node-6,node_host=node-6,node_id=3044f675-21ce-4335-898a-8408aa678245,node_name=node-6-test,node_version=6.4.2 logstash_events,node_id=3da53ed0-a946-4a33-9cdb-33013f2273f6,node_name=debian-stretch-logstash6.virt,node_version=6.8.1,pipeline=main,source=debian-stretch-logstash6.virt duration_in_millis=0,filtered=0,in=0,out=0,queue_push_duration_in_millis=0 1566425244000000000
gc_collectors_old_collection_count=5,gc_collectors_old_collection_time_in_millis=702,gc_collectors_young_collection_count=95,gc_collectors_young_collection_time_in_millis=4772,mem_heap_committed_in_bytes=360804352,mem_heap_max_in_bytes=8389328896,mem_heap_used_in_bytes=252629768,mem_heap_used_percent=3,mem_non_heap_committed_in_bytes=212144128,mem_non_heap_used_in_bytes=188726024,mem_pools_old_committed_in_bytes=280260608,mem_pools_old_max_in_bytes=6583418880,mem_pools_old_peak_max_in_bytes=6583418880,mem_pools_old_peak_used_in_bytes=235352976,mem_pools_old_used_in_bytes=194754608,mem_pools_survivor_committed_in_bytes=8912896,mem_pools_survivor_max_in_bytes=200605696,mem_pools_survivor_peak_max_in_bytes=200605696,mem_pools_survivor_peak_used_in_bytes=8912896,mem_pools_survivor_used_in_bytes=4476680,mem_pools_young_committed_in_bytes=71630848,mem_pools_young_max_in_bytes=1605304320,mem_pools_young_peak_max_in_bytes=1605304320,mem_pools_young_peak_used_in_bytes=71630848,mem_pools_young_used_in_bytes=53398480,threads_count=60,threads_peak_count=62,uptime_in_millis=10469014 1540289864000000000 logstash_plugins,node_id=3da53ed0-a946-4a33-9cdb-33013f2273f6,node_name=debian-stretch-logstash6.virt,node_version=6.8.1,pipeline=main,plugin_id=2807cb8610ba7854efa9159814fcf44c3dda762b43bd088403b30d42c88e69ab,plugin_name=beats,plugin_type=input,source=debian-stretch-logstash6.virt out=0,queue_push_duration_in_millis=0 1566425244000000000
> logstash_process,host=node-6,node_host=node-6,node_id=3044f675-21ce-4335-898a-8408aa678245,node_name=node-6-test,node_version=6.4.2 logstash_plugins,node_id=3da53ed0-a946-4a33-9cdb-33013f2273f6,node_name=debian-stretch-logstash6.virt,node_version=6.8.1,pipeline=main,plugin_id=7a6c973366186a695727c73935634a00bccd52fceedf30d0746983fce572d50c,plugin_name=file,plugin_type=output,source=debian-stretch-logstash6.virt duration_in_millis=0,in=0,out=0 1566425244000000000
cpu_load_average_15m=39.84,cpu_load_average_1m=32.87,cpu_load_average_5m=39.23,cpu_percent=0,cpu_total_in_millis=389920,max_file_descriptors=262144,mem_total_virtual_in_bytes=17921355776,open_file_descriptors=132,peak_open_file_descriptors=140 1540289864000000000 logstash_queue,node_id=3da53ed0-a946-4a33-9cdb-33013f2273f6,node_name=debian-stretch-logstash6.virt,node_version=6.8.1,pipeline=main,queue_type=memory,source=debian-stretch-logstash6.virt events=0 1566425244000000000
> logstash_events,host=node-6,node_host=node-6,node_id=3044f675-21ce-4335-898a-8408aa678245,node_name=node-6-test,node_version=6.4.2,pipeline=main
duration_in_millis=175144,filtered=4543,in=4543,out=4543,queue_push_duration_in_millis=19 1540289864000000000
> logstash_plugins,host=node-6,node_host=node-6,node_id=3044f675-21ce-4335-898a-8408aa678245,node_name=node-6-test,node_version=6.4.2,pipeline=main,plugin_id=input-kafka,plugin_name=kafka,plugin_type=input
duration_in_millis=0,in=0,out=4543,queue_push_duration_in_millis=19 1540289864000000000
``` ```

View File

@ -2,87 +2,64 @@ package logstash
import ( import (
"encoding/json" "encoding/json"
"github.com/influxdata/telegraf" "fmt"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
"net/http" "net/http"
"net/url" "net/url"
"strings"
"time" "time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
jsonParser "github.com/influxdata/telegraf/plugins/parsers/json" jsonParser "github.com/influxdata/telegraf/plugins/parsers/json"
) )
// ##### Interface #####
const sampleConfig = ` const sampleConfig = `
## This plugin reads metrics exposed by Logstash Monitoring API. ## The URL of the exposed Logstash API endpoint.
## https://www.elastic.co/guide/en/logstash/current/monitoring.html
## The URL of the exposed Logstash API endpoint
url = "http://127.0.0.1:9600" url = "http://127.0.0.1:9600"
## Enable Logstash 6+ multi-pipeline statistics support ## Use Logstash 5 single pipeline API, set to true when monitoring
multi_pipeline = true ## Logstash 5.
# single_pipeline = false
## Should the general process statistics be gathered ## Enable optional collection components. Can contain
collect_process_stats = true ## "pipelines", "process", and "jvm".
# collect = ["pipelines", "process", "jvm"]
## Should the JVM specific statistics be gathered ## Timeout for HTTP requests.
collect_jvm_stats = true # timeout = "5s"
## Should the event pipelines statistics be gathered ## Optional HTTP Basic Auth credentials.
collect_pipelines_stats = true
## Should the plugin statistics be gathered
collect_plugins_stats = true
## Should the queue statistics be gathered
collect_queue_stats = true
## HTTP method
# method = "GET"
## Optional HTTP headers
# headers = {"X-Special-Header" = "Special-Value"}
## Override HTTP "Host" header
# host_header = "logstash.example.com"
## Timeout for HTTP requests
timeout = "5s"
## Optional HTTP Basic Auth credentials
# username = "username" # username = "username"
# password = "pa$$word" # password = "pa$$word"
## Optional TLS Config ## Optional TLS Config.
# tls_ca = "/etc/telegraf/ca.pem" # tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem" # tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem" # tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification ## Use TLS but skip chain & host verification.
# insecure_skip_verify = false # insecure_skip_verify = false
## Optional HTTP headers.
# [inputs.logstash.headers]
# "X-Special-Header" = "Special-Value"
` `
type Logstash struct { type Logstash struct {
URL string `toml:"url"` URL string `toml:"url"`
MultiPipeline bool `toml:"multi_pipeline"` SinglePipeline bool `toml:"single_pipeline"`
CollectProcessStats bool `toml:"collect_process_stats"` Collect []string `toml:"collect"`
CollectJVMStats bool `toml:"collect_jvm_stats"`
CollectPipelinesStats bool `toml:"collect_pipelines_stats"`
CollectPluginsStats bool `toml:"collect_plugins_stats"`
CollectQueueStats bool `toml:"collect_queue_stats"`
Username string `toml:"username"` Username string `toml:"username"`
Password string `toml:"password"` Password string `toml:"password"`
Method string `toml:"method"`
Headers map[string]string `toml:"headers"` Headers map[string]string `toml:"headers"`
HostHeader string `toml:"host_header"`
Timeout internal.Duration `toml:"timeout"` Timeout internal.Duration `toml:"timeout"`
tls.ClientConfig tls.ClientConfig
client *http.Client client *http.Client
} }
@ -90,26 +67,13 @@ type Logstash struct {
func NewLogstash() *Logstash { func NewLogstash() *Logstash {
return &Logstash{ return &Logstash{
URL: "http://127.0.0.1:9600", URL: "http://127.0.0.1:9600",
MultiPipeline: true, SinglePipeline: false,
CollectProcessStats: true, Collect: []string{"pipelines", "process", "jvm"},
CollectJVMStats: true,
CollectPipelinesStats: true,
CollectPluginsStats: true,
CollectQueueStats: true,
Method: "GET",
Headers: make(map[string]string), Headers: make(map[string]string),
HostHeader: "",
Timeout: internal.Duration{Duration: time.Second * 5}, Timeout: internal.Duration{Duration: time.Second * 5},
} }
} }
// init initialise this plugin instance
func init() {
inputs.Add("logstash", func() telegraf.Input {
return NewLogstash()
})
}
// Description returns short info about plugin // Description returns short info about plugin
func (logstash *Logstash) Description() string { func (logstash *Logstash) Description() string {
return "Read metrics exposed by Logstash" return "Read metrics exposed by Logstash"
@ -183,6 +147,14 @@ const processStats = "/_node/stats/process"
const pipelinesStats = "/_node/stats/pipelines" const pipelinesStats = "/_node/stats/pipelines"
const pipelineStats = "/_node/stats/pipeline" const pipelineStats = "/_node/stats/pipeline"
func (i *Logstash) Init() error {
err := choice.CheckSlice(i.Collect, []string{"pipelines", "process", "jvm"})
if err != nil {
return fmt.Errorf(`cannot verify "collect" setting: %v`, err)
}
return nil
}
// createHttpClient create a clients to access API // createHttpClient create a clients to access API
func (logstash *Logstash) createHttpClient() (*http.Client, error) { func (logstash *Logstash) createHttpClient() (*http.Client, error) {
tlsConfig, err := logstash.ClientConfig.TLSConfig() tlsConfig, err := logstash.ClientConfig.TLSConfig()
@ -202,15 +174,7 @@ func (logstash *Logstash) createHttpClient() (*http.Client, error) {
// gatherJsonData query the data source and parse the response JSON // gatherJsonData query the data source and parse the response JSON
func (logstash *Logstash) gatherJsonData(url string, value interface{}) error { func (logstash *Logstash) gatherJsonData(url string, value interface{}) error {
request, err := http.NewRequest("GET", url, nil)
var method string
if logstash.Method != "" {
method = logstash.Method
} else {
method = "GET"
}
request, err := http.NewRequest(method, url, nil)
if err != nil { if err != nil {
return err return err
} }
@ -218,11 +182,13 @@ func (logstash *Logstash) gatherJsonData(url string, value interface{}) error {
if (logstash.Username != "") || (logstash.Password != "") { if (logstash.Username != "") || (logstash.Password != "") {
request.SetBasicAuth(logstash.Username, logstash.Password) request.SetBasicAuth(logstash.Username, logstash.Password)
} }
for header, value := range logstash.Headers { for header, value := range logstash.Headers {
if strings.ToLower(header) == "host" {
request.Host = value
} else {
request.Header.Add(header, value) request.Header.Add(header, value)
} }
if logstash.HostHeader != "" {
request.Host = logstash.HostHeader
} }
response, err := logstash.client.Do(request) response, err := logstash.client.Do(request)
@ -252,8 +218,8 @@ func (logstash *Logstash) gatherJVMStats(url string, accumulator telegraf.Accumu
tags := map[string]string{ tags := map[string]string{
"node_id": jvmStats.ID, "node_id": jvmStats.ID,
"node_name": jvmStats.Name, "node_name": jvmStats.Name,
"node_host": jvmStats.Host,
"node_version": jvmStats.Version, "node_version": jvmStats.Version,
"source": jvmStats.Host,
} }
flattener := jsonParser.JSONFlattener{} flattener := jsonParser.JSONFlattener{}
@ -278,8 +244,8 @@ func (logstash *Logstash) gatherProcessStats(url string, accumulator telegraf.Ac
tags := map[string]string{ tags := map[string]string{
"node_id": processStats.ID, "node_id": processStats.ID,
"node_name": processStats.Name, "node_name": processStats.Name,
"node_host": processStats.Host,
"node_version": processStats.Version, "node_version": processStats.Version,
"source": processStats.Host,
} }
flattener := jsonParser.JSONFlattener{} flattener := jsonParser.JSONFlattener{}
@ -368,8 +334,8 @@ func (logstash *Logstash) gatherPipelineStats(url string, accumulator telegraf.A
tags := map[string]string{ tags := map[string]string{
"node_id": pipelineStats.ID, "node_id": pipelineStats.ID,
"node_name": pipelineStats.Name, "node_name": pipelineStats.Name,
"node_host": pipelineStats.Host,
"node_version": pipelineStats.Version, "node_version": pipelineStats.Version,
"source": pipelineStats.Host,
} }
flattener := jsonParser.JSONFlattener{} flattener := jsonParser.JSONFlattener{}
@ -379,7 +345,6 @@ func (logstash *Logstash) gatherPipelineStats(url string, accumulator telegraf.A
} }
accumulator.AddFields("logstash_events", flattener.Fields, tags) accumulator.AddFields("logstash_events", flattener.Fields, tags)
if logstash.CollectPluginsStats {
err = logstash.gatherPluginsStats(pipelineStats.Pipeline.Plugins.Inputs, "input", tags, accumulator) err = logstash.gatherPluginsStats(pipelineStats.Pipeline.Plugins.Inputs, "input", tags, accumulator)
if err != nil { if err != nil {
return err return err
@ -392,10 +357,10 @@ func (logstash *Logstash) gatherPipelineStats(url string, accumulator telegraf.A
if err != nil { if err != nil {
return err return err
} }
}
if logstash.CollectQueueStats {
err = logstash.gatherQueueStats(&pipelineStats.Pipeline.Queue, tags, accumulator) err = logstash.gatherQueueStats(&pipelineStats.Pipeline.Queue, tags, accumulator)
if err != nil {
return err
} }
return nil return nil
@ -414,9 +379,9 @@ func (logstash *Logstash) gatherPipelinesStats(url string, accumulator telegraf.
tags := map[string]string{ tags := map[string]string{
"node_id": pipelinesStats.ID, "node_id": pipelinesStats.ID,
"node_name": pipelinesStats.Name, "node_name": pipelinesStats.Name,
"node_host": pipelinesStats.Host,
"node_version": pipelinesStats.Version, "node_version": pipelinesStats.Version,
"pipeline": pipelineName, "pipeline": pipelineName,
"source": pipelinesStats.Host,
} }
flattener := jsonParser.JSONFlattener{} flattener := jsonParser.JSONFlattener{}
@ -426,7 +391,6 @@ func (logstash *Logstash) gatherPipelinesStats(url string, accumulator telegraf.
} }
accumulator.AddFields("logstash_events", flattener.Fields, tags) accumulator.AddFields("logstash_events", flattener.Fields, tags)
if logstash.CollectPluginsStats {
err = logstash.gatherPluginsStats(pipeline.Plugins.Inputs, "input", tags, accumulator) err = logstash.gatherPluginsStats(pipeline.Plugins.Inputs, "input", tags, accumulator)
if err != nil { if err != nil {
return err return err
@ -439,12 +403,11 @@ func (logstash *Logstash) gatherPipelinesStats(url string, accumulator telegraf.
if err != nil { if err != nil {
return err return err
} }
}
if logstash.CollectQueueStats {
err = logstash.gatherQueueStats(&pipeline.Queue, tags, accumulator) err = logstash.gatherQueueStats(&pipeline.Queue, tags, accumulator)
if err != nil {
return err
} }
} }
return nil return nil
@ -452,7 +415,6 @@ func (logstash *Logstash) gatherPipelinesStats(url string, accumulator telegraf.
// Gather ask this plugin to start gathering metrics // Gather ask this plugin to start gathering metrics
func (logstash *Logstash) Gather(accumulator telegraf.Accumulator) error { func (logstash *Logstash) Gather(accumulator telegraf.Accumulator) error {
if logstash.client == nil { if logstash.client == nil {
client, err := logstash.createHttpClient() client, err := logstash.createHttpClient()
@ -462,7 +424,7 @@ func (logstash *Logstash) Gather(accumulator telegraf.Accumulator) error {
logstash.client = client logstash.client = client
} }
if logstash.CollectJVMStats { if choice.Contains("jvm", logstash.Collect) {
jvmUrl, err := url.Parse(logstash.URL + jvmStats) jvmUrl, err := url.Parse(logstash.URL + jvmStats)
if err != nil { if err != nil {
return err return err
@ -472,7 +434,7 @@ func (logstash *Logstash) Gather(accumulator telegraf.Accumulator) error {
} }
} }
if logstash.CollectProcessStats { if choice.Contains("process", logstash.Collect) {
processUrl, err := url.Parse(logstash.URL + processStats) processUrl, err := url.Parse(logstash.URL + processStats)
if err != nil { if err != nil {
return err return err
@ -482,16 +444,8 @@ func (logstash *Logstash) Gather(accumulator telegraf.Accumulator) error {
} }
} }
if logstash.CollectPipelinesStats { if choice.Contains("pipelines", logstash.Collect) {
if logstash.MultiPipeline { if logstash.SinglePipeline {
pipelinesUrl, err := url.Parse(logstash.URL + pipelinesStats)
if err != nil {
return err
}
if err := logstash.gatherPipelinesStats(pipelinesUrl.String(), accumulator); err != nil {
return err
}
} else {
pipelineUrl, err := url.Parse(logstash.URL + pipelineStats) pipelineUrl, err := url.Parse(logstash.URL + pipelineStats)
if err != nil { if err != nil {
return err return err
@ -499,8 +453,23 @@ func (logstash *Logstash) Gather(accumulator telegraf.Accumulator) error {
if err := logstash.gatherPipelineStats(pipelineUrl.String(), accumulator); err != nil { if err := logstash.gatherPipelineStats(pipelineUrl.String(), accumulator); err != nil {
return err return err
} }
} else {
pipelinesUrl, err := url.Parse(logstash.URL + pipelinesStats)
if err != nil {
return err
}
if err := logstash.gatherPipelinesStats(pipelinesUrl.String(), accumulator); err != nil {
return err
}
} }
} }
return nil return nil
} }
// init registers this plugin instance
func init() {
inputs.Add("logstash", func() telegraf.Input {
return NewLogstash()
})
}

View File

@ -9,7 +9,6 @@ import (
"testing" "testing"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
) )
var logstashTest = NewLogstash() var logstashTest = NewLogstash()
@ -66,7 +65,7 @@ func Test_Logstash5GatherProcessStats(test *testing.T) {
map[string]string{ map[string]string{
"node_id": string("a360d8cf-6289-429d-8419-6145e324b574"), "node_id": string("a360d8cf-6289-429d-8419-6145e324b574"),
"node_name": string("node-5-test"), "node_name": string("node-5-test"),
"node_host": string("node-5"), "source": string("node-5"),
"node_version": string("5.3.0"), "node_version": string("5.3.0"),
}, },
) )
@ -115,7 +114,7 @@ func Test_Logstash6GatherProcessStats(test *testing.T) {
map[string]string{ map[string]string{
"node_id": string("3044f675-21ce-4335-898a-8408aa678245"), "node_id": string("3044f675-21ce-4335-898a-8408aa678245"),
"node_name": string("node-6-test"), "node_name": string("node-6-test"),
"node_host": string("node-6"), "source": string("node-6"),
"node_version": string("6.4.2"), "node_version": string("6.4.2"),
}, },
) )
@ -160,7 +159,7 @@ func Test_Logstash5GatherPipelineStats(test *testing.T) {
map[string]string{ map[string]string{
"node_id": string("a360d8cf-6289-429d-8419-6145e324b574"), "node_id": string("a360d8cf-6289-429d-8419-6145e324b574"),
"node_name": string("node-5-test"), "node_name": string("node-5-test"),
"node_host": string("node-5"), "source": string("node-5"),
"node_version": string("5.3.0"), "node_version": string("5.3.0"),
}, },
) )
@ -176,7 +175,7 @@ func Test_Logstash5GatherPipelineStats(test *testing.T) {
map[string]string{ map[string]string{
"node_id": string("a360d8cf-6289-429d-8419-6145e324b574"), "node_id": string("a360d8cf-6289-429d-8419-6145e324b574"),
"node_name": string("node-5-test"), "node_name": string("node-5-test"),
"node_host": string("node-5"), "source": string("node-5"),
"node_version": string("5.3.0"), "node_version": string("5.3.0"),
"plugin_name": string("beats"), "plugin_name": string("beats"),
"plugin_id": string("a35197a509596954e905e38521bae12b1498b17d-1"), "plugin_id": string("a35197a509596954e905e38521bae12b1498b17d-1"),
@ -195,7 +194,7 @@ func Test_Logstash5GatherPipelineStats(test *testing.T) {
map[string]string{ map[string]string{
"node_id": string("a360d8cf-6289-429d-8419-6145e324b574"), "node_id": string("a360d8cf-6289-429d-8419-6145e324b574"),
"node_name": string("node-5-test"), "node_name": string("node-5-test"),
"node_host": string("node-5"), "source": string("node-5"),
"node_version": string("5.3.0"), "node_version": string("5.3.0"),
"plugin_name": string("stdout"), "plugin_name": string("stdout"),
"plugin_id": string("582d5c2becb582a053e1e9a6bcc11d49b69a6dfd-2"), "plugin_id": string("582d5c2becb582a053e1e9a6bcc11d49b69a6dfd-2"),
@ -214,7 +213,7 @@ func Test_Logstash5GatherPipelineStats(test *testing.T) {
map[string]string{ map[string]string{
"node_id": string("a360d8cf-6289-429d-8419-6145e324b574"), "node_id": string("a360d8cf-6289-429d-8419-6145e324b574"),
"node_name": string("node-5-test"), "node_name": string("node-5-test"),
"node_host": string("node-5"), "source": string("node-5"),
"node_version": string("5.3.0"), "node_version": string("5.3.0"),
"plugin_name": string("s3"), "plugin_name": string("s3"),
"plugin_id": string("582d5c2becb582a053e1e9a6bcc11d49b69a6dfd-3"), "plugin_id": string("582d5c2becb582a053e1e9a6bcc11d49b69a6dfd-3"),
@ -264,7 +263,7 @@ func Test_Logstash6GatherPipelinesStats(test *testing.T) {
map[string]string{ map[string]string{
"node_id": string("3044f675-21ce-4335-898a-8408aa678245"), "node_id": string("3044f675-21ce-4335-898a-8408aa678245"),
"node_name": string("node-6-test"), "node_name": string("node-6-test"),
"node_host": string("node-6"), "source": string("node-6"),
"node_version": string("6.4.2"), "node_version": string("6.4.2"),
"pipeline": string("main"), "pipeline": string("main"),
}, },
@ -281,7 +280,7 @@ func Test_Logstash6GatherPipelinesStats(test *testing.T) {
map[string]string{ map[string]string{
"node_id": string("3044f675-21ce-4335-898a-8408aa678245"), "node_id": string("3044f675-21ce-4335-898a-8408aa678245"),
"node_name": string("node-6-test"), "node_name": string("node-6-test"),
"node_host": string("node-6"), "source": string("node-6"),
"node_version": string("6.4.2"), "node_version": string("6.4.2"),
"pipeline": string("main"), "pipeline": string("main"),
"plugin_name": string("kafka"), "plugin_name": string("kafka"),
@ -301,7 +300,7 @@ func Test_Logstash6GatherPipelinesStats(test *testing.T) {
map[string]string{ map[string]string{
"node_id": string("3044f675-21ce-4335-898a-8408aa678245"), "node_id": string("3044f675-21ce-4335-898a-8408aa678245"),
"node_name": string("node-6-test"), "node_name": string("node-6-test"),
"node_host": string("node-6"), "source": string("node-6"),
"node_version": string("6.4.2"), "node_version": string("6.4.2"),
"pipeline": string("main"), "pipeline": string("main"),
"plugin_name": string("mutate"), "plugin_name": string("mutate"),
@ -321,7 +320,7 @@ func Test_Logstash6GatherPipelinesStats(test *testing.T) {
map[string]string{ map[string]string{
"node_id": string("3044f675-21ce-4335-898a-8408aa678245"), "node_id": string("3044f675-21ce-4335-898a-8408aa678245"),
"node_name": string("node-6-test"), "node_name": string("node-6-test"),
"node_host": string("node-6"), "source": string("node-6"),
"node_version": string("6.4.2"), "node_version": string("6.4.2"),
"pipeline": string("main"), "pipeline": string("main"),
"plugin_name": string("mutate"), "plugin_name": string("mutate"),
@ -341,7 +340,7 @@ func Test_Logstash6GatherPipelinesStats(test *testing.T) {
map[string]string{ map[string]string{
"node_id": string("3044f675-21ce-4335-898a-8408aa678245"), "node_id": string("3044f675-21ce-4335-898a-8408aa678245"),
"node_name": string("node-6-test"), "node_name": string("node-6-test"),
"node_host": string("node-6"), "source": string("node-6"),
"node_version": string("6.4.2"), "node_version": string("6.4.2"),
"pipeline": string("main"), "pipeline": string("main"),
"plugin_name": string("date"), "plugin_name": string("date"),
@ -361,7 +360,7 @@ func Test_Logstash6GatherPipelinesStats(test *testing.T) {
map[string]string{ map[string]string{
"node_id": string("3044f675-21ce-4335-898a-8408aa678245"), "node_id": string("3044f675-21ce-4335-898a-8408aa678245"),
"node_name": string("node-6-test"), "node_name": string("node-6-test"),
"node_host": string("node-6"), "source": string("node-6"),
"node_version": string("6.4.2"), "node_version": string("6.4.2"),
"pipeline": string("main"), "pipeline": string("main"),
"plugin_name": string("mutate"), "plugin_name": string("mutate"),
@ -381,7 +380,7 @@ func Test_Logstash6GatherPipelinesStats(test *testing.T) {
map[string]string{ map[string]string{
"node_id": string("3044f675-21ce-4335-898a-8408aa678245"), "node_id": string("3044f675-21ce-4335-898a-8408aa678245"),
"node_name": string("node-6-test"), "node_name": string("node-6-test"),
"node_host": string("node-6"), "source": string("node-6"),
"node_version": string("6.4.2"), "node_version": string("6.4.2"),
"pipeline": string("main"), "pipeline": string("main"),
"plugin_name": string("mutate"), "plugin_name": string("mutate"),
@ -401,7 +400,7 @@ func Test_Logstash6GatherPipelinesStats(test *testing.T) {
map[string]string{ map[string]string{
"node_id": string("3044f675-21ce-4335-898a-8408aa678245"), "node_id": string("3044f675-21ce-4335-898a-8408aa678245"),
"node_name": string("node-6-test"), "node_name": string("node-6-test"),
"node_host": string("node-6"), "source": string("node-6"),
"node_version": string("6.4.2"), "node_version": string("6.4.2"),
"pipeline": string("main"), "pipeline": string("main"),
"plugin_name": string("drop"), "plugin_name": string("drop"),
@ -421,7 +420,7 @@ func Test_Logstash6GatherPipelinesStats(test *testing.T) {
map[string]string{ map[string]string{
"node_id": string("3044f675-21ce-4335-898a-8408aa678245"), "node_id": string("3044f675-21ce-4335-898a-8408aa678245"),
"node_name": string("node-6-test"), "node_name": string("node-6-test"),
"node_host": string("node-6"), "source": string("node-6"),
"node_version": string("6.4.2"), "node_version": string("6.4.2"),
"pipeline": string("main"), "pipeline": string("main"),
"plugin_name": string("mutate"), "plugin_name": string("mutate"),
@ -441,7 +440,7 @@ func Test_Logstash6GatherPipelinesStats(test *testing.T) {
map[string]string{ map[string]string{
"node_id": string("3044f675-21ce-4335-898a-8408aa678245"), "node_id": string("3044f675-21ce-4335-898a-8408aa678245"),
"node_name": string("node-6-test"), "node_name": string("node-6-test"),
"node_host": string("node-6"), "source": string("node-6"),
"node_version": string("6.4.2"), "node_version": string("6.4.2"),
"pipeline": string("main"), "pipeline": string("main"),
"plugin_name": string("csv"), "plugin_name": string("csv"),
@ -461,7 +460,7 @@ func Test_Logstash6GatherPipelinesStats(test *testing.T) {
map[string]string{ map[string]string{
"node_id": string("3044f675-21ce-4335-898a-8408aa678245"), "node_id": string("3044f675-21ce-4335-898a-8408aa678245"),
"node_name": string("node-6-test"), "node_name": string("node-6-test"),
"node_host": string("node-6"), "source": string("node-6"),
"node_version": string("6.4.2"), "node_version": string("6.4.2"),
"pipeline": string("main"), "pipeline": string("main"),
"plugin_name": string("mutate"), "plugin_name": string("mutate"),
@ -481,7 +480,7 @@ func Test_Logstash6GatherPipelinesStats(test *testing.T) {
map[string]string{ map[string]string{
"node_id": string("3044f675-21ce-4335-898a-8408aa678245"), "node_id": string("3044f675-21ce-4335-898a-8408aa678245"),
"node_name": string("node-6-test"), "node_name": string("node-6-test"),
"node_host": string("node-6"), "source": string("node-6"),
"node_version": string("6.4.2"), "node_version": string("6.4.2"),
"pipeline": string("main"), "pipeline": string("main"),
"plugin_name": string("elasticsearch"), "plugin_name": string("elasticsearch"),
@ -501,7 +500,7 @@ func Test_Logstash6GatherPipelinesStats(test *testing.T) {
map[string]string{ map[string]string{
"node_id": string("3044f675-21ce-4335-898a-8408aa678245"), "node_id": string("3044f675-21ce-4335-898a-8408aa678245"),
"node_name": string("node-6-test"), "node_name": string("node-6-test"),
"node_host": string("node-6"), "source": string("node-6"),
"node_version": string("6.4.2"), "node_version": string("6.4.2"),
"pipeline": string("main"), "pipeline": string("main"),
"plugin_name": string("kafka"), "plugin_name": string("kafka"),
@ -521,7 +520,7 @@ func Test_Logstash6GatherPipelinesStats(test *testing.T) {
map[string]string{ map[string]string{
"node_id": string("3044f675-21ce-4335-898a-8408aa678245"), "node_id": string("3044f675-21ce-4335-898a-8408aa678245"),
"node_name": string("node-6-test"), "node_name": string("node-6-test"),
"node_host": string("node-6"), "source": string("node-6"),
"node_version": string("6.4.2"), "node_version": string("6.4.2"),
"pipeline": string("main"), "pipeline": string("main"),
"plugin_name": string("kafka"), "plugin_name": string("kafka"),
@ -544,7 +543,7 @@ func Test_Logstash6GatherPipelinesStats(test *testing.T) {
map[string]string{ map[string]string{
"node_id": string("3044f675-21ce-4335-898a-8408aa678245"), "node_id": string("3044f675-21ce-4335-898a-8408aa678245"),
"node_name": string("node-6-test"), "node_name": string("node-6-test"),
"node_host": string("node-6"), "source": string("node-6"),
"node_version": string("6.4.2"), "node_version": string("6.4.2"),
"pipeline": string("main"), "pipeline": string("main"),
"queue_type": string("persisted"), "queue_type": string("persisted"),
@ -615,7 +614,7 @@ func Test_Logstash5GatherJVMStats(test *testing.T) {
map[string]string{ map[string]string{
"node_id": string("a360d8cf-6289-429d-8419-6145e324b574"), "node_id": string("a360d8cf-6289-429d-8419-6145e324b574"),
"node_name": string("node-5-test"), "node_name": string("node-5-test"),
"node_host": string("node-5"), "source": string("node-5"),
"node_version": string("5.3.0"), "node_version": string("5.3.0"),
}, },
) )
@ -684,43 +683,9 @@ func Test_Logstash6GatherJVMStats(test *testing.T) {
map[string]string{ map[string]string{
"node_id": string("3044f675-21ce-4335-898a-8408aa678245"), "node_id": string("3044f675-21ce-4335-898a-8408aa678245"),
"node_name": string("node-6-test"), "node_name": string("node-6-test"),
"node_host": string("node-6"), "source": string("node-6"),
"node_version": string("6.4.2"), "node_version": string("6.4.2"),
}, },
) )
} }
func Test_LogstashRequests(test *testing.T) {
fakeServer := httptest.NewUnstartedServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
writer.Header().Set("Content-Type", "application/json")
fmt.Fprintf(writer, "%s", string(logstash6JvmJSON))
assert.Equal(test, request.Host, "logstash.test.local")
assert.Equal(test, request.Method, "POST")
assert.Equal(test, request.Header.Get("X-Test"), "test-header")
}))
requestURL, err := url.Parse(logstashTest.URL)
if err != nil {
test.Logf("Can't connect to: %s", logstashTest.URL)
}
fakeServer.Listener, _ = net.Listen("tcp", fmt.Sprintf("%s:%s", requestURL.Hostname(), requestURL.Port()))
fakeServer.Start()
defer fakeServer.Close()
if logstashTest.client == nil {
client, err := logstashTest.createHttpClient()
if err != nil {
test.Logf("Can't createHttpClient")
}
logstashTest.client = client
}
logstashTest.Method = "POST"
logstashTest.Headers["X-Test"] = "test-header"
logstashTest.HostHeader = "logstash.test.local"
if err := logstashTest.gatherJsonData(logstashTest.URL+jvmStats, &logstash6accJVMStats); err != nil {
test.Logf("Can't gather JVM stats")
}
}