diff --git a/CHANGELOG.md b/CHANGELOG.md index 91cc09f81..b942ec953 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ - [#1368](https://github.com/influxdata/telegraf/pull/1368): Add precision rounding to all metrics on collection. - [#1390](https://github.com/influxdata/telegraf/pull/1390): Add support for Tengine - [#1320](https://github.com/influxdata/telegraf/pull/1320): Logparser input plugin for parsing grok-style log patterns. +- [#1397](https://github.com/influxdata/telegraf/issues/1397): ElasticSearch: now supports connecting to ElasticSearch via SSL ### Bugfixes diff --git a/internal/internal.go b/internal/internal.go index 4c90d11b9..58a1200e0 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -133,8 +133,8 @@ func GetTLSConfig( cert, err := tls.LoadX509KeyPair(SSLCert, SSLKey) if err != nil { return nil, errors.New(fmt.Sprintf( - "Could not load TLS client key/certificate: %s", - err)) + "Could not load TLS client key/certificate from %s:%s: %s", + SSLKey, SSLCert, err)) } t.Certificates = []tls.Certificate{cert} diff --git a/plugins/inputs/elasticsearch/README.md b/plugins/inputs/elasticsearch/README.md index 88f08bd93..526bc3f39 100644 --- a/plugins/inputs/elasticsearch/README.md +++ b/plugins/inputs/elasticsearch/README.md @@ -11,6 +11,13 @@ and optionally [cluster](https://www.elastic.co/guide/en/elasticsearch/reference servers = ["http://localhost:9200"] local = true cluster_health = true + + ## Optional SSL Config + # ssl_ca = "/etc/telegraf/ca.pem" + # ssl_cert = "/etc/telegraf/cert.pem" + # ssl_key = "/etc/telegraf/key.pem" + ## Use SSL but skip chain & host verification + # insecure_skip_verify = false ``` ### Measurements & Fields: diff --git a/plugins/inputs/elasticsearch/elasticsearch.go b/plugins/inputs/elasticsearch/elasticsearch.go index 3839f6df6..ef0a4c199 100644 --- a/plugins/inputs/elasticsearch/elasticsearch.go +++ b/plugins/inputs/elasticsearch/elasticsearch.go @@ -8,6 +8,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/plugins/inputs" jsonparser "github.com/influxdata/telegraf/plugins/parsers/json" @@ -67,25 +68,31 @@ const sampleConfig = ` ## set cluster_health to true when you want to also obtain cluster level stats cluster_health = false + + ## Optional SSL Config + # ssl_ca = "/etc/telegraf/ca.pem" + # ssl_cert = "/etc/telegraf/cert.pem" + # ssl_key = "/etc/telegraf/key.pem" + ## Use SSL but skip chain & host verification + # insecure_skip_verify = false ` // Elasticsearch is a plugin to read stats from one or many Elasticsearch // servers. type Elasticsearch struct { - Local bool - Servers []string - ClusterHealth bool - client *http.Client + Local bool + Servers []string + ClusterHealth bool + SSLCA string `toml:"ssl_ca"` // Path to CA file + SSLCert string `toml:"ssl_cert"` // Path to host cert file + SSLKey string `toml:"ssl_key"` // Path to cert key file + InsecureSkipVerify bool // Use SSL but skip chain & host verification + client *http.Client } // NewElasticsearch return a new instance of Elasticsearch func NewElasticsearch() *Elasticsearch { - tr := &http.Transport{ResponseHeaderTimeout: time.Duration(3 * time.Second)} - client := &http.Client{ - Transport: tr, - Timeout: time.Duration(4 * time.Second), - } - return &Elasticsearch{client: client} + return &Elasticsearch{} } // SampleConfig returns sample configuration for this plugin. @@ -101,6 +108,15 @@ func (e *Elasticsearch) Description() string { // Gather reads the stats from Elasticsearch and writes it to the // Accumulator. func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error { + if e.client == nil { + client, err := e.createHttpClient() + + if err != nil { + return err + } + e.client = client + } + errChan := errchan.New(len(e.Servers)) var wg sync.WaitGroup wg.Add(len(e.Servers)) @@ -128,6 +144,23 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error { return errChan.Error() } +func (e *Elasticsearch) createHttpClient() (*http.Client, error) { + tlsCfg, err := internal.GetTLSConfig(e.SSLCert, e.SSLKey, e.SSLCA, e.InsecureSkipVerify) + if err != nil { + return nil, err + } + tr := &http.Transport{ + ResponseHeaderTimeout: time.Duration(3 * time.Second), + TLSClientConfig: tlsCfg, + } + client := &http.Client{ + Transport: tr, + Timeout: time.Duration(4 * time.Second), + } + + return client, nil +} + func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) error { nodeStats := &struct { ClusterName string `json:"cluster_name"` diff --git a/plugins/inputs/elasticsearch/elasticsearch_test.go b/plugins/inputs/elasticsearch/elasticsearch_test.go index f29857507..760ac921b 100644 --- a/plugins/inputs/elasticsearch/elasticsearch_test.go +++ b/plugins/inputs/elasticsearch/elasticsearch_test.go @@ -38,7 +38,7 @@ func (t *transportMock) CancelRequest(_ *http.Request) { } func TestElasticsearch(t *testing.T) { - es := NewElasticsearch() + es := newElasticsearchWithClient() es.Servers = []string{"http://example.com:9200"} es.client.Transport = newTransportMock(http.StatusOK, statsResponse) @@ -67,7 +67,7 @@ func TestElasticsearch(t *testing.T) { } func TestGatherClusterStats(t *testing.T) { - es := NewElasticsearch() + es := newElasticsearchWithClient() es.Servers = []string{"http://example.com:9200"} es.ClusterHealth = true es.client.Transport = newTransportMock(http.StatusOK, clusterResponse) @@ -87,3 +87,9 @@ func TestGatherClusterStats(t *testing.T) { v2IndexExpected, map[string]string{"index": "v2"}) } + +func newElasticsearchWithClient() *Elasticsearch { + es := NewElasticsearch() + es.client = &http.Client{} + return es +}