Fix influxdb output database quoting (#2851)
(cherry picked from commit f47924ffc5
)
This commit is contained in:
parent
99edca80ef
commit
4f8341670e
|
@ -16,7 +16,6 @@ var (
|
||||||
defaultRequestTimeout = time.Second * 5
|
defaultRequestTimeout = time.Second * 5
|
||||||
)
|
)
|
||||||
|
|
||||||
//
|
|
||||||
func NewHTTP(config HTTPConfig, defaultWP WriteParams) (Client, error) {
|
func NewHTTP(config HTTPConfig, defaultWP WriteParams) (Client, error) {
|
||||||
// validate required parameters:
|
// validate required parameters:
|
||||||
if len(config.URL) == 0 {
|
if len(config.URL) == 0 {
|
||||||
|
|
|
@ -16,6 +16,11 @@ import (
|
||||||
"github.com/influxdata/telegraf/plugins/outputs/influxdb/client"
|
"github.com/influxdata/telegraf/plugins/outputs/influxdb/client"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// Quote Ident replacer.
|
||||||
|
qiReplacer = strings.NewReplacer("\n", `\n`, `\`, `\\`, `"`, `\"`)
|
||||||
|
)
|
||||||
|
|
||||||
// InfluxDB struct is the primary data structure for the plugin
|
// InfluxDB struct is the primary data structure for the plugin
|
||||||
type InfluxDB struct {
|
type InfluxDB struct {
|
||||||
// URL is only for backwards compatability
|
// URL is only for backwards compatability
|
||||||
|
@ -133,7 +138,7 @@ func (i *InfluxDB) Connect() error {
|
||||||
}
|
}
|
||||||
i.clients = append(i.clients, c)
|
i.clients = append(i.clients, c)
|
||||||
|
|
||||||
err = c.Query("CREATE DATABASE " + i.Database)
|
err = c.Query(fmt.Sprintf(`CREATE DATABASE "%s"`, qiReplacer.Replace(i.Database)))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !strings.Contains(err.Error(), "Status Code [403]") {
|
if !strings.Contains(err.Error(), "Status Code [403]") {
|
||||||
log.Println("I! Database creation failed: " + err.Error())
|
log.Println("I! Database creation failed: " + err.Error())
|
||||||
|
@ -191,7 +196,8 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
|
||||||
if _, e := i.clients[n].WriteStream(r, bufsize); e != nil {
|
if _, e := i.clients[n].WriteStream(r, bufsize); e != nil {
|
||||||
// If the database was not found, try to recreate it:
|
// If the database was not found, try to recreate it:
|
||||||
if strings.Contains(e.Error(), "database not found") {
|
if strings.Contains(e.Error(), "database not found") {
|
||||||
if errc := i.clients[n].Query("CREATE DATABASE " + i.Database); errc != nil {
|
errc := i.clients[n].Query(fmt.Sprintf(`CREATE DATABASE "%s"`, qiReplacer.Replace(i.Database)))
|
||||||
|
if errc != nil {
|
||||||
log.Printf("E! Error: Database %s not found and failed to recreate\n",
|
log.Printf("E! Error: Database %s not found and failed to recreate\n",
|
||||||
i.Database)
|
i.Database)
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,9 +8,44 @@ import (
|
||||||
|
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestIdentQuoting(t *testing.T) {
|
||||||
|
var testCases = []struct {
|
||||||
|
database string
|
||||||
|
expected string
|
||||||
|
}{
|
||||||
|
{"x-y", `CREATE DATABASE "x-y"`},
|
||||||
|
{`x"y`, `CREATE DATABASE "x\"y"`},
|
||||||
|
{"x\ny", `CREATE DATABASE "x\ny"`},
|
||||||
|
{`x\y`, `CREATE DATABASE "x\\y"`},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
r.ParseForm()
|
||||||
|
q := r.Form.Get("q")
|
||||||
|
assert.Equal(t, tc.expected, q)
|
||||||
|
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
fmt.Fprintln(w, `{"results":[{}]}`)
|
||||||
|
}))
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
i := InfluxDB{
|
||||||
|
URLs: []string{ts.URL},
|
||||||
|
Database: tc.database,
|
||||||
|
}
|
||||||
|
|
||||||
|
err := i.Connect()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, i.Close())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestUDPInflux(t *testing.T) {
|
func TestUDPInflux(t *testing.T) {
|
||||||
i := InfluxDB{
|
i := InfluxDB{
|
||||||
URLs: []string{"udp://localhost:8089"},
|
URLs: []string{"udp://localhost:8089"},
|
||||||
|
|
Loading…
Reference in New Issue