Add OAuth2 support to HTTP output plugin (#4536)
This commit is contained in:
		
							parent
							
								
									54f28eefa9
								
							
						
					
					
						commit
						091af7e645
					
				|  | @ -1032,6 +1032,18 @@ | ||||||
|   pruneopts = "" |   pruneopts = "" | ||||||
|   revision = "a680a1efc54dd51c040b3b5ce4939ea3cf2ea0d1" |   revision = "a680a1efc54dd51c040b3b5ce4939ea3cf2ea0d1" | ||||||
| 
 | 
 | ||||||
|  | [[projects]] | ||||||
|  |   branch = "master" | ||||||
|  |   digest = "1:b697592485cb412be4188c08ca0beed9aab87f36b86418e21acc4a3998f63734" | ||||||
|  |   name = "golang.org/x/oauth2" | ||||||
|  |   packages = [ | ||||||
|  |     ".", | ||||||
|  |     "clientcredentials", | ||||||
|  |     "internal", | ||||||
|  |   ] | ||||||
|  |   pruneopts = "" | ||||||
|  |   revision = "d2e6202438beef2727060aa7cabdd924d92ebfd9" | ||||||
|  | 
 | ||||||
| [[projects]] | [[projects]] | ||||||
|   branch = "master" |   branch = "master" | ||||||
|   digest = "1:677e38cad6833ad266ec843739d167755eda1e6f2d8af1c63102b0426ad820db" |   digest = "1:677e38cad6833ad266ec843739d167755eda1e6f2d8af1c63102b0426ad820db" | ||||||
|  | @ -1086,7 +1098,16 @@ | ||||||
| [[projects]] | [[projects]] | ||||||
|   digest = "1:c1771ca6060335f9768dff6558108bc5ef6c58506821ad43377ee23ff059e472" |   digest = "1:c1771ca6060335f9768dff6558108bc5ef6c58506821ad43377ee23ff059e472" | ||||||
|   name = "google.golang.org/appengine" |   name = "google.golang.org/appengine" | ||||||
|   packages = ["cloudsql"] |   packages = [ | ||||||
|  |     "cloudsql", | ||||||
|  |     "internal", | ||||||
|  |     "internal/base", | ||||||
|  |     "internal/datastore", | ||||||
|  |     "internal/log", | ||||||
|  |     "internal/remote_api", | ||||||
|  |     "internal/urlfetch", | ||||||
|  |     "urlfetch", | ||||||
|  |   ] | ||||||
|   pruneopts = "" |   pruneopts = "" | ||||||
|   revision = "b1f26356af11148e710935ed1ac8a7f5702c7612" |   revision = "b1f26356af11148e710935ed1ac8a7f5702c7612" | ||||||
|   version = "v1.1.0" |   version = "v1.1.0" | ||||||
|  | @ -1312,6 +1333,8 @@ | ||||||
|     "github.com/zensqlmonitor/go-mssqldb", |     "github.com/zensqlmonitor/go-mssqldb", | ||||||
|     "golang.org/x/net/context", |     "golang.org/x/net/context", | ||||||
|     "golang.org/x/net/html/charset", |     "golang.org/x/net/html/charset", | ||||||
|  |     "golang.org/x/oauth2", | ||||||
|  |     "golang.org/x/oauth2/clientcredentials", | ||||||
|     "golang.org/x/sys/unix", |     "golang.org/x/sys/unix", | ||||||
|     "golang.org/x/sys/windows", |     "golang.org/x/sys/windows", | ||||||
|     "golang.org/x/sys/windows/svc", |     "golang.org/x/sys/windows/svc", | ||||||
|  |  | ||||||
|  | @ -225,3 +225,7 @@ | ||||||
| [[constraint]] | [[constraint]] | ||||||
|   name = "github.com/Azure/go-autorest" |   name = "github.com/Azure/go-autorest" | ||||||
|   version = "10.12.0" |   version = "10.12.0" | ||||||
|  | 
 | ||||||
|  | [[constraint]] | ||||||
|  |   branch = "master" | ||||||
|  |   name = "golang.org/x/oauth2" | ||||||
|  |  | ||||||
|  | @ -100,6 +100,7 @@ following works: | ||||||
| - github.com/zensqlmonitor/go-mssqldb [BSD](https://github.com/zensqlmonitor/go-mssqldb/blob/master/LICENSE.txt) | - github.com/zensqlmonitor/go-mssqldb [BSD](https://github.com/zensqlmonitor/go-mssqldb/blob/master/LICENSE.txt) | ||||||
| - golang.org/x/crypto [BSD](https://github.com/golang/crypto/blob/master/LICENSE) | - golang.org/x/crypto [BSD](https://github.com/golang/crypto/blob/master/LICENSE) | ||||||
| - golang.org/x/net [BSD](https://go.googlesource.com/net/+/master/LICENSE) | - golang.org/x/net [BSD](https://go.googlesource.com/net/+/master/LICENSE) | ||||||
|  | - golang.org/x/oauth2 [BSD](https://go.googlesource.com/oauth2/+/master/LICENSE) | ||||||
| - golang.org/x/text [BSD](https://go.googlesource.com/text/+/master/LICENSE) | - golang.org/x/text [BSD](https://go.googlesource.com/text/+/master/LICENSE) | ||||||
| - golang.org/x/sys [BSD](https://go.googlesource.com/sys/+/master/LICENSE) | - golang.org/x/sys [BSD](https://go.googlesource.com/sys/+/master/LICENSE) | ||||||
| - google.golang.org/grpc [APACHE](https://github.com/google/grpc-go/blob/master/LICENSE) | - google.golang.org/grpc [APACHE](https://github.com/google/grpc-go/blob/master/LICENSE) | ||||||
|  |  | ||||||
|  | @ -21,6 +21,12 @@ data formats.  For data_formats that support batching, metrics are sent in batch | ||||||
|   # username = "username" |   # username = "username" | ||||||
|   # password = "pa$$word" |   # password = "pa$$word" | ||||||
| 
 | 
 | ||||||
|  |   ## OAuth2 Client Credentials Grant | ||||||
|  |   # client_id = "clientid" | ||||||
|  |   # client_secret = "secret" | ||||||
|  |   # token_url = "https://indentityprovider/oauth2/v1/token" | ||||||
|  |   # scopes = ["urn:opc:idm:__myscopes__"] | ||||||
|  | 
 | ||||||
|   ## Optional TLS Config |   ## Optional TLS Config | ||||||
|   # tls_ca = "/etc/telegraf/ca.pem" |   # tls_ca = "/etc/telegraf/ca.pem" | ||||||
|   # tls_cert = "/etc/telegraf/cert.pem" |   # tls_cert = "/etc/telegraf/cert.pem" | ||||||
|  | @ -33,7 +39,7 @@ data formats.  For data_formats that support batching, metrics are sent in batch | ||||||
|   ## more about them here: |   ## more about them here: | ||||||
|   ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md |   ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md | ||||||
|   # data_format = "influx" |   # data_format = "influx" | ||||||
|    | 
 | ||||||
|   ## Additional HTTP headers |   ## Additional HTTP headers | ||||||
|   # [outputs.http.headers] |   # [outputs.http.headers] | ||||||
|   #   # Should be set manually to "application/json" for json data_format |   #   # Should be set manually to "application/json" for json data_format | ||||||
|  |  | ||||||
|  | @ -2,6 +2,7 @@ package http | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"bytes" | 	"bytes" | ||||||
|  | 	"context" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"io/ioutil" | 	"io/ioutil" | ||||||
| 	"net/http" | 	"net/http" | ||||||
|  | @ -13,6 +14,8 @@ import ( | ||||||
| 	"github.com/influxdata/telegraf/internal/tls" | 	"github.com/influxdata/telegraf/internal/tls" | ||||||
| 	"github.com/influxdata/telegraf/plugins/outputs" | 	"github.com/influxdata/telegraf/plugins/outputs" | ||||||
| 	"github.com/influxdata/telegraf/plugins/serializers" | 	"github.com/influxdata/telegraf/plugins/serializers" | ||||||
|  | 	"golang.org/x/oauth2" | ||||||
|  | 	"golang.org/x/oauth2/clientcredentials" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| var sampleConfig = ` | var sampleConfig = ` | ||||||
|  | @ -29,6 +32,12 @@ var sampleConfig = ` | ||||||
|   # username = "username" |   # username = "username" | ||||||
|   # password = "pa$$word" |   # password = "pa$$word" | ||||||
| 
 | 
 | ||||||
|  |   ## OAuth2 Client Credentials Grant | ||||||
|  |   # client_id = "clientid" | ||||||
|  |   # client_secret = "secret" | ||||||
|  |   # token_url = "https://indentityprovider/oauth2/v1/token" | ||||||
|  |   # scopes = ["urn:opc:idm:__myscopes__"] | ||||||
|  | 
 | ||||||
|   ## Optional TLS Config |   ## Optional TLS Config | ||||||
|   # tls_ca = "/etc/telegraf/ca.pem" |   # tls_ca = "/etc/telegraf/ca.pem" | ||||||
|   # tls_cert = "/etc/telegraf/cert.pem" |   # tls_cert = "/etc/telegraf/cert.pem" | ||||||
|  | @ -41,7 +50,7 @@ var sampleConfig = ` | ||||||
|   ## more about them here: |   ## more about them here: | ||||||
|   ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
 |   ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
 | ||||||
|   # data_format = "influx" |   # data_format = "influx" | ||||||
|    | 
 | ||||||
|   ## Additional HTTP headers |   ## Additional HTTP headers | ||||||
|   # [outputs.http.headers] |   # [outputs.http.headers] | ||||||
|   #   # Should be set manually to "application/json" for json data_format |   #   # Should be set manually to "application/json" for json data_format | ||||||
|  | @ -55,12 +64,16 @@ const ( | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| type HTTP struct { | type HTTP struct { | ||||||
| 	URL      string            `toml:"url"` | 	URL          string            `toml:"url"` | ||||||
| 	Timeout  internal.Duration `toml:"timeout"` | 	Timeout      internal.Duration `toml:"timeout"` | ||||||
| 	Method   string            `toml:"method"` | 	Method       string            `toml:"method"` | ||||||
| 	Username string            `toml:"username"` | 	Username     string            `toml:"username"` | ||||||
| 	Password string            `toml:"password"` | 	Password     string            `toml:"password"` | ||||||
| 	Headers  map[string]string `toml:"headers"` | 	Headers      map[string]string `toml:"headers"` | ||||||
|  | 	ClientID     string            `toml:"client_id"` | ||||||
|  | 	ClientSecret string            `toml:"client_secret"` | ||||||
|  | 	TokenURL     string            `toml:"token_url"` | ||||||
|  | 	Scopes       []string          `toml:"scopes"` | ||||||
| 	tls.ClientConfig | 	tls.ClientConfig | ||||||
| 
 | 
 | ||||||
| 	client     *http.Client | 	client     *http.Client | ||||||
|  | @ -71,6 +84,34 @@ func (h *HTTP) SetSerializer(serializer serializers.Serializer) { | ||||||
| 	h.serializer = serializer | 	h.serializer = serializer | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func (h *HTTP) createClient(ctx context.Context) (*http.Client, error) { | ||||||
|  | 	tlsCfg, err := h.ClientConfig.TLSConfig() | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	client := &http.Client{ | ||||||
|  | 		Transport: &http.Transport{ | ||||||
|  | 			TLSClientConfig: tlsCfg, | ||||||
|  | 			Proxy:           http.ProxyFromEnvironment, | ||||||
|  | 		}, | ||||||
|  | 		Timeout: h.Timeout.Duration, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if h.ClientID != "" && h.ClientSecret != "" && h.TokenURL != "" { | ||||||
|  | 		oauthConfig := clientcredentials.Config{ | ||||||
|  | 			ClientID:     h.ClientID, | ||||||
|  | 			ClientSecret: h.ClientSecret, | ||||||
|  | 			TokenURL:     h.TokenURL, | ||||||
|  | 			Scopes:       h.Scopes, | ||||||
|  | 		} | ||||||
|  | 		ctx = context.WithValue(ctx, oauth2.HTTPClient, client) | ||||||
|  | 		client = oauthConfig.Client(ctx) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return client, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
| func (h *HTTP) Connect() error { | func (h *HTTP) Connect() error { | ||||||
| 	if h.Method == "" { | 	if h.Method == "" { | ||||||
| 		h.Method = http.MethodPost | 		h.Method = http.MethodPost | ||||||
|  | @ -84,18 +125,13 @@ func (h *HTTP) Connect() error { | ||||||
| 		h.Timeout.Duration = defaultClientTimeout | 		h.Timeout.Duration = defaultClientTimeout | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	tlsCfg, err := h.ClientConfig.TLSConfig() | 	ctx := context.Background() | ||||||
|  | 	client, err := h.createClient(ctx) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	h.client = &http.Client{ | 	h.client = client | ||||||
| 		Transport: &http.Transport{ |  | ||||||
| 			TLSClientConfig: tlsCfg, |  | ||||||
| 			Proxy:           http.ProxyFromEnvironment, |  | ||||||
| 		}, |  | ||||||
| 		Timeout: h.Timeout.Duration, |  | ||||||
| 	} |  | ||||||
| 
 | 
 | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -287,3 +287,76 @@ func TestBasicAuth(t *testing.T) { | ||||||
| 		}) | 		}) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | type TestHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request) | ||||||
|  | 
 | ||||||
|  | func TestOAuthClientCredentialsGrant(t *testing.T) { | ||||||
|  | 	ts := httptest.NewServer(http.NotFoundHandler()) | ||||||
|  | 	defer ts.Close() | ||||||
|  | 
 | ||||||
|  | 	var token = "2YotnFZFEjr1zCsicMWpAA" | ||||||
|  | 
 | ||||||
|  | 	u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String())) | ||||||
|  | 	require.NoError(t, err) | ||||||
|  | 
 | ||||||
|  | 	tests := []struct { | ||||||
|  | 		name         string | ||||||
|  | 		plugin       *HTTP | ||||||
|  | 		tokenHandler TestHandlerFunc | ||||||
|  | 		handler      TestHandlerFunc | ||||||
|  | 	}{ | ||||||
|  | 		{ | ||||||
|  | 			name: "no credentials", | ||||||
|  | 			plugin: &HTTP{ | ||||||
|  | 				URL: u.String(), | ||||||
|  | 			}, | ||||||
|  | 			handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) { | ||||||
|  | 				require.Len(t, r.Header["Authorization"], 0) | ||||||
|  | 				w.WriteHeader(http.StatusOK) | ||||||
|  | 			}, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			name: "success", | ||||||
|  | 			plugin: &HTTP{ | ||||||
|  | 				URL:          u.String() + "/write", | ||||||
|  | 				ClientID:     "howdy", | ||||||
|  | 				ClientSecret: "secret", | ||||||
|  | 				TokenURL:     u.String() + "/token", | ||||||
|  | 				Scopes:       []string{"urn:opc:idm:__myscopes__"}, | ||||||
|  | 			}, | ||||||
|  | 			tokenHandler: func(t *testing.T, w http.ResponseWriter, r *http.Request) { | ||||||
|  | 				w.WriteHeader(http.StatusOK) | ||||||
|  | 				values := url.Values{} | ||||||
|  | 				values.Add("access_token", token) | ||||||
|  | 				values.Add("token_type", "bearer") | ||||||
|  | 				values.Add("expires_in", "3600") | ||||||
|  | 				w.Write([]byte(values.Encode())) | ||||||
|  | 			}, | ||||||
|  | 			handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) { | ||||||
|  | 				require.Equal(t, []string{"Bearer " + token}, r.Header["Authorization"]) | ||||||
|  | 				w.WriteHeader(http.StatusOK) | ||||||
|  | 			}, | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	for _, tt := range tests { | ||||||
|  | 		t.Run(tt.name, func(t *testing.T) { | ||||||
|  | 			ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||||||
|  | 				switch r.URL.Path { | ||||||
|  | 				case "/write": | ||||||
|  | 					tt.handler(t, w, r) | ||||||
|  | 				case "/token": | ||||||
|  | 					tt.tokenHandler(t, w, r) | ||||||
|  | 				} | ||||||
|  | 			}) | ||||||
|  | 
 | ||||||
|  | 			serializer := influx.NewSerializer() | ||||||
|  | 			tt.plugin.SetSerializer(serializer) | ||||||
|  | 			err = tt.plugin.Connect() | ||||||
|  | 			require.NoError(t, err) | ||||||
|  | 
 | ||||||
|  | 			err = tt.plugin.Write([]telegraf.Metric{getMetric()}) | ||||||
|  | 			require.NoError(t, err) | ||||||
|  | 		}) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue