Add logstash input plugin (#4910)
This commit is contained in:
parent
8c2b3addd3
commit
02174031c8
|
@ -1,5 +1,10 @@
|
|||
# Build and binaries
|
||||
/build
|
||||
/telegraf
|
||||
/telegraf.exe
|
||||
/telegraf.gz
|
||||
/vendor
|
||||
|
||||
# Editor files
|
||||
*~
|
||||
.idea
|
||||
|
|
|
@ -78,6 +78,7 @@ import (
|
|||
_ "github.com/influxdata/telegraf/plugins/inputs/leofs"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/linux_sysctl_fs"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/logparser"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/logstash"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/lustre2"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/mailchimp"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/mcrouter"
|
||||
|
|
|
@ -0,0 +1,189 @@
|
|||
# Logstash Input Plugin
|
||||
|
||||
This plugin reads metrics exposed by
|
||||
[Logstash Monitoring API](https://www.elastic.co/guide/en/logstash/current/monitoring-logstash.html).
|
||||
|
||||
### Configuration:
|
||||
|
||||
```toml
|
||||
## This plugin reads metrics exposed by Logstash Monitoring API.
|
||||
## https://www.elastic.co/guide/en/logstash/current/monitoring.html
|
||||
|
||||
## The URL of the exposed Logstash API endpoint
|
||||
url = "http://127.0.0.1:9600"
|
||||
|
||||
## Enable Logstash 6+ multi-pipeline statistics support
|
||||
multi_pipeline = true
|
||||
|
||||
## Should the general process statistics be gathered
|
||||
collect_process_stats = true
|
||||
|
||||
## Should the JVM specific statistics be gathered
|
||||
collect_jvm_stats = true
|
||||
|
||||
## Should the event pipelines statistics be gathered
|
||||
collect_pipelines_stats = true
|
||||
|
||||
## Should the plugin statistics be gathered
|
||||
collect_plugins_stats = true
|
||||
|
||||
## Should the queue statistics be gathered
|
||||
collect_queue_stats = true
|
||||
|
||||
## HTTP method
|
||||
# method = "GET"
|
||||
|
||||
## Optional HTTP headers
|
||||
# headers = {"X-Special-Header" = "Special-Value"}
|
||||
|
||||
## Override HTTP "Host" header
|
||||
# host_header = "logstash.example.com"
|
||||
|
||||
## Timeout for HTTP requests
|
||||
timeout = "5s"
|
||||
|
||||
## Optional HTTP Basic Auth credentials
|
||||
# username = "username"
|
||||
# password = "pa$$word"
|
||||
|
||||
## Optional TLS Config
|
||||
# tls_ca = "/etc/telegraf/ca.pem"
|
||||
# tls_cert = "/etc/telegraf/cert.pem"
|
||||
# tls_key = "/etc/telegraf/key.pem"
|
||||
|
||||
## Use TLS but skip chain & host verification
|
||||
# insecure_skip_verify = false
|
||||
```
|
||||
|
||||
### Measurements & Fields:
|
||||
|
||||
- **logstash_jvm**
|
||||
* Fields:
|
||||
- threads_peak_count
|
||||
- mem_pools_survivor_peak_max_in_bytes
|
||||
- mem_pools_survivor_max_in_bytes
|
||||
- mem_pools_old_peak_used_in_bytes
|
||||
- mem_pools_young_used_in_bytes
|
||||
- mem_non_heap_committed_in_bytes
|
||||
- threads_count
|
||||
- mem_pools_old_committed_in_bytes
|
||||
- mem_pools_young_peak_max_in_bytes
|
||||
- mem_heap_used_percent
|
||||
- gc_collectors_young_collection_time_in_millis
|
||||
- mem_pools_survivor_peak_used_in_bytes
|
||||
- mem_pools_young_committed_in_bytes
|
||||
- gc_collectors_old_collection_time_in_millis
|
||||
- gc_collectors_old_collection_count
|
||||
- mem_pools_survivor_used_in_bytes
|
||||
- mem_pools_old_used_in_bytes
|
||||
- mem_pools_young_max_in_bytes
|
||||
- mem_heap_max_in_bytes
|
||||
- mem_non_heap_used_in_bytes
|
||||
- mem_pools_survivor_committed_in_bytes
|
||||
- mem_pools_old_max_in_bytes
|
||||
- mem_heap_committed_in_bytes
|
||||
- mem_pools_old_peak_max_in_bytes
|
||||
- mem_pools_young_peak_used_in_bytes
|
||||
- mem_heap_used_in_bytes
|
||||
- gc_collectors_young_collection_count
|
||||
- uptime_in_millis
|
||||
* Tags:
|
||||
- node_id
|
||||
- node_name
|
||||
- node_host
|
||||
- node_version
|
||||
|
||||
- **logstash_process**
|
||||
* Fields:
|
||||
- open_file_descriptors
|
||||
- cpu_load_average_1m
|
||||
- cpu_load_average_5m
|
||||
- cpu_load_average_15m
|
||||
- cpu_total_in_millis
|
||||
- cpu_percent
|
||||
- peak_open_file_descriptors
|
||||
- max_file_descriptors
|
||||
- mem_total_virtual_in_bytes
|
||||
- mem_total_virtual_in_bytes
|
||||
* Tags:
|
||||
- node_id
|
||||
- node_name
|
||||
- node_host
|
||||
- node_version
|
||||
|
||||
- **logstash_events**
|
||||
* Fields:
|
||||
- queue_push_duration_in_millis
|
||||
- duration_in_millis
|
||||
- in
|
||||
- filtered
|
||||
- out
|
||||
* Tags:
|
||||
- node_id
|
||||
- node_name
|
||||
- node_host
|
||||
- node_version
|
||||
- pipeline (for Logstash 6 only)
|
||||
|
||||
- **logstash_plugins**
|
||||
* Fields:
|
||||
- queue_push_duration_in_millis (for input plugins only)
|
||||
- duration_in_millis
|
||||
- in
|
||||
- out
|
||||
* Tags:
|
||||
- node_id
|
||||
- node_name
|
||||
- node_host
|
||||
- node_version
|
||||
- pipeline (for Logstash 6 only)
|
||||
- plugin_id
|
||||
- plugin_name
|
||||
- plugin_type
|
||||
|
||||
- **logstash_queue**
|
||||
* Fields:
|
||||
- events
|
||||
- free_space_in_bytes
|
||||
- max_queue_size_in_bytes
|
||||
- max_unread_events
|
||||
- page_capacity_in_bytes
|
||||
- queue_size_in_bytes
|
||||
* Tags:
|
||||
- node_id
|
||||
- node_name
|
||||
- node_host
|
||||
- node_version
|
||||
- pipeline (for Logstash 6 only)
|
||||
- queue_type
|
||||
|
||||
### Tags description
|
||||
|
||||
- node_id - The uuid of the logstash node. Randomly generated.
|
||||
- node_name - The name of the logstash node. Can be defined in the *logstash.yml* or defaults to the hostname.
|
||||
Can be used to break apart metrics from different logstash instances of the same host.
|
||||
- node_host - The hostname of the logstash node.
|
||||
Can be different from the telegraf's host if a remote connection to logstash instance is used.
|
||||
- node_version - The version of logstash service running on this node.
|
||||
- pipeline (for Logstash 6 only) - The name of a pipeline if multi-pipeline is configured.
|
||||
Will defaults to "main" if there is only one pipeline and will be missing for logstash 5.
|
||||
- plugin_id - The unique id of this plugin.
|
||||
It will be a randomly generated string unless it's defined in the logstash pipeline config file.
|
||||
- plugin_name - The name of this plugin. i.e. file, elasticsearch, date, mangle.
|
||||
- plugin_type - The type of this plugin i.e. input/filter/output.
|
||||
- queue_type - The type of the event queue (memory/persisted).
|
||||
|
||||
### Example Output:
|
||||
|
||||
```
|
||||
$ ./telegraf -config telegraf.conf -input-filter logstash -test
|
||||
|
||||
> logstash_jvm,host=node-6,node_host=node-6,node_id=3044f675-21ce-4335-898a-8408aa678245,node_name=node-6-test,node_version=6.4.2
|
||||
gc_collectors_old_collection_count=5,gc_collectors_old_collection_time_in_millis=702,gc_collectors_young_collection_count=95,gc_collectors_young_collection_time_in_millis=4772,mem_heap_committed_in_bytes=360804352,mem_heap_max_in_bytes=8389328896,mem_heap_used_in_bytes=252629768,mem_heap_used_percent=3,mem_non_heap_committed_in_bytes=212144128,mem_non_heap_used_in_bytes=188726024,mem_pools_old_committed_in_bytes=280260608,mem_pools_old_max_in_bytes=6583418880,mem_pools_old_peak_max_in_bytes=6583418880,mem_pools_old_peak_used_in_bytes=235352976,mem_pools_old_used_in_bytes=194754608,mem_pools_survivor_committed_in_bytes=8912896,mem_pools_survivor_max_in_bytes=200605696,mem_pools_survivor_peak_max_in_bytes=200605696,mem_pools_survivor_peak_used_in_bytes=8912896,mem_pools_survivor_used_in_bytes=4476680,mem_pools_young_committed_in_bytes=71630848,mem_pools_young_max_in_bytes=1605304320,mem_pools_young_peak_max_in_bytes=1605304320,mem_pools_young_peak_used_in_bytes=71630848,mem_pools_young_used_in_bytes=53398480,threads_count=60,threads_peak_count=62,uptime_in_millis=10469014 1540289864000000000
|
||||
> logstash_process,host=node-6,node_host=node-6,node_id=3044f675-21ce-4335-898a-8408aa678245,node_name=node-6-test,node_version=6.4.2
|
||||
cpu_load_average_15m=39.84,cpu_load_average_1m=32.87,cpu_load_average_5m=39.23,cpu_percent=0,cpu_total_in_millis=389920,max_file_descriptors=262144,mem_total_virtual_in_bytes=17921355776,open_file_descriptors=132,peak_open_file_descriptors=140 1540289864000000000
|
||||
> logstash_events,host=node-6,node_host=node-6,node_id=3044f675-21ce-4335-898a-8408aa678245,node_name=node-6-test,node_version=6.4.2,pipeline=main
|
||||
duration_in_millis=175144,filtered=4543,in=4543,out=4543,queue_push_duration_in_millis=19 1540289864000000000
|
||||
> logstash_plugins,host=node-6,node_host=node-6,node_id=3044f675-21ce-4335-898a-8408aa678245,node_name=node-6-test,node_version=6.4.2,pipeline=main,plugin_id=input-kafka,plugin_name=kafka,plugin_type=input
|
||||
duration_in_millis=0,in=0,out=4543,queue_push_duration_in_millis=19 1540289864000000000
|
||||
```
|
|
@ -0,0 +1,506 @@
|
|||
package logstash
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/internal/tls"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
jsonParser "github.com/influxdata/telegraf/plugins/parsers/json"
|
||||
)
|
||||
|
||||
// ##### Interface #####
|
||||
|
||||
const sampleConfig = `
|
||||
## This plugin reads metrics exposed by Logstash Monitoring API.
|
||||
## https://www.elastic.co/guide/en/logstash/current/monitoring.html
|
||||
|
||||
## The URL of the exposed Logstash API endpoint
|
||||
url = "http://127.0.0.1:9600"
|
||||
|
||||
## Enable Logstash 6+ multi-pipeline statistics support
|
||||
multi_pipeline = true
|
||||
|
||||
## Should the general process statistics be gathered
|
||||
collect_process_stats = true
|
||||
|
||||
## Should the JVM specific statistics be gathered
|
||||
collect_jvm_stats = true
|
||||
|
||||
## Should the event pipelines statistics be gathered
|
||||
collect_pipelines_stats = true
|
||||
|
||||
## Should the plugin statistics be gathered
|
||||
collect_plugins_stats = true
|
||||
|
||||
## Should the queue statistics be gathered
|
||||
collect_queue_stats = true
|
||||
|
||||
## HTTP method
|
||||
# method = "GET"
|
||||
|
||||
## Optional HTTP headers
|
||||
# headers = {"X-Special-Header" = "Special-Value"}
|
||||
|
||||
## Override HTTP "Host" header
|
||||
# host_header = "logstash.example.com"
|
||||
|
||||
## Timeout for HTTP requests
|
||||
timeout = "5s"
|
||||
|
||||
## Optional HTTP Basic Auth credentials
|
||||
# username = "username"
|
||||
# password = "pa$$word"
|
||||
|
||||
## Optional TLS Config
|
||||
# tls_ca = "/etc/telegraf/ca.pem"
|
||||
# tls_cert = "/etc/telegraf/cert.pem"
|
||||
# tls_key = "/etc/telegraf/key.pem"
|
||||
|
||||
## Use TLS but skip chain & host verification
|
||||
# insecure_skip_verify = false
|
||||
`
|
||||
|
||||
type Logstash struct {
|
||||
URL string `toml:"url"`
|
||||
|
||||
MultiPipeline bool `toml:"multi_pipeline"`
|
||||
CollectProcessStats bool `toml:"collect_process_stats"`
|
||||
CollectJVMStats bool `toml:"collect_jvm_stats"`
|
||||
CollectPipelinesStats bool `toml:"collect_pipelines_stats"`
|
||||
CollectPluginsStats bool `toml:"collect_plugins_stats"`
|
||||
CollectQueueStats bool `toml:"collect_queue_stats"`
|
||||
|
||||
Username string `toml:"username"`
|
||||
Password string `toml:"password"`
|
||||
Method string `toml:"method"`
|
||||
Headers map[string]string `toml:"headers"`
|
||||
HostHeader string `toml:"host_header"`
|
||||
Timeout internal.Duration `toml:"timeout"`
|
||||
|
||||
tls.ClientConfig
|
||||
client *http.Client
|
||||
}
|
||||
|
||||
// NewLogstash create an instance of the plugin with default settings
|
||||
func NewLogstash() *Logstash {
|
||||
return &Logstash{
|
||||
URL: "http://127.0.0.1:9600",
|
||||
MultiPipeline: true,
|
||||
CollectProcessStats: true,
|
||||
CollectJVMStats: true,
|
||||
CollectPipelinesStats: true,
|
||||
CollectPluginsStats: true,
|
||||
CollectQueueStats: true,
|
||||
Method: "GET",
|
||||
Headers: make(map[string]string),
|
||||
HostHeader: "",
|
||||
Timeout: internal.Duration{Duration: time.Second * 5},
|
||||
}
|
||||
}
|
||||
|
||||
// init initialise this plugin instance
|
||||
func init() {
|
||||
inputs.Add("logstash", func() telegraf.Input {
|
||||
return NewLogstash()
|
||||
})
|
||||
}
|
||||
|
||||
// Description returns short info about plugin
|
||||
func (logstash *Logstash) Description() string {
|
||||
return "Read metrics exposed by Logstash"
|
||||
}
|
||||
|
||||
// SampleConfig returns details how to configure plugin
|
||||
func (logstash *Logstash) SampleConfig() string {
|
||||
return sampleConfig
|
||||
}
|
||||
|
||||
type ProcessStats struct {
|
||||
ID string `json:"id"`
|
||||
Process interface{} `json:"process"`
|
||||
Name string `json:"name"`
|
||||
Host string `json:"host"`
|
||||
Version string `json:"version"`
|
||||
}
|
||||
|
||||
type JVMStats struct {
|
||||
ID string `json:"id"`
|
||||
JVM interface{} `json:"jvm"`
|
||||
Name string `json:"name"`
|
||||
Host string `json:"host"`
|
||||
Version string `json:"version"`
|
||||
}
|
||||
|
||||
type PipelinesStats struct {
|
||||
ID string `json:"id"`
|
||||
Pipelines map[string]Pipeline `json:"pipelines"`
|
||||
Name string `json:"name"`
|
||||
Host string `json:"host"`
|
||||
Version string `json:"version"`
|
||||
}
|
||||
|
||||
type PipelineStats struct {
|
||||
ID string `json:"id"`
|
||||
Pipeline Pipeline `json:"pipeline"`
|
||||
Name string `json:"name"`
|
||||
Host string `json:"host"`
|
||||
Version string `json:"version"`
|
||||
}
|
||||
|
||||
type Pipeline struct {
|
||||
Events interface{} `json:"events"`
|
||||
Plugins PipelinePlugins `json:"plugins"`
|
||||
Reloads interface{} `json:"reloads"`
|
||||
Queue PipelineQueue `json:"queue"`
|
||||
}
|
||||
|
||||
type Plugin struct {
|
||||
ID string `json:"id"`
|
||||
Events interface{} `json:"events"`
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
type PipelinePlugins struct {
|
||||
Inputs []Plugin `json:"inputs"`
|
||||
Filters []Plugin `json:"filters"`
|
||||
Outputs []Plugin `json:"outputs"`
|
||||
}
|
||||
|
||||
type PipelineQueue struct {
|
||||
Events float64 `json:"events"`
|
||||
Type string `json:"type"`
|
||||
Capacity interface{} `json:"capacity"`
|
||||
Data interface{} `json:"data"`
|
||||
}
|
||||
|
||||
const jvmStats = "/_node/stats/jvm"
|
||||
const processStats = "/_node/stats/process"
|
||||
const pipelinesStats = "/_node/stats/pipelines"
|
||||
const pipelineStats = "/_node/stats/pipeline"
|
||||
|
||||
// createHttpClient create a clients to access API
|
||||
func (logstash *Logstash) createHttpClient() (*http.Client, error) {
|
||||
tlsConfig, err := logstash.ClientConfig.TLSConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
client := &http.Client{
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: tlsConfig,
|
||||
},
|
||||
Timeout: logstash.Timeout.Duration,
|
||||
}
|
||||
|
||||
return client, nil
|
||||
}
|
||||
|
||||
// gatherJsonData query the data source and parse the response JSON
|
||||
func (logstash *Logstash) gatherJsonData(url string, value interface{}) error {
|
||||
|
||||
var method string
|
||||
if logstash.Method != "" {
|
||||
method = logstash.Method
|
||||
} else {
|
||||
method = "GET"
|
||||
}
|
||||
|
||||
request, err := http.NewRequest(method, url, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if (logstash.Username != "") || (logstash.Password != "") {
|
||||
request.SetBasicAuth(logstash.Username, logstash.Password)
|
||||
}
|
||||
for header, value := range logstash.Headers {
|
||||
request.Header.Add(header, value)
|
||||
}
|
||||
if logstash.HostHeader != "" {
|
||||
request.Host = logstash.HostHeader
|
||||
}
|
||||
|
||||
response, err := logstash.client.Do(request)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer response.Body.Close()
|
||||
|
||||
err = json.NewDecoder(response.Body).Decode(value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// gatherJVMStats gather the JVM metrics and add results to the accumulator
|
||||
func (logstash *Logstash) gatherJVMStats(url string, accumulator telegraf.Accumulator) error {
|
||||
jvmStats := &JVMStats{}
|
||||
|
||||
err := logstash.gatherJsonData(url, jvmStats)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tags := map[string]string{
|
||||
"node_id": jvmStats.ID,
|
||||
"node_name": jvmStats.Name,
|
||||
"node_host": jvmStats.Host,
|
||||
"node_version": jvmStats.Version,
|
||||
}
|
||||
|
||||
flattener := jsonParser.JSONFlattener{}
|
||||
err = flattener.FlattenJSON("", jvmStats.JVM)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
accumulator.AddFields("logstash_jvm", flattener.Fields, tags)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// gatherJVMStats gather the Process metrics and add results to the accumulator
|
||||
func (logstash *Logstash) gatherProcessStats(url string, accumulator telegraf.Accumulator) error {
|
||||
processStats := &ProcessStats{}
|
||||
|
||||
err := logstash.gatherJsonData(url, processStats)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tags := map[string]string{
|
||||
"node_id": processStats.ID,
|
||||
"node_name": processStats.Name,
|
||||
"node_host": processStats.Host,
|
||||
"node_version": processStats.Version,
|
||||
}
|
||||
|
||||
flattener := jsonParser.JSONFlattener{}
|
||||
err = flattener.FlattenJSON("", processStats.Process)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
accumulator.AddFields("logstash_process", flattener.Fields, tags)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// gatherPluginsStats go through a list of plugins and add their metrics to the accumulator
|
||||
func (logstash *Logstash) gatherPluginsStats(
|
||||
plugins []Plugin,
|
||||
pluginType string,
|
||||
tags map[string]string,
|
||||
accumulator telegraf.Accumulator) error {
|
||||
|
||||
for _, plugin := range plugins {
|
||||
pluginTags := map[string]string{
|
||||
"plugin_name": plugin.Name,
|
||||
"plugin_id": plugin.ID,
|
||||
"plugin_type": pluginType,
|
||||
}
|
||||
for tag, value := range tags {
|
||||
pluginTags[tag] = value
|
||||
}
|
||||
flattener := jsonParser.JSONFlattener{}
|
||||
err := flattener.FlattenJSON("", plugin.Events)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
accumulator.AddFields("logstash_plugins", flattener.Fields, pluginTags)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (logstash *Logstash) gatherQueueStats(
|
||||
queue *PipelineQueue,
|
||||
tags map[string]string,
|
||||
accumulator telegraf.Accumulator) error {
|
||||
|
||||
var err error
|
||||
queueTags := map[string]string{
|
||||
"queue_type": queue.Type,
|
||||
}
|
||||
for tag, value := range tags {
|
||||
queueTags[tag] = value
|
||||
}
|
||||
|
||||
queueFields := map[string]interface{}{
|
||||
"events": queue.Events,
|
||||
}
|
||||
|
||||
if queue.Type != "memory" {
|
||||
flattener := jsonParser.JSONFlattener{}
|
||||
err = flattener.FlattenJSON("", queue.Capacity)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = flattener.FlattenJSON("", queue.Data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for field, value := range flattener.Fields {
|
||||
queueFields[field] = value
|
||||
}
|
||||
}
|
||||
|
||||
accumulator.AddFields("logstash_queue", queueFields, queueTags)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// gatherJVMStats gather the Pipeline metrics and add results to the accumulator (for Logstash < 6)
|
||||
func (logstash *Logstash) gatherPipelineStats(url string, accumulator telegraf.Accumulator) error {
|
||||
pipelineStats := &PipelineStats{}
|
||||
|
||||
err := logstash.gatherJsonData(url, pipelineStats)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tags := map[string]string{
|
||||
"node_id": pipelineStats.ID,
|
||||
"node_name": pipelineStats.Name,
|
||||
"node_host": pipelineStats.Host,
|
||||
"node_version": pipelineStats.Version,
|
||||
}
|
||||
|
||||
flattener := jsonParser.JSONFlattener{}
|
||||
err = flattener.FlattenJSON("", pipelineStats.Pipeline.Events)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
accumulator.AddFields("logstash_events", flattener.Fields, tags)
|
||||
|
||||
if logstash.CollectPluginsStats {
|
||||
err = logstash.gatherPluginsStats(pipelineStats.Pipeline.Plugins.Inputs, "input", tags, accumulator)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = logstash.gatherPluginsStats(pipelineStats.Pipeline.Plugins.Filters, "filter", tags, accumulator)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = logstash.gatherPluginsStats(pipelineStats.Pipeline.Plugins.Outputs, "output", tags, accumulator)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if logstash.CollectQueueStats {
|
||||
err = logstash.gatherQueueStats(&pipelineStats.Pipeline.Queue, tags, accumulator)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// gatherJVMStats gather the Pipelines metrics and add results to the accumulator (for Logstash >= 6)
|
||||
func (logstash *Logstash) gatherPipelinesStats(url string, accumulator telegraf.Accumulator) error {
|
||||
pipelinesStats := &PipelinesStats{}
|
||||
|
||||
err := logstash.gatherJsonData(url, pipelinesStats)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for pipelineName, pipeline := range pipelinesStats.Pipelines {
|
||||
tags := map[string]string{
|
||||
"node_id": pipelinesStats.ID,
|
||||
"node_name": pipelinesStats.Name,
|
||||
"node_host": pipelinesStats.Host,
|
||||
"node_version": pipelinesStats.Version,
|
||||
"pipeline": pipelineName,
|
||||
}
|
||||
|
||||
flattener := jsonParser.JSONFlattener{}
|
||||
err := flattener.FlattenJSON("", pipeline.Events)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
accumulator.AddFields("logstash_events", flattener.Fields, tags)
|
||||
|
||||
if logstash.CollectPluginsStats {
|
||||
err = logstash.gatherPluginsStats(pipeline.Plugins.Inputs, "input", tags, accumulator)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = logstash.gatherPluginsStats(pipeline.Plugins.Filters, "filter", tags, accumulator)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = logstash.gatherPluginsStats(pipeline.Plugins.Outputs, "output", tags, accumulator)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if logstash.CollectQueueStats {
|
||||
err = logstash.gatherQueueStats(&pipeline.Queue, tags, accumulator)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Gather ask this plugin to start gathering metrics
|
||||
func (logstash *Logstash) Gather(accumulator telegraf.Accumulator) error {
|
||||
|
||||
if logstash.client == nil {
|
||||
client, err := logstash.createHttpClient()
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
logstash.client = client
|
||||
}
|
||||
|
||||
if logstash.CollectJVMStats {
|
||||
jvmUrl, err := url.Parse(logstash.URL + jvmStats)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := logstash.gatherJVMStats(jvmUrl.String(), accumulator); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if logstash.CollectProcessStats {
|
||||
processUrl, err := url.Parse(logstash.URL + processStats)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := logstash.gatherProcessStats(processUrl.String(), accumulator); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if logstash.CollectPipelinesStats {
|
||||
if logstash.MultiPipeline {
|
||||
pipelinesUrl, err := url.Parse(logstash.URL + pipelinesStats)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := logstash.gatherPipelinesStats(pipelinesUrl.String(), accumulator); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
pipelineUrl, err := url.Parse(logstash.URL + pipelineStats)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := logstash.gatherPipelineStats(pipelineUrl.String(), accumulator); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,726 @@
|
|||
package logstash
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
var logstashTest = NewLogstash()
|
||||
|
||||
var (
|
||||
logstash5accPipelineStats testutil.Accumulator
|
||||
logstash6accPipelinesStats testutil.Accumulator
|
||||
logstash5accProcessStats testutil.Accumulator
|
||||
logstash6accProcessStats testutil.Accumulator
|
||||
logstash5accJVMStats testutil.Accumulator
|
||||
logstash6accJVMStats testutil.Accumulator
|
||||
)
|
||||
|
||||
func Test_Logstash5GatherProcessStats(test *testing.T) {
|
||||
fakeServer := httptest.NewUnstartedServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
|
||||
writer.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprintf(writer, "%s", string(logstash5ProcessJSON))
|
||||
}))
|
||||
requestURL, err := url.Parse(logstashTest.URL)
|
||||
if err != nil {
|
||||
test.Logf("Can't connect to: %s", logstashTest.URL)
|
||||
}
|
||||
fakeServer.Listener, _ = net.Listen("tcp", fmt.Sprintf("%s:%s", requestURL.Hostname(), requestURL.Port()))
|
||||
fakeServer.Start()
|
||||
defer fakeServer.Close()
|
||||
|
||||
if logstashTest.client == nil {
|
||||
client, err := logstashTest.createHttpClient()
|
||||
|
||||
if err != nil {
|
||||
test.Logf("Can't createHttpClient")
|
||||
}
|
||||
logstashTest.client = client
|
||||
}
|
||||
|
||||
if err := logstashTest.gatherProcessStats(logstashTest.URL+processStats, &logstash5accProcessStats); err != nil {
|
||||
test.Logf("Can't gather Process stats")
|
||||
}
|
||||
|
||||
logstash5accProcessStats.AssertContainsTaggedFields(
|
||||
test,
|
||||
"logstash_process",
|
||||
map[string]interface{}{
|
||||
"open_file_descriptors": float64(89.0),
|
||||
"max_file_descriptors": float64(1.048576e+06),
|
||||
"cpu_percent": float64(3.0),
|
||||
"cpu_load_average_5m": float64(0.61),
|
||||
"cpu_load_average_15m": float64(0.54),
|
||||
"mem_total_virtual_in_bytes": float64(4.809506816e+09),
|
||||
"cpu_total_in_millis": float64(1.5526e+11),
|
||||
"cpu_load_average_1m": float64(0.49),
|
||||
"peak_open_file_descriptors": float64(100.0),
|
||||
},
|
||||
map[string]string{
|
||||
"node_id": string("a360d8cf-6289-429d-8419-6145e324b574"),
|
||||
"node_name": string("node-5-test"),
|
||||
"node_host": string("node-5"),
|
||||
"node_version": string("5.3.0"),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
func Test_Logstash6GatherProcessStats(test *testing.T) {
|
||||
fakeServer := httptest.NewUnstartedServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
|
||||
writer.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprintf(writer, "%s", string(logstash6ProcessJSON))
|
||||
}))
|
||||
requestURL, err := url.Parse(logstashTest.URL)
|
||||
if err != nil {
|
||||
test.Logf("Can't connect to: %s", logstashTest.URL)
|
||||
}
|
||||
fakeServer.Listener, _ = net.Listen("tcp", fmt.Sprintf("%s:%s", requestURL.Hostname(), requestURL.Port()))
|
||||
fakeServer.Start()
|
||||
defer fakeServer.Close()
|
||||
|
||||
if logstashTest.client == nil {
|
||||
client, err := logstashTest.createHttpClient()
|
||||
|
||||
if err != nil {
|
||||
test.Logf("Can't createHttpClient")
|
||||
}
|
||||
logstashTest.client = client
|
||||
}
|
||||
|
||||
if err := logstashTest.gatherProcessStats(logstashTest.URL+processStats, &logstash6accProcessStats); err != nil {
|
||||
test.Logf("Can't gather Process stats")
|
||||
}
|
||||
|
||||
logstash6accProcessStats.AssertContainsTaggedFields(
|
||||
test,
|
||||
"logstash_process",
|
||||
map[string]interface{}{
|
||||
"open_file_descriptors": float64(133.0),
|
||||
"max_file_descriptors": float64(262144.0),
|
||||
"cpu_percent": float64(0.0),
|
||||
"cpu_load_average_5m": float64(42.4),
|
||||
"cpu_load_average_15m": float64(38.95),
|
||||
"mem_total_virtual_in_bytes": float64(17923452928.0),
|
||||
"cpu_total_in_millis": float64(5841460),
|
||||
"cpu_load_average_1m": float64(48.2),
|
||||
"peak_open_file_descriptors": float64(145.0),
|
||||
},
|
||||
map[string]string{
|
||||
"node_id": string("3044f675-21ce-4335-898a-8408aa678245"),
|
||||
"node_name": string("node-6-test"),
|
||||
"node_host": string("node-6"),
|
||||
"node_version": string("6.4.2"),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
func Test_Logstash5GatherPipelineStats(test *testing.T) {
|
||||
//logstash5accPipelineStats.SetDebug(true)
|
||||
fakeServer := httptest.NewUnstartedServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
|
||||
writer.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprintf(writer, "%s", string(logstash5PipelineJSON))
|
||||
}))
|
||||
requestURL, err := url.Parse(logstashTest.URL)
|
||||
if err != nil {
|
||||
test.Logf("Can't connect to: %s", logstashTest.URL)
|
||||
}
|
||||
fakeServer.Listener, _ = net.Listen("tcp", fmt.Sprintf("%s:%s", requestURL.Hostname(), requestURL.Port()))
|
||||
fakeServer.Start()
|
||||
defer fakeServer.Close()
|
||||
|
||||
if logstashTest.client == nil {
|
||||
client, err := logstashTest.createHttpClient()
|
||||
|
||||
if err != nil {
|
||||
test.Logf("Can't createHttpClient")
|
||||
}
|
||||
logstashTest.client = client
|
||||
}
|
||||
|
||||
if err := logstashTest.gatherPipelineStats(logstashTest.URL+pipelineStats, &logstash5accPipelineStats); err != nil {
|
||||
test.Logf("Can't gather Pipeline stats")
|
||||
}
|
||||
|
||||
logstash5accPipelineStats.AssertContainsTaggedFields(
|
||||
test,
|
||||
"logstash_events",
|
||||
map[string]interface{}{
|
||||
"duration_in_millis": float64(1151.0),
|
||||
"in": float64(1269.0),
|
||||
"filtered": float64(1269.0),
|
||||
"out": float64(1269.0),
|
||||
},
|
||||
map[string]string{
|
||||
"node_id": string("a360d8cf-6289-429d-8419-6145e324b574"),
|
||||
"node_name": string("node-5-test"),
|
||||
"node_host": string("node-5"),
|
||||
"node_version": string("5.3.0"),
|
||||
},
|
||||
)
|
||||
|
||||
fields := make(map[string]interface{})
|
||||
fields["queue_push_duration_in_millis"] = float64(32.0)
|
||||
fields["out"] = float64(2.0)
|
||||
|
||||
logstash5accPipelineStats.AssertContainsTaggedFields(
|
||||
test,
|
||||
"logstash_plugins",
|
||||
fields,
|
||||
map[string]string{
|
||||
"node_id": string("a360d8cf-6289-429d-8419-6145e324b574"),
|
||||
"node_name": string("node-5-test"),
|
||||
"node_host": string("node-5"),
|
||||
"node_version": string("5.3.0"),
|
||||
"plugin_name": string("beats"),
|
||||
"plugin_id": string("a35197a509596954e905e38521bae12b1498b17d-1"),
|
||||
"plugin_type": string("input"),
|
||||
},
|
||||
)
|
||||
|
||||
logstash5accPipelineStats.AssertContainsTaggedFields(
|
||||
test,
|
||||
"logstash_plugins",
|
||||
map[string]interface{}{
|
||||
"duration_in_millis": float64(360.0),
|
||||
"in": float64(1269.0),
|
||||
"out": float64(1269.0),
|
||||
},
|
||||
map[string]string{
|
||||
"node_id": string("a360d8cf-6289-429d-8419-6145e324b574"),
|
||||
"node_name": string("node-5-test"),
|
||||
"node_host": string("node-5"),
|
||||
"node_version": string("5.3.0"),
|
||||
"plugin_name": string("stdout"),
|
||||
"plugin_id": string("582d5c2becb582a053e1e9a6bcc11d49b69a6dfd-2"),
|
||||
"plugin_type": string("output"),
|
||||
},
|
||||
)
|
||||
|
||||
logstash5accPipelineStats.AssertContainsTaggedFields(
|
||||
test,
|
||||
"logstash_plugins",
|
||||
map[string]interface{}{
|
||||
"duration_in_millis": float64(228.0),
|
||||
"in": float64(1269.0),
|
||||
"out": float64(1269.0),
|
||||
},
|
||||
map[string]string{
|
||||
"node_id": string("a360d8cf-6289-429d-8419-6145e324b574"),
|
||||
"node_name": string("node-5-test"),
|
||||
"node_host": string("node-5"),
|
||||
"node_version": string("5.3.0"),
|
||||
"plugin_name": string("s3"),
|
||||
"plugin_id": string("582d5c2becb582a053e1e9a6bcc11d49b69a6dfd-3"),
|
||||
"plugin_type": string("output"),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
func Test_Logstash6GatherPipelinesStats(test *testing.T) {
|
||||
//logstash6accPipelinesStats.SetDebug(true)
|
||||
fakeServer := httptest.NewUnstartedServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
|
||||
writer.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprintf(writer, "%s", string(logstash6PipelinesJSON))
|
||||
}))
|
||||
requestURL, err := url.Parse(logstashTest.URL)
|
||||
if err != nil {
|
||||
test.Logf("Can't connect to: %s", logstashTest.URL)
|
||||
}
|
||||
fakeServer.Listener, _ = net.Listen("tcp", fmt.Sprintf("%s:%s", requestURL.Hostname(), requestURL.Port()))
|
||||
fakeServer.Start()
|
||||
defer fakeServer.Close()
|
||||
|
||||
if logstashTest.client == nil {
|
||||
client, err := logstashTest.createHttpClient()
|
||||
|
||||
if err != nil {
|
||||
test.Logf("Can't createHttpClient")
|
||||
}
|
||||
logstashTest.client = client
|
||||
}
|
||||
|
||||
if err := logstashTest.gatherPipelinesStats(logstashTest.URL+pipelineStats, &logstash6accPipelinesStats); err != nil {
|
||||
test.Logf("Can't gather Pipeline stats")
|
||||
}
|
||||
|
||||
fields := make(map[string]interface{})
|
||||
fields["duration_in_millis"] = float64(8540751.0)
|
||||
fields["queue_push_duration_in_millis"] = float64(366.0)
|
||||
fields["in"] = float64(180659.0)
|
||||
fields["filtered"] = float64(180659.0)
|
||||
fields["out"] = float64(180659.0)
|
||||
|
||||
logstash6accPipelinesStats.AssertContainsTaggedFields(
|
||||
test,
|
||||
"logstash_events",
|
||||
fields,
|
||||
map[string]string{
|
||||
"node_id": string("3044f675-21ce-4335-898a-8408aa678245"),
|
||||
"node_name": string("node-6-test"),
|
||||
"node_host": string("node-6"),
|
||||
"node_version": string("6.4.2"),
|
||||
"pipeline": string("main"),
|
||||
},
|
||||
)
|
||||
|
||||
fields = make(map[string]interface{})
|
||||
fields["queue_push_duration_in_millis"] = float64(366.0)
|
||||
fields["out"] = float64(180659.0)
|
||||
|
||||
logstash6accPipelinesStats.AssertContainsTaggedFields(
|
||||
test,
|
||||
"logstash_plugins",
|
||||
fields,
|
||||
map[string]string{
|
||||
"node_id": string("3044f675-21ce-4335-898a-8408aa678245"),
|
||||
"node_name": string("node-6-test"),
|
||||
"node_host": string("node-6"),
|
||||
"node_version": string("6.4.2"),
|
||||
"pipeline": string("main"),
|
||||
"plugin_name": string("kafka"),
|
||||
"plugin_id": string("input-kafka"),
|
||||
"plugin_type": string("input"),
|
||||
},
|
||||
)
|
||||
|
||||
logstash6accPipelinesStats.AssertContainsTaggedFields(
|
||||
test,
|
||||
"logstash_plugins",
|
||||
map[string]interface{}{
|
||||
"duration_in_millis": float64(2117.0),
|
||||
"in": float64(27641.0),
|
||||
"out": float64(27641.0),
|
||||
},
|
||||
map[string]string{
|
||||
"node_id": string("3044f675-21ce-4335-898a-8408aa678245"),
|
||||
"node_name": string("node-6-test"),
|
||||
"node_host": string("node-6"),
|
||||
"node_version": string("6.4.2"),
|
||||
"pipeline": string("main"),
|
||||
"plugin_name": string("mutate"),
|
||||
"plugin_id": string("155b0ad18abbf3df1e0cb7bddef0d77c5ba699efe5a0f8a28502d140549baf54"),
|
||||
"plugin_type": string("filter"),
|
||||
},
|
||||
)
|
||||
|
||||
logstash6accPipelinesStats.AssertContainsTaggedFields(
|
||||
test,
|
||||
"logstash_plugins",
|
||||
map[string]interface{}{
|
||||
"duration_in_millis": float64(2117.0),
|
||||
"in": float64(27641.0),
|
||||
"out": float64(27641.0),
|
||||
},
|
||||
map[string]string{
|
||||
"node_id": string("3044f675-21ce-4335-898a-8408aa678245"),
|
||||
"node_name": string("node-6-test"),
|
||||
"node_host": string("node-6"),
|
||||
"node_version": string("6.4.2"),
|
||||
"pipeline": string("main"),
|
||||
"plugin_name": string("mutate"),
|
||||
"plugin_id": string("155b0ad18abbf3df1e0cb7bddef0d77c5ba699efe5a0f8a28502d140549baf54"),
|
||||
"plugin_type": string("filter"),
|
||||
},
|
||||
)
|
||||
|
||||
logstash6accPipelinesStats.AssertContainsTaggedFields(
|
||||
test,
|
||||
"logstash_plugins",
|
||||
map[string]interface{}{
|
||||
"duration_in_millis": float64(13149.0),
|
||||
"in": float64(180659.0),
|
||||
"out": float64(177549.0),
|
||||
},
|
||||
map[string]string{
|
||||
"node_id": string("3044f675-21ce-4335-898a-8408aa678245"),
|
||||
"node_name": string("node-6-test"),
|
||||
"node_host": string("node-6"),
|
||||
"node_version": string("6.4.2"),
|
||||
"pipeline": string("main"),
|
||||
"plugin_name": string("date"),
|
||||
"plugin_id": string("d079424bb6b7b8c7c61d9c5e0ddae445e92fa9ffa2e8690b0a669f7c690542f0"),
|
||||
"plugin_type": string("filter"),
|
||||
},
|
||||
)
|
||||
|
||||
logstash6accPipelinesStats.AssertContainsTaggedFields(
|
||||
test,
|
||||
"logstash_plugins",
|
||||
map[string]interface{}{
|
||||
"duration_in_millis": float64(2814.0),
|
||||
"in": float64(76602.0),
|
||||
"out": float64(76602.0),
|
||||
},
|
||||
map[string]string{
|
||||
"node_id": string("3044f675-21ce-4335-898a-8408aa678245"),
|
||||
"node_name": string("node-6-test"),
|
||||
"node_host": string("node-6"),
|
||||
"node_version": string("6.4.2"),
|
||||
"pipeline": string("main"),
|
||||
"plugin_name": string("mutate"),
|
||||
"plugin_id": string("25afa60ab6dc30512fe80efa3493e4928b5b1b109765b7dc46a3e4bbf293d2d4"),
|
||||
"plugin_type": string("filter"),
|
||||
},
|
||||
)
|
||||
|
||||
logstash6accPipelinesStats.AssertContainsTaggedFields(
|
||||
test,
|
||||
"logstash_plugins",
|
||||
map[string]interface{}{
|
||||
"duration_in_millis": float64(9.0),
|
||||
"in": float64(934.0),
|
||||
"out": float64(934.0),
|
||||
},
|
||||
map[string]string{
|
||||
"node_id": string("3044f675-21ce-4335-898a-8408aa678245"),
|
||||
"node_name": string("node-6-test"),
|
||||
"node_host": string("node-6"),
|
||||
"node_version": string("6.4.2"),
|
||||
"pipeline": string("main"),
|
||||
"plugin_name": string("mutate"),
|
||||
"plugin_id": string("2d9fa8f74eeb137bfa703b8050bad7d76636fface729e4585b789b5fc9bed668"),
|
||||
"plugin_type": string("filter"),
|
||||
},
|
||||
)
|
||||
|
||||
logstash6accPipelinesStats.AssertContainsTaggedFields(
|
||||
test,
|
||||
"logstash_plugins",
|
||||
map[string]interface{}{
|
||||
"duration_in_millis": float64(173.0),
|
||||
"in": float64(3110.0),
|
||||
"out": float64(0.0),
|
||||
},
|
||||
map[string]string{
|
||||
"node_id": string("3044f675-21ce-4335-898a-8408aa678245"),
|
||||
"node_name": string("node-6-test"),
|
||||
"node_host": string("node-6"),
|
||||
"node_version": string("6.4.2"),
|
||||
"pipeline": string("main"),
|
||||
"plugin_name": string("drop"),
|
||||
"plugin_id": string("4ed14c9ef0198afe16c31200041e98d321cb5c2e6027e30b077636b8c4842110"),
|
||||
"plugin_type": string("filter"),
|
||||
},
|
||||
)
|
||||
|
||||
logstash6accPipelinesStats.AssertContainsTaggedFields(
|
||||
test,
|
||||
"logstash_plugins",
|
||||
map[string]interface{}{
|
||||
"duration_in_millis": float64(5605.0),
|
||||
"in": float64(75482.0),
|
||||
"out": float64(75482.0),
|
||||
},
|
||||
map[string]string{
|
||||
"node_id": string("3044f675-21ce-4335-898a-8408aa678245"),
|
||||
"node_name": string("node-6-test"),
|
||||
"node_host": string("node-6"),
|
||||
"node_version": string("6.4.2"),
|
||||
"pipeline": string("main"),
|
||||
"plugin_name": string("mutate"),
|
||||
"plugin_id": string("358ce1eb387de7cd5711c2fb4de64cd3b12e5ca9a4c45f529516bcb053a31df4"),
|
||||
"plugin_type": string("filter"),
|
||||
},
|
||||
)
|
||||
|
||||
logstash6accPipelinesStats.AssertContainsTaggedFields(
|
||||
test,
|
||||
"logstash_plugins",
|
||||
map[string]interface{}{
|
||||
"duration_in_millis": float64(313992.0),
|
||||
"in": float64(180659.0),
|
||||
"out": float64(180659.0),
|
||||
},
|
||||
map[string]string{
|
||||
"node_id": string("3044f675-21ce-4335-898a-8408aa678245"),
|
||||
"node_name": string("node-6-test"),
|
||||
"node_host": string("node-6"),
|
||||
"node_version": string("6.4.2"),
|
||||
"pipeline": string("main"),
|
||||
"plugin_name": string("csv"),
|
||||
"plugin_id": string("82a9bbb02fff37a63c257c1f146b0a36273c7cbbebe83c0a51f086e5280bf7bb"),
|
||||
"plugin_type": string("filter"),
|
||||
},
|
||||
)
|
||||
|
||||
logstash6accPipelinesStats.AssertContainsTaggedFields(
|
||||
test,
|
||||
"logstash_plugins",
|
||||
map[string]interface{}{
|
||||
"duration_in_millis": float64(0.0),
|
||||
"in": float64(0.0),
|
||||
"out": float64(0.0),
|
||||
},
|
||||
map[string]string{
|
||||
"node_id": string("3044f675-21ce-4335-898a-8408aa678245"),
|
||||
"node_name": string("node-6-test"),
|
||||
"node_host": string("node-6"),
|
||||
"node_version": string("6.4.2"),
|
||||
"pipeline": string("main"),
|
||||
"plugin_name": string("mutate"),
|
||||
"plugin_id": string("8fb13a8cdd4257b52724d326aa1549603ffdd4e4fde6d20720c96b16238c18c3"),
|
||||
"plugin_type": string("filter"),
|
||||
},
|
||||
)
|
||||
|
||||
logstash6accPipelinesStats.AssertContainsTaggedFields(
|
||||
test,
|
||||
"logstash_plugins",
|
||||
map[string]interface{}{
|
||||
"duration_in_millis": float64(651386.0),
|
||||
"in": float64(177549.0),
|
||||
"out": float64(177549.0),
|
||||
},
|
||||
map[string]string{
|
||||
"node_id": string("3044f675-21ce-4335-898a-8408aa678245"),
|
||||
"node_name": string("node-6-test"),
|
||||
"node_host": string("node-6"),
|
||||
"node_version": string("6.4.2"),
|
||||
"pipeline": string("main"),
|
||||
"plugin_name": string("elasticsearch"),
|
||||
"plugin_id": string("output-elk"),
|
||||
"plugin_type": string("output"),
|
||||
},
|
||||
)
|
||||
|
||||
logstash6accPipelinesStats.AssertContainsTaggedFields(
|
||||
test,
|
||||
"logstash_plugins",
|
||||
map[string]interface{}{
|
||||
"duration_in_millis": float64(186751.0),
|
||||
"in": float64(177549.0),
|
||||
"out": float64(177549.0),
|
||||
},
|
||||
map[string]string{
|
||||
"node_id": string("3044f675-21ce-4335-898a-8408aa678245"),
|
||||
"node_name": string("node-6-test"),
|
||||
"node_host": string("node-6"),
|
||||
"node_version": string("6.4.2"),
|
||||
"pipeline": string("main"),
|
||||
"plugin_name": string("kafka"),
|
||||
"plugin_id": string("output-kafka1"),
|
||||
"plugin_type": string("output"),
|
||||
},
|
||||
)
|
||||
|
||||
logstash6accPipelinesStats.AssertContainsTaggedFields(
|
||||
test,
|
||||
"logstash_plugins",
|
||||
map[string]interface{}{
|
||||
"duration_in_millis": float64(7335196.0),
|
||||
"in": float64(177549.0),
|
||||
"out": float64(177549.0),
|
||||
},
|
||||
map[string]string{
|
||||
"node_id": string("3044f675-21ce-4335-898a-8408aa678245"),
|
||||
"node_name": string("node-6-test"),
|
||||
"node_host": string("node-6"),
|
||||
"node_version": string("6.4.2"),
|
||||
"pipeline": string("main"),
|
||||
"plugin_name": string("kafka"),
|
||||
"plugin_id": string("output-kafka2"),
|
||||
"plugin_type": string("output"),
|
||||
},
|
||||
)
|
||||
|
||||
logstash6accPipelinesStats.AssertContainsTaggedFields(
|
||||
test,
|
||||
"logstash_queue",
|
||||
map[string]interface{}{
|
||||
"events": float64(103),
|
||||
"free_space_in_bytes": float64(36307369984),
|
||||
"max_queue_size_in_bytes": float64(1073741824),
|
||||
"max_unread_events": float64(0),
|
||||
"page_capacity_in_bytes": float64(67108864),
|
||||
"queue_size_in_bytes": float64(1872391),
|
||||
},
|
||||
map[string]string{
|
||||
"node_id": string("3044f675-21ce-4335-898a-8408aa678245"),
|
||||
"node_name": string("node-6-test"),
|
||||
"node_host": string("node-6"),
|
||||
"node_version": string("6.4.2"),
|
||||
"pipeline": string("main"),
|
||||
"queue_type": string("persisted"),
|
||||
},
|
||||
)
|
||||
|
||||
}
|
||||
|
||||
func Test_Logstash5GatherJVMStats(test *testing.T) {
|
||||
fakeServer := httptest.NewUnstartedServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
|
||||
writer.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprintf(writer, "%s", string(logstash5JvmJSON))
|
||||
}))
|
||||
requestURL, err := url.Parse(logstashTest.URL)
|
||||
if err != nil {
|
||||
test.Logf("Can't connect to: %s", logstashTest.URL)
|
||||
}
|
||||
fakeServer.Listener, _ = net.Listen("tcp", fmt.Sprintf("%s:%s", requestURL.Hostname(), requestURL.Port()))
|
||||
fakeServer.Start()
|
||||
defer fakeServer.Close()
|
||||
|
||||
if logstashTest.client == nil {
|
||||
client, err := logstashTest.createHttpClient()
|
||||
|
||||
if err != nil {
|
||||
test.Logf("Can't createHttpClient")
|
||||
}
|
||||
logstashTest.client = client
|
||||
}
|
||||
|
||||
if err := logstashTest.gatherJVMStats(logstashTest.URL+jvmStats, &logstash5accJVMStats); err != nil {
|
||||
test.Logf("Can't gather JVM stats")
|
||||
}
|
||||
|
||||
logstash5accJVMStats.AssertContainsTaggedFields(
|
||||
test,
|
||||
"logstash_jvm",
|
||||
map[string]interface{}{
|
||||
"mem_pools_young_max_in_bytes": float64(5.5836672e+08),
|
||||
"mem_pools_young_committed_in_bytes": float64(1.43261696e+08),
|
||||
"mem_heap_committed_in_bytes": float64(5.1904512e+08),
|
||||
"threads_count": float64(29.0),
|
||||
"mem_pools_old_peak_used_in_bytes": float64(1.27900864e+08),
|
||||
"mem_pools_old_peak_max_in_bytes": float64(7.2482816e+08),
|
||||
"mem_heap_used_percent": float64(16.0),
|
||||
"gc_collectors_young_collection_time_in_millis": float64(3235.0),
|
||||
"mem_pools_survivor_committed_in_bytes": float64(1.7825792e+07),
|
||||
"mem_pools_young_used_in_bytes": float64(7.6049384e+07),
|
||||
"mem_non_heap_committed_in_bytes": float64(2.91487744e+08),
|
||||
"mem_pools_survivor_peak_max_in_bytes": float64(3.4865152e+07),
|
||||
"mem_pools_young_peak_max_in_bytes": float64(2.7918336e+08),
|
||||
"uptime_in_millis": float64(4.803461e+06),
|
||||
"mem_pools_survivor_peak_used_in_bytes": float64(8.912896e+06),
|
||||
"mem_pools_survivor_max_in_bytes": float64(6.9730304e+07),
|
||||
"gc_collectors_old_collection_count": float64(2.0),
|
||||
"mem_pools_survivor_used_in_bytes": float64(9.419672e+06),
|
||||
"mem_pools_old_used_in_bytes": float64(2.55801728e+08),
|
||||
"mem_pools_old_max_in_bytes": float64(1.44965632e+09),
|
||||
"mem_pools_young_peak_used_in_bytes": float64(7.1630848e+07),
|
||||
"mem_heap_used_in_bytes": float64(3.41270784e+08),
|
||||
"mem_heap_max_in_bytes": float64(2.077753344e+09),
|
||||
"gc_collectors_young_collection_count": float64(616.0),
|
||||
"threads_peak_count": float64(31.0),
|
||||
"mem_pools_old_committed_in_bytes": float64(3.57957632e+08),
|
||||
"gc_collectors_old_collection_time_in_millis": float64(114.0),
|
||||
"mem_non_heap_used_in_bytes": float64(2.68905936e+08),
|
||||
},
|
||||
map[string]string{
|
||||
"node_id": string("a360d8cf-6289-429d-8419-6145e324b574"),
|
||||
"node_name": string("node-5-test"),
|
||||
"node_host": string("node-5"),
|
||||
"node_version": string("5.3.0"),
|
||||
},
|
||||
)
|
||||
|
||||
}
|
||||
|
||||
func Test_Logstash6GatherJVMStats(test *testing.T) {
|
||||
fakeServer := httptest.NewUnstartedServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
|
||||
writer.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprintf(writer, "%s", string(logstash6JvmJSON))
|
||||
}))
|
||||
requestURL, err := url.Parse(logstashTest.URL)
|
||||
if err != nil {
|
||||
test.Logf("Can't connect to: %s", logstashTest.URL)
|
||||
}
|
||||
fakeServer.Listener, _ = net.Listen("tcp", fmt.Sprintf("%s:%s", requestURL.Hostname(), requestURL.Port()))
|
||||
fakeServer.Start()
|
||||
defer fakeServer.Close()
|
||||
|
||||
if logstashTest.client == nil {
|
||||
client, err := logstashTest.createHttpClient()
|
||||
|
||||
if err != nil {
|
||||
test.Logf("Can't createHttpClient")
|
||||
}
|
||||
logstashTest.client = client
|
||||
}
|
||||
|
||||
if err := logstashTest.gatherJVMStats(logstashTest.URL+jvmStats, &logstash6accJVMStats); err != nil {
|
||||
test.Logf("Can't gather JVM stats")
|
||||
}
|
||||
|
||||
logstash6accJVMStats.AssertContainsTaggedFields(
|
||||
test,
|
||||
"logstash_jvm",
|
||||
map[string]interface{}{
|
||||
"mem_pools_young_max_in_bytes": float64(1605304320.0),
|
||||
"mem_pools_young_committed_in_bytes": float64(71630848.0),
|
||||
"mem_heap_committed_in_bytes": float64(824963072.0),
|
||||
"threads_count": float64(60.0),
|
||||
"mem_pools_old_peak_used_in_bytes": float64(696572600.0),
|
||||
"mem_pools_old_peak_max_in_bytes": float64(6583418880.0),
|
||||
"mem_heap_used_percent": float64(2.0),
|
||||
"gc_collectors_young_collection_time_in_millis": float64(107321.0),
|
||||
"mem_pools_survivor_committed_in_bytes": float64(8912896.0),
|
||||
"mem_pools_young_used_in_bytes": float64(11775120.0),
|
||||
"mem_non_heap_committed_in_bytes": float64(222986240.0),
|
||||
"mem_pools_survivor_peak_max_in_bytes": float64(200605696),
|
||||
"mem_pools_young_peak_max_in_bytes": float64(1605304320.0),
|
||||
"uptime_in_millis": float64(281850926.0),
|
||||
"mem_pools_survivor_peak_used_in_bytes": float64(8912896.0),
|
||||
"mem_pools_survivor_max_in_bytes": float64(200605696.0),
|
||||
"gc_collectors_old_collection_count": float64(37.0),
|
||||
"mem_pools_survivor_used_in_bytes": float64(835008.0),
|
||||
"mem_pools_old_used_in_bytes": float64(189750576.0),
|
||||
"mem_pools_old_max_in_bytes": float64(6583418880.0),
|
||||
"mem_pools_young_peak_used_in_bytes": float64(71630848.0),
|
||||
"mem_heap_used_in_bytes": float64(202360704.0),
|
||||
"mem_heap_max_in_bytes": float64(8389328896.0),
|
||||
"gc_collectors_young_collection_count": float64(2094.0),
|
||||
"threads_peak_count": float64(62.0),
|
||||
"mem_pools_old_committed_in_bytes": float64(744419328.0),
|
||||
"gc_collectors_old_collection_time_in_millis": float64(7492.0),
|
||||
"mem_non_heap_used_in_bytes": float64(197878896.0),
|
||||
},
|
||||
map[string]string{
|
||||
"node_id": string("3044f675-21ce-4335-898a-8408aa678245"),
|
||||
"node_name": string("node-6-test"),
|
||||
"node_host": string("node-6"),
|
||||
"node_version": string("6.4.2"),
|
||||
},
|
||||
)
|
||||
|
||||
}
|
||||
|
||||
func Test_LogstashRequests(test *testing.T) {
|
||||
fakeServer := httptest.NewUnstartedServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
|
||||
writer.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprintf(writer, "%s", string(logstash6JvmJSON))
|
||||
assert.Equal(test, request.Host, "logstash.test.local")
|
||||
assert.Equal(test, request.Method, "POST")
|
||||
assert.Equal(test, request.Header.Get("X-Test"), "test-header")
|
||||
}))
|
||||
requestURL, err := url.Parse(logstashTest.URL)
|
||||
if err != nil {
|
||||
test.Logf("Can't connect to: %s", logstashTest.URL)
|
||||
}
|
||||
fakeServer.Listener, _ = net.Listen("tcp", fmt.Sprintf("%s:%s", requestURL.Hostname(), requestURL.Port()))
|
||||
fakeServer.Start()
|
||||
defer fakeServer.Close()
|
||||
|
||||
if logstashTest.client == nil {
|
||||
client, err := logstashTest.createHttpClient()
|
||||
|
||||
if err != nil {
|
||||
test.Logf("Can't createHttpClient")
|
||||
}
|
||||
logstashTest.client = client
|
||||
}
|
||||
|
||||
logstashTest.Method = "POST"
|
||||
logstashTest.Headers["X-Test"] = "test-header"
|
||||
logstashTest.HostHeader = "logstash.test.local"
|
||||
|
||||
if err := logstashTest.gatherJsonData(logstashTest.URL+jvmStats, &logstash6accJVMStats); err != nil {
|
||||
test.Logf("Can't gather JVM stats")
|
||||
}
|
||||
}
|
|
@ -0,0 +1,156 @@
|
|||
package logstash
|
||||
|
||||
const logstash5ProcessJSON = `
|
||||
{
|
||||
"host" : "node-5",
|
||||
"version" : "5.3.0",
|
||||
"http_address" : "0.0.0.0:9600",
|
||||
"id" : "a360d8cf-6289-429d-8419-6145e324b574",
|
||||
"name" : "node-5-test",
|
||||
"process" : {
|
||||
"open_file_descriptors" : 89,
|
||||
"peak_open_file_descriptors" : 100,
|
||||
"max_file_descriptors" : 1048576,
|
||||
"mem" : {
|
||||
"total_virtual_in_bytes" : 4809506816
|
||||
},
|
||||
"cpu" : {
|
||||
"total_in_millis" : 155260000000,
|
||||
"percent" : 3,
|
||||
"load_average" : {
|
||||
"1m" : 0.49,
|
||||
"5m" : 0.61,
|
||||
"15m" : 0.54
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
`
|
||||
|
||||
const logstash5JvmJSON = `
|
||||
{
|
||||
"host" : "node-5",
|
||||
"version" : "5.3.0",
|
||||
"http_address" : "0.0.0.0:9600",
|
||||
"id" : "a360d8cf-6289-429d-8419-6145e324b574",
|
||||
"name" : "node-5-test",
|
||||
"jvm" : {
|
||||
"threads" : {
|
||||
"count" : 29,
|
||||
"peak_count" : 31
|
||||
},
|
||||
"mem" : {
|
||||
"heap_used_in_bytes" : 341270784,
|
||||
"heap_used_percent" : 16,
|
||||
"heap_committed_in_bytes" : 519045120,
|
||||
"heap_max_in_bytes" : 2077753344,
|
||||
"non_heap_used_in_bytes" : 268905936,
|
||||
"non_heap_committed_in_bytes" : 291487744,
|
||||
"pools" : {
|
||||
"survivor" : {
|
||||
"peak_used_in_bytes" : 8912896,
|
||||
"used_in_bytes" : 9419672,
|
||||
"peak_max_in_bytes" : 34865152,
|
||||
"max_in_bytes" : 69730304,
|
||||
"committed_in_bytes" : 17825792
|
||||
},
|
||||
"old" : {
|
||||
"peak_used_in_bytes" : 127900864,
|
||||
"used_in_bytes" : 255801728,
|
||||
"peak_max_in_bytes" : 724828160,
|
||||
"max_in_bytes" : 1449656320,
|
||||
"committed_in_bytes" : 357957632
|
||||
},
|
||||
"young" : {
|
||||
"peak_used_in_bytes" : 71630848,
|
||||
"used_in_bytes" : 76049384,
|
||||
"peak_max_in_bytes" : 279183360,
|
||||
"max_in_bytes" : 558366720,
|
||||
"committed_in_bytes" : 143261696
|
||||
}
|
||||
}
|
||||
},
|
||||
"gc" : {
|
||||
"collectors" : {
|
||||
"old" : {
|
||||
"collection_time_in_millis" : 114,
|
||||
"collection_count" : 2
|
||||
},
|
||||
"young" : {
|
||||
"collection_time_in_millis" : 3235,
|
||||
"collection_count" : 616
|
||||
}
|
||||
}
|
||||
},
|
||||
"uptime_in_millis" : 4803461
|
||||
}
|
||||
}
|
||||
`
|
||||
|
||||
const logstash5PipelineJSON = `
|
||||
{
|
||||
"host" : "node-5",
|
||||
"version" : "5.3.0",
|
||||
"http_address" : "0.0.0.0:9600",
|
||||
"id" : "a360d8cf-6289-429d-8419-6145e324b574",
|
||||
"name" : "node-5-test",
|
||||
"pipeline" : {
|
||||
"events" : {
|
||||
"duration_in_millis" : 1151,
|
||||
"in" : 1269,
|
||||
"filtered" : 1269,
|
||||
"out" : 1269
|
||||
},
|
||||
"plugins" : {
|
||||
"inputs" : [ {
|
||||
"id" : "a35197a509596954e905e38521bae12b1498b17d-1",
|
||||
"events" : {
|
||||
"out" : 2,
|
||||
"queue_push_duration_in_millis" : 32
|
||||
},
|
||||
"name" : "beats"
|
||||
} ],
|
||||
"filters" : [ ],
|
||||
"outputs" : [ {
|
||||
"id" : "582d5c2becb582a053e1e9a6bcc11d49b69a6dfd-3",
|
||||
"events" : {
|
||||
"duration_in_millis" : 228,
|
||||
"in" : 1269,
|
||||
"out" : 1269
|
||||
},
|
||||
"name" : "s3"
|
||||
}, {
|
||||
"id" : "582d5c2becb582a053e1e9a6bcc11d49b69a6dfd-2",
|
||||
"events" : {
|
||||
"duration_in_millis" : 360,
|
||||
"in" : 1269,
|
||||
"out" : 1269
|
||||
},
|
||||
"name" : "stdout"
|
||||
} ]
|
||||
},
|
||||
"reloads" : {
|
||||
"last_error" : null,
|
||||
"successes" : 0,
|
||||
"last_success_timestamp" : null,
|
||||
"last_failure_timestamp" : null,
|
||||
"failures" : 0
|
||||
},
|
||||
"queue" : {
|
||||
"events" : 208,
|
||||
"type" : "persisted",
|
||||
"capacity" : {
|
||||
"page_capacity_in_bytes" : 262144000,
|
||||
"max_queue_size_in_bytes" : 8589934592,
|
||||
"max_unread_events" : 0
|
||||
},
|
||||
"data" : {
|
||||
"path" : "/path/to/data/queue",
|
||||
"free_space_in_bytes" : 89280552960,
|
||||
"storage_type" : "hfs"
|
||||
}
|
||||
},
|
||||
"id" : "main"
|
||||
}
|
||||
}
|
||||
`
|
|
@ -0,0 +1,256 @@
|
|||
package logstash
|
||||
|
||||
const logstash6ProcessJSON = `
|
||||
{
|
||||
"host" : "node-6",
|
||||
"version" : "6.4.2",
|
||||
"http_address" : "127.0.0.1:9600",
|
||||
"id" : "3044f675-21ce-4335-898a-8408aa678245",
|
||||
"name" : "node-6-test",
|
||||
"process" : {
|
||||
"open_file_descriptors" : 133,
|
||||
"peak_open_file_descriptors" : 145,
|
||||
"max_file_descriptors" : 262144,
|
||||
"mem" : {
|
||||
"total_virtual_in_bytes" : 17923452928
|
||||
},
|
||||
"cpu" : {
|
||||
"total_in_millis" : 5841460,
|
||||
"percent" : 0,
|
||||
"load_average" : {
|
||||
"1m" : 48.2,
|
||||
"5m" : 42.4,
|
||||
"15m" : 38.95
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
`
|
||||
const logstash6JvmJSON = `
|
||||
{
|
||||
"host" : "node-6",
|
||||
"version" : "6.4.2",
|
||||
"http_address" : "127.0.0.1:9600",
|
||||
"id" : "3044f675-21ce-4335-898a-8408aa678245",
|
||||
"name" : "node-6-test",
|
||||
"jvm" : {
|
||||
"threads" : {
|
||||
"count" : 60,
|
||||
"peak_count" : 62
|
||||
},
|
||||
"mem" : {
|
||||
"heap_used_percent" : 2,
|
||||
"heap_committed_in_bytes" : 824963072,
|
||||
"heap_max_in_bytes" : 8389328896,
|
||||
"heap_used_in_bytes" : 202360704,
|
||||
"non_heap_used_in_bytes" : 197878896,
|
||||
"non_heap_committed_in_bytes" : 222986240,
|
||||
"pools" : {
|
||||
"survivor" : {
|
||||
"peak_used_in_bytes" : 8912896,
|
||||
"used_in_bytes" : 835008,
|
||||
"peak_max_in_bytes" : 200605696,
|
||||
"max_in_bytes" : 200605696,
|
||||
"committed_in_bytes" : 8912896
|
||||
},
|
||||
"old" : {
|
||||
"peak_used_in_bytes" : 696572600,
|
||||
"used_in_bytes" : 189750576,
|
||||
"peak_max_in_bytes" : 6583418880,
|
||||
"max_in_bytes" : 6583418880,
|
||||
"committed_in_bytes" : 744419328
|
||||
},
|
||||
"young" : {
|
||||
"peak_used_in_bytes" : 71630848,
|
||||
"used_in_bytes" : 11775120,
|
||||
"peak_max_in_bytes" : 1605304320,
|
||||
"max_in_bytes" : 1605304320,
|
||||
"committed_in_bytes" : 71630848
|
||||
}
|
||||
}
|
||||
},
|
||||
"gc" : {
|
||||
"collectors" : {
|
||||
"old" : {
|
||||
"collection_time_in_millis" : 7492,
|
||||
"collection_count" : 37
|
||||
},
|
||||
"young" : {
|
||||
"collection_time_in_millis" : 107321,
|
||||
"collection_count" : 2094
|
||||
}
|
||||
}
|
||||
},
|
||||
"uptime_in_millis" : 281850926
|
||||
}
|
||||
}
|
||||
`
|
||||
|
||||
const logstash6PipelinesJSON = `
|
||||
{
|
||||
"host" : "node-6",
|
||||
"version" : "6.4.2",
|
||||
"http_address" : "127.0.0.1:9600",
|
||||
"id" : "3044f675-21ce-4335-898a-8408aa678245",
|
||||
"name" : "node-6-test",
|
||||
"pipelines" : {
|
||||
"main" : {
|
||||
"events" : {
|
||||
"duration_in_millis" : 8540751,
|
||||
"in" : 180659,
|
||||
"out" : 180659,
|
||||
"filtered" : 180659,
|
||||
"queue_push_duration_in_millis" : 366
|
||||
},
|
||||
"plugins" : {
|
||||
"inputs" : [
|
||||
{
|
||||
"id" : "input-kafka",
|
||||
"events" : {
|
||||
"out" : 180659,
|
||||
"queue_push_duration_in_millis" : 366
|
||||
},
|
||||
"name" : "kafka"
|
||||
}
|
||||
],
|
||||
"filters" : [
|
||||
{
|
||||
"id" : "155b0ad18abbf3df1e0cb7bddef0d77c5ba699efe5a0f8a28502d140549baf54",
|
||||
"events" : {
|
||||
"duration_in_millis" : 2117,
|
||||
"in" : 27641,
|
||||
"out" : 27641
|
||||
},
|
||||
"name" : "mutate"
|
||||
},
|
||||
{
|
||||
"id" : "d079424bb6b7b8c7c61d9c5e0ddae445e92fa9ffa2e8690b0a669f7c690542f0",
|
||||
"events" : {
|
||||
"duration_in_millis" : 13149,
|
||||
"in" : 180659,
|
||||
"out" : 177549
|
||||
},
|
||||
"matches" : 177546,
|
||||
"failures" : 2,
|
||||
"name" : "date"
|
||||
},
|
||||
{
|
||||
"id" : "25afa60ab6dc30512fe80efa3493e4928b5b1b109765b7dc46a3e4bbf293d2d4",
|
||||
"events" : {
|
||||
"duration_in_millis" : 2814,
|
||||
"in" : 76602,
|
||||
"out" : 76602
|
||||
},
|
||||
"name" : "mutate"
|
||||
},
|
||||
{
|
||||
"id" : "2d9fa8f74eeb137bfa703b8050bad7d76636fface729e4585b789b5fc9bed668",
|
||||
"events" : {
|
||||
"duration_in_millis" : 9,
|
||||
"in" : 934,
|
||||
"out" : 934
|
||||
},
|
||||
"name" : "mutate"
|
||||
},
|
||||
{
|
||||
"id" : "4ed14c9ef0198afe16c31200041e98d321cb5c2e6027e30b077636b8c4842110",
|
||||
"events" : {
|
||||
"duration_in_millis" : 173,
|
||||
"in" : 3110,
|
||||
"out" : 0
|
||||
},
|
||||
"name" : "drop"
|
||||
},
|
||||
{
|
||||
"id" : "358ce1eb387de7cd5711c2fb4de64cd3b12e5ca9a4c45f529516bcb053a31df4",
|
||||
"events" : {
|
||||
"duration_in_millis" : 5605,
|
||||
"in" : 75482,
|
||||
"out" : 75482
|
||||
},
|
||||
"name" : "mutate"
|
||||
},
|
||||
{
|
||||
"id" : "82a9bbb02fff37a63c257c1f146b0a36273c7cbbebe83c0a51f086e5280bf7bb",
|
||||
"events" : {
|
||||
"duration_in_millis" : 313992,
|
||||
"in" : 180659,
|
||||
"out" : 180659
|
||||
},
|
||||
"name" : "csv"
|
||||
},
|
||||
{
|
||||
"id" : "8fb13a8cdd4257b52724d326aa1549603ffdd4e4fde6d20720c96b16238c18c3",
|
||||
"events" : {
|
||||
"duration_in_millis" : 0,
|
||||
"in" : 0,
|
||||
"out" : 0
|
||||
},
|
||||
"name" : "mutate"
|
||||
}
|
||||
],
|
||||
"outputs" : [
|
||||
{
|
||||
"id" : "output-elk",
|
||||
"documents" : {
|
||||
"successes" : 221
|
||||
},
|
||||
"events" : {
|
||||
"duration_in_millis" : 651386,
|
||||
"in" : 177549,
|
||||
"out" : 177549
|
||||
},
|
||||
"bulk_requests" : {
|
||||
"successes" : 1,
|
||||
"responses" : {
|
||||
"200" : 748
|
||||
}
|
||||
},
|
||||
"name" : "elasticsearch"
|
||||
},
|
||||
{
|
||||
"id" : "output-kafka1",
|
||||
"events" : {
|
||||
"duration_in_millis" : 186751,
|
||||
"in" : 177549,
|
||||
"out" : 177549
|
||||
},
|
||||
"name" : "kafka"
|
||||
},
|
||||
{
|
||||
"id" : "output-kafka2",
|
||||
"events" : {
|
||||
"duration_in_millis" : 7335196,
|
||||
"in" : 177549,
|
||||
"out" : 177549
|
||||
},
|
||||
"name" : "kafka"
|
||||
}
|
||||
]
|
||||
},
|
||||
"reloads" : {
|
||||
"last_error" : null,
|
||||
"successes" : 0,
|
||||
"last_success_timestamp" : null,
|
||||
"last_failure_timestamp" : null,
|
||||
"failures" : 0
|
||||
},
|
||||
"queue": {
|
||||
"events": 103,
|
||||
"type": "persisted",
|
||||
"capacity": {
|
||||
"queue_size_in_bytes": 1872391,
|
||||
"page_capacity_in_bytes": 67108864,
|
||||
"max_queue_size_in_bytes": 1073741824,
|
||||
"max_unread_events": 0
|
||||
},
|
||||
"data": {
|
||||
"path": "/var/lib/logstash/queue/main",
|
||||
"free_space_in_bytes": 36307369984,
|
||||
"storage_type": "ext4"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
`
|
Loading…
Reference in New Issue