Godep update: influxdb

This commit is contained in:
Cameron Sparr 2015-10-20 10:17:34 -06:00
parent f7504fb5eb
commit 6263bc2d1b
48 changed files with 843 additions and 804 deletions

4
Godeps/Godeps.json generated
View File

@ -102,8 +102,8 @@
},
{
"ImportPath": "github.com/influxdb/influxdb",
"Comment": "v0.9.4-rc1-652-gd9f0413",
"Rev": "d9f04132ef567bb9671690e4db226ff3dab9feb5"
"Comment": "v0.9.4-rc1-703-g956efae",
"Rev": "956efaeb94ee57ecd8dc23e2f654b5231204e28f"
},
{
"ImportPath": "github.com/lib/pq",

View File

@ -16,11 +16,11 @@
- [#4310](https://github.com/influxdb/influxdb/pull/4310): Support dropping non-Raft nodes. Work mostly by @corylanou
- [#4348](https://github.com/influxdb/influxdb/pull/4348): Public ApplyTemplate function for graphite parser.
- [#4178](https://github.com/influxdb/influxdb/pull/4178): Support fields in graphite parser. Thanks @roobert!
- [#4291](https://github.com/influxdb/influxdb/pull/4291): Added ALTER DATABASE RENAME. Thanks @linearb
- [#4409](https://github.com/influxdb/influxdb/pull/4409): wire up INTO queries.
- [#4379](https://github.com/influxdb/influxdb/pull/4379): Auto-create database for UDP input.
- [#4375](https://github.com/influxdb/influxdb/pull/4375): Add Subscriptions so data can be 'forked' out of InfluxDB to another third party.
- [#4459](https://github.com/influxdb/influxdb/pull/4459): Register with Enterprise service if token available.
- [#4506](https://github.com/influxdb/influxdb/pull/4506): Register with Enterprise service and upload stats, if token is available.
- [#4501](https://github.com/influxdb/influxdb/pull/4501): Allow filtering SHOW MEASUREMENTS by regex.
### Bugfixes
- [#4389](https://github.com/influxdb/influxdb/pull/4389): Don't add a new segment file on each hinted-handoff purge cycle.
@ -50,6 +50,7 @@
- [#4465](https://github.com/influxdb/influxdb/pull/4465): Actually display a message if the CLI can't connect to the database.
- [#4342](https://github.com/influxdb/influxdb/pull/4342): Fix mixing aggregates and math with non-aggregates. Thanks @kostya-sh.
- [#4349](https://github.com/influxdb/influxdb/issues/4349): If HH can't unmarshal a block, skip that block.
- [#4502](https://github.com/influxdb/influxdb/pull/4502): Don't crash on Graphite close, if Graphite not fully open. Thanks for the report @ranjib
- [#4354](https://github.com/influxdb/influxdb/pull/4353): Fully lock node queues during hinted handoff. Fixes one cause of missing data on clusters.
- [#4357](https://github.com/influxdb/influxdb/issues/4357): Fix similar float values encoding overflow Thanks @dgryski!
- [#4344](https://github.com/influxdb/influxdb/issues/4344): Make client.Write default to client.precision if none is given.
@ -71,6 +72,9 @@
- [#4415](https://github.com/influxdb/influxdb/issues/4415): Selector (like max, min, first, etc) return a string instead of timestamp
- [#4472](https://github.com/influxdb/influxdb/issues/4472): Fix 'too many points in GROUP BY interval' error
- [#4475](https://github.com/influxdb/influxdb/issues/4475): Fix SHOW TAG VALUES error message.
- [#4486](https://github.com/influxdb/influxdb/pull/4486): Fix missing comments for runner package
- [#4497](https://github.com/influxdb/influxdb/pull/4497): Fix sequence in meta proto
- [#3367](https://github.com/influxdb/influxdb/issues/3367): Negative timestamps are parsed correctly by the line protocol.
## v0.9.4 [2015-09-14]

View File

@ -66,7 +66,7 @@ To assist in review for the PR, please add the following to your pull request co
Use of third-party packages
------------
A third-party package is defined as one that is not part of the standard Go distribution. Generally speaking we prefer to minimize our use of third-party packages, and avoid them unless absolutely necessarly. We'll often write a little bit of code rather than pull in a third-party package. Of course, we do use some third-party packages -- most importantly we use [BoltDB](https://github.com/boltdb/bolt) as the storage engine. So to maximise the chance your change will be accepted by us, use only the standard libaries, or the third-party packages we have decided to use.
A third-party package is defined as one that is not part of the standard Go distribution. Generally speaking we prefer to minimize our use of third-party packages, and avoid them unless absolutely necessarily. We'll often write a little bit of code rather than pull in a third-party package. Of course, we do use some third-party packages -- most importantly we use [BoltDB](https://github.com/boltdb/bolt) as the storage engine. So to maximise the chance your change will be accepted by us, use only the standard libraries, or the third-party packages we have decided to use.
For rationale, check out the post [The Case Against Third Party Libraries](http://blog.gopheracademy.com/advent-2014/case-against-3pl/).
@ -236,7 +236,7 @@ Note that when you pass the binary to `go tool pprof` *you must specify the path
Continuous Integration testing
-----
InfluxDB uses CirceCI for continuous integration testing. To see how the code is built and tested, check out [this file](https://github.com/influxdb/influxdb/blob/master/circle-test.sh). It closely follows the build and test process outlined above. You can see the exact version of Go InfluxDB uses for testing by consulting that file.
InfluxDB uses CircleCI for continuous integration testing. To see how the code is built and tested, check out [this file](https://github.com/influxdb/influxdb/blob/master/circle-test.sh). It closely follows the build and test process outlined above. You can see the exact version of Go InfluxDB uses for testing by consulting that file.
Useful links
------------

View File

@ -168,7 +168,7 @@ func queryDB(clnt client.Client, cmd string) (res []client.Result, err error) {
}
res = response.Results
}
return response, nil
return res, nil
}
```

View File

@ -19,7 +19,7 @@ func ExampleNewClient() {
}
// NOTE: this assumes you've setup a user and have setup shell env variables,
// namely INFLUX_USER/INFLUX_PWD. If not just ommit Username/Password below.
// namely INFLUX_USER/INFLUX_PWD. If not just omit Username/Password below.
conf := client.Config{
URL: *host,
Username: os.Getenv("INFLUX_USER"),

View File

@ -218,6 +218,31 @@ func (p *Point) PrecisionString(precison string) string {
return p.pt.PrecisionString(precison)
}
// Name returns the measurement name of the point
func (p *Point) Name() string {
return p.pt.Name()
}
// Name returns the tags associated with the point
func (p *Point) Tags() map[string]string {
return p.pt.Tags()
}
// Time return the timestamp for the point
func (p *Point) Time() time.Time {
return p.pt.Time()
}
// UnixNano returns the unix nano time of the point
func (p *Point) UnixNano() int64 {
return p.pt.UnixNano()
}
// Fields returns the fields for the point
func (p *Point) Fields() map[string]interface{} {
return p.pt.Fields()
}
func (c *client) Write(bp BatchPoints) error {
u := c.url
u.Path = "write"

View File

@ -5,6 +5,7 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"reflect"
"strings"
"testing"
"time"
@ -186,6 +187,54 @@ func TestClient_PointWithoutTimeString(t *testing.T) {
}
}
func TestClient_PointName(t *testing.T) {
tags := map[string]string{"cpu": "cpu-total"}
fields := map[string]interface{}{"idle": 10.1, "system": 50.9, "user": 39.0}
p := NewPoint("cpu_usage", tags, fields)
exp := "cpu_usage"
if p.Name() != exp {
t.Errorf("Error, got %s, expected %s",
p.Name(), exp)
}
}
func TestClient_PointTags(t *testing.T) {
tags := map[string]string{"cpu": "cpu-total"}
fields := map[string]interface{}{"idle": 10.1, "system": 50.9, "user": 39.0}
p := NewPoint("cpu_usage", tags, fields)
if !reflect.DeepEqual(tags, p.Tags()) {
t.Errorf("Error, got %v, expected %v",
p.Tags(), tags)
}
}
func TestClient_PointUnixNano(t *testing.T) {
const shortForm = "2006-Jan-02"
time1, _ := time.Parse(shortForm, "2013-Feb-03")
tags := map[string]string{"cpu": "cpu-total"}
fields := map[string]interface{}{"idle": 10.1, "system": 50.9, "user": 39.0}
p := NewPoint("cpu_usage", tags, fields, time1)
exp := int64(1359849600000000000)
if p.UnixNano() != exp {
t.Errorf("Error, got %d, expected %d",
p.UnixNano(), exp)
}
}
func TestClient_PointFields(t *testing.T) {
tags := map[string]string{"cpu": "cpu-total"}
fields := map[string]interface{}{"idle": 10.1, "system": 50.9, "user": 39.0}
p := NewPoint("cpu_usage", tags, fields)
if !reflect.DeepEqual(fields, p.Fields()) {
t.Errorf("Error, got %v, expected %v",
p.Fields(), fields)
}
}
func TestBatchPoints_PrecisionError(t *testing.T) {
_, err := NewBatchPoints(BatchPointsConfig{Precision: "foobar"})
if err == nil {

View File

@ -15,7 +15,7 @@ func ExampleNewClient() client.Client {
u, _ := url.Parse("http://localhost:8086")
// NOTE: this assumes you've setup a user and have setup shell env variables,
// namely INFLUX_USER/INFLUX_PWD. If not just ommit Username/Password below.
// namely INFLUX_USER/INFLUX_PWD. If not just omit Username/Password below.
client := client.NewClient(client.Config{
URL: u,
Username: os.Getenv("INFLUX_USER"),

View File

@ -323,7 +323,7 @@ func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPo
// If the write consistency level is ANY, then a successful hinted handoff can
// be considered a successful write so send nil to the response channel
// otherwise, let the original error propogate to the response channel
// otherwise, let the original error propagate to the response channel
if hherr == nil && consistency == ConsistencyLevelAny {
ch <- &AsyncWriteResult{owner, nil}
return

View File

@ -228,9 +228,9 @@ func TestPointsWriter_WritePoints(t *testing.T) {
expErr: nil,
},
// Write to non-existant database
// Write to non-existent database
{
name: "write to non-existant database",
name: "write to non-existent database",
database: "doesnt_exist",
retentionPolicy: "",
consistency: cluster.ConsistencyLevelAny,

View File

@ -165,11 +165,11 @@ func readIds(path string) (map[string]uint64, error) {
}
return ids, err
}
func readIndex(f *os.File) *tsmIndex {
func readIndex(f *os.File) (*tsmIndex, error) {
// Get the file size
stat, err := f.Stat()
if err != nil {
panic(err.Error())
return nil, err
}
// Seek to the series count
@ -177,8 +177,7 @@ func readIndex(f *os.File) *tsmIndex {
b := make([]byte, 8)
_, err = f.Read(b[:4])
if err != nil {
fmt.Printf("error: %v\n", err.Error())
os.Exit(1)
return nil, err
}
seriesCount := binary.BigEndian.Uint32(b)
@ -206,6 +205,10 @@ func readIndex(f *os.File) *tsmIndex {
series: count,
}
if indexStart < 0 {
return nil, fmt.Errorf("index corrupt: offset=%d", indexStart)
}
// Read the index entries
for i := 0; i < count; i++ {
f.Read(b)
@ -215,7 +218,7 @@ func readIndex(f *os.File) *tsmIndex {
index.blocks = append(index.blocks, &block{id: id, offset: int64(pos)})
}
return index
return index, nil
}
func cmdDumpTsm1(opts *tsdmDumpOpts) {
@ -254,7 +257,19 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) {
invIds[v] = k
}
index := readIndex(f)
index, err := readIndex(f)
if err != nil {
println("Failed to readIndex:", err.Error())
// Create a stubbed out index so we can still try and read the block data directly
// w/o panicing ourselves.
index = &tsmIndex{
minTime: time.Unix(0, 0),
maxTime: time.Unix(0, 0),
offset: stat.Size(),
}
}
blockStats := &blockStats{}
println("Summary:")

View File

@ -22,21 +22,21 @@ import (
"github.com/influxdb/influxdb/services/httpd"
"github.com/influxdb/influxdb/services/opentsdb"
"github.com/influxdb/influxdb/services/precreator"
"github.com/influxdb/influxdb/services/registration"
"github.com/influxdb/influxdb/services/retention"
"github.com/influxdb/influxdb/services/subscriber"
"github.com/influxdb/influxdb/services/udp"
"github.com/influxdb/influxdb/tsdb"
)
const DefaultEnterpriseURL = "https://enterprise.influxdata.com"
// Config represents the configuration format for the influxd binary.
type Config struct {
Meta *meta.Config `toml:"meta"`
Data tsdb.Config `toml:"data"`
Cluster cluster.Config `toml:"cluster"`
Retention retention.Config `toml:"retention"`
Precreator precreator.Config `toml:"shard-precreation"`
Meta *meta.Config `toml:"meta"`
Data tsdb.Config `toml:"data"`
Cluster cluster.Config `toml:"cluster"`
Retention retention.Config `toml:"retention"`
Registration registration.Config `toml:"registration"`
Precreator precreator.Config `toml:"shard-precreation"`
Admin admin.Config `toml:"admin"`
Monitor monitor.Config `toml:"monitor"`
@ -54,19 +54,15 @@ type Config struct {
// Server reporting
ReportingDisabled bool `toml:"reporting-disabled"`
// Server registration
EnterpriseURL string `toml:"enterprise-url"`
EnterpriseToken string `toml:"enterprise-token"`
}
// NewConfig returns an instance of Config with reasonable defaults.
func NewConfig() *Config {
c := &Config{}
c.EnterpriseURL = DefaultEnterpriseURL
c.Meta = meta.NewConfig()
c.Data = tsdb.NewConfig()
c.Cluster = cluster.NewConfig()
c.Registration = registration.NewConfig()
c.Precreator = precreator.NewConfig()
c.Admin = admin.NewConfig()

View File

@ -13,8 +13,6 @@ func TestConfig_Parse(t *testing.T) {
// Parse configuration.
var c run.Config
if _, err := toml.Decode(`
enterprise-token = "deadbeef"
[meta]
dir = "/tmp/meta"
@ -57,9 +55,7 @@ enabled = true
}
// Validate configuration.
if c.EnterpriseToken != "deadbeef" {
t.Fatalf("unexpected Enterprise token: %s", c.EnterpriseToken)
} else if c.Meta.Dir != "/tmp/meta" {
if c.Meta.Dir != "/tmp/meta" {
t.Fatalf("unexpected meta dir: %s", c.Meta.Dir)
} else if c.Data.Dir != "/tmp/data" {
t.Fatalf("unexpected data dir: %s", c.Data.Dir)
@ -91,8 +87,6 @@ func TestConfig_Parse_EnvOverride(t *testing.T) {
// Parse configuration.
var c run.Config
if _, err := toml.Decode(`
enterprise-token = "deadbeef"
[meta]
dir = "/tmp/meta"
@ -131,10 +125,6 @@ enabled = true
t.Fatal(err)
}
if err := os.Setenv("INFLUXDB_ENTERPRISE_TOKEN", "wheresthebeef"); err != nil {
t.Fatalf("failed to set env var: %v", err)
}
if err := os.Setenv("INFLUXDB_UDP_BIND_ADDRESS", ":1234"); err != nil {
t.Fatalf("failed to set env var: %v", err)
}
@ -147,10 +137,6 @@ enabled = true
t.Fatalf("failed to apply env overrides: %v", err)
}
if c.EnterpriseToken != "wheresthebeef" {
t.Fatalf("unexpected Enterprise token: %s", c.EnterpriseToken)
}
if c.UDPs[0].BindAddress != ":4444" {
t.Fatalf("unexpected udp bind address: %s", c.UDPs[0].BindAddress)
}

View File

@ -2,9 +2,7 @@ package run
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
@ -26,6 +24,7 @@ import (
"github.com/influxdb/influxdb/services/httpd"
"github.com/influxdb/influxdb/services/opentsdb"
"github.com/influxdb/influxdb/services/precreator"
"github.com/influxdb/influxdb/services/registration"
"github.com/influxdb/influxdb/services/retention"
"github.com/influxdb/influxdb/services/snapshotter"
"github.com/influxdb/influxdb/services/subscriber"
@ -76,8 +75,6 @@ type Server struct {
// Server reporting and registration
reportingDisabled bool
enterpriseURL string
enterpriseToken string
// Profiling
CPUProfile string
@ -104,8 +101,6 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
Monitor: monitor.New(c.Monitor),
reportingDisabled: c.ReportingDisabled,
enterpriseURL: c.EnterpriseURL,
enterpriseToken: c.EnterpriseToken,
}
// Copy TSDB configuration.
@ -162,6 +157,7 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
// Append services.
s.appendClusterService(c.Cluster)
s.appendPrecreatorService(c.Precreator)
s.appendRegistrationService(c.Registration)
s.appendSnapshotterService()
s.appendCopierService()
s.appendAdminService(c.Admin)
@ -299,6 +295,21 @@ func (s *Server) appendPrecreatorService(c precreator.Config) error {
return nil
}
func (s *Server) appendRegistrationService(c registration.Config) error {
if !c.Enabled {
return nil
}
srv, err := registration.NewService(c, s.buildInfo.Version)
if err != nil {
return err
}
srv.MetaStore = s.MetaStore
srv.Monitor = s.Monitor
s.Services = append(s.Services, srv)
return nil
}
func (s *Server) appendUDPService(c udp.Config) {
if !c.Enabled {
return
@ -403,11 +414,6 @@ func (s *Server) Open() error {
go s.startServerReporting()
}
// Register server
if err := s.registerServer(); err != nil {
log.Printf("failed to register server: %s", err.Error())
}
return nil
}(); err != nil {
@ -519,59 +525,6 @@ func (s *Server) reportServer() {
go client.Post("http://m.influxdb.com:8086/db/reporting/series?u=reporter&p=influxdb", "application/json", data)
}
// registerServer registers the server on start-up.
func (s *Server) registerServer() error {
if s.enterpriseToken == "" {
return nil
}
clusterID, err := s.MetaStore.ClusterID()
if err != nil {
log.Printf("failed to retrieve cluster ID for registration: %s", err.Error())
return err
}
hostname, err := os.Hostname()
if err != nil {
return err
}
j := map[string]interface{}{
"cluster_id": fmt.Sprintf("%d", clusterID),
"server_id": fmt.Sprintf("%d", s.MetaStore.NodeID()),
"host": hostname,
"product": "influxdb",
"version": s.buildInfo.Version,
}
b, err := json.Marshal(j)
if err != nil {
return err
}
url := fmt.Sprintf("%s/api/v1/servers?token=%s", s.enterpriseURL, s.enterpriseToken)
go func() {
client := http.Client{Timeout: time.Duration(5 * time.Second)}
resp, err := client.Post(url, "application/json", bytes.NewBuffer(b))
if err != nil {
log.Printf("failed to register server with %s: %s", s.enterpriseURL, err.Error())
return
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusCreated {
return
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Printf("failed to read response from registration server: %s", err.Error())
return
}
log.Printf("failed to register server with %s: received code %s, body: %s", s.enterpriseURL, resp.Status, string(body))
}()
return nil
}
// monitorErrorChan reads an error channel and resends it through the server.
func (s *Server) monitorErrorChan(ch <-chan error) {
for {

View File

@ -66,43 +66,18 @@ func TestServer_DatabaseCommands(t *testing.T) {
command: `SHOW DATABASES`,
exp: `{"results":[{"series":[{"name":"databases","columns":["name"],"values":[["db0"],["db1"]]}]}]}`,
},
&Query{
name: "rename database should succeed",
command: `ALTER DATABASE db1 RENAME TO db2`,
exp: `{"results":[{}]}`,
},
&Query{
name: "show databases should reflect change of name",
command: `SHOW DATABASES`,
exp: `{"results":[{"series":[{"name":"databases","columns":["name"],"values":[["db0"],["db2"]]}]}]}`,
},
&Query{
name: "rename non-existent database should fail",
command: `ALTER DATABASE db4 RENAME TO db5`,
exp: `{"results":[{"error":"database not found"}]}`,
},
&Query{
name: "rename database to illegal name should fail",
command: `ALTER DATABASE db2 RENAME TO 0xdb0`,
exp: `{"error":"error parsing query: found 0, expected identifier at line 1, char 30"}`,
},
&Query{
name: "rename database to already existing datbase should fail",
command: `ALTER DATABASE db2 RENAME TO db0`,
exp: `{"results":[{"error":"database already exists"}]}`,
},
&Query{
name: "drop database db0 should succeed",
command: `DROP DATABASE db0`,
exp: `{"results":[{}]}`,
},
&Query{
name: "drop database db2 should succeed",
command: `DROP DATABASE db2`,
name: "drop database db1 should succeed",
command: `DROP DATABASE db1`,
exp: `{"results":[{}]}`,
},
&Query{
name: "show databases should have no results after dropping all databases",
name: "show database should have no results",
command: `SHOW DATABASES`,
exp: `{"results":[{"series":[{"name":"databases","columns":["name"]}]}]}`,
},
@ -266,96 +241,6 @@ func TestServer_Query_DropDatabaseIsolated(t *testing.T) {
}
}
func TestServer_Query_RenameDatabase(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig(), "")
defer s.Close()
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil {
t.Fatal(err)
}
if err := s.MetaStore.SetDefaultRetentionPolicy("db0", "rp0"); err != nil {
t.Fatal(err)
}
writes := []string{
fmt.Sprintf(`cpu,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
}
test := NewTest("db0", "rp0")
test.write = strings.Join(writes, "\n")
test.addQueries([]*Query{
&Query{
name: "Query data from db0 database",
command: `SELECT * FROM cpu`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-01T00:00:00Z","serverA","uswest",23.2]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "Query data from db0 database with GROUP BY *",
command: `SELECT * FROM cpu GROUP BY *`,
exp: `{"results":[{"series":[{"name":"cpu","tags":{"host":"serverA","region":"uswest"},"columns":["time","val"],"values":[["2000-01-01T00:00:00Z",23.2]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "Create continuous query using db0",
command: `CREATE CONTINUOUS QUERY "cq1" ON db0 BEGIN SELECT count(value) INTO "rp1".:MEASUREMENT FROM cpu GROUP BY time(5s) END`,
exp: `{"results":[{}]}`,
},
&Query{
name: "Rename database should fail because of conflicting CQ",
command: `ALTER DATABASE db0 RENAME TO db1`,
exp: `{"results":[{"error":"database rename conflict with existing continuous query"}]}`,
},
&Query{
name: "Drop conflicting CQ",
command: `DROP CONTINUOUS QUERY "cq1" on db0`,
exp: `{"results":[{}]}`,
},
&Query{
name: "Rename database should succeed now",
command: `ALTER DATABASE db0 RENAME TO db1`,
exp: `{"results":[{}]}`,
},
&Query{
name: "Query data from db0 database and ensure it's gone",
command: `SELECT * FROM cpu`,
exp: `{"results":[{"error":"database not found: db0"}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "Query data from now renamed database db1 and ensure that's there",
command: `SELECT * FROM cpu`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-01T00:00:00Z","serverA","uswest",23.2]]}]}]}`,
params: url.Values{"db": []string{"db1"}},
},
&Query{
name: "Query data from now renamed database db1 and ensure it's still there with GROUP BY *",
command: `SELECT * FROM cpu GROUP BY *`,
exp: `{"results":[{"series":[{"name":"cpu","tags":{"host":"serverA","region":"uswest"},"columns":["time","val"],"values":[["2000-01-01T00:00:00Z",23.2]]}]}]}`,
params: url.Values{"db": []string{"db1"}},
},
}...)
for i, query := range test.queries {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}
func TestServer_Query_DropAndRecreateSeries(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig(), "")
@ -4371,6 +4256,24 @@ func TestServer_Query_ShowMeasurements(t *testing.T) {
exp: `{"results":[{"series":[{"name":"measurements","columns":["name"],"values":[["cpu"],["gpu"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: `show measurements using WITH`,
command: "SHOW MEASUREMENTS WITH MEASUREMENT = cpu",
exp: `{"results":[{"series":[{"name":"measurements","columns":["name"],"values":[["cpu"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: `show measurements using WITH and regex`,
command: "SHOW MEASUREMENTS WITH MEASUREMENT =~ /[cg]pu/",
exp: `{"results":[{"series":[{"name":"measurements","columns":["name"],"values":[["cpu"],["gpu"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: `show measurements using WITH and regex - no matches`,
command: "SHOW MEASUREMENTS WITH MEASUREMENT =~ /.*zzzzz.*/",
exp: `{"results":[{}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: `show measurements where tag matches regular expression`,
command: "SHOW MEASUREMENTS WHERE region =~ /ca.*/",
@ -5008,6 +4911,7 @@ func TestServer_Query_IntoTarget(t *testing.T) {
fmt.Sprintf(`foo value=2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:10Z").UnixNano()),
fmt.Sprintf(`foo value=3 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:20Z").UnixNano()),
fmt.Sprintf(`foo value=4 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:30Z").UnixNano()),
fmt.Sprintf(`foo value=4,foobar=3 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:40Z").UnixNano()),
}
test := NewTest("db0", "rp0")
@ -5017,14 +4921,14 @@ func TestServer_Query_IntoTarget(t *testing.T) {
&Query{
name: "into",
params: url.Values{"db": []string{"db0"}},
command: `SELECT value AS something INTO baz FROM foo`,
exp: `{"results":[{"series":[{"name":"result","columns":["time","written"],"values":[["1970-01-01T00:00:00Z",4]]}]}]}`,
command: `SELECT * INTO baz FROM foo`,
exp: `{"results":[{"series":[{"name":"result","columns":["time","written"],"values":[["1970-01-01T00:00:00Z",5]]}]}]}`,
},
&Query{
name: "confirm results",
params: url.Values{"db": []string{"db0"}},
command: `SELECT something FROM baz`,
exp: `{"results":[{"series":[{"name":"baz","columns":["time","something"],"values":[["2000-01-01T00:00:00Z",1],["2000-01-01T00:00:10Z",2],["2000-01-01T00:00:20Z",3],["2000-01-01T00:00:30Z",4]]}]}]}`,
command: `SELECT * FROM baz`,
exp: `{"results":[{"series":[{"name":"baz","columns":["time","foobar","value"],"values":[["2000-01-01T00:00:00Z",null,1],["2000-01-01T00:00:10Z",null,2],["2000-01-01T00:00:20Z",null,3],["2000-01-01T00:00:30Z",null,4],["2000-01-01T00:00:40Z",3,4]]}]}]}`,
},
}...)

View File

@ -18,7 +18,7 @@ When each test runs it does the following:
## Idempotent - Allows for parallel tests
Each test should be `idempotent`, meaining that its data will not be affected by other tests, or use cases within the table tests themselves.
Each test should be `idempotent`, meaning that its data will not be affected by other tests, or use cases within the table tests themselves.
This allows for parallel testing, keeping the test suite total execution time very low.
### Basic sample test

View File

@ -8,9 +8,14 @@
# Change this option to true to disable reporting.
reporting-disabled = false
# Enterprise registration control
# enterprise-url = "https://enterprise.influxdata.com" # The Enterprise server URL
# enterprise-token = "" # Registration token for Enterprise server
###
### Enterprise registration control
###
[registration]
# enabled = true
# url = "https://enterprise.influxdata.com" # The Enterprise server URL
# token = "" # Registration token for Enterprise server
###
### [meta]

View File

@ -80,7 +80,6 @@ type Node interface {
func (*Query) node() {}
func (Statements) node() {}
func (*AlterDatabaseRenameStatement) node() {}
func (*AlterRetentionPolicyStatement) node() {}
func (*CreateContinuousQueryStatement) node() {}
func (*CreateDatabaseStatement) node() {}
@ -192,7 +191,6 @@ type ExecutionPrivilege struct {
// ExecutionPrivileges is a list of privileges required to execute a statement.
type ExecutionPrivileges []ExecutionPrivilege
func (*AlterDatabaseRenameStatement) stmt() {}
func (*AlterRetentionPolicyStatement) stmt() {}
func (*CreateContinuousQueryStatement) stmt() {}
func (*CreateDatabaseStatement) stmt() {}
@ -510,30 +508,6 @@ func (s *GrantAdminStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Admin: true, Name: "", Privilege: AllPrivileges}}
}
// AlterDatabaseRenameStatement represents a command for renaming a database.
type AlterDatabaseRenameStatement struct {
// Current name of the database
OldName string
// New name of the database
NewName string
}
// String returns a string representation of the rename database statement.
func (s *AlterDatabaseRenameStatement) String() string {
var buf bytes.Buffer
_, _ = buf.WriteString("ALTER DATABASE ")
_, _ = buf.WriteString(s.OldName)
_, _ = buf.WriteString(" RENAME ")
_, _ = buf.WriteString(" TO ")
_, _ = buf.WriteString(s.NewName)
return buf.String()
}
// RequiredPrivileges returns the privilege required to execute an AlterDatabaseRenameStatement.
func (s *AlterDatabaseRenameStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Admin: true, Name: "", Privilege: AllPrivileges}}
}
// SetPasswordUserStatement represents a command for changing user password.
type SetPasswordUserStatement struct {
// Plain Password
@ -1953,6 +1927,9 @@ func (s *DropContinuousQueryStatement) RequiredPrivileges() ExecutionPrivileges
// ShowMeasurementsStatement represents a command for listing measurements.
type ShowMeasurementsStatement struct {
// Measurement name or regex.
Source Source
// An expression evaluated on data point.
Condition Expr

View File

@ -226,18 +226,14 @@ func (p *Parser) parseDropStatement() (Statement, error) {
// This function assumes the ALTER token has already been consumed.
func (p *Parser) parseAlterStatement() (Statement, error) {
tok, pos, lit := p.scanIgnoreWhitespace()
switch tok {
case RETENTION:
if tok == RETENTION {
if tok, pos, lit = p.scanIgnoreWhitespace(); tok != POLICY {
return nil, newParseError(tokstr(tok, lit), []string{"POLICY"}, pos)
}
return p.parseAlterRetentionPolicyStatement()
case DATABASE:
return p.parseAlterDatabaseRenameStatement()
}
return nil, newParseError(tokstr(tok, lit), []string{"RETENTION", "DATABASE"}, pos)
return nil, newParseError(tokstr(tok, lit), []string{"RETENTION"}, pos)
}
// parseSetPasswordUserStatement parses a string and returns a set statement.
@ -1011,6 +1007,29 @@ func (p *Parser) parseShowMeasurementsStatement() (*ShowMeasurementsStatement, e
stmt := &ShowMeasurementsStatement{}
var err error
// Parse optional WITH clause.
if tok, _, _ := p.scanIgnoreWhitespace(); tok == WITH {
// Parse required MEASUREMENT token.
if err := p.parseTokens([]Token{MEASUREMENT}); err != nil {
return nil, err
}
// Parse required operator: = or =~.
tok, pos, lit := p.scanIgnoreWhitespace()
switch tok {
case EQ, EQREGEX:
// Parse required source (measurement name or regex).
if stmt.Source, err = p.parseSource(); err != nil {
return nil, err
}
default:
return nil, newParseError(tokstr(tok, lit), []string{"=", "=~"}, pos)
}
} else {
// Not a WITH clause so put the token back.
p.unscan()
}
// Parse condition: "WHERE EXPR".
if stmt.Condition, err = p.parseCondition(); err != nil {
return nil, err
@ -1449,33 +1468,6 @@ func (p *Parser) parseDropDatabaseStatement() (*DropDatabaseStatement, error) {
return stmt, nil
}
// parseAlterDatabaseRenameStatement parses a string and returns an AlterDatabaseRenameStatement.
// This function assumes the "ALTER DATABASE" tokens have already been consumed.
func (p *Parser) parseAlterDatabaseRenameStatement() (*AlterDatabaseRenameStatement, error) {
stmt := &AlterDatabaseRenameStatement{}
// Parse the name of the database to be renamed.
lit, err := p.parseIdent()
if err != nil {
return nil, err
}
stmt.OldName = lit
// Parse required RENAME TO tokens.
if err := p.parseTokens([]Token{RENAME, TO}); err != nil {
return nil, err
}
// Parse the new name of the database.
lit, err = p.parseIdent()
if err != nil {
return nil, err
}
stmt.NewName = lit
return stmt, nil
}
// parseDropSubscriptionStatement parses a string and returns a DropSubscriptionStatement.
// This function assumes the "DROP SUBSCRIPTION" tokens have already been consumed.
func (p *Parser) parseDropSubscriptionStatement() (*DropSubscriptionStatement, error) {

View File

@ -751,6 +751,24 @@ func TestParser_ParseStatement(t *testing.T) {
},
},
// SHOW MEASUREMENTS WITH MEASUREMENT = cpu
{
s: `SHOW MEASUREMENTS WITH MEASUREMENT = cpu`,
stmt: &influxql.ShowMeasurementsStatement{
Source: &influxql.Measurement{Name: "cpu"},
},
},
// SHOW MEASUREMENTS WITH MEASUREMENT =~ /regex/
{
s: `SHOW MEASUREMENTS WITH MEASUREMENT =~ /[cg]pu/`,
stmt: &influxql.ShowMeasurementsStatement{
Source: &influxql.Measurement{
Regex: &influxql.RegexLiteral{Val: regexp.MustCompile(`[cg]pu`)},
},
},
},
// SHOW RETENTION POLICIES
{
s: `SHOW RETENTION POLICIES ON mydb`,
@ -1418,12 +1436,6 @@ func TestParser_ParseStatement(t *testing.T) {
stmt: newAlterRetentionPolicyStatement("default", "testdb", -1, 4, false),
},
// ALTER DATABASE RENAME
{
s: `ALTER DATABASE db0 RENAME TO db1`,
stmt: newAlterDatabaseRenameStatement("db0", "db1"),
},
// SHOW STATS
{
s: `SHOW STATS`,
@ -1687,15 +1699,11 @@ func TestParser_ParseStatement(t *testing.T) {
{s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h REPLICATION 0`, err: `invalid value 0: must be 1 <= n <= 2147483647 at line 1, char 67`},
{s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h REPLICATION bad`, err: `found bad, expected number at line 1, char 67`},
{s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h REPLICATION 1 foo`, err: `found foo, expected DEFAULT at line 1, char 69`},
{s: `ALTER`, err: `found EOF, expected RETENTION, DATABASE at line 1, char 7`},
{s: `ALTER`, err: `found EOF, expected RETENTION at line 1, char 7`},
{s: `ALTER RETENTION`, err: `found EOF, expected POLICY at line 1, char 17`},
{s: `ALTER RETENTION POLICY`, err: `found EOF, expected identifier at line 1, char 24`},
{s: `ALTER RETENTION POLICY policy1`, err: `found EOF, expected ON at line 1, char 32`}, {s: `ALTER RETENTION POLICY policy1 ON`, err: `found EOF, expected identifier at line 1, char 35`},
{s: `ALTER RETENTION POLICY policy1 ON testdb`, err: `found EOF, expected DURATION, RETENTION, DEFAULT at line 1, char 42`},
{s: `ALTER DATABASE`, err: `found EOF, expected identifier at line 1, char 16`},
{s: `ALTER DATABASE db0`, err: `found EOF, expected RENAME at line 1, char 20`},
{s: `ALTER DATABASE db0 RENAME`, err: `found EOF, expected TO at line 1, char 27`},
{s: `ALTER DATABASE db0 RENAME TO`, err: `found EOF, expected identifier at line 1, char 30`},
{s: `SET`, err: `found EOF, expected PASSWORD at line 1, char 5`},
{s: `SET PASSWORD`, err: `found EOF, expected FOR at line 1, char 14`},
{s: `SET PASSWORD something`, err: `found something, expected FOR at line 1, char 14`},
@ -2129,14 +2137,6 @@ func newAlterRetentionPolicyStatement(name string, DB string, d time.Duration, r
return stmt
}
// newAlterDatabaseRenameStatement creates an initialized AlterDatabaseRenameStatement.
func newAlterDatabaseRenameStatement(oldName, newName string) *influxql.AlterDatabaseRenameStatement {
return &influxql.AlterDatabaseRenameStatement{
OldName: oldName,
NewName: newName,
}
}
// mustMarshalJSON encodes a value to JSON.
func mustMarshalJSON(v interface{}) []byte {
b, err := json.Marshal(v)

View File

@ -150,7 +150,6 @@ func TestScanner_Scan(t *testing.T) {
{s: `QUERIES`, tok: influxql.QUERIES},
{s: `QUERY`, tok: influxql.QUERY},
{s: `READ`, tok: influxql.READ},
{s: `RENAME`, tok: influxql.RENAME},
{s: `RETENTION`, tok: influxql.RETENTION},
{s: `REVOKE`, tok: influxql.REVOKE},
{s: `SELECT`, tok: influxql.SELECT},

View File

@ -107,7 +107,6 @@ const (
QUERIES
QUERY
READ
RENAME
REPLICATION
RETENTION
REVOKE
@ -224,7 +223,6 @@ var tokens = [...]string{
QUERIES: "QUERIES",
QUERY: "QUERY",
READ: "READ",
RENAME: "RENAME",
REPLICATION: "REPLICATION",
RETENTION: "RETENTION",
REVOKE: "REVOKE",

View File

@ -1,9 +1,7 @@
package meta
import (
"fmt"
"sort"
"strings"
"time"
"github.com/gogo/protobuf/proto"
@ -179,69 +177,6 @@ func (data *Data) DropDatabase(name string) error {
return ErrDatabaseNotFound
}
// RenameDatabase renames a database.
// Returns an error if oldName or newName is blank
// or if a database with the newName already exists
// or if a database with oldName does not exist
func (data *Data) RenameDatabase(oldName, newName string) error {
if newName == "" || oldName == "" {
return ErrDatabaseNameRequired
}
if data.Database(newName) != nil {
return ErrDatabaseExists
}
if data.Database(oldName) == nil {
return ErrDatabaseNotFound
}
// TODO should rename database in continuous queries also
// for now, just return an error if there is a possible conflict
if data.isDatabaseNameUsedInCQ(oldName) {
return ErrDatabaseRenameCQConflict
}
// find database named oldName and rename it to newName
for i := range data.Databases {
if data.Databases[i].Name == oldName {
data.Databases[i].Name = newName
data.switchDatabaseUserPrivileges(oldName, newName)
return nil
}
}
return ErrDatabaseNotFound
}
// isDatabaseNameUsedInCQ returns true if a database name is used in any continuous query
func (data *Data) isDatabaseNameUsedInCQ(dbName string) bool {
CQOnDb := fmt.Sprintf(" ON %s ", dbName)
CQIntoDb := fmt.Sprintf(" INTO \"%s\".", dbName)
CQFromDb := fmt.Sprintf(" FROM \"%s\".", dbName)
for i := range data.Databases {
for j := range data.Databases[i].ContinuousQueries {
query := data.Databases[i].ContinuousQueries[j].Query
if strings.Contains(query, CQOnDb) {
return true
}
if strings.Contains(query, CQIntoDb) {
return true
}
if strings.Contains(query, CQFromDb) {
return true
}
}
}
return false
}
// switchDatabaseUserPrivileges changes the database associated with user privileges
func (data *Data) switchDatabaseUserPrivileges(oldDatabase, newDatabase string) error {
for i := range data.Users {
if p, ok := data.Users[i].Privileges[oldDatabase]; ok {
data.Users[i].Privileges[newDatabase] = p
delete(data.Users[i].Privileges, oldDatabase)
}
}
return nil
}
// RetentionPolicy returns a retention policy for a database by name.
func (data *Data) RetentionPolicy(database, name string) (*RetentionPolicyInfo, error) {
di := data.Database(database)

View File

@ -135,97 +135,6 @@ func TestData_DropDatabase(t *testing.T) {
}
}
// Ensure a database can be renamed.
func TestData_RenameDatabase(t *testing.T) {
var data meta.Data
for i := 0; i < 2; i++ {
if err := data.CreateDatabase(fmt.Sprintf("db%d", i)); err != nil {
t.Fatal(err)
}
}
if err := data.RenameDatabase("db1", "db2"); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(data.Databases, []meta.DatabaseInfo{{Name: "db0"}, {Name: "db2"}}) {
t.Fatalf("unexpected databases: %#v", data.Databases)
}
}
// Ensure that user privileges are updated correctly when database is renamed.
func TestData_RenameDatabaseUpdatesPrivileges(t *testing.T) {
var data meta.Data
for i := 0; i < 2; i++ {
if err := data.CreateDatabase(fmt.Sprintf("db%d", i)); err != nil {
t.Fatal(err)
}
}
data.Users = []meta.UserInfo{{
Name: "susy",
Hash: "ABC123",
Admin: true,
Privileges: map[string]influxql.Privilege{
"db1": influxql.AllPrivileges, "db0": influxql.ReadPrivilege}}}
if err := data.RenameDatabase("db1", "db2"); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(data.Users,
[]meta.UserInfo{{
Name: "susy",
Hash: "ABC123",
Admin: true,
Privileges: map[string]influxql.Privilege{
"db2": influxql.AllPrivileges, "db0": influxql.ReadPrivilege}}}) {
t.Fatalf("unexpected user privileges: %#v", data.Users)
}
}
// Ensure that renaming a database without both old and new names returns an error.
func TestData_RenameDatabase_ErrNameRequired(t *testing.T) {
var data meta.Data
if err := data.RenameDatabase("", ""); err != meta.ErrDatabaseNameRequired {
t.Fatalf("unexpected error: %s", err)
}
if err := data.RenameDatabase("from_foo", ""); err != meta.ErrDatabaseNameRequired {
t.Fatalf("unexpected error: %s", err)
}
if err := data.RenameDatabase("", "to_foo"); err != meta.ErrDatabaseNameRequired {
t.Fatalf("unexpected error: %s", err)
}
}
// Ensure that renaming a database returns an error if there is a possibly conflicting CQ
func TestData_RenameDatabase_ErrDatabaseCQConflict(t *testing.T) {
var data meta.Data
if err := data.CreateDatabase("db0"); err != nil {
t.Fatal(err)
} else if err := data.CreateDatabase("db1"); err != nil {
t.Fatal(err)
} else if err := data.CreateContinuousQuery("db0", "cq0", `CREATE CONTINUOUS QUERY cq0 ON db0 BEGIN SELECT count() INTO "foo"."default"."bar" FROM "foo"."foobar" END`); err != nil {
t.Fatal(err)
} else if err := data.CreateContinuousQuery("db1", "cq1", `CREATE CONTINUOUS QUERY cq1 ON db1 BEGIN SELECT count() INTO "db1"."default"."bar" FROM "db0"."foobar" END`); err != nil {
t.Fatal(err)
} else if err := data.CreateContinuousQuery("db1", "cq2", `CREATE CONTINUOUS QUERY cq2 ON db1 BEGIN SELECT count() INTO "db0"."default"."bar" FROM "db1"."foobar" END`); err != nil {
t.Fatal(err)
} else if err := data.CreateContinuousQuery("db1", "noconflict", `CREATE CONTINUOUS QUERY noconflict ON db1 BEGIN SELECT count() INTO "db1"."default"."bar" FROM "db1"."foobar" END`); err != nil {
t.Fatal(err)
} else if err := data.RenameDatabase("db0", "db2"); err == nil {
t.Fatalf("unexpected rename database success despite cq conflict")
} else if err := data.DropContinuousQuery("db0", "cq0"); err != nil {
t.Fatal(err)
} else if err := data.RenameDatabase("db0", "db2"); err == nil {
t.Fatalf("unexpected rename database success despite cq conflict")
} else if err := data.DropContinuousQuery("db1", "cq1"); err != nil {
t.Fatal(err)
} else if err := data.RenameDatabase("db0", "db2"); err == nil {
t.Fatalf("unexpected rename database success despite cq conflict")
} else if err := data.DropContinuousQuery("db1", "cq2"); err != nil {
t.Fatal(err)
} else if err := data.RenameDatabase("db0", "db2"); err != nil {
t.Fatal(err)
}
}
// Ensure a retention policy can be created.
func TestData_CreateRetentionPolicy(t *testing.T) {
data := meta.Data{Nodes: []meta.NodeInfo{{ID: 1}, {ID: 2}}}

View File

@ -47,9 +47,6 @@ var (
// ErrDatabaseNameRequired is returned when creating a database without a name.
ErrDatabaseNameRequired = newError("database name required")
// ErrDatabaseRenameCQConflict is returned when attempting to rename a database in use by a CQ.
ErrDatabaseRenameCQConflict = newError("database rename conflict with existing continuous query")
)
var (

View File

@ -40,7 +40,6 @@ It has these top-level messages:
SetDataCommand
SetAdminPrivilegeCommand
UpdateNodeCommand
RenameDatabaseCommand
CreateSubscriptionCommand
DropSubscriptionCommand
Response
@ -54,12 +53,10 @@ It has these top-level messages:
package internal
import proto "github.com/gogo/protobuf/proto"
import fmt "fmt"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
type RPCType int32
@ -120,9 +117,8 @@ const (
Command_SetDataCommand Command_Type = 17
Command_SetAdminPrivilegeCommand Command_Type = 18
Command_UpdateNodeCommand Command_Type = 19
Command_RenameDatabaseCommand Command_Type = 20
Command_CreateSubscriptionCommand Command_Type = 22
Command_DropSubscriptionCommand Command_Type = 23
Command_CreateSubscriptionCommand Command_Type = 21
Command_DropSubscriptionCommand Command_Type = 22
)
var Command_Type_name = map[int32]string{
@ -145,9 +141,8 @@ var Command_Type_name = map[int32]string{
17: "SetDataCommand",
18: "SetAdminPrivilegeCommand",
19: "UpdateNodeCommand",
20: "RenameDatabaseCommand",
22: "CreateSubscriptionCommand",
23: "DropSubscriptionCommand",
21: "CreateSubscriptionCommand",
22: "DropSubscriptionCommand",
}
var Command_Type_value = map[string]int32{
"CreateNodeCommand": 1,
@ -169,9 +164,8 @@ var Command_Type_value = map[string]int32{
"SetDataCommand": 17,
"SetAdminPrivilegeCommand": 18,
"UpdateNodeCommand": 19,
"RenameDatabaseCommand": 20,
"CreateSubscriptionCommand": 22,
"DropSubscriptionCommand": 23,
"CreateSubscriptionCommand": 21,
"DropSubscriptionCommand": 22,
}
func (x Command_Type) Enum() *Command_Type {
@ -192,15 +186,15 @@ func (x *Command_Type) UnmarshalJSON(data []byte) error {
}
type Data struct {
Term *uint64 `protobuf:"varint,1,req,name=Term" json:"Term,omitempty"`
Index *uint64 `protobuf:"varint,2,req,name=Index" json:"Index,omitempty"`
ClusterID *uint64 `protobuf:"varint,3,req,name=ClusterID" json:"ClusterID,omitempty"`
Nodes []*NodeInfo `protobuf:"bytes,4,rep,name=Nodes" json:"Nodes,omitempty"`
Databases []*DatabaseInfo `protobuf:"bytes,5,rep,name=Databases" json:"Databases,omitempty"`
Users []*UserInfo `protobuf:"bytes,6,rep,name=Users" json:"Users,omitempty"`
MaxNodeID *uint64 `protobuf:"varint,7,req,name=MaxNodeID" json:"MaxNodeID,omitempty"`
MaxShardGroupID *uint64 `protobuf:"varint,8,req,name=MaxShardGroupID" json:"MaxShardGroupID,omitempty"`
MaxShardID *uint64 `protobuf:"varint,9,req,name=MaxShardID" json:"MaxShardID,omitempty"`
Term *uint64 `protobuf:"varint,1,req" json:"Term,omitempty"`
Index *uint64 `protobuf:"varint,2,req" json:"Index,omitempty"`
ClusterID *uint64 `protobuf:"varint,3,req" json:"ClusterID,omitempty"`
Nodes []*NodeInfo `protobuf:"bytes,4,rep" json:"Nodes,omitempty"`
Databases []*DatabaseInfo `protobuf:"bytes,5,rep" json:"Databases,omitempty"`
Users []*UserInfo `protobuf:"bytes,6,rep" json:"Users,omitempty"`
MaxNodeID *uint64 `protobuf:"varint,7,req" json:"MaxNodeID,omitempty"`
MaxShardGroupID *uint64 `protobuf:"varint,8,req" json:"MaxShardGroupID,omitempty"`
MaxShardID *uint64 `protobuf:"varint,9,req" json:"MaxShardID,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -272,8 +266,8 @@ func (m *Data) GetMaxShardID() uint64 {
}
type NodeInfo struct {
ID *uint64 `protobuf:"varint,1,req,name=ID" json:"ID,omitempty"`
Host *string `protobuf:"bytes,2,req,name=Host" json:"Host,omitempty"`
ID *uint64 `protobuf:"varint,1,req" json:"ID,omitempty"`
Host *string `protobuf:"bytes,2,req" json:"Host,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -296,10 +290,10 @@ func (m *NodeInfo) GetHost() string {
}
type DatabaseInfo struct {
Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"`
DefaultRetentionPolicy *string `protobuf:"bytes,2,req,name=DefaultRetentionPolicy" json:"DefaultRetentionPolicy,omitempty"`
RetentionPolicies []*RetentionPolicyInfo `protobuf:"bytes,3,rep,name=RetentionPolicies" json:"RetentionPolicies,omitempty"`
ContinuousQueries []*ContinuousQueryInfo `protobuf:"bytes,4,rep,name=ContinuousQueries" json:"ContinuousQueries,omitempty"`
Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"`
DefaultRetentionPolicy *string `protobuf:"bytes,2,req" json:"DefaultRetentionPolicy,omitempty"`
RetentionPolicies []*RetentionPolicyInfo `protobuf:"bytes,3,rep" json:"RetentionPolicies,omitempty"`
ContinuousQueries []*ContinuousQueryInfo `protobuf:"bytes,4,rep" json:"ContinuousQueries,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -336,12 +330,12 @@ func (m *DatabaseInfo) GetContinuousQueries() []*ContinuousQueryInfo {
}
type RetentionPolicyInfo struct {
Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"`
Duration *int64 `protobuf:"varint,2,req,name=Duration" json:"Duration,omitempty"`
ShardGroupDuration *int64 `protobuf:"varint,3,req,name=ShardGroupDuration" json:"ShardGroupDuration,omitempty"`
ReplicaN *uint32 `protobuf:"varint,4,req,name=ReplicaN" json:"ReplicaN,omitempty"`
ShardGroups []*ShardGroupInfo `protobuf:"bytes,5,rep,name=ShardGroups" json:"ShardGroups,omitempty"`
Subscriptions []*SubscriptionInfo `protobuf:"bytes,6,rep,name=Subscriptions" json:"Subscriptions,omitempty"`
Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"`
Duration *int64 `protobuf:"varint,2,req" json:"Duration,omitempty"`
ShardGroupDuration *int64 `protobuf:"varint,3,req" json:"ShardGroupDuration,omitempty"`
ReplicaN *uint32 `protobuf:"varint,4,req" json:"ReplicaN,omitempty"`
ShardGroups []*ShardGroupInfo `protobuf:"bytes,5,rep" json:"ShardGroups,omitempty"`
Subscriptions []*SubscriptionInfo `protobuf:"bytes,6,rep" json:"Subscriptions,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -392,11 +386,11 @@ func (m *RetentionPolicyInfo) GetSubscriptions() []*SubscriptionInfo {
}
type ShardGroupInfo struct {
ID *uint64 `protobuf:"varint,1,req,name=ID" json:"ID,omitempty"`
StartTime *int64 `protobuf:"varint,2,req,name=StartTime" json:"StartTime,omitempty"`
EndTime *int64 `protobuf:"varint,3,req,name=EndTime" json:"EndTime,omitempty"`
DeletedAt *int64 `protobuf:"varint,4,req,name=DeletedAt" json:"DeletedAt,omitempty"`
Shards []*ShardInfo `protobuf:"bytes,5,rep,name=Shards" json:"Shards,omitempty"`
ID *uint64 `protobuf:"varint,1,req" json:"ID,omitempty"`
StartTime *int64 `protobuf:"varint,2,req" json:"StartTime,omitempty"`
EndTime *int64 `protobuf:"varint,3,req" json:"EndTime,omitempty"`
DeletedAt *int64 `protobuf:"varint,4,req" json:"DeletedAt,omitempty"`
Shards []*ShardInfo `protobuf:"bytes,5,rep" json:"Shards,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -440,9 +434,9 @@ func (m *ShardGroupInfo) GetShards() []*ShardInfo {
}
type ShardInfo struct {
ID *uint64 `protobuf:"varint,1,req,name=ID" json:"ID,omitempty"`
OwnerIDs []uint64 `protobuf:"varint,2,rep,name=OwnerIDs" json:"OwnerIDs,omitempty"`
Owners []*ShardOwner `protobuf:"bytes,3,rep,name=Owners" json:"Owners,omitempty"`
ID *uint64 `protobuf:"varint,1,req" json:"ID,omitempty"`
OwnerIDs []uint64 `protobuf:"varint,2,rep" json:"OwnerIDs,omitempty"`
Owners []*ShardOwner `protobuf:"bytes,3,rep" json:"Owners,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -472,9 +466,9 @@ func (m *ShardInfo) GetOwners() []*ShardOwner {
}
type SubscriptionInfo struct {
Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"`
Mode *string `protobuf:"bytes,2,req,name=Mode" json:"Mode,omitempty"`
Destinations []string `protobuf:"bytes,3,rep,name=Destinations" json:"Destinations,omitempty"`
Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"`
Mode *string `protobuf:"bytes,2,req" json:"Mode,omitempty"`
Destinations []string `protobuf:"bytes,3,rep" json:"Destinations,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -504,7 +498,7 @@ func (m *SubscriptionInfo) GetDestinations() []string {
}
type ShardOwner struct {
NodeID *uint64 `protobuf:"varint,1,req,name=NodeID" json:"NodeID,omitempty"`
NodeID *uint64 `protobuf:"varint,1,req" json:"NodeID,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -520,8 +514,8 @@ func (m *ShardOwner) GetNodeID() uint64 {
}
type ContinuousQueryInfo struct {
Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"`
Query *string `protobuf:"bytes,2,req,name=Query" json:"Query,omitempty"`
Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"`
Query *string `protobuf:"bytes,2,req" json:"Query,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -544,10 +538,10 @@ func (m *ContinuousQueryInfo) GetQuery() string {
}
type UserInfo struct {
Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"`
Hash *string `protobuf:"bytes,2,req,name=Hash" json:"Hash,omitempty"`
Admin *bool `protobuf:"varint,3,req,name=Admin" json:"Admin,omitempty"`
Privileges []*UserPrivilege `protobuf:"bytes,4,rep,name=Privileges" json:"Privileges,omitempty"`
Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"`
Hash *string `protobuf:"bytes,2,req" json:"Hash,omitempty"`
Admin *bool `protobuf:"varint,3,req" json:"Admin,omitempty"`
Privileges []*UserPrivilege `protobuf:"bytes,4,rep" json:"Privileges,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -584,8 +578,8 @@ func (m *UserInfo) GetPrivileges() []*UserPrivilege {
}
type UserPrivilege struct {
Database *string `protobuf:"bytes,1,req,name=Database" json:"Database,omitempty"`
Privilege *int32 `protobuf:"varint,2,req,name=Privilege" json:"Privilege,omitempty"`
Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"`
Privilege *int32 `protobuf:"varint,2,req" json:"Privilege,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -639,8 +633,8 @@ func (m *Command) GetType() Command_Type {
}
type CreateNodeCommand struct {
Host *string `protobuf:"bytes,1,req,name=Host" json:"Host,omitempty"`
Rand *uint64 `protobuf:"varint,2,req,name=Rand" json:"Rand,omitempty"`
Host *string `protobuf:"bytes,1,req" json:"Host,omitempty"`
Rand *uint64 `protobuf:"varint,2,req" json:"Rand,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -671,8 +665,8 @@ var E_CreateNodeCommand_Command = &proto.ExtensionDesc{
}
type DeleteNodeCommand struct {
ID *uint64 `protobuf:"varint,1,req,name=ID" json:"ID,omitempty"`
Force *bool `protobuf:"varint,2,req,name=Force" json:"Force,omitempty"`
ID *uint64 `protobuf:"varint,1,req" json:"ID,omitempty"`
Force *bool `protobuf:"varint,2,req" json:"Force,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -703,7 +697,7 @@ var E_DeleteNodeCommand_Command = &proto.ExtensionDesc{
}
type CreateDatabaseCommand struct {
Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"`
Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -727,7 +721,7 @@ var E_CreateDatabaseCommand_Command = &proto.ExtensionDesc{
}
type DropDatabaseCommand struct {
Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"`
Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -751,8 +745,8 @@ var E_DropDatabaseCommand_Command = &proto.ExtensionDesc{
}
type CreateRetentionPolicyCommand struct {
Database *string `protobuf:"bytes,1,req,name=Database" json:"Database,omitempty"`
RetentionPolicy *RetentionPolicyInfo `protobuf:"bytes,2,req,name=RetentionPolicy" json:"RetentionPolicy,omitempty"`
Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"`
RetentionPolicy *RetentionPolicyInfo `protobuf:"bytes,2,req" json:"RetentionPolicy,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -783,8 +777,8 @@ var E_CreateRetentionPolicyCommand_Command = &proto.ExtensionDesc{
}
type DropRetentionPolicyCommand struct {
Database *string `protobuf:"bytes,1,req,name=Database" json:"Database,omitempty"`
Name *string `protobuf:"bytes,2,req,name=Name" json:"Name,omitempty"`
Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"`
Name *string `protobuf:"bytes,2,req" json:"Name,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -815,8 +809,8 @@ var E_DropRetentionPolicyCommand_Command = &proto.ExtensionDesc{
}
type SetDefaultRetentionPolicyCommand struct {
Database *string `protobuf:"bytes,1,req,name=Database" json:"Database,omitempty"`
Name *string `protobuf:"bytes,2,req,name=Name" json:"Name,omitempty"`
Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"`
Name *string `protobuf:"bytes,2,req" json:"Name,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -847,11 +841,11 @@ var E_SetDefaultRetentionPolicyCommand_Command = &proto.ExtensionDesc{
}
type UpdateRetentionPolicyCommand struct {
Database *string `protobuf:"bytes,1,req,name=Database" json:"Database,omitempty"`
Name *string `protobuf:"bytes,2,req,name=Name" json:"Name,omitempty"`
NewName *string `protobuf:"bytes,3,opt,name=NewName" json:"NewName,omitempty"`
Duration *int64 `protobuf:"varint,4,opt,name=Duration" json:"Duration,omitempty"`
ReplicaN *uint32 `protobuf:"varint,5,opt,name=ReplicaN" json:"ReplicaN,omitempty"`
Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"`
Name *string `protobuf:"bytes,2,req" json:"Name,omitempty"`
NewName *string `protobuf:"bytes,3,opt" json:"NewName,omitempty"`
Duration *int64 `protobuf:"varint,4,opt" json:"Duration,omitempty"`
ReplicaN *uint32 `protobuf:"varint,5,opt" json:"ReplicaN,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -903,9 +897,9 @@ var E_UpdateRetentionPolicyCommand_Command = &proto.ExtensionDesc{
}
type CreateShardGroupCommand struct {
Database *string `protobuf:"bytes,1,req,name=Database" json:"Database,omitempty"`
Policy *string `protobuf:"bytes,2,req,name=Policy" json:"Policy,omitempty"`
Timestamp *int64 `protobuf:"varint,3,req,name=Timestamp" json:"Timestamp,omitempty"`
Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"`
Policy *string `protobuf:"bytes,2,req" json:"Policy,omitempty"`
Timestamp *int64 `protobuf:"varint,3,req" json:"Timestamp,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -943,9 +937,9 @@ var E_CreateShardGroupCommand_Command = &proto.ExtensionDesc{
}
type DeleteShardGroupCommand struct {
Database *string `protobuf:"bytes,1,req,name=Database" json:"Database,omitempty"`
Policy *string `protobuf:"bytes,2,req,name=Policy" json:"Policy,omitempty"`
ShardGroupID *uint64 `protobuf:"varint,3,req,name=ShardGroupID" json:"ShardGroupID,omitempty"`
Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"`
Policy *string `protobuf:"bytes,2,req" json:"Policy,omitempty"`
ShardGroupID *uint64 `protobuf:"varint,3,req" json:"ShardGroupID,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -983,9 +977,9 @@ var E_DeleteShardGroupCommand_Command = &proto.ExtensionDesc{
}
type CreateContinuousQueryCommand struct {
Database *string `protobuf:"bytes,1,req,name=Database" json:"Database,omitempty"`
Name *string `protobuf:"bytes,2,req,name=Name" json:"Name,omitempty"`
Query *string `protobuf:"bytes,3,req,name=Query" json:"Query,omitempty"`
Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"`
Name *string `protobuf:"bytes,2,req" json:"Name,omitempty"`
Query *string `protobuf:"bytes,3,req" json:"Query,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -1023,8 +1017,8 @@ var E_CreateContinuousQueryCommand_Command = &proto.ExtensionDesc{
}
type DropContinuousQueryCommand struct {
Database *string `protobuf:"bytes,1,req,name=Database" json:"Database,omitempty"`
Name *string `protobuf:"bytes,2,req,name=Name" json:"Name,omitempty"`
Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"`
Name *string `protobuf:"bytes,2,req" json:"Name,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -1055,9 +1049,9 @@ var E_DropContinuousQueryCommand_Command = &proto.ExtensionDesc{
}
type CreateUserCommand struct {
Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"`
Hash *string `protobuf:"bytes,2,req,name=Hash" json:"Hash,omitempty"`
Admin *bool `protobuf:"varint,3,req,name=Admin" json:"Admin,omitempty"`
Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"`
Hash *string `protobuf:"bytes,2,req" json:"Hash,omitempty"`
Admin *bool `protobuf:"varint,3,req" json:"Admin,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -1095,7 +1089,7 @@ var E_CreateUserCommand_Command = &proto.ExtensionDesc{
}
type DropUserCommand struct {
Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"`
Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -1119,8 +1113,8 @@ var E_DropUserCommand_Command = &proto.ExtensionDesc{
}
type UpdateUserCommand struct {
Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"`
Hash *string `protobuf:"bytes,2,req,name=Hash" json:"Hash,omitempty"`
Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"`
Hash *string `protobuf:"bytes,2,req" json:"Hash,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -1151,9 +1145,9 @@ var E_UpdateUserCommand_Command = &proto.ExtensionDesc{
}
type SetPrivilegeCommand struct {
Username *string `protobuf:"bytes,1,req,name=Username" json:"Username,omitempty"`
Database *string `protobuf:"bytes,2,req,name=Database" json:"Database,omitempty"`
Privilege *int32 `protobuf:"varint,3,req,name=Privilege" json:"Privilege,omitempty"`
Username *string `protobuf:"bytes,1,req" json:"Username,omitempty"`
Database *string `protobuf:"bytes,2,req" json:"Database,omitempty"`
Privilege *int32 `protobuf:"varint,3,req" json:"Privilege,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -1191,7 +1185,7 @@ var E_SetPrivilegeCommand_Command = &proto.ExtensionDesc{
}
type SetDataCommand struct {
Data *Data `protobuf:"bytes,1,req,name=Data" json:"Data,omitempty"`
Data *Data `protobuf:"bytes,1,req" json:"Data,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -1215,8 +1209,8 @@ var E_SetDataCommand_Command = &proto.ExtensionDesc{
}
type SetAdminPrivilegeCommand struct {
Username *string `protobuf:"bytes,1,req,name=Username" json:"Username,omitempty"`
Admin *bool `protobuf:"varint,2,req,name=Admin" json:"Admin,omitempty"`
Username *string `protobuf:"bytes,1,req" json:"Username,omitempty"`
Admin *bool `protobuf:"varint,2,req" json:"Admin,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -1247,8 +1241,8 @@ var E_SetAdminPrivilegeCommand_Command = &proto.ExtensionDesc{
}
type UpdateNodeCommand struct {
ID *uint64 `protobuf:"varint,1,req,name=ID" json:"ID,omitempty"`
Host *string `protobuf:"bytes,2,req,name=Host" json:"Host,omitempty"`
ID *uint64 `protobuf:"varint,1,req" json:"ID,omitempty"`
Host *string `protobuf:"bytes,2,req" json:"Host,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -1278,44 +1272,12 @@ var E_UpdateNodeCommand_Command = &proto.ExtensionDesc{
Tag: "bytes,119,opt,name=command",
}
type RenameDatabaseCommand struct {
OldName *string `protobuf:"bytes,1,req,name=oldName" json:"oldName,omitempty"`
NewName *string `protobuf:"bytes,2,req,name=newName" json:"newName,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *RenameDatabaseCommand) Reset() { *m = RenameDatabaseCommand{} }
func (m *RenameDatabaseCommand) String() string { return proto.CompactTextString(m) }
func (*RenameDatabaseCommand) ProtoMessage() {}
func (m *RenameDatabaseCommand) GetOldName() string {
if m != nil && m.OldName != nil {
return *m.OldName
}
return ""
}
func (m *RenameDatabaseCommand) GetNewName() string {
if m != nil && m.NewName != nil {
return *m.NewName
}
return ""
}
var E_RenameDatabaseCommand_Command = &proto.ExtensionDesc{
ExtendedType: (*Command)(nil),
ExtensionType: (*RenameDatabaseCommand)(nil),
Field: 120,
Name: "internal.RenameDatabaseCommand.command",
Tag: "bytes,120,opt,name=command",
}
type CreateSubscriptionCommand struct {
Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"`
Database *string `protobuf:"bytes,2,req,name=Database" json:"Database,omitempty"`
RetentionPolicy *string `protobuf:"bytes,3,req,name=RetentionPolicy" json:"RetentionPolicy,omitempty"`
Mode *string `protobuf:"bytes,4,req,name=Mode" json:"Mode,omitempty"`
Destinations []string `protobuf:"bytes,5,rep,name=Destinations" json:"Destinations,omitempty"`
Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"`
Database *string `protobuf:"bytes,2,req" json:"Database,omitempty"`
RetentionPolicy *string `protobuf:"bytes,3,req" json:"RetentionPolicy,omitempty"`
Mode *string `protobuf:"bytes,4,req" json:"Mode,omitempty"`
Destinations []string `protobuf:"bytes,5,rep" json:"Destinations,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -1367,9 +1329,9 @@ var E_CreateSubscriptionCommand_Command = &proto.ExtensionDesc{
}
type DropSubscriptionCommand struct {
Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"`
Database *string `protobuf:"bytes,2,req,name=Database" json:"Database,omitempty"`
RetentionPolicy *string `protobuf:"bytes,3,req,name=RetentionPolicy" json:"RetentionPolicy,omitempty"`
Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"`
Database *string `protobuf:"bytes,2,req" json:"Database,omitempty"`
RetentionPolicy *string `protobuf:"bytes,3,req" json:"RetentionPolicy,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -1407,9 +1369,9 @@ var E_DropSubscriptionCommand_Command = &proto.ExtensionDesc{
}
type Response struct {
OK *bool `protobuf:"varint,1,req,name=OK" json:"OK,omitempty"`
Error *string `protobuf:"bytes,2,opt,name=Error" json:"Error,omitempty"`
Index *uint64 `protobuf:"varint,3,opt,name=Index" json:"Index,omitempty"`
OK *bool `protobuf:"varint,1,req" json:"OK,omitempty"`
Error *string `protobuf:"bytes,2,opt" json:"Error,omitempty"`
Index *uint64 `protobuf:"varint,3,opt" json:"Index,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -1439,8 +1401,8 @@ func (m *Response) GetIndex() uint64 {
}
type ResponseHeader struct {
OK *bool `protobuf:"varint,1,req,name=OK" json:"OK,omitempty"`
Error *string `protobuf:"bytes,2,opt,name=Error" json:"Error,omitempty"`
OK *bool `protobuf:"varint,1,req" json:"OK,omitempty"`
Error *string `protobuf:"bytes,2,opt" json:"Error,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -1463,7 +1425,7 @@ func (m *ResponseHeader) GetError() string {
}
type ErrorResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,req,name=Header" json:"Header,omitempty"`
Header *ResponseHeader `protobuf:"bytes,1,req" json:"Header,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -1479,9 +1441,9 @@ func (m *ErrorResponse) GetHeader() *ResponseHeader {
}
type FetchDataRequest struct {
Index *uint64 `protobuf:"varint,1,req,name=Index" json:"Index,omitempty"`
Term *uint64 `protobuf:"varint,2,req,name=Term" json:"Term,omitempty"`
Blocking *bool `protobuf:"varint,3,opt,name=Blocking,def=0" json:"Blocking,omitempty"`
Index *uint64 `protobuf:"varint,1,req" json:"Index,omitempty"`
Term *uint64 `protobuf:"varint,2,req" json:"Term,omitempty"`
Blocking *bool `protobuf:"varint,3,opt,def=0" json:"Blocking,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -1513,10 +1475,10 @@ func (m *FetchDataRequest) GetBlocking() bool {
}
type FetchDataResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,req,name=Header" json:"Header,omitempty"`
Index *uint64 `protobuf:"varint,2,req,name=Index" json:"Index,omitempty"`
Term *uint64 `protobuf:"varint,3,req,name=Term" json:"Term,omitempty"`
Data []byte `protobuf:"bytes,4,opt,name=Data" json:"Data,omitempty"`
Header *ResponseHeader `protobuf:"bytes,1,req" json:"Header,omitempty"`
Index *uint64 `protobuf:"varint,2,req" json:"Index,omitempty"`
Term *uint64 `protobuf:"varint,3,req" json:"Term,omitempty"`
Data []byte `protobuf:"bytes,4,opt" json:"Data,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -1553,7 +1515,7 @@ func (m *FetchDataResponse) GetData() []byte {
}
type JoinRequest struct {
Addr *string `protobuf:"bytes,1,req,name=Addr" json:"Addr,omitempty"`
Addr *string `protobuf:"bytes,1,req" json:"Addr,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -1569,14 +1531,14 @@ func (m *JoinRequest) GetAddr() string {
}
type JoinResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,req,name=Header" json:"Header,omitempty"`
Header *ResponseHeader `protobuf:"bytes,1,req" json:"Header,omitempty"`
// Indicates that this node should take part in the raft cluster.
EnableRaft *bool `protobuf:"varint,2,opt,name=EnableRaft" json:"EnableRaft,omitempty"`
EnableRaft *bool `protobuf:"varint,2,opt" json:"EnableRaft,omitempty"`
// The addresses of raft peers to use if joining as a raft member. If not joining
// as a raft member, these are the nodes running raft.
RaftNodes []string `protobuf:"bytes,3,rep,name=RaftNodes" json:"RaftNodes,omitempty"`
RaftNodes []string `protobuf:"bytes,3,rep" json:"RaftNodes,omitempty"`
// The node ID assigned to the requesting node.
NodeID *uint64 `protobuf:"varint,4,opt,name=NodeID" json:"NodeID,omitempty"`
NodeID *uint64 `protobuf:"varint,4,opt" json:"NodeID,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -1634,7 +1596,6 @@ func init() {
proto.RegisterExtension(E_SetDataCommand_Command)
proto.RegisterExtension(E_SetAdminPrivilegeCommand_Command)
proto.RegisterExtension(E_UpdateNodeCommand_Command)
proto.RegisterExtension(E_RenameDatabaseCommand_Command)
proto.RegisterExtension(E_CreateSubscriptionCommand_Command)
proto.RegisterExtension(E_DropSubscriptionCommand_Command)
}

View File

@ -112,9 +112,8 @@ message Command {
SetDataCommand = 17;
SetAdminPrivilegeCommand = 18;
UpdateNodeCommand = 19;
RenameDatabaseCommand = 20;
CreateSubscriptionCommand = 22;
DropSubscriptionCommand = 23;
CreateSubscriptionCommand = 21;
DropSubscriptionCommand = 22;
}
required Type type = 1;
@ -276,14 +275,6 @@ message UpdateNodeCommand {
required string Host = 2;
}
message RenameDatabaseCommand {
extend Command {
optional RenameDatabaseCommand command = 120;
}
required string oldName = 1;
required string newName = 2;
}
message CreateSubscriptionCommand {
extend Command {
optional CreateSubscriptionCommand command = 121;

View File

@ -23,7 +23,6 @@ type StatementExecutor struct {
Databases() ([]DatabaseInfo, error)
CreateDatabase(name string) (*DatabaseInfo, error)
DropDatabase(name string) error
RenameDatabase(oldName, newName string) error
DefaultRetentionPolicy(database string) (*RetentionPolicyInfo, error)
CreateRetentionPolicy(database string, rpi *RetentionPolicyInfo) (*RetentionPolicyInfo, error)
@ -73,8 +72,6 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement) *influxql.
return e.executeGrantStatement(stmt)
case *influxql.GrantAdminStatement:
return e.executeGrantAdminStatement(stmt)
case *influxql.AlterDatabaseRenameStatement:
return e.executeAlterDatabaseRenameStatement(stmt)
case *influxql.RevokeStatement:
return e.executeRevokeStatement(stmt)
case *influxql.RevokeAdminStatement:
@ -224,10 +221,6 @@ func (e *StatementExecutor) executeGrantAdminStatement(stmt *influxql.GrantAdmin
return &influxql.Result{Err: e.Store.SetAdminPrivilege(stmt.User, true)}
}
func (e *StatementExecutor) executeAlterDatabaseRenameStatement(q *influxql.AlterDatabaseRenameStatement) *influxql.Result {
return &influxql.Result{Err: e.Store.RenameDatabase(q.OldName, q.NewName)}
}
func (e *StatementExecutor) executeRevokeStatement(stmt *influxql.RevokeStatement) *influxql.Result {
priv := influxql.NoPrivileges

View File

@ -46,26 +46,6 @@ func TestStatementExecutor_ExecuteStatement_DropDatabase(t *testing.T) {
}
}
// Ensure an ALTER DATABASE ... RENAME TO ... statement can be executed.
func TestStatementExecutor_ExecuteStatement_AlterDatabaseRename(t *testing.T) {
e := NewStatementExecutor()
e.Store.RenameDatabaseFn = func(oldName, newName string) error {
if oldName != "old_foo" {
t.Fatalf("unexpected name: %s", oldName)
}
if newName != "new_foo" {
t.Fatalf("unexpected name: %s", newName)
}
return nil
}
if res := e.ExecuteStatement(influxql.MustParseStatement(`ALTER DATABASE old_foo RENAME TO new_foo`)); res.Err != nil {
t.Fatal(res.Err)
} else if res.Series != nil {
t.Fatalf("unexpected rows: %#v", res.Series)
}
}
// Ensure a SHOW DATABASES statement can be executed.
func TestStatementExecutor_ExecuteStatement_ShowDatabases(t *testing.T) {
e := NewStatementExecutor()
@ -1056,7 +1036,6 @@ type StatementExecutorStore struct {
CreateDatabaseFn func(name string) (*meta.DatabaseInfo, error)
DropDatabaseFn func(name string) error
DeleteNodeFn func(nodeID uint64, force bool) error
RenameDatabaseFn func(oldName, newName string) error
DefaultRetentionPolicyFn func(database string) (*meta.RetentionPolicyInfo, error)
CreateRetentionPolicyFn func(database string, rpi *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error)
UpdateRetentionPolicyFn func(database, name string, rpu *meta.RetentionPolicyUpdate) error
@ -1116,10 +1095,6 @@ func (s *StatementExecutorStore) DropDatabase(name string) error {
return s.DropDatabaseFn(name)
}
func (s *StatementExecutorStore) RenameDatabase(oldName, newName string) error {
return s.RenameDatabaseFn(oldName, newName)
}
func (s *StatementExecutorStore) DefaultRetentionPolicy(database string) (*meta.RetentionPolicyInfo, error) {
return s.DefaultRetentionPolicyFn(database)
}

View File

@ -88,7 +88,7 @@ type Store struct {
wg sync.WaitGroup
changed chan struct{}
// clusterTracingEnabled controls whether low-level cluster communcation is logged.
// clusterTracingEnabled controls whether low-level cluster communication is logged.
// Useful for troubleshooting
clusterTracingEnabled bool
@ -927,16 +927,6 @@ func (s *Store) DropDatabase(name string) error {
)
}
// RenameDatabase renames a database in the metastore
func (s *Store) RenameDatabase(oldName, newName string) error {
return s.exec(internal.Command_RenameDatabaseCommand, internal.E_RenameDatabaseCommand_Command,
&internal.RenameDatabaseCommand{
OldName: proto.String(oldName),
NewName: proto.String(newName),
},
)
}
// RetentionPolicy returns a retention policy for a database by name.
func (s *Store) RetentionPolicy(database, name string) (rpi *RetentionPolicyInfo, err error) {
err = s.read(func(data *Data) error {
@ -1668,8 +1658,6 @@ func (fsm *storeFSM) Apply(l *raft.Log) interface{} {
return fsm.applyCreateDatabaseCommand(&cmd)
case internal.Command_DropDatabaseCommand:
return fsm.applyDropDatabaseCommand(&cmd)
case internal.Command_RenameDatabaseCommand:
return fsm.applyRenameDatabaseCommand(&cmd)
case internal.Command_CreateRetentionPolicyCommand:
return fsm.applyCreateRetentionPolicyCommand(&cmd)
case internal.Command_DropRetentionPolicyCommand:
@ -1798,20 +1786,6 @@ func (fsm *storeFSM) applyDropDatabaseCommand(cmd *internal.Command) interface{}
return nil
}
func (fsm *storeFSM) applyRenameDatabaseCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_RenameDatabaseCommand_Command)
v := ext.(*internal.RenameDatabaseCommand)
// Copy data and update.
other := fsm.data.Clone()
if err := other.RenameDatabase(v.GetOldName(), v.GetNewName()); err != nil {
return err
}
fsm.data = other
return nil
}
func (fsm *storeFSM) applyCreateRetentionPolicyCommand(cmd *internal.Command) interface{} {
ext, _ := proto.GetExtension(cmd, internal.E_CreateRetentionPolicyCommand_Command)
v := ext.(*internal.CreateRetentionPolicyCommand)

View File

@ -244,76 +244,6 @@ func TestStore_DropDatabase_ErrDatabaseNotFound(t *testing.T) {
}
}
// Ensure the store can rename an existing database.
func TestStore_RenameDatabase(t *testing.T) {
t.Parallel()
s := MustOpenStore()
defer s.Close()
// Create three databases.
for i := 0; i < 3; i++ {
if _, err := s.CreateDatabase(fmt.Sprintf("db%d", i)); err != nil {
t.Fatal(err)
}
}
// Rename database db1, leaving db0 and db2 unchanged.
if err := s.RenameDatabase("db1", "db3"); err != nil {
t.Fatal(err)
}
// Ensure the nodes are correct.
exp := &meta.DatabaseInfo{Name: "db0"}
if di, _ := s.Database("db0"); !reflect.DeepEqual(di, exp) {
t.Fatalf("unexpected database(0): \ngot: %#v\nexp: %#v", di, exp)
}
if di, _ := s.Database("db1"); di != nil {
t.Fatalf("unexpected database(1): %#v", di)
}
exp = &meta.DatabaseInfo{Name: "db2"}
if di, _ := s.Database("db2"); !reflect.DeepEqual(di, exp) {
t.Fatalf("unexpected database(2): \ngot: %#v\nexp: %#v", di, exp)
}
exp = &meta.DatabaseInfo{Name: "db3"}
if di, _ := s.Database("db3"); !reflect.DeepEqual(di, exp) {
t.Fatalf("unexpected database(2): \ngot: %#v\nexp: %#v", di, exp)
}
}
// Ensure the store returns an error when renaming a database that doesn't exist.
func TestStore_RenameDatabase_ErrDatabaseNotFound(t *testing.T) {
t.Parallel()
s := MustOpenStore()
defer s.Close()
if err := s.RenameDatabase("no_such_database", "another_database"); err != meta.ErrDatabaseNotFound {
t.Fatalf("unexpected error: %s", err)
}
}
// Ensure the store returns an error when renaming a database to a database that already exists.
func TestStore_RenameDatabase_ErrDatabaseExists(t *testing.T) {
t.Parallel()
s := MustOpenStore()
defer s.Close()
// create two databases
if _, err := s.CreateDatabase("db00"); err != nil {
t.Fatal(err)
}
if _, err := s.CreateDatabase("db01"); err != nil {
t.Fatal(err)
}
if err := s.RenameDatabase("db00", "db01"); err != meta.ErrDatabaseExists {
t.Fatalf("unexpected error: %s", err)
}
}
// Ensure the store can create a retention policy on a database.
func TestStore_CreateRetentionPolicy(t *testing.T) {
t.Parallel()

View File

@ -341,7 +341,7 @@ func scanKey(buf []byte, i int) (int, []byte, error) {
}
// Now we know where the key region is within buf, and the locations of tags, we
// need to deterimine if duplicate tags exist and if the tags are sorted. This iterates
// need to determine if duplicate tags exist and if the tags are sorted. This iterates
// 1/2 of the list comparing each end with each other, walking towards the center from
// both sides.
for j := 0; j < commas/2; j++ {
@ -531,9 +531,14 @@ func scanTime(buf []byte, i int) (int, []byte, error) {
break
}
// Timestamps should integers, make sure they are so we don't need to actually
// Timestamps should be integers, make sure they are so we don't need to actually
// parse the timestamp until needed
if buf[i] < '0' || buf[i] > '9' {
// Handle negative timestamps
if i == start && buf[i] == '-' {
i += 1
continue
}
return i, buf[start:i], fmt.Errorf("bad timestamp")
}

View File

@ -330,7 +330,7 @@ func TestParsePointMaxInt64(t *testing.T) {
t.Fatalf(`ParsePoints("%s") mismatch. got %v, exp nil`, `cpu,host=serverA,region=us-west value=9223372036854775807i`, err)
}
if exp, got := int64(9223372036854775807), p[0].Fields()["value"].(int64); exp != got {
t.Fatalf("ParsePoints Value mistmatch. \nexp: %v\ngot: %v", exp, got)
t.Fatalf("ParsePoints Value mismatch. \nexp: %v\ngot: %v", exp, got)
}
// leading zeros
@ -532,7 +532,7 @@ func TestParsePointUnescape(t *testing.T) {
},
time.Unix(0, 0)))
// commas in measuremnt name
// commas in measurement name
test(t, `cpu\,main,regions=east\,west value=1.0`,
models.NewPoint(
"cpu,main", // comma in the name
@ -975,6 +975,69 @@ func TestParsePointUnicodeString(t *testing.T) {
)
}
func TestParsePointNegativeTimestamp(t *testing.T) {
test(t, `cpu value=1 -1`,
models.NewPoint(
"cpu",
models.Tags{},
models.Fields{
"value": 1.0,
},
time.Unix(0, -1)),
)
}
func TestParsePointMaxTimestamp(t *testing.T) {
test(t, `cpu value=1 9223372036854775807`,
models.NewPoint(
"cpu",
models.Tags{},
models.Fields{
"value": 1.0,
},
time.Unix(0, int64(1<<63-1))),
)
}
func TestParsePointMinTimestamp(t *testing.T) {
test(t, `cpu value=1 -9223372036854775807`,
models.NewPoint(
"cpu",
models.Tags{},
models.Fields{
"value": 1.0,
},
time.Unix(0, -int64(1<<63-1))),
)
}
func TestParsePointInvalidTimestamp(t *testing.T) {
_, err := models.ParsePointsString("cpu value=1 9223372036854775808")
if err == nil {
t.Fatalf("ParsePoints failed: %v", err)
}
_, err = models.ParsePointsString("cpu value=1 -92233720368547758078")
if err == nil {
t.Fatalf("ParsePoints failed: %v", err)
}
_, err = models.ParsePointsString("cpu value=1 -")
if err == nil {
t.Fatalf("ParsePoints failed: %v", err)
}
_, err = models.ParsePointsString("cpu value=1 -/")
if err == nil {
t.Fatalf("ParsePoints failed: %v", err)
}
_, err = models.ParsePointsString("cpu value=1 -1?")
if err == nil {
t.Fatalf("ParsePoints failed: %v", err)
}
_, err = models.ParsePointsString("cpu value=1 1-")
if err == nil {
t.Fatalf("ParsePoints failed: %v", err)
}
}
func TestNewPointFloatWithoutDecimal(t *testing.T) {
test(t, `cpu value=1 1000000000`,
models.NewPoint(
@ -1064,7 +1127,6 @@ func TestNewPointNaN(t *testing.T) {
},
time.Unix(0, 0)),
)
}
func TestNewPointLargeNumberOfTags(t *testing.T) {
@ -1105,7 +1167,6 @@ func TestParsePointIntsFloats(t *testing.T) {
if _, ok := pt.Fields()["float2"].(float64); !ok {
t.Errorf("ParsePoint() float field mismatch: got %T, exp %T", pt.Fields()["float64"], float64(12.1))
}
}
func TestParsePointKeyUnsorted(t *testing.T) {

View File

@ -115,7 +115,7 @@ func New(c Config) *Monitor {
}
// Open opens the monitoring system, using the given clusterID, node ID, and hostname
// for identification purposem.
// for identification purpose.
func (m *Monitor) Open() error {
m.Logger.Printf("Starting monitor system")
@ -171,8 +171,8 @@ func (m *Monitor) DeregisterDiagnosticsClient(name string) {
// Statistics returns the combined statistics for all expvar data. The given
// tags are added to each of the returned statistics.
func (m *Monitor) Statistics(tags map[string]string) ([]*statistic, error) {
statistics := make([]*statistic, 0)
func (m *Monitor) Statistics(tags map[string]string) ([]*Statistic, error) {
statistics := make([]*Statistic, 0)
expvar.Do(func(kv expvar.KeyValue) {
// Skip built-in expvar stats.
@ -180,7 +180,7 @@ func (m *Monitor) Statistics(tags map[string]string) ([]*statistic, error) {
return
}
statistic := &statistic{
statistic := &Statistic{
Tags: make(map[string]string),
Values: make(map[string]interface{}),
}
@ -246,7 +246,7 @@ func (m *Monitor) Statistics(tags map[string]string) ([]*statistic, error) {
})
// Add Go memstats.
statistic := &statistic{
statistic := &Statistic{
Name: "runtime",
Tags: make(map[string]string),
Values: make(map[string]interface{}),
@ -388,16 +388,16 @@ func (m *Monitor) storeStatistics() {
}
}
// statistic represents the information returned by a single monitor client.
type statistic struct {
Name string
Tags map[string]string
Values map[string]interface{}
// Statistic represents the information returned by a single monitor client.
type Statistic struct {
Name string `json:"name"`
Tags map[string]string `json:"tags"`
Values map[string]interface{} `json:"values"`
}
// newStatistic returns a new statistic object.
func newStatistic(name string, tags map[string]string, values map[string]interface{}) *statistic {
return &statistic{
func newStatistic(name string, tags map[string]string, values map[string]interface{}) *Statistic {
return &Statistic{
Name: name,
Tags: tags,
Values: values,
@ -405,7 +405,7 @@ func newStatistic(name string, tags map[string]string, values map[string]interfa
}
// valueNames returns a sorted list of the value names, if any.
func (s *statistic) valueNames() []string {
func (s *Statistic) valueNames() []string {
a := make([]string, 0, len(s.Values))
for k, _ := range s.Values {
a = append(a, k)

View File

@ -11,7 +11,7 @@ import (
// StatementExecutor translates InfluxQL queries to Monitor methods.
type StatementExecutor struct {
Monitor interface {
Statistics(map[string]string) ([]*statistic, error)
Statistics(map[string]string) ([]*Statistic, error)
Diagnostics() (map[string]*Diagnostic, error)
}
}

View File

@ -47,6 +47,8 @@ func (c *tcpConnection) Close() {
}
type Service struct {
mu sync.Mutex
bindAddress string
database string
protocol string
@ -121,6 +123,9 @@ func NewService(c Config) (*Service, error) {
// Open starts the Graphite input processing data.
func (s *Service) Open() error {
s.mu.Lock()
defer s.mu.Unlock()
s.logger.Printf("Starting graphite service, batch size %d, batch timeout %s", s.batchSize, s.batchTimeout)
// Configure expvar monitoring. It's OK to do this even if the service fails to open and
@ -176,6 +181,9 @@ func (s *Service) closeAllConnections() {
// Close stops all data processing on the Graphite input.
func (s *Service) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
s.closeAllConnections()
if s.ln != nil {
@ -185,7 +193,9 @@ func (s *Service) Close() error {
s.udpConn.Close()
}
s.batcher.Stop()
if s.batcher != nil {
s.batcher.Stop()
}
close(s.done)
s.wg.Wait()
s.done = nil

View File

@ -0,0 +1,27 @@
package registration
import (
"time"
"github.com/influxdb/influxdb/toml"
)
const (
DefaultURL = "https://enterprise.influxdata.com"
DefaultStatsInterval = time.Minute
)
type Config struct {
Enabled bool `toml:"enabled"`
URL string `toml:"url"`
Token string `toml:"token"`
StatsInterval toml.Duration `toml:"stats-interval"`
}
func NewConfig() Config {
return Config{
Enabled: true,
URL: DefaultURL,
StatsInterval: toml.Duration(DefaultStatsInterval),
}
}

View File

@ -0,0 +1,33 @@
package registration_test
import (
"testing"
"time"
"github.com/BurntSushi/toml"
"github.com/influxdb/influxdb/services/registration"
)
func TestConfig_Parse(t *testing.T) {
// Parse configuration.
var c registration.Config
if _, err := toml.Decode(`
enabled = true
url = "a.b.c"
token = "1234"
stats-interval = "1s"
`, &c); err != nil {
t.Fatal(err)
}
// Validate configuration.
if c.Enabled != true {
t.Fatalf("unexpected enabled state: %v", c.Enabled)
} else if c.URL != "a.b.c" {
t.Fatalf("unexpected Enterprise URL: %s", c.URL)
} else if c.Token != "1234" {
t.Fatalf("unexpected Enterprise URL: %s", c.URL)
} else if time.Duration(c.StatsInterval) != time.Second {
t.Fatalf("unexpected stats interval: %v", c.StatsInterval)
}
}

View File

@ -0,0 +1,218 @@
package registration
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/url"
"os"
"sync"
"time"
"github.com/influxdb/influxdb/monitor"
)
// Service represents the registration service.
type Service struct {
MetaStore interface {
ClusterID() (uint64, error)
NodeID() uint64
}
Monitor interface {
Statistics(tags map[string]string) ([]*monitor.Statistic, error)
RegisterDiagnosticsClient(name string, client monitor.DiagsClient)
}
enabled bool
url *url.URL
token string
statsInterval time.Duration
version string
mu sync.Mutex
lastContact time.Time
wg sync.WaitGroup
done chan struct{}
logger *log.Logger
}
// NewService returns a configured registration service.
func NewService(c Config, version string) (*Service, error) {
url, err := url.Parse(c.URL)
if err != nil {
return nil, err
}
return &Service{
enabled: c.Enabled,
url: url,
token: c.Token,
statsInterval: time.Duration(c.StatsInterval),
version: version,
done: make(chan struct{}),
logger: log.New(os.Stderr, "[registration] ", log.LstdFlags),
}, nil
}
// Open starts retention policy enforcement.
func (s *Service) Open() error {
if !s.enabled {
return nil
}
s.logger.Println("Starting registration service")
if err := s.registerServer(); err != nil {
return err
}
// Register diagnostics if a Monitor service is available.
if s.Monitor != nil {
s.Monitor.RegisterDiagnosticsClient("registration", s)
}
s.wg.Add(1)
go s.reportStats()
return nil
}
// Close stops retention policy enforcement.
func (s *Service) Close() error {
s.logger.Println("registration service terminating")
close(s.done)
s.wg.Wait()
return nil
}
func (s *Service) Diagnostics() (*monitor.Diagnostic, error) {
diagnostics := map[string]interface{}{
"URL": s.url.String(),
"token": s.token,
"last_contact": s.getLastContact().String(),
}
return monitor.DiagnosticFromMap(diagnostics), nil
}
// registerServer registers the server.
func (s *Service) registerServer() error {
if !s.enabled || s.token == "" {
return nil
}
clusterID, err := s.MetaStore.ClusterID()
if err != nil {
s.logger.Printf("failed to retrieve cluster ID for registration: %s", err.Error())
return err
}
hostname, err := os.Hostname()
if err != nil {
return err
}
j := map[string]interface{}{
"cluster_id": fmt.Sprintf("%d", clusterID),
"server_id": fmt.Sprintf("%d", s.MetaStore.NodeID()),
"host": hostname,
"product": "influxdb",
"version": s.version,
}
b, err := json.Marshal(j)
if err != nil {
return err
}
url := fmt.Sprintf("%s/api/v1/servers?token=%s", s.url.String(), s.token)
s.wg.Add(1)
go func() {
defer s.wg.Done()
client := http.Client{Timeout: time.Duration(5 * time.Second)}
resp, err := client.Post(url, "application/json", bytes.NewBuffer(b))
if err != nil {
s.logger.Printf("failed to register server with %s: %s", s.url.String(), err.Error())
return
}
s.updateLastContact(time.Now().UTC())
defer resp.Body.Close()
if resp.StatusCode == http.StatusCreated {
return
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
s.logger.Printf("failed to read response from registration server: %s", err.Error())
return
}
s.logger.Printf("failed to register server with %s: received code %s, body: %s", s.url.String(), resp.Status, string(body))
}()
return nil
}
func (s *Service) reportStats() {
defer s.wg.Done()
if s.token == "" {
// No reporting, for now, without token.
return
}
statsURL := fmt.Sprintf("%s/api/v1/stats/influxdb?token=%s", s.url.String(), s.token)
clusterID, err := s.MetaStore.ClusterID()
if err != nil {
s.logger.Printf("failed to retrieve cluster ID for registration -- aborting stats upload: %s", err.Error())
return
}
t := time.NewTicker(s.statsInterval)
for {
select {
case <-t.C:
stats, err := s.Monitor.Statistics(nil)
if err != nil {
s.logger.Printf("failed to retrieve statistics: %s", err.Error())
continue
}
o := map[string]interface{}{
"cluster_id": fmt.Sprintf("%d", clusterID),
"server_id": fmt.Sprintf("%d", s.MetaStore.NodeID()),
"stats": stats,
}
b, err := json.Marshal(o)
if err != nil {
s.logger.Printf("failed to JSON-encode stats: %s", err.Error())
continue
}
client := http.Client{Timeout: time.Duration(5 * time.Second)}
resp, err := client.Post(statsURL, "application/json", bytes.NewBuffer(b))
if err != nil {
s.logger.Printf("failed to post statistics to %s: %s", statsURL, err.Error())
continue
}
s.updateLastContact(time.Now().UTC())
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
s.logger.Printf("failed to post statistics to %s: repsonse code: %d", statsURL, resp.StatusCode)
continue
}
case <-s.done:
return
}
}
}
func (s *Service) updateLastContact(t time.Time) {
s.mu.Lock()
defer s.mu.Unlock()
s.lastContact = t
}
func (s *Service) getLastContact() time.Time {
s.mu.Lock()
defer s.mu.Unlock()
return s.lastContact
}

View File

@ -29,7 +29,7 @@ type Service struct {
logger *log.Logger
}
// NewService returns a configure retention policy enforcement service.
// NewService returns a configured retention policy enforcement service.
func NewService(c Config) *Service {
return &Service{
checkInterval: time.Duration(c.CheckInterval),

View File

@ -89,7 +89,7 @@ type Config struct {
SSL bool `toml:"ssl"`
}
// NewSeries, takes a measurement, and point count,
// NewSeries takes a measurement, and point count,
// and a series count and returns a series
func NewSeries(m string, p int, sc int) series {
s := series{

View File

@ -8,6 +8,8 @@ import (
"github.com/influxdb/influxdb/client"
)
// QueryResults holds the total number of executed queries
// and the response time for each query
type QueryResults struct {
TotalQueries int
ResponseTimes ResponseTimes

View File

@ -80,13 +80,14 @@ type ResponseTime struct {
Time time.Time
}
// newResponseTime returns a new response time
// NewResponseTime returns a new response time
// with value `v` and time `time.Now()`.
func NewResponseTime(v int) ResponseTime {
r := ResponseTime{Value: v, Time: time.Now()}
return r
}
// ResponseTimes is a slice of response times
type ResponseTimes []ResponseTime
// Implements the `Len` method for the
@ -107,6 +108,7 @@ func (rs ResponseTimes) Swap(i, j int) {
rs[i], rs[j] = rs[j], rs[i]
}
// Measurements holds all measurement results of the stress test
type Measurements []string
// String returns a string and implements the `String` method for
@ -126,7 +128,7 @@ func (ms *Measurements) Set(value string) error {
return nil
}
// newClient returns a pointer to an InfluxDB client for
// NewClient returns a pointer to an InfluxDB client for
// a `Config`'s `Address` field. If an error is encountered
// when creating a new client, the function panics.
func (cfg *Config) NewClient() (*client.Client, error) {

View File

@ -1649,6 +1649,11 @@ func (e *Engine) readSeries() (map[string]*tsdb.Series, error) {
// has future encoded blocks so that this method can know how much of its values can be
// combined and output in the resulting encoded block.
func (e *Engine) DecodeAndCombine(newValues Values, block, buf []byte, nextTime int64, hasFutureBlock bool) (Values, []byte, error) {
// No new values passed in, so nothing to combine. Just return the existing block.
if len(newValues) == 0 {
return newValues, block, nil
}
values, err := DecodeBlock(block)
if err != nil {
panic(fmt.Sprintf("failure decoding block: %v", err))

View File

@ -1013,7 +1013,81 @@ func TestEngine_WriteIntoCompactedFile(t *testing.T) {
}
if count := e.DataFileCount(); count != 1 {
t.Fatalf("execpted 1 data file but got %d", count)
t.Fatalf("expected 1 data file but got %d", count)
}
tx, _ := e.Begin(false)
defer tx.Rollback()
c := tx.Cursor("cpu,host=A", fields, nil, true)
k, _ := c.SeekTo(0)
if k != 1000000000 {
t.Fatalf("wrong time: %d", k)
}
k, _ = c.Next()
if k != 2000000000 {
t.Fatalf("wrong time: %d", k)
}
k, _ = c.Next()
if k != 2500000000 {
t.Fatalf("wrong time: %d", k)
}
k, _ = c.Next()
if k != 3000000000 {
t.Fatalf("wrong time: %d", k)
}
k, _ = c.Next()
if k != 4000000000 {
t.Fatalf("wrong time: %d", k)
}
}
func TestEngine_WriteIntoCompactedFile_MaxPointsPerBlockZero(t *testing.T) {
e := OpenDefaultEngine()
defer e.Close()
fields := []string{"value"}
e.MaxPointsPerBlock = 4
e.RotateFileSize = 10
p1 := parsePoint("cpu,host=A value=1.1 1000000000")
p2 := parsePoint("cpu,host=A value=1.2 2000000000")
p3 := parsePoint("cpu,host=A value=1.3 3000000000")
p4 := parsePoint("cpu,host=A value=1.5 4000000000")
p5 := parsePoint("cpu,host=A value=1.6 2500000000")
p6 := parsePoint("cpu,host=A value=1.7 5000000000")
p7 := parsePoint("cpu,host=A value=1.8 6000000000")
p8 := parsePoint("cpu,host=A value=1.9 7000000000")
if err := e.WritePoints([]models.Point{p1, p2}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
if err := e.WritePoints([]models.Point{p3}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
if err := e.Compact(true); err != nil {
t.Fatalf("error compacting: %s", err.Error())
}
if err := e.WritePoints([]models.Point{p4}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
if err := e.WritePoints([]models.Point{p6, p7, p8}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
if err := e.Compact(true); err != nil {
t.Fatalf("error compacting: %s", err.Error())
}
if err := e.WritePoints([]models.Point{p5}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
if count := e.DataFileCount(); count != 1 {
t.Fatalf("expected 1 data file but got %d", count)
}
tx, _ := e.Begin(false)
@ -1353,6 +1427,32 @@ func TestEngine_RewriteFileAndCompact(t *testing.T) {
}()
}
func TestEngine_DecodeAndCombine_NoNewValues(t *testing.T) {
var newValues tsm1.Values
e := OpenDefaultEngine()
defer e.Engine.Close()
values := make(tsm1.Values, 1)
values[0] = tsm1.NewValue(time.Unix(0, 0), float64(1))
block, err := values.Encode(nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
remaining, encoded, err := e.DecodeAndCombine(newValues, block, nil, time.Unix(1, 0).UnixNano(), false)
if len(remaining) != 0 {
t.Fatalf("unexpected remaining values: exp %v, got %v", 0, len(remaining))
}
if len(encoded) != len(block) {
t.Fatalf("unexpected encoded block length: exp %v, got %v", len(block), len(encoded))
}
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
}
// Engine represents a test wrapper for tsm1.Engine.
type Engine struct {
*tsm1.Engine

View File

@ -29,7 +29,10 @@ func convertRowToPoints(measurementName string, row *models.Row) ([]models.Point
for _, v := range row.Values {
vals := make(map[string]interface{})
for fieldName, fieldIndex := range fieldIndexes {
vals[fieldName] = v[fieldIndex]
val := v[fieldIndex]
if val != nil {
vals[fieldName] = v[fieldIndex]
}
}
p := models.NewPoint(measurementName, row.Tags, vals, v[timeIndex].(time.Time))

View File

@ -739,6 +739,9 @@ func (q *QueryExecutor) writeInto(row *models.Row, selectstmt *influxql.SelectSt
// limitedRowWriter and ExecuteAggregate/Raw makes it ridiculously hard to make sure that the
// results will be the same as when queried normally.
measurement := intoMeasurement(selectstmt)
if measurement == "" {
measurement = row.Name
}
intodb, err := intoDB(selectstmt)
if err != nil {
return err
@ -748,14 +751,6 @@ func (q *QueryExecutor) writeInto(row *models.Row, selectstmt *influxql.SelectSt
if err != nil {
return err
}
for _, p := range points {
fields := p.Fields()
for _, v := range fields {
if v == nil {
return nil
}
}
}
req := &IntoWriteRequest{
Database: intodb,
RetentionPolicy: rp,

View File

@ -164,6 +164,16 @@ func (m *ShowMeasurementsMapper) Open() error {
// Start a goroutine to send the names over the channel as needed.
go func() {
for _, mm := range measurements {
// Filter measurements by WITH clause, if one was given.
if m.stmt.Source != nil {
s, ok := m.stmt.Source.(*influxql.Measurement)
if !ok ||
s.Regex != nil && !s.Regex.Val.MatchString(mm.Name) ||
s.Name != "" && s.Name != mm.Name {
continue
}
}
ch <- mm.Name
}
close(ch)