Allow for TLS connections to ElasticSearch (#1398)

* Allow for TLS connections to ElasticSearch

Extremely similar implementation to the HTTP JSON module's
implementation of the same code.

* Changelog update
This commit is contained in:
Mike Glazer 2016-06-22 17:23:49 +02:00 committed by Cameron Sparr
parent 25848c545a
commit e3448153e1
5 changed files with 61 additions and 14 deletions

View File

@ -15,6 +15,7 @@
- [#1368](https://github.com/influxdata/telegraf/pull/1368): Add precision rounding to all metrics on collection. - [#1368](https://github.com/influxdata/telegraf/pull/1368): Add precision rounding to all metrics on collection.
- [#1390](https://github.com/influxdata/telegraf/pull/1390): Add support for Tengine - [#1390](https://github.com/influxdata/telegraf/pull/1390): Add support for Tengine
- [#1320](https://github.com/influxdata/telegraf/pull/1320): Logparser input plugin for parsing grok-style log patterns. - [#1320](https://github.com/influxdata/telegraf/pull/1320): Logparser input plugin for parsing grok-style log patterns.
- [#1397](https://github.com/influxdata/telegraf/issues/1397): ElasticSearch: now supports connecting to ElasticSearch via SSL
### Bugfixes ### Bugfixes

View File

@ -133,8 +133,8 @@ func GetTLSConfig(
cert, err := tls.LoadX509KeyPair(SSLCert, SSLKey) cert, err := tls.LoadX509KeyPair(SSLCert, SSLKey)
if err != nil { if err != nil {
return nil, errors.New(fmt.Sprintf( return nil, errors.New(fmt.Sprintf(
"Could not load TLS client key/certificate: %s", "Could not load TLS client key/certificate from %s:%s: %s",
err)) SSLKey, SSLCert, err))
} }
t.Certificates = []tls.Certificate{cert} t.Certificates = []tls.Certificate{cert}

View File

@ -11,6 +11,13 @@ and optionally [cluster](https://www.elastic.co/guide/en/elasticsearch/reference
servers = ["http://localhost:9200"] servers = ["http://localhost:9200"]
local = true local = true
cluster_health = true cluster_health = true
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
``` ```
### Measurements & Fields: ### Measurements & Fields:

View File

@ -8,6 +8,7 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json" jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
@ -67,25 +68,31 @@ const sampleConfig = `
## set cluster_health to true when you want to also obtain cluster level stats ## set cluster_health to true when you want to also obtain cluster level stats
cluster_health = false cluster_health = false
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
` `
// Elasticsearch is a plugin to read stats from one or many Elasticsearch // Elasticsearch is a plugin to read stats from one or many Elasticsearch
// servers. // servers.
type Elasticsearch struct { type Elasticsearch struct {
Local bool Local bool
Servers []string Servers []string
ClusterHealth bool ClusterHealth bool
client *http.Client SSLCA string `toml:"ssl_ca"` // Path to CA file
SSLCert string `toml:"ssl_cert"` // Path to host cert file
SSLKey string `toml:"ssl_key"` // Path to cert key file
InsecureSkipVerify bool // Use SSL but skip chain & host verification
client *http.Client
} }
// NewElasticsearch return a new instance of Elasticsearch // NewElasticsearch return a new instance of Elasticsearch
func NewElasticsearch() *Elasticsearch { func NewElasticsearch() *Elasticsearch {
tr := &http.Transport{ResponseHeaderTimeout: time.Duration(3 * time.Second)} return &Elasticsearch{}
client := &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}
return &Elasticsearch{client: client}
} }
// SampleConfig returns sample configuration for this plugin. // SampleConfig returns sample configuration for this plugin.
@ -101,6 +108,15 @@ func (e *Elasticsearch) Description() string {
// Gather reads the stats from Elasticsearch and writes it to the // Gather reads the stats from Elasticsearch and writes it to the
// Accumulator. // Accumulator.
func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error { func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
if e.client == nil {
client, err := e.createHttpClient()
if err != nil {
return err
}
e.client = client
}
errChan := errchan.New(len(e.Servers)) errChan := errchan.New(len(e.Servers))
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(len(e.Servers)) wg.Add(len(e.Servers))
@ -128,6 +144,23 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
return errChan.Error() return errChan.Error()
} }
func (e *Elasticsearch) createHttpClient() (*http.Client, error) {
tlsCfg, err := internal.GetTLSConfig(e.SSLCert, e.SSLKey, e.SSLCA, e.InsecureSkipVerify)
if err != nil {
return nil, err
}
tr := &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
TLSClientConfig: tlsCfg,
}
client := &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}
return client, nil
}
func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) error { func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) error {
nodeStats := &struct { nodeStats := &struct {
ClusterName string `json:"cluster_name"` ClusterName string `json:"cluster_name"`

View File

@ -38,7 +38,7 @@ func (t *transportMock) CancelRequest(_ *http.Request) {
} }
func TestElasticsearch(t *testing.T) { func TestElasticsearch(t *testing.T) {
es := NewElasticsearch() es := newElasticsearchWithClient()
es.Servers = []string{"http://example.com:9200"} es.Servers = []string{"http://example.com:9200"}
es.client.Transport = newTransportMock(http.StatusOK, statsResponse) es.client.Transport = newTransportMock(http.StatusOK, statsResponse)
@ -67,7 +67,7 @@ func TestElasticsearch(t *testing.T) {
} }
func TestGatherClusterStats(t *testing.T) { func TestGatherClusterStats(t *testing.T) {
es := NewElasticsearch() es := newElasticsearchWithClient()
es.Servers = []string{"http://example.com:9200"} es.Servers = []string{"http://example.com:9200"}
es.ClusterHealth = true es.ClusterHealth = true
es.client.Transport = newTransportMock(http.StatusOK, clusterResponse) es.client.Transport = newTransportMock(http.StatusOK, clusterResponse)
@ -87,3 +87,9 @@ func TestGatherClusterStats(t *testing.T) {
v2IndexExpected, v2IndexExpected,
map[string]string{"index": "v2"}) map[string]string{"index": "v2"})
} }
func newElasticsearchWithClient() *Elasticsearch {
es := NewElasticsearch()
es.client = &http.Client{}
return es
}