From 8b938f3bd43c83747db9cd3ab33ffde117c19c3c Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Wed, 21 Aug 2019 18:04:30 -0700 Subject: [PATCH] Make review changes to logstash input (#6299) --- .gitignore | 5 - internal/choice/choice.go | 36 ++++ plugins/inputs/logstash/README.md | 167 +++++++--------- plugins/inputs/logstash/logstash.go | 233 ++++++++++------------- plugins/inputs/logstash/logstash_test.go | 81 +++----- 5 files changed, 226 insertions(+), 296 deletions(-) create mode 100644 internal/choice/choice.go diff --git a/.gitignore b/.gitignore index 9e012aabd..4176a0413 100644 --- a/.gitignore +++ b/.gitignore @@ -1,10 +1,5 @@ -# Build and binaries /build /telegraf /telegraf.exe /telegraf.gz /vendor - -# Editor files -*~ -.idea diff --git a/internal/choice/choice.go b/internal/choice/choice.go new file mode 100644 index 000000000..33c26096d --- /dev/null +++ b/internal/choice/choice.go @@ -0,0 +1,36 @@ +// Package choice provides basic functions for working with +// plugin options that must be one of several values. +package choice + +import "fmt" + +// Contains return true if the choice in the list of choices. +func Contains(choice string, choices []string) bool { + for _, item := range choices { + if item == choice { + return true + } + } + return false +} + +// CheckSContains returns an error if a choice is not one of +// the available choices. +func Check(choice string, available []string) error { + if !Contains(choice, available) { + return fmt.Errorf("unknown choice %s", choice) + } + return nil +} + +// CheckSliceContains returns an error if the choices is not a subset of +// available. +func CheckSlice(choices, available []string) error { + for _, choice := range choices { + err := Check(choice, available) + if err != nil { + return err + } + } + return nil +} diff --git a/plugins/inputs/logstash/README.md b/plugins/inputs/logstash/README.md index f54697c39..9571de5fd 100644 --- a/plugins/inputs/logstash/README.md +++ b/plugins/inputs/logstash/README.md @@ -3,62 +3,52 @@ This plugin reads metrics exposed by [Logstash Monitoring API](https://www.elastic.co/guide/en/logstash/current/monitoring-logstash.html). -### Configuration: +Logstash 5 and later is supported. + +### Configuration ```toml - ## This plugin reads metrics exposed by Logstash Monitoring API. - ## https://www.elastic.co/guide/en/logstash/current/monitoring.html - - ## The URL of the exposed Logstash API endpoint +[[inputs.logstash]] + ## The URL of the exposed Logstash API endpoint. url = "http://127.0.0.1:9600" - ## Enable Logstash 6+ multi-pipeline statistics support - multi_pipeline = true + ## Use Logstash 5 single pipeline API, set to true when monitoring + ## Logstash 5. + # single_pipeline = false - ## Should the general process statistics be gathered - collect_process_stats = true + ## Enable optional collection components. Can contain + ## "pipelines", "process", and "jvm". + # collect = ["pipelines", "process", "jvm"] - ## Should the JVM specific statistics be gathered - collect_jvm_stats = true + ## Timeout for HTTP requests. + # timeout = "5s" - ## Should the event pipelines statistics be gathered - collect_pipelines_stats = true - - ## Should the plugin statistics be gathered - collect_plugins_stats = true - - ## Should the queue statistics be gathered - collect_queue_stats = true - - ## HTTP method - # method = "GET" - - ## Optional HTTP headers - # headers = {"X-Special-Header" = "Special-Value"} - - ## Override HTTP "Host" header - # host_header = "logstash.example.com" - - ## Timeout for HTTP requests - timeout = "5s" - - ## Optional HTTP Basic Auth credentials + ## Optional HTTP Basic Auth credentials. # username = "username" # password = "pa$$word" - ## Optional TLS Config + ## Optional TLS Config. # tls_ca = "/etc/telegraf/ca.pem" # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" - ## Use TLS but skip chain & host verification + ## Use TLS but skip chain & host verification. # insecure_skip_verify = false + + ## Optional HTTP headers. + # [inputs.logstash.headers] + # "X-Special-Header" = "Special-Value" ``` -### Measurements & Fields: +### Metrics -- **logstash_jvm** - * Fields: +- logstash_jvm + - tags: + - node_id + - node_name + - node_host + - node_version + - fields: - threads_peak_count - mem_pools_survivor_peak_max_in_bytes - mem_pools_survivor_max_in_bytes @@ -87,14 +77,14 @@ This plugin reads metrics exposed by - mem_heap_used_in_bytes - gc_collectors_young_collection_count - uptime_in_millis - * Tags: + ++ logstash_process + - tags: - node_id - node_name - - node_host - - node_version - -- **logstash_process** - * Fields: + - source + - node_version + - fields: - open_file_descriptors - cpu_load_average_1m - cpu_load_average_5m @@ -105,85 +95,60 @@ This plugin reads metrics exposed by - max_file_descriptors - mem_total_virtual_in_bytes - mem_total_virtual_in_bytes - * Tags: + +- logstash_events + - tags: - node_id - node_name - - node_host - - node_version - -- **logstash_events** - * Fields: + - source + - node_version + - pipeline (for Logstash 6+) + - fields: - queue_push_duration_in_millis - duration_in_millis - in - filtered - out - * Tags: + ++ logstash_plugins + - tags: - node_id - node_name - - node_host - - node_version - - pipeline (for Logstash 6 only) - -- **logstash_plugins** - * Fields: + - source + - node_version + - pipeline (for Logstash 6+) + - plugin_id + - plugin_name + - plugin_type + - fields: - queue_push_duration_in_millis (for input plugins only) - duration_in_millis - in - out - * Tags: + +- logstash_queue + - tags: - node_id - node_name - - node_host - - node_version - - pipeline (for Logstash 6 only) - - plugin_id - - plugin_name - - plugin_type - -- **logstash_queue** - * Fields: + - source + - node_version + - pipeline (for Logstash 6+) + - queue_type + - fields: - events - free_space_in_bytes - max_queue_size_in_bytes - max_unread_events - page_capacity_in_bytes - queue_size_in_bytes - * Tags: - - node_id - - node_name - - node_host - - node_version - - pipeline (for Logstash 6 only) - - queue_type -### Tags description - -- node_id - The uuid of the logstash node. Randomly generated. -- node_name - The name of the logstash node. Can be defined in the *logstash.yml* or defaults to the hostname. - Can be used to break apart metrics from different logstash instances of the same host. -- node_host - The hostname of the logstash node. - Can be different from the telegraf's host if a remote connection to logstash instance is used. -- node_version - The version of logstash service running on this node. -- pipeline (for Logstash 6 only) - The name of a pipeline if multi-pipeline is configured. - Will defaults to "main" if there is only one pipeline and will be missing for logstash 5. -- plugin_id - The unique id of this plugin. - It will be a randomly generated string unless it's defined in the logstash pipeline config file. -- plugin_name - The name of this plugin. i.e. file, elasticsearch, date, mangle. -- plugin_type - The type of this plugin i.e. input/filter/output. -- queue_type - The type of the event queue (memory/persisted). - -### Example Output: +### Example Output ``` -$ ./telegraf -config telegraf.conf -input-filter logstash -test - -> logstash_jvm,host=node-6,node_host=node-6,node_id=3044f675-21ce-4335-898a-8408aa678245,node_name=node-6-test,node_version=6.4.2 - gc_collectors_old_collection_count=5,gc_collectors_old_collection_time_in_millis=702,gc_collectors_young_collection_count=95,gc_collectors_young_collection_time_in_millis=4772,mem_heap_committed_in_bytes=360804352,mem_heap_max_in_bytes=8389328896,mem_heap_used_in_bytes=252629768,mem_heap_used_percent=3,mem_non_heap_committed_in_bytes=212144128,mem_non_heap_used_in_bytes=188726024,mem_pools_old_committed_in_bytes=280260608,mem_pools_old_max_in_bytes=6583418880,mem_pools_old_peak_max_in_bytes=6583418880,mem_pools_old_peak_used_in_bytes=235352976,mem_pools_old_used_in_bytes=194754608,mem_pools_survivor_committed_in_bytes=8912896,mem_pools_survivor_max_in_bytes=200605696,mem_pools_survivor_peak_max_in_bytes=200605696,mem_pools_survivor_peak_used_in_bytes=8912896,mem_pools_survivor_used_in_bytes=4476680,mem_pools_young_committed_in_bytes=71630848,mem_pools_young_max_in_bytes=1605304320,mem_pools_young_peak_max_in_bytes=1605304320,mem_pools_young_peak_used_in_bytes=71630848,mem_pools_young_used_in_bytes=53398480,threads_count=60,threads_peak_count=62,uptime_in_millis=10469014 1540289864000000000 -> logstash_process,host=node-6,node_host=node-6,node_id=3044f675-21ce-4335-898a-8408aa678245,node_name=node-6-test,node_version=6.4.2 - cpu_load_average_15m=39.84,cpu_load_average_1m=32.87,cpu_load_average_5m=39.23,cpu_percent=0,cpu_total_in_millis=389920,max_file_descriptors=262144,mem_total_virtual_in_bytes=17921355776,open_file_descriptors=132,peak_open_file_descriptors=140 1540289864000000000 -> logstash_events,host=node-6,node_host=node-6,node_id=3044f675-21ce-4335-898a-8408aa678245,node_name=node-6-test,node_version=6.4.2,pipeline=main - duration_in_millis=175144,filtered=4543,in=4543,out=4543,queue_push_duration_in_millis=19 1540289864000000000 -> logstash_plugins,host=node-6,node_host=node-6,node_id=3044f675-21ce-4335-898a-8408aa678245,node_name=node-6-test,node_version=6.4.2,pipeline=main,plugin_id=input-kafka,plugin_name=kafka,plugin_type=input - duration_in_millis=0,in=0,out=4543,queue_push_duration_in_millis=19 1540289864000000000 +logstash_jvm,node_id=3da53ed0-a946-4a33-9cdb-33013f2273f6,node_name=debian-stretch-logstash6.virt,node_version=6.8.1,source=debian-stretch-logstash6.virt gc_collectors_old_collection_count=2,gc_collectors_old_collection_time_in_millis=100,gc_collectors_young_collection_count=26,gc_collectors_young_collection_time_in_millis=1028,mem_heap_committed_in_bytes=1056309248,mem_heap_max_in_bytes=1056309248,mem_heap_used_in_bytes=207216328,mem_heap_used_percent=19,mem_non_heap_committed_in_bytes=160878592,mem_non_heap_used_in_bytes=140838184,mem_pools_old_committed_in_bytes=899284992,mem_pools_old_max_in_bytes=899284992,mem_pools_old_peak_max_in_bytes=899284992,mem_pools_old_peak_used_in_bytes=189468088,mem_pools_old_used_in_bytes=189468088,mem_pools_survivor_committed_in_bytes=17432576,mem_pools_survivor_max_in_bytes=17432576,mem_pools_survivor_peak_max_in_bytes=17432576,mem_pools_survivor_peak_used_in_bytes=17432576,mem_pools_survivor_used_in_bytes=12572640,mem_pools_young_committed_in_bytes=139591680,mem_pools_young_max_in_bytes=139591680,mem_pools_young_peak_max_in_bytes=139591680,mem_pools_young_peak_used_in_bytes=139591680,mem_pools_young_used_in_bytes=5175600,threads_count=20,threads_peak_count=24,uptime_in_millis=739089 1566425244000000000 +logstash_process,node_id=3da53ed0-a946-4a33-9cdb-33013f2273f6,node_name=debian-stretch-logstash6.virt,node_version=6.8.1,source=debian-stretch-logstash6.virt cpu_load_average_15m=0.03,cpu_load_average_1m=0.01,cpu_load_average_5m=0.04,cpu_percent=0,cpu_total_in_millis=83230,max_file_descriptors=16384,mem_total_virtual_in_bytes=3689132032,open_file_descriptors=118,peak_open_file_descriptors=118 1566425244000000000 +logstash_events,node_id=3da53ed0-a946-4a33-9cdb-33013f2273f6,node_name=debian-stretch-logstash6.virt,node_version=6.8.1,pipeline=main,source=debian-stretch-logstash6.virt duration_in_millis=0,filtered=0,in=0,out=0,queue_push_duration_in_millis=0 1566425244000000000 +logstash_plugins,node_id=3da53ed0-a946-4a33-9cdb-33013f2273f6,node_name=debian-stretch-logstash6.virt,node_version=6.8.1,pipeline=main,plugin_id=2807cb8610ba7854efa9159814fcf44c3dda762b43bd088403b30d42c88e69ab,plugin_name=beats,plugin_type=input,source=debian-stretch-logstash6.virt out=0,queue_push_duration_in_millis=0 1566425244000000000 +logstash_plugins,node_id=3da53ed0-a946-4a33-9cdb-33013f2273f6,node_name=debian-stretch-logstash6.virt,node_version=6.8.1,pipeline=main,plugin_id=7a6c973366186a695727c73935634a00bccd52fceedf30d0746983fce572d50c,plugin_name=file,plugin_type=output,source=debian-stretch-logstash6.virt duration_in_millis=0,in=0,out=0 1566425244000000000 +logstash_queue,node_id=3da53ed0-a946-4a33-9cdb-33013f2273f6,node_name=debian-stretch-logstash6.virt,node_version=6.8.1,pipeline=main,queue_type=memory,source=debian-stretch-logstash6.virt events=0 1566425244000000000 ``` diff --git a/plugins/inputs/logstash/logstash.go b/plugins/inputs/logstash/logstash.go index ba25fafd5..b97600700 100644 --- a/plugins/inputs/logstash/logstash.go +++ b/plugins/inputs/logstash/logstash.go @@ -2,114 +2,78 @@ package logstash import ( "encoding/json" - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal" - "github.com/influxdata/telegraf/internal/tls" - "github.com/influxdata/telegraf/plugins/inputs" + "fmt" "net/http" "net/url" + "strings" "time" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/choice" + "github.com/influxdata/telegraf/internal/tls" + "github.com/influxdata/telegraf/plugins/inputs" jsonParser "github.com/influxdata/telegraf/plugins/parsers/json" ) -// ##### Interface ##### - const sampleConfig = ` - ## This plugin reads metrics exposed by Logstash Monitoring API. - ## https://www.elastic.co/guide/en/logstash/current/monitoring.html - - ## The URL of the exposed Logstash API endpoint + ## The URL of the exposed Logstash API endpoint. url = "http://127.0.0.1:9600" - ## Enable Logstash 6+ multi-pipeline statistics support - multi_pipeline = true + ## Use Logstash 5 single pipeline API, set to true when monitoring + ## Logstash 5. + # single_pipeline = false - ## Should the general process statistics be gathered - collect_process_stats = true + ## Enable optional collection components. Can contain + ## "pipelines", "process", and "jvm". + # collect = ["pipelines", "process", "jvm"] - ## Should the JVM specific statistics be gathered - collect_jvm_stats = true + ## Timeout for HTTP requests. + # timeout = "5s" - ## Should the event pipelines statistics be gathered - collect_pipelines_stats = true - - ## Should the plugin statistics be gathered - collect_plugins_stats = true - - ## Should the queue statistics be gathered - collect_queue_stats = true - - ## HTTP method - # method = "GET" - - ## Optional HTTP headers - # headers = {"X-Special-Header" = "Special-Value"} - - ## Override HTTP "Host" header - # host_header = "logstash.example.com" - - ## Timeout for HTTP requests - timeout = "5s" - - ## Optional HTTP Basic Auth credentials + ## Optional HTTP Basic Auth credentials. # username = "username" # password = "pa$$word" - ## Optional TLS Config + ## Optional TLS Config. # tls_ca = "/etc/telegraf/ca.pem" # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" - ## Use TLS but skip chain & host verification + ## Use TLS but skip chain & host verification. # insecure_skip_verify = false + + ## Optional HTTP headers. + # [inputs.logstash.headers] + # "X-Special-Header" = "Special-Value" ` type Logstash struct { URL string `toml:"url"` - MultiPipeline bool `toml:"multi_pipeline"` - CollectProcessStats bool `toml:"collect_process_stats"` - CollectJVMStats bool `toml:"collect_jvm_stats"` - CollectPipelinesStats bool `toml:"collect_pipelines_stats"` - CollectPluginsStats bool `toml:"collect_plugins_stats"` - CollectQueueStats bool `toml:"collect_queue_stats"` - - Username string `toml:"username"` - Password string `toml:"password"` - Method string `toml:"method"` - Headers map[string]string `toml:"headers"` - HostHeader string `toml:"host_header"` - Timeout internal.Duration `toml:"timeout"` + SinglePipeline bool `toml:"single_pipeline"` + Collect []string `toml:"collect"` + Username string `toml:"username"` + Password string `toml:"password"` + Headers map[string]string `toml:"headers"` + Timeout internal.Duration `toml:"timeout"` tls.ClientConfig + client *http.Client } // NewLogstash create an instance of the plugin with default settings func NewLogstash() *Logstash { return &Logstash{ - URL: "http://127.0.0.1:9600", - MultiPipeline: true, - CollectProcessStats: true, - CollectJVMStats: true, - CollectPipelinesStats: true, - CollectPluginsStats: true, - CollectQueueStats: true, - Method: "GET", - Headers: make(map[string]string), - HostHeader: "", - Timeout: internal.Duration{Duration: time.Second * 5}, + URL: "http://127.0.0.1:9600", + SinglePipeline: false, + Collect: []string{"pipelines", "process", "jvm"}, + Headers: make(map[string]string), + Timeout: internal.Duration{Duration: time.Second * 5}, } } -// init initialise this plugin instance -func init() { - inputs.Add("logstash", func() telegraf.Input { - return NewLogstash() - }) -} - // Description returns short info about plugin func (logstash *Logstash) Description() string { return "Read metrics exposed by Logstash" @@ -183,6 +147,14 @@ const processStats = "/_node/stats/process" const pipelinesStats = "/_node/stats/pipelines" const pipelineStats = "/_node/stats/pipeline" +func (i *Logstash) Init() error { + err := choice.CheckSlice(i.Collect, []string{"pipelines", "process", "jvm"}) + if err != nil { + return fmt.Errorf(`cannot verify "collect" setting: %v`, err) + } + return nil +} + // createHttpClient create a clients to access API func (logstash *Logstash) createHttpClient() (*http.Client, error) { tlsConfig, err := logstash.ClientConfig.TLSConfig() @@ -202,15 +174,7 @@ func (logstash *Logstash) createHttpClient() (*http.Client, error) { // gatherJsonData query the data source and parse the response JSON func (logstash *Logstash) gatherJsonData(url string, value interface{}) error { - - var method string - if logstash.Method != "" { - method = logstash.Method - } else { - method = "GET" - } - - request, err := http.NewRequest(method, url, nil) + request, err := http.NewRequest("GET", url, nil) if err != nil { return err } @@ -218,11 +182,13 @@ func (logstash *Logstash) gatherJsonData(url string, value interface{}) error { if (logstash.Username != "") || (logstash.Password != "") { request.SetBasicAuth(logstash.Username, logstash.Password) } + for header, value := range logstash.Headers { - request.Header.Add(header, value) - } - if logstash.HostHeader != "" { - request.Host = logstash.HostHeader + if strings.ToLower(header) == "host" { + request.Host = value + } else { + request.Header.Add(header, value) + } } response, err := logstash.client.Do(request) @@ -252,8 +218,8 @@ func (logstash *Logstash) gatherJVMStats(url string, accumulator telegraf.Accumu tags := map[string]string{ "node_id": jvmStats.ID, "node_name": jvmStats.Name, - "node_host": jvmStats.Host, "node_version": jvmStats.Version, + "source": jvmStats.Host, } flattener := jsonParser.JSONFlattener{} @@ -278,8 +244,8 @@ func (logstash *Logstash) gatherProcessStats(url string, accumulator telegraf.Ac tags := map[string]string{ "node_id": processStats.ID, "node_name": processStats.Name, - "node_host": processStats.Host, "node_version": processStats.Version, + "source": processStats.Host, } flattener := jsonParser.JSONFlattener{} @@ -368,8 +334,8 @@ func (logstash *Logstash) gatherPipelineStats(url string, accumulator telegraf.A tags := map[string]string{ "node_id": pipelineStats.ID, "node_name": pipelineStats.Name, - "node_host": pipelineStats.Host, "node_version": pipelineStats.Version, + "source": pipelineStats.Host, } flattener := jsonParser.JSONFlattener{} @@ -379,23 +345,22 @@ func (logstash *Logstash) gatherPipelineStats(url string, accumulator telegraf.A } accumulator.AddFields("logstash_events", flattener.Fields, tags) - if logstash.CollectPluginsStats { - err = logstash.gatherPluginsStats(pipelineStats.Pipeline.Plugins.Inputs, "input", tags, accumulator) - if err != nil { - return err - } - err = logstash.gatherPluginsStats(pipelineStats.Pipeline.Plugins.Filters, "filter", tags, accumulator) - if err != nil { - return err - } - err = logstash.gatherPluginsStats(pipelineStats.Pipeline.Plugins.Outputs, "output", tags, accumulator) - if err != nil { - return err - } + err = logstash.gatherPluginsStats(pipelineStats.Pipeline.Plugins.Inputs, "input", tags, accumulator) + if err != nil { + return err + } + err = logstash.gatherPluginsStats(pipelineStats.Pipeline.Plugins.Filters, "filter", tags, accumulator) + if err != nil { + return err + } + err = logstash.gatherPluginsStats(pipelineStats.Pipeline.Plugins.Outputs, "output", tags, accumulator) + if err != nil { + return err } - if logstash.CollectQueueStats { - err = logstash.gatherQueueStats(&pipelineStats.Pipeline.Queue, tags, accumulator) + err = logstash.gatherQueueStats(&pipelineStats.Pipeline.Queue, tags, accumulator) + if err != nil { + return err } return nil @@ -414,9 +379,9 @@ func (logstash *Logstash) gatherPipelinesStats(url string, accumulator telegraf. tags := map[string]string{ "node_id": pipelinesStats.ID, "node_name": pipelinesStats.Name, - "node_host": pipelinesStats.Host, "node_version": pipelinesStats.Version, "pipeline": pipelineName, + "source": pipelinesStats.Host, } flattener := jsonParser.JSONFlattener{} @@ -426,25 +391,23 @@ func (logstash *Logstash) gatherPipelinesStats(url string, accumulator telegraf. } accumulator.AddFields("logstash_events", flattener.Fields, tags) - if logstash.CollectPluginsStats { - err = logstash.gatherPluginsStats(pipeline.Plugins.Inputs, "input", tags, accumulator) - if err != nil { - return err - } - err = logstash.gatherPluginsStats(pipeline.Plugins.Filters, "filter", tags, accumulator) - if err != nil { - return err - } - err = logstash.gatherPluginsStats(pipeline.Plugins.Outputs, "output", tags, accumulator) - if err != nil { - return err - } + err = logstash.gatherPluginsStats(pipeline.Plugins.Inputs, "input", tags, accumulator) + if err != nil { + return err + } + err = logstash.gatherPluginsStats(pipeline.Plugins.Filters, "filter", tags, accumulator) + if err != nil { + return err + } + err = logstash.gatherPluginsStats(pipeline.Plugins.Outputs, "output", tags, accumulator) + if err != nil { + return err } - if logstash.CollectQueueStats { - err = logstash.gatherQueueStats(&pipeline.Queue, tags, accumulator) + err = logstash.gatherQueueStats(&pipeline.Queue, tags, accumulator) + if err != nil { + return err } - } return nil @@ -452,7 +415,6 @@ func (logstash *Logstash) gatherPipelinesStats(url string, accumulator telegraf. // Gather ask this plugin to start gathering metrics func (logstash *Logstash) Gather(accumulator telegraf.Accumulator) error { - if logstash.client == nil { client, err := logstash.createHttpClient() @@ -462,7 +424,7 @@ func (logstash *Logstash) Gather(accumulator telegraf.Accumulator) error { logstash.client = client } - if logstash.CollectJVMStats { + if choice.Contains("jvm", logstash.Collect) { jvmUrl, err := url.Parse(logstash.URL + jvmStats) if err != nil { return err @@ -472,7 +434,7 @@ func (logstash *Logstash) Gather(accumulator telegraf.Accumulator) error { } } - if logstash.CollectProcessStats { + if choice.Contains("process", logstash.Collect) { processUrl, err := url.Parse(logstash.URL + processStats) if err != nil { return err @@ -482,16 +444,8 @@ func (logstash *Logstash) Gather(accumulator telegraf.Accumulator) error { } } - if logstash.CollectPipelinesStats { - if logstash.MultiPipeline { - pipelinesUrl, err := url.Parse(logstash.URL + pipelinesStats) - if err != nil { - return err - } - if err := logstash.gatherPipelinesStats(pipelinesUrl.String(), accumulator); err != nil { - return err - } - } else { + if choice.Contains("pipelines", logstash.Collect) { + if logstash.SinglePipeline { pipelineUrl, err := url.Parse(logstash.URL + pipelineStats) if err != nil { return err @@ -499,8 +453,23 @@ func (logstash *Logstash) Gather(accumulator telegraf.Accumulator) error { if err := logstash.gatherPipelineStats(pipelineUrl.String(), accumulator); err != nil { return err } + } else { + pipelinesUrl, err := url.Parse(logstash.URL + pipelinesStats) + if err != nil { + return err + } + if err := logstash.gatherPipelinesStats(pipelinesUrl.String(), accumulator); err != nil { + return err + } } } return nil } + +// init registers this plugin instance +func init() { + inputs.Add("logstash", func() telegraf.Input { + return NewLogstash() + }) +} diff --git a/plugins/inputs/logstash/logstash_test.go b/plugins/inputs/logstash/logstash_test.go index c091be83c..aeb4e46f8 100644 --- a/plugins/inputs/logstash/logstash_test.go +++ b/plugins/inputs/logstash/logstash_test.go @@ -9,7 +9,6 @@ import ( "testing" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/assert" ) var logstashTest = NewLogstash() @@ -66,7 +65,7 @@ func Test_Logstash5GatherProcessStats(test *testing.T) { map[string]string{ "node_id": string("a360d8cf-6289-429d-8419-6145e324b574"), "node_name": string("node-5-test"), - "node_host": string("node-5"), + "source": string("node-5"), "node_version": string("5.3.0"), }, ) @@ -115,7 +114,7 @@ func Test_Logstash6GatherProcessStats(test *testing.T) { map[string]string{ "node_id": string("3044f675-21ce-4335-898a-8408aa678245"), "node_name": string("node-6-test"), - "node_host": string("node-6"), + "source": string("node-6"), "node_version": string("6.4.2"), }, ) @@ -160,7 +159,7 @@ func Test_Logstash5GatherPipelineStats(test *testing.T) { map[string]string{ "node_id": string("a360d8cf-6289-429d-8419-6145e324b574"), "node_name": string("node-5-test"), - "node_host": string("node-5"), + "source": string("node-5"), "node_version": string("5.3.0"), }, ) @@ -176,7 +175,7 @@ func Test_Logstash5GatherPipelineStats(test *testing.T) { map[string]string{ "node_id": string("a360d8cf-6289-429d-8419-6145e324b574"), "node_name": string("node-5-test"), - "node_host": string("node-5"), + "source": string("node-5"), "node_version": string("5.3.0"), "plugin_name": string("beats"), "plugin_id": string("a35197a509596954e905e38521bae12b1498b17d-1"), @@ -195,7 +194,7 @@ func Test_Logstash5GatherPipelineStats(test *testing.T) { map[string]string{ "node_id": string("a360d8cf-6289-429d-8419-6145e324b574"), "node_name": string("node-5-test"), - "node_host": string("node-5"), + "source": string("node-5"), "node_version": string("5.3.0"), "plugin_name": string("stdout"), "plugin_id": string("582d5c2becb582a053e1e9a6bcc11d49b69a6dfd-2"), @@ -214,7 +213,7 @@ func Test_Logstash5GatherPipelineStats(test *testing.T) { map[string]string{ "node_id": string("a360d8cf-6289-429d-8419-6145e324b574"), "node_name": string("node-5-test"), - "node_host": string("node-5"), + "source": string("node-5"), "node_version": string("5.3.0"), "plugin_name": string("s3"), "plugin_id": string("582d5c2becb582a053e1e9a6bcc11d49b69a6dfd-3"), @@ -264,7 +263,7 @@ func Test_Logstash6GatherPipelinesStats(test *testing.T) { map[string]string{ "node_id": string("3044f675-21ce-4335-898a-8408aa678245"), "node_name": string("node-6-test"), - "node_host": string("node-6"), + "source": string("node-6"), "node_version": string("6.4.2"), "pipeline": string("main"), }, @@ -281,7 +280,7 @@ func Test_Logstash6GatherPipelinesStats(test *testing.T) { map[string]string{ "node_id": string("3044f675-21ce-4335-898a-8408aa678245"), "node_name": string("node-6-test"), - "node_host": string("node-6"), + "source": string("node-6"), "node_version": string("6.4.2"), "pipeline": string("main"), "plugin_name": string("kafka"), @@ -301,7 +300,7 @@ func Test_Logstash6GatherPipelinesStats(test *testing.T) { map[string]string{ "node_id": string("3044f675-21ce-4335-898a-8408aa678245"), "node_name": string("node-6-test"), - "node_host": string("node-6"), + "source": string("node-6"), "node_version": string("6.4.2"), "pipeline": string("main"), "plugin_name": string("mutate"), @@ -321,7 +320,7 @@ func Test_Logstash6GatherPipelinesStats(test *testing.T) { map[string]string{ "node_id": string("3044f675-21ce-4335-898a-8408aa678245"), "node_name": string("node-6-test"), - "node_host": string("node-6"), + "source": string("node-6"), "node_version": string("6.4.2"), "pipeline": string("main"), "plugin_name": string("mutate"), @@ -341,7 +340,7 @@ func Test_Logstash6GatherPipelinesStats(test *testing.T) { map[string]string{ "node_id": string("3044f675-21ce-4335-898a-8408aa678245"), "node_name": string("node-6-test"), - "node_host": string("node-6"), + "source": string("node-6"), "node_version": string("6.4.2"), "pipeline": string("main"), "plugin_name": string("date"), @@ -361,7 +360,7 @@ func Test_Logstash6GatherPipelinesStats(test *testing.T) { map[string]string{ "node_id": string("3044f675-21ce-4335-898a-8408aa678245"), "node_name": string("node-6-test"), - "node_host": string("node-6"), + "source": string("node-6"), "node_version": string("6.4.2"), "pipeline": string("main"), "plugin_name": string("mutate"), @@ -381,7 +380,7 @@ func Test_Logstash6GatherPipelinesStats(test *testing.T) { map[string]string{ "node_id": string("3044f675-21ce-4335-898a-8408aa678245"), "node_name": string("node-6-test"), - "node_host": string("node-6"), + "source": string("node-6"), "node_version": string("6.4.2"), "pipeline": string("main"), "plugin_name": string("mutate"), @@ -401,7 +400,7 @@ func Test_Logstash6GatherPipelinesStats(test *testing.T) { map[string]string{ "node_id": string("3044f675-21ce-4335-898a-8408aa678245"), "node_name": string("node-6-test"), - "node_host": string("node-6"), + "source": string("node-6"), "node_version": string("6.4.2"), "pipeline": string("main"), "plugin_name": string("drop"), @@ -421,7 +420,7 @@ func Test_Logstash6GatherPipelinesStats(test *testing.T) { map[string]string{ "node_id": string("3044f675-21ce-4335-898a-8408aa678245"), "node_name": string("node-6-test"), - "node_host": string("node-6"), + "source": string("node-6"), "node_version": string("6.4.2"), "pipeline": string("main"), "plugin_name": string("mutate"), @@ -441,7 +440,7 @@ func Test_Logstash6GatherPipelinesStats(test *testing.T) { map[string]string{ "node_id": string("3044f675-21ce-4335-898a-8408aa678245"), "node_name": string("node-6-test"), - "node_host": string("node-6"), + "source": string("node-6"), "node_version": string("6.4.2"), "pipeline": string("main"), "plugin_name": string("csv"), @@ -461,7 +460,7 @@ func Test_Logstash6GatherPipelinesStats(test *testing.T) { map[string]string{ "node_id": string("3044f675-21ce-4335-898a-8408aa678245"), "node_name": string("node-6-test"), - "node_host": string("node-6"), + "source": string("node-6"), "node_version": string("6.4.2"), "pipeline": string("main"), "plugin_name": string("mutate"), @@ -481,7 +480,7 @@ func Test_Logstash6GatherPipelinesStats(test *testing.T) { map[string]string{ "node_id": string("3044f675-21ce-4335-898a-8408aa678245"), "node_name": string("node-6-test"), - "node_host": string("node-6"), + "source": string("node-6"), "node_version": string("6.4.2"), "pipeline": string("main"), "plugin_name": string("elasticsearch"), @@ -501,7 +500,7 @@ func Test_Logstash6GatherPipelinesStats(test *testing.T) { map[string]string{ "node_id": string("3044f675-21ce-4335-898a-8408aa678245"), "node_name": string("node-6-test"), - "node_host": string("node-6"), + "source": string("node-6"), "node_version": string("6.4.2"), "pipeline": string("main"), "plugin_name": string("kafka"), @@ -521,7 +520,7 @@ func Test_Logstash6GatherPipelinesStats(test *testing.T) { map[string]string{ "node_id": string("3044f675-21ce-4335-898a-8408aa678245"), "node_name": string("node-6-test"), - "node_host": string("node-6"), + "source": string("node-6"), "node_version": string("6.4.2"), "pipeline": string("main"), "plugin_name": string("kafka"), @@ -544,7 +543,7 @@ func Test_Logstash6GatherPipelinesStats(test *testing.T) { map[string]string{ "node_id": string("3044f675-21ce-4335-898a-8408aa678245"), "node_name": string("node-6-test"), - "node_host": string("node-6"), + "source": string("node-6"), "node_version": string("6.4.2"), "pipeline": string("main"), "queue_type": string("persisted"), @@ -615,7 +614,7 @@ func Test_Logstash5GatherJVMStats(test *testing.T) { map[string]string{ "node_id": string("a360d8cf-6289-429d-8419-6145e324b574"), "node_name": string("node-5-test"), - "node_host": string("node-5"), + "source": string("node-5"), "node_version": string("5.3.0"), }, ) @@ -684,43 +683,9 @@ func Test_Logstash6GatherJVMStats(test *testing.T) { map[string]string{ "node_id": string("3044f675-21ce-4335-898a-8408aa678245"), "node_name": string("node-6-test"), - "node_host": string("node-6"), + "source": string("node-6"), "node_version": string("6.4.2"), }, ) } - -func Test_LogstashRequests(test *testing.T) { - fakeServer := httptest.NewUnstartedServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { - writer.Header().Set("Content-Type", "application/json") - fmt.Fprintf(writer, "%s", string(logstash6JvmJSON)) - assert.Equal(test, request.Host, "logstash.test.local") - assert.Equal(test, request.Method, "POST") - assert.Equal(test, request.Header.Get("X-Test"), "test-header") - })) - requestURL, err := url.Parse(logstashTest.URL) - if err != nil { - test.Logf("Can't connect to: %s", logstashTest.URL) - } - fakeServer.Listener, _ = net.Listen("tcp", fmt.Sprintf("%s:%s", requestURL.Hostname(), requestURL.Port())) - fakeServer.Start() - defer fakeServer.Close() - - if logstashTest.client == nil { - client, err := logstashTest.createHttpClient() - - if err != nil { - test.Logf("Can't createHttpClient") - } - logstashTest.client = client - } - - logstashTest.Method = "POST" - logstashTest.Headers["X-Test"] = "test-header" - logstashTest.HostHeader = "logstash.test.local" - - if err := logstashTest.gatherJsonData(logstashTest.URL+jvmStats, &logstash6accJVMStats); err != nil { - test.Logf("Can't gather JVM stats") - } -}