2015-11-13 17:39:36 +00:00
package influxdb
import (
"fmt"
2017-05-31 00:38:32 +00:00
"io"
2015-11-13 17:39:36 +00:00
"net/http"
"net/http/httptest"
"testing"
2017-05-31 00:38:32 +00:00
"github.com/influxdata/telegraf/plugins/outputs/influxdb/client"
2016-01-20 18:57:35 +00:00
"github.com/influxdata/telegraf/testutil"
2015-11-13 17:39:36 +00:00
2017-05-25 20:25:52 +00:00
"github.com/stretchr/testify/assert"
2015-11-13 17:39:36 +00:00
"github.com/stretchr/testify/require"
)
2017-05-25 20:25:52 +00:00
func TestIdentQuoting ( t * testing . T ) {
var testCases = [ ] struct {
database string
expected string
} {
{ "x-y" , ` CREATE DATABASE "x-y" ` } ,
{ ` x"y ` , ` CREATE DATABASE "x\"y" ` } ,
{ "x\ny" , ` CREATE DATABASE "x\ny" ` } ,
{ ` x\y ` , ` CREATE DATABASE "x\\y" ` } ,
}
for _ , tc := range testCases {
ts := httptest . NewServer ( http . HandlerFunc ( func ( w http . ResponseWriter , r * http . Request ) {
r . ParseForm ( )
q := r . Form . Get ( "q" )
assert . Equal ( t , tc . expected , q )
w . WriteHeader ( http . StatusOK )
w . Header ( ) . Set ( "Content-Type" , "application/json" )
fmt . Fprintln ( w , ` { "results":[ { }]} ` )
} ) )
defer ts . Close ( )
i := InfluxDB {
URLs : [ ] string { ts . URL } ,
Database : tc . database ,
}
err := i . Connect ( )
require . NoError ( t , err )
require . NoError ( t , i . Close ( ) )
}
}
2015-11-13 17:39:36 +00:00
func TestUDPInflux ( t * testing . T ) {
i := InfluxDB {
URLs : [ ] string { "udp://localhost:8089" } ,
}
err := i . Connect ( )
require . NoError ( t , err )
2016-01-27 23:15:14 +00:00
err = i . Write ( testutil . MockMetrics ( ) )
2015-11-13 17:39:36 +00:00
require . NoError ( t , err )
2016-12-04 20:18:13 +00:00
require . NoError ( t , i . Close ( ) )
2015-11-13 17:39:36 +00:00
}
func TestHTTPInflux ( t * testing . T ) {
ts := httptest . NewServer ( http . HandlerFunc ( func ( w http . ResponseWriter , r * http . Request ) {
2016-12-04 20:18:13 +00:00
switch r . URL . Path {
case "/write" :
// test that database is set properly
if r . FormValue ( "db" ) != "test" {
w . WriteHeader ( http . StatusTeapot )
w . Header ( ) . Set ( "Content-Type" , "application/json" )
}
// test that user agent is set properly
if r . UserAgent ( ) != "telegraf" {
w . WriteHeader ( http . StatusTeapot )
w . Header ( ) . Set ( "Content-Type" , "application/json" )
}
w . WriteHeader ( http . StatusNoContent )
w . Header ( ) . Set ( "Content-Type" , "application/json" )
case "/query" :
w . WriteHeader ( http . StatusOK )
w . Header ( ) . Set ( "Content-Type" , "application/json" )
fmt . Fprintln ( w , ` { "results":[ { }]} ` )
}
2015-11-13 17:39:36 +00:00
} ) )
defer ts . Close ( )
2016-12-04 20:18:13 +00:00
i := newInflux ( )
i . URLs = [ ] string { ts . URL }
i . Database = "test"
i . UserAgent = "telegraf"
err := i . Connect ( )
require . NoError ( t , err )
err = i . Write ( testutil . MockMetrics ( ) )
require . NoError ( t , err )
require . NoError ( t , i . Close ( ) )
}
func TestUDPConnectError ( t * testing . T ) {
2015-11-13 17:39:36 +00:00
i := InfluxDB {
2016-12-04 20:18:13 +00:00
URLs : [ ] string { "udp://foobar:8089" } ,
2015-11-13 17:39:36 +00:00
}
2016-12-04 20:18:13 +00:00
err := i . Connect ( )
require . Error ( t , err )
i = InfluxDB {
URLs : [ ] string { "udp://localhost:9999999" } ,
}
err = i . Connect ( )
require . Error ( t , err )
}
func TestHTTPConnectError_InvalidURL ( t * testing . T ) {
i := InfluxDB {
URLs : [ ] string { "http://foobar:8089" } ,
}
err := i . Connect ( )
require . Error ( t , err )
i = InfluxDB {
URLs : [ ] string { "http://localhost:9999999" } ,
}
err = i . Connect ( )
require . Error ( t , err )
}
func TestHTTPConnectError_DatabaseCreateFail ( t * testing . T ) {
ts := httptest . NewServer ( http . HandlerFunc ( func ( w http . ResponseWriter , r * http . Request ) {
switch r . URL . Path {
case "/query" :
w . WriteHeader ( http . StatusNotFound )
w . Header ( ) . Set ( "Content-Type" , "application/json" )
fmt . Fprintln ( w , ` { "results":[ { }],"error":"test error"} ` )
}
} ) )
defer ts . Close ( )
i := InfluxDB {
URLs : [ ] string { ts . URL } ,
Database : "test" ,
}
// database creation errors do not return an error from Connect
// they are only logged.
2015-11-13 17:39:36 +00:00
err := i . Connect ( )
require . NoError ( t , err )
2016-12-04 20:18:13 +00:00
require . NoError ( t , i . Close ( ) )
}
func TestHTTPError_DatabaseNotFound ( t * testing . T ) {
ts := httptest . NewServer ( http . HandlerFunc ( func ( w http . ResponseWriter , r * http . Request ) {
switch r . URL . Path {
case "/write" :
w . WriteHeader ( http . StatusNotFound )
w . Header ( ) . Set ( "Content-Type" , "application/json" )
fmt . Fprintln ( w , ` { "results":[ { }],"error":"database not found"} ` )
case "/query" :
w . WriteHeader ( http . StatusNotFound )
w . Header ( ) . Set ( "Content-Type" , "application/json" )
fmt . Fprintln ( w , ` { "results":[ { }],"error":"database not found"} ` )
}
} ) )
defer ts . Close ( )
i := InfluxDB {
URLs : [ ] string { ts . URL } ,
Database : "test" ,
}
err := i . Connect ( )
2015-11-13 17:39:36 +00:00
require . NoError ( t , err )
2016-12-04 20:18:13 +00:00
err = i . Write ( testutil . MockMetrics ( ) )
require . Error ( t , err )
require . NoError ( t , i . Close ( ) )
2015-11-13 17:39:36 +00:00
}
2017-01-11 16:01:32 +00:00
// field type conflict does not return an error, instead we
func TestHTTPError_FieldTypeConflict ( t * testing . T ) {
ts := httptest . NewServer ( http . HandlerFunc ( func ( w http . ResponseWriter , r * http . Request ) {
switch r . URL . Path {
case "/write" :
w . WriteHeader ( http . StatusNotFound )
w . Header ( ) . Set ( "Content-Type" , "application/json" )
fmt . Fprintln ( w , ` { "results":[ { }],"error":"field type conflict: input field \"value\" on measurement \"test\" is type integer, already exists as type float dropped=1"} ` )
}
} ) )
defer ts . Close ( )
i := InfluxDB {
URLs : [ ] string { ts . URL } ,
Database : "test" ,
}
err := i . Connect ( )
require . NoError ( t , err )
err = i . Write ( testutil . MockMetrics ( ) )
require . NoError ( t , err )
require . NoError ( t , i . Close ( ) )
}
2017-05-31 00:38:32 +00:00
type MockClient struct {
writeStreamCalled int
contentLength int
}
func ( m * MockClient ) Query ( command string ) error {
panic ( "not implemented" )
}
func ( m * MockClient ) Write ( b [ ] byte ) ( int , error ) {
panic ( "not implemented" )
}
func ( m * MockClient ) WriteWithParams ( b [ ] byte , params client . WriteParams ) ( int , error ) {
panic ( "not implemented" )
}
func ( m * MockClient ) WriteStream ( b io . Reader , contentLength int ) ( int , error ) {
m . writeStreamCalled ++
m . contentLength = contentLength
return 0 , nil
}
func ( m * MockClient ) WriteStreamWithParams ( b io . Reader , contentLength int , params client . WriteParams ) ( int , error ) {
panic ( "not implemented" )
}
func ( m * MockClient ) Close ( ) error {
panic ( "not implemented" )
}