From f8c1e953d497499250189b7fc10e513ce87389d8 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Thu, 20 Aug 2015 14:26:44 -0600 Subject: [PATCH] godep update influxdb to 0.9.3-rc1 --- Godeps/Godeps.json | 24 +- .../influxdb/influxdb/influxql/ast.go | 2 + .../influxdb/influxdb/influxql/ast_test.go | 3 + .../influxdb/influxdb/influxql/result.go | 5 + .../influxdb/influxdb/tsdb/README.md | 22 +- .../influxdb/influxdb/tsdb/config.go | 44 +- .../influxdb/influxdb/tsdb/engine.go | 37 +- .../influxdb/influxdb/tsdb/engine/b1/b1.go | 2 +- .../influxdb/tsdb/engine/b1/b1_test.go | 2 +- .../influxdb/influxdb/tsdb/engine/bz1/bz1.go | 362 ++-- .../influxdb/tsdb/engine/bz1/bz1_test.go | 101 +- .../influxdb/influxdb/tsdb/engine/wal/wal.go | 1596 +++++++++++++++++ .../influxdb/tsdb/engine/wal/wal_test.go | 906 ++++++++++ .../influxdb/influxdb/tsdb/executor_test.go | 3 + .../influxdb/influxdb/tsdb/mapper_test.go | 55 +- .../github.com/influxdb/influxdb/tsdb/meta.go | 14 + .../influxdb/influxdb/tsdb/meta_test.go | 5 +- .../influxdb/tsdb/query_executor_test.go | 12 + .../influxdb/influxdb/tsdb/shard.go | 10 +- .../influxdb/influxdb/tsdb/shard_test.go | 23 +- .../influxdb/influxdb/tsdb/store.go | 9 +- .../influxdb/influxdb/tsdb/store_test.go | 63 + 22 files changed, 3060 insertions(+), 240 deletions(-) create mode 100644 Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine/wal/wal.go create mode 100644 Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine/wal/wal_test.go diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index e17125352..9400bc682 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -92,33 +92,33 @@ }, { "ImportPath": "github.com/influxdb/influxdb/client", - "Comment": "v0.9.1-rc1-545-g8de66eb", - "Rev": "8de66eb37024cd6bd953662e5588253f0888874b" + "Comment": "v0.9.3-rc1", + "Rev": "f4077764b2bb2b03241452d88e9db321c62bb560" }, { "ImportPath": "github.com/influxdb/influxdb/influxql", - "Comment": "v0.9.1-rc1-545-g8de66eb", - "Rev": "8de66eb37024cd6bd953662e5588253f0888874b" + "Comment": "v0.9.3-rc1", + "Rev": "f4077764b2bb2b03241452d88e9db321c62bb560" }, { "ImportPath": "github.com/influxdb/influxdb/meta", - "Comment": "v0.9.1-rc1-545-g8de66eb", - "Rev": "8de66eb37024cd6bd953662e5588253f0888874b" + "Comment": "v0.9.3-rc1", + "Rev": "f4077764b2bb2b03241452d88e9db321c62bb560" }, { "ImportPath": "github.com/influxdb/influxdb/snapshot", - "Comment": "v0.9.1-rc1-545-g8de66eb", - "Rev": "8de66eb37024cd6bd953662e5588253f0888874b" + "Comment": "v0.9.3-rc1", + "Rev": "f4077764b2bb2b03241452d88e9db321c62bb560" }, { "ImportPath": "github.com/influxdb/influxdb/toml", - "Comment": "v0.9.1-rc1-545-g8de66eb", - "Rev": "8de66eb37024cd6bd953662e5588253f0888874b" + "Comment": "v0.9.3-rc1", + "Rev": "f4077764b2bb2b03241452d88e9db321c62bb560" }, { "ImportPath": "github.com/influxdb/influxdb/tsdb", - "Comment": "v0.9.1-rc1-545-g8de66eb", - "Rev": "8de66eb37024cd6bd953662e5588253f0888874b" + "Comment": "v0.9.3-rc1", + "Rev": "f4077764b2bb2b03241452d88e9db321c62bb560" }, { "ImportPath": "github.com/lib/pq", diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/influxql/ast.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/influxql/ast.go index 04bb66bbb..6235e405a 100644 --- a/Godeps/_workspace/src/github.com/influxdb/influxdb/influxql/ast.go +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/influxql/ast.go @@ -2469,6 +2469,8 @@ func timeExprValue(ref Expr, lit Expr) time.Time { return lit.Val case *DurationLiteral: return time.Unix(0, int64(lit.Val)).UTC() + case *NumberLiteral: + return time.Unix(0, int64(lit.Val)).UTC() } } return time.Time{} diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/influxql/ast_test.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/influxql/ast_test.go index 468b7b1f3..b58e7353b 100644 --- a/Godeps/_workspace/src/github.com/influxdb/influxdb/influxql/ast_test.go +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/influxql/ast_test.go @@ -483,6 +483,9 @@ func TestTimeRange(t *testing.T) { {expr: `'2000-01-01 00:00:00' < time`, min: `2000-01-01T00:00:00.000000001Z`, max: `0001-01-01T00:00:00Z`}, {expr: `'2000-01-01 00:00:00' <= time`, min: `2000-01-01T00:00:00Z`, max: `0001-01-01T00:00:00Z`}, + // number literal + {expr: `time < 10`, min: `0001-01-01T00:00:00Z`, max: `1970-01-01T00:00:00.000000009Z`}, + // Equality {expr: `time = '2000-01-01 00:00:00'`, min: `2000-01-01T00:00:00Z`, max: `2000-01-01T00:00:00Z`}, diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/influxql/result.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/influxql/result.go index a74ed714a..a9a8cd561 100644 --- a/Godeps/_workspace/src/github.com/influxdb/influxdb/influxql/result.go +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/influxql/result.go @@ -31,6 +31,11 @@ type Row struct { Err error `json:"err,omitempty"` } +// SameSeries returns true if r contains values for the same series as o. +func (r *Row) SameSeries(o *Row) bool { + return r.tagsHash() == o.tagsHash() && r.Name == o.Name +} + // tagsHash returns a hash of tag key/value pairs. func (r *Row) tagsHash() uint64 { h := fnv.New64a() diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/README.md b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/README.md index a2229ee3b..112dc202c 100644 --- a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/README.md +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/README.md @@ -37,8 +37,8 @@ Fields are key-value metrics associated with the measurement. Every line must h Field keys are always strings and follow the same syntactical rules as described above for tag keys and values. Field values can be one of four types. The first value written for a given field on a given measurement defines the type of that field for all series under that measurement. -* _integer_ - Numeric values that do not include a decimal. (e.g. 1, 345, 2015, -10) -* _float_ - Numeric values that include a decimal. (e.g. 1.0, -3.14, 6.0+e5). Note that all values _must_ have a decimal even if the decimal value is zero (1 is an _integer_, 1.0 is a _float_). +* _integer_ - Numeric values that do not include a decimal and are followed by a trailing i when inserted (e.g. 1i, 345i, 2015i, -10i). Note that all values must have a trailing i. If they do not they will be written as floats. +* _float_ - Numeric values tha are not followed by a trailing i. (e.g. 1, 1.0, -3.14, 6.0+e5, 10). * _boolean_ - A value indicating true or false. Valid boolean strings are (t, T, true, TRUE, f, F, false, and FALSE). * _string_ - A text value. All string values _must_ be surrounded in double-quotes `"`. If the string contains a double-quote, it must be escaped with a backslash, e.g. `\"`. @@ -46,9 +46,15 @@ a double-quote, it must be escaped with a backslash, e.g. `\"`. ``` # integer value -cpu value=1 +cpu value=1i + +cpu value=1.1i # will result in a parse error # float value +cpu_load value=1 + +cpu_load value=1.0 + cpu_load value=1.2 # boolean value @@ -58,7 +64,7 @@ error fatal=true event msg="logged out" # multiple values -cpu load=10.0,alert=true,reason="value above maximum threshold" +cpu load=10,alert=true,reason="value above maximum threshold" ``` ## Timestamp @@ -71,13 +77,13 @@ an integer epoch in microseconds, milliseconds, seconds, minutes or hours. ## Full Example A full example is shown below. ``` -cpu,host=server01,region=uswest value=1.0 1434055562000000000 -cpu,host=server02,region=uswest value=3.0 1434055562000010000 +cpu,host=server01,region=uswest value=1 1434055562000000000 +cpu,host=server02,region=uswest value=3 1434055562000010000 ``` In this example the first line shows a `measurement` of "cpu", there are two tags "host" and "region, the `value` is 1.0, and the `timestamp` is 1434055562000000000. Following this is a second line, also a point in the `measurement` "cpu" but belonging to a different "host". ``` -cpu,host=server\ 01,region=uswest value=1.0,msg="all systems nominal" -cpu,host=server\ 01,region=us\,west value_int=1 +cpu,host=server\ 01,region=uswest value=1,msg="all systems nominal" +cpu,host=server\ 01,region=us\,west value_int=1i ``` In these examples, the "host" is set to `server 01`. The field value associated with field key `msg` is double-quoted, as it is a string. The second example shows a region of `us,west` with the comma properly escaped. In the first example `value` is written as a floating point number. In the second, `value_int` is an integer. diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/config.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/config.go index a74caceec..dc3081442 100644 --- a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/config.go +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/config.go @@ -16,13 +16,48 @@ const ( // DefaultWALPartitionFlushDelay is the sleep time between WAL partition flushes. DefaultWALPartitionFlushDelay = 2 * time.Second + + // tsdb/engine/wal configuration options + + // DefaultReadySeriesSize of 32KB specifies when a series is eligible to be flushed + DefaultReadySeriesSize = 30 * 1024 + + // DefaultCompactionThreshold flush and compact a partition once this ratio of keys are over the flush size + DefaultCompactionThreshold = 0.5 + + // DefaultMaxSeriesSize specifies the size at which a series will be forced to flush + DefaultMaxSeriesSize = 1024 * 1024 + + // DefaultFlushColdInterval specifies how long after a partition has been cold + // for writes that a full flush and compaction are forced + DefaultFlushColdInterval = 5 * time.Minute + + // DefaultParititionSizeThreshold specifies when a partition gets to this size in + // memory, we should slow down writes until it gets a chance to compact. + // This will force clients to get backpressure if they're writing too fast. We need + // this because the WAL can take writes much faster than the index. So eventually + // we'll need to create backpressure, otherwise we'll fill up the memory and die. + // This number multiplied by the parition count is roughly the max possible memory + // size for the in-memory WAL cache. + DefaultPartitionSizeThreshold = 20 * 1024 * 1024 // 20MB ) type Config struct { - Dir string `toml:"dir"` + Dir string `toml:"dir"` + + // WAL config options for b1 (introduced in 0.9.2) MaxWALSize int `toml:"max-wal-size"` WALFlushInterval toml.Duration `toml:"wal-flush-interval"` WALPartitionFlushDelay toml.Duration `toml:"wal-partition-flush-delay"` + + // WAL configuration options for bz1 (introduced in 0.9.3) + WALDir string `toml:"wal-dir"` + WALEnableLogging bool `toml:"wal-enable-logging"` + WALReadySeriesSize int `toml:"wal-ready-series-size"` + WALCompactionThreshold float64 `toml:"wal-compaction-threshold"` + WALMaxSeriesSize int `toml:"wal-max-series-size"` + WALFlushColdInterval toml.Duration `toml:"wal-flush-cold-interval"` + WALPartitionSizeThreshold uint64 `toml:"wal-partition-size-threshold"` } func NewConfig() Config { @@ -30,5 +65,12 @@ func NewConfig() Config { MaxWALSize: DefaultMaxWALSize, WALFlushInterval: toml.Duration(DefaultWALFlushInterval), WALPartitionFlushDelay: toml.Duration(DefaultWALPartitionFlushDelay), + + WALEnableLogging: true, + WALReadySeriesSize: DefaultReadySeriesSize, + WALCompactionThreshold: DefaultCompactionThreshold, + WALMaxSeriesSize: DefaultMaxSeriesSize, + WALFlushColdInterval: toml.Duration(DefaultFlushColdInterval), + WALPartitionSizeThreshold: DefaultPartitionSizeThreshold, } } diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine.go index 65e8bb0da..67976786b 100644 --- a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine.go +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine.go @@ -1,10 +1,12 @@ package tsdb import ( + "bytes" "errors" "fmt" "io" "os" + "sort" "time" "github.com/boltdb/bolt" @@ -16,7 +18,7 @@ var ( ) // DefaultEngine is the default engine used by the shard when initializing. -const DefaultEngine = "b1" +const DefaultEngine = "bz1" // Engine represents a swappable storage engine for the shard. type Engine interface { @@ -52,7 +54,7 @@ func RegisterEngine(name string, fn NewEngineFunc) { func NewEngine(path string, options EngineOptions) (Engine, error) { // Create a new engine if _, err := os.Stat(path); os.IsNotExist(err) { - return newEngineFuncs[DefaultEngine](path, options), nil + return newEngineFuncs[options.EngineVersion](path, options), nil } // Only bolt-based backends are currently supported so open it and check the format. @@ -96,17 +98,22 @@ func NewEngine(path string, options EngineOptions) (Engine, error) { // EngineOptions represents the options used to initialize the engine. type EngineOptions struct { + EngineVersion string MaxWALSize int WALFlushInterval time.Duration WALPartitionFlushDelay time.Duration + + Config Config } // NewEngineOptions returns the default options. func NewEngineOptions() EngineOptions { return EngineOptions{ + EngineVersion: DefaultEngine, MaxWALSize: DefaultMaxWALSize, WALFlushInterval: DefaultWALFlushInterval, WALPartitionFlushDelay: DefaultWALPartitionFlushDelay, + Config: NewConfig(), } } @@ -125,3 +132,29 @@ type Cursor interface { Seek(seek []byte) (key, value []byte) Next() (key, value []byte) } + +// DedupeEntries returns slices with unique keys (the first 8 bytes). +func DedupeEntries(a [][]byte) [][]byte { + // Convert to a map where the last slice is used. + m := make(map[string][]byte) + for _, b := range a { + m[string(b[0:8])] = b + } + + // Convert map back to a slice of byte slices. + other := make([][]byte, 0, len(m)) + for _, v := range m { + other = append(other, v) + } + + // Sort entries. + sort.Sort(ByteSlices(other)) + + return other +} + +type ByteSlices [][]byte + +func (a ByteSlices) Len() int { return len(a) } +func (a ByteSlices) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a ByteSlices) Less(i, j int) bool { return bytes.Compare(a[i], a[j]) == -1 } diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine/b1/b1.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine/b1/b1.go index f0f7fbb18..f87146d58 100644 --- a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine/b1/b1.go +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine/b1/b1.go @@ -194,7 +194,7 @@ func (e *Engine) LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields meta = tx.Bucket([]byte("series")) c = meta.Cursor() for k, v := c.First(); k != nil; k, v = c.Next() { - series := &tsdb.Series{} + series := tsdb.NewSeries("", nil) if err := series.UnmarshalBinary(v); err != nil { return err } diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine/b1/b1_test.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine/b1/b1_test.go index ee1009dbd..cd2d98445 100644 --- a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine/b1/b1_test.go +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine/b1/b1_test.go @@ -22,7 +22,7 @@ func TestEngine_WritePoints(t *testing.T) { mf := &tsdb.MeasurementFields{Fields: make(map[string]*tsdb.Field)} mf.CreateFieldIfNotExists("value", influxql.Float) seriesToCreate := []*tsdb.SeriesCreate{ - {Series: &tsdb.Series{Key: string(tsdb.MakeKey([]byte("temperature"), nil))}}, + {Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("temperature"), nil)), nil)}, } // Parse point. diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine/bz1/bz1.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine/bz1/bz1.go index 6ec3f8c8c..1fe49d373 100644 --- a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine/bz1/bz1.go +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine/bz1/bz1.go @@ -3,11 +3,13 @@ package bz1 import ( "bytes" "encoding/binary" + "encoding/json" "errors" "fmt" "io" "log" "math" + "path/filepath" "sort" "sync" "time" @@ -15,6 +17,7 @@ import ( "github.com/boltdb/bolt" "github.com/golang/snappy" "github.com/influxdb/influxdb/tsdb" + "github.com/influxdb/influxdb/tsdb/engine/wal" ) var ( @@ -22,8 +25,10 @@ var ( ErrSeriesExists = errors.New("series exists") ) -// Format is the file format name of this engine. -const Format = "bz1" +const ( + // Format is the file format name of this engine. + Format = "bz1" +) func init() { tsdb.RegisterEngine(Format, NewEngine) @@ -44,21 +49,44 @@ type Engine struct { db *bolt.DB // Write-ahead log storage. - PointsWriter interface { - WritePoints(points []tsdb.Point) error - } + WAL WAL // Size of uncompressed points to write to a block. BlockSize int } +// WAL represents a write ahead log that can be queried +type WAL interface { + WritePoints(points []tsdb.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error + LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error + DeleteSeries(keys []string) error + Cursor(key string) tsdb.Cursor + Open() error + Close() error +} + // NewEngine returns a new instance of Engine. func NewEngine(path string, opt tsdb.EngineOptions) tsdb.Engine { - return &Engine{ + // create the writer with a directory of the same name as the shard, but with the wal extension + w := wal.NewLog(filepath.Join(opt.Config.WALDir, filepath.Base(path))) + + w.ReadySeriesSize = opt.Config.WALReadySeriesSize + w.FlushColdInterval = time.Duration(opt.Config.WALFlushColdInterval) + w.MaxSeriesSize = opt.Config.WALMaxSeriesSize + w.CompactionThreshold = opt.Config.WALCompactionThreshold + w.PartitionSizeThreshold = opt.Config.WALPartitionSizeThreshold + w.ReadySeriesSize = opt.Config.WALReadySeriesSize + + e := &Engine{ path: path, BlockSize: DefaultBlockSize, + WAL: w, } + + w.Index = e + + return e } // Path returns the path the engine was opened with. @@ -79,8 +107,6 @@ func (e *Engine) Open() error { // Initialize data file. if err := e.db.Update(func(tx *bolt.Tx) error { - _, _ = tx.CreateBucketIfNotExists([]byte("series")) - _, _ = tx.CreateBucketIfNotExists([]byte("fields")) _, _ = tx.CreateBucketIfNotExists([]byte("points")) // Set file format, if not set yet. @@ -101,6 +127,7 @@ func (e *Engine) Open() error { e.close() return err } + return nil } @@ -108,7 +135,10 @@ func (e *Engine) Open() error { func (e *Engine) Close() error { e.mu.Lock() defer e.mu.Unlock() - return e.close() + if err := e.close(); err != nil { + return err + } + return e.WAL.Close() } func (e *Engine) close() error { @@ -123,16 +153,14 @@ func (e *Engine) SetLogOutput(w io.Writer) {} // LoadMetadataIndex loads the shard metadata into memory. func (e *Engine) LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error { - return e.db.View(func(tx *bolt.Tx) error { + if err := e.db.View(func(tx *bolt.Tx) error { // Load measurement metadata - meta := tx.Bucket([]byte("fields")) - c := meta.Cursor() - for k, v := c.First(); k != nil; k, v = c.Next() { + fields, err := e.readFields(tx) + if err != nil { + return err + } + for k, mf := range fields { m := index.CreateMeasurementIndexIfNotExists(string(k)) - mf := &tsdb.MeasurementFields{} - if err := mf.UnmarshalBinary(v); err != nil { - return err - } for name, _ := range mf.Fields { m.SetFieldName(name) } @@ -141,97 +169,59 @@ func (e *Engine) LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields } // Load series metadata - meta = tx.Bucket([]byte("series")) - c = meta.Cursor() - for k, v := c.First(); k != nil; k, v = c.Next() { - series := &tsdb.Series{} - if err := series.UnmarshalBinary(v); err != nil { - return err - } - index.CreateSeriesIndexIfNotExists(tsdb.MeasurementFromSeriesKey(string(k)), series) - } - return nil - }) -} - -// WritePoints writes metadata and point data into the engine. -// Returns an error if new points are added to an existing key. -func (e *Engine) WritePoints(points []tsdb.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error { - // Write series & field metadata. - if err := e.db.Update(func(tx *bolt.Tx) error { - if err := e.writeSeries(tx, seriesToCreate); err != nil { - return fmt.Errorf("write series: %s", err) - } - if err := e.writeFields(tx, measurementFieldsToSave); err != nil { - return fmt.Errorf("write fields: %s", err) + series, err := e.readSeries(tx) + if err != nil { + return err } + // Load the series into the in-memory index in sorted order to ensure + // it's always consistent for testing purposes + a := make([]string, 0, len(series)) + for k, _ := range series { + a = append(a, k) + } + sort.Strings(a) + for _, key := range a { + s := series[key] + s.InitializeShards() + index.CreateSeriesIndexIfNotExists(tsdb.MeasurementFromSeriesKey(string(key)), s) + } return nil }); err != nil { return err } + // now flush the metadata that was in the WAL, but hand't yet been flushed + if err := e.WAL.LoadMetadataIndex(index, measurementFields); err != nil { + return err + } + + // finally open the WAL up + return e.WAL.Open() +} + +// WritePoints writes metadata and point data into the engine. +// Returns an error if new points are added to an existing key. +func (e *Engine) WritePoints(points []tsdb.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error { // Write points to the WAL. - if err := e.PointsWriter.WritePoints(points); err != nil { + if err := e.WAL.WritePoints(points, measurementFieldsToSave, seriesToCreate); err != nil { return fmt.Errorf("write points: %s", err) } return nil } -// writeSeries writes a list of series to the metadata. -func (e *Engine) writeSeries(tx *bolt.Tx, a []*tsdb.SeriesCreate) error { - // Ignore if there are no series. - if len(a) == 0 { - return nil - } - - // Marshal and insert each series into the metadata. - b := tx.Bucket([]byte("series")) - for _, sc := range a { - // Marshal series into bytes. - data, err := sc.Series.MarshalBinary() - if err != nil { - return fmt.Errorf("marshal series: %s", err) - } - - // Insert marshaled data into appropriate key. - if err := b.Put([]byte(sc.Series.Key), data); err != nil { - return fmt.Errorf("put: %s", err) - } - } - - return nil -} - -// writeFields writes a list of measurement fields to the metadata. -func (e *Engine) writeFields(tx *bolt.Tx, m map[string]*tsdb.MeasurementFields) error { - // Ignore if there are no fields to save. - if len(m) == 0 { - return nil - } - - // Persist each measurement field in the map. - b := tx.Bucket([]byte("fields")) - for k, f := range m { - // Marshal field into bytes. - data, err := f.MarshalBinary() - if err != nil { - return fmt.Errorf("marshal measurement field: %s", err) - } - - // Insert marshaled data into key. - if err := b.Put([]byte(k), data); err != nil { - return fmt.Errorf("put: %s", err) - } - } - - return nil -} - // WriteIndex writes marshaled points to the engine's underlying index. -func (e *Engine) WriteIndex(pointsByKey map[string][][]byte) error { +func (e *Engine) WriteIndex(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error { return e.db.Update(func(tx *bolt.Tx) error { + // Write series & field metadata. + if err := e.writeNewSeries(tx, seriesToCreate); err != nil { + return fmt.Errorf("write series: %s", err) + } + if err := e.writeNewFields(tx, measurementFieldsToSave); err != nil { + return fmt.Errorf("write fields: %s", err) + } + for key, values := range pointsByKey { if err := e.writeIndex(tx, key, values); err != nil { return fmt.Errorf("write: key=%x, err=%s", key, err) @@ -241,6 +231,103 @@ func (e *Engine) WriteIndex(pointsByKey map[string][][]byte) error { }) } +func (e *Engine) writeNewFields(tx *bolt.Tx, measurementFieldsToSave map[string]*tsdb.MeasurementFields) error { + if len(measurementFieldsToSave) == 0 { + return nil + } + + // read in all the previously saved fields + fields, err := e.readFields(tx) + if err != nil { + return err + } + + // add the new ones or overwrite old ones + for name, mf := range measurementFieldsToSave { + fields[name] = mf + } + + return e.writeFields(tx, fields) +} + +func (e *Engine) writeFields(tx *bolt.Tx, fields map[string]*tsdb.MeasurementFields) error { + // compress and save everything + data, err := json.Marshal(fields) + if err != nil { + return err + } + + return tx.Bucket([]byte("meta")).Put([]byte("fields"), snappy.Encode(nil, data)) +} + +func (e *Engine) readFields(tx *bolt.Tx) (map[string]*tsdb.MeasurementFields, error) { + fields := make(map[string]*tsdb.MeasurementFields) + + b := tx.Bucket([]byte("meta")).Get([]byte("fields")) + if b == nil { + return fields, nil + } + + data, err := snappy.Decode(nil, b) + if err != nil { + return nil, err + } + + if err := json.Unmarshal(data, &fields); err != nil { + return nil, err + } + + return fields, nil +} + +func (e *Engine) writeNewSeries(tx *bolt.Tx, seriesToCreate []*tsdb.SeriesCreate) error { + if len(seriesToCreate) == 0 { + return nil + } + + // read in previously saved series + series, err := e.readSeries(tx) + if err != nil { + return err + } + + // add new ones, compress and save + for _, s := range seriesToCreate { + series[s.Series.Key] = s.Series + } + + return e.writeSeries(tx, series) +} + +func (e *Engine) writeSeries(tx *bolt.Tx, series map[string]*tsdb.Series) error { + data, err := json.Marshal(series) + if err != nil { + return err + } + + return tx.Bucket([]byte("meta")).Put([]byte("series"), snappy.Encode(nil, data)) +} + +func (e *Engine) readSeries(tx *bolt.Tx) (map[string]*tsdb.Series, error) { + series := make(map[string]*tsdb.Series) + + b := tx.Bucket([]byte("meta")).Get([]byte("series")) + if b == nil { + return series, nil + } + + data, err := snappy.Decode(nil, b) + if err != nil { + return nil, err + } + + if err := json.Unmarshal(data, &series); err != nil { + return nil, err + } + + return series, nil +} + // writeIndex writes a set of points for a single key. func (e *Engine) writeIndex(tx *bolt.Tx, key string, a [][]byte) error { // Ignore if there are no points. @@ -256,8 +343,13 @@ func (e *Engine) writeIndex(tx *bolt.Tx, key string, a [][]byte) error { c := bkt.Cursor() // Ensure the slice is sorted before retrieving the time range. - a = DedupeEntries(a) - sort.Sort(byteSlices(a)) + a = tsdb.DedupeEntries(a) + + // Convert the raw time and byte slices to entries with lengths + for i, p := range a { + timestamp := int64(btou64(p[0:8])) + a[i] = MarshalEntry(timestamp, p[8:]) + } // Determine time range of new data. tmin, tmax := int64(btou64(a[0][0:8])), int64(btou64(a[len(a)-1][0:8])) @@ -270,6 +362,7 @@ func (e *Engine) writeIndex(tx *bolt.Tx, key string, a [][]byte) error { if err := e.writeBlocks(bkt, a); err != nil { return fmt.Errorf("append blocks: %s", err) } + return nil } // Generate map of inserted keys. @@ -311,7 +404,7 @@ func (e *Engine) writeIndex(tx *bolt.Tx, key string, a [][]byte) error { // Merge entries before rewriting. a = append(existing, a...) - sort.Sort(byteSlices(a)) + sort.Sort(tsdb.ByteSlices(a)) // Rewrite points to new blocks. if err := e.writeBlocks(bkt, a); err != nil { @@ -325,9 +418,6 @@ func (e *Engine) writeIndex(tx *bolt.Tx, key string, a [][]byte) error { func (e *Engine) writeBlocks(bkt *bolt.Bucket, a [][]byte) error { var block []byte - // Dedupe points by key. - a = DedupeEntries(a) - // Group points into blocks by size. tmin, tmax := int64(math.MaxInt64), int64(math.MinInt64) for i, p := range a { @@ -367,36 +457,56 @@ func (e *Engine) writeBlocks(bkt *bolt.Bucket, a [][]byte) error { // DeleteSeries deletes the series from the engine. func (e *Engine) DeleteSeries(keys []string) error { + // remove it from the WAL first + if err := e.WAL.DeleteSeries(keys); err != nil { + return err + } + return e.db.Update(func(tx *bolt.Tx) error { + series, err := e.readSeries(tx) + if err != nil { + return err + } for _, k := range keys { - if err := tx.Bucket([]byte("series")).Delete([]byte(k)); err != nil { - return fmt.Errorf("delete series metadata: %s", err) - } + delete(series, k) if err := tx.Bucket([]byte("points")).DeleteBucket([]byte(k)); err != nil && err != bolt.ErrBucketNotFound { return fmt.Errorf("delete series data: %s", err) } } - return nil + + return e.writeSeries(tx, series) }) } // DeleteMeasurement deletes a measurement and all related series. func (e *Engine) DeleteMeasurement(name string, seriesKeys []string) error { + // remove from the WAL first so it won't get flushed after removing from Bolt + if err := e.WAL.DeleteSeries(seriesKeys); err != nil { + return err + } + return e.db.Update(func(tx *bolt.Tx) error { - if err := tx.Bucket([]byte("fields")).Delete([]byte(name)); err != nil { + fields, err := e.readFields(tx) + if err != nil { + return err + } + delete(fields, name) + if err := e.writeFields(tx, fields); err != nil { return err } + series, err := e.readSeries(tx) + if err != nil { + return err + } for _, k := range seriesKeys { - if err := tx.Bucket([]byte("series")).Delete([]byte(k)); err != nil { - return fmt.Errorf("delete series metadata: %s", err) - } + delete(series, k) if err := tx.Bucket([]byte("points")).DeleteBucket([]byte(k)); err != nil && err != bolt.ErrBucketNotFound { return fmt.Errorf("delete series data: %s", err) } } - return nil + return e.writeSeries(tx, series) }) } @@ -418,7 +528,7 @@ func (e *Engine) Begin(writable bool) (tsdb.Tx, error) { if err != nil { return nil, err } - return &Tx{Tx: tx, engine: e}, nil + return &Tx{Tx: tx, engine: e, wal: e.WAL}, nil } // Stats returns internal statistics for the engine. @@ -439,19 +549,25 @@ type Stats struct { type Tx struct { *bolt.Tx engine *Engine + wal WAL } // Cursor returns an iterator for a key. func (tx *Tx) Cursor(key string) tsdb.Cursor { + walCursor := tx.wal.Cursor(key) + // Retrieve points bucket. Ignore if there is no bucket. b := tx.Bucket([]byte("points")).Bucket([]byte(key)) if b == nil { - return nil + return walCursor } - return &Cursor{ + + c := &Cursor{ cursor: b.Cursor(), buf: make([]byte, DefaultBlockSize), } + + return tsdb.MultiCursor(walCursor, c) } // Cursor provides ordered iteration across a series. @@ -584,26 +700,6 @@ func SplitEntries(b []byte) [][]byte { } } -// DedupeEntries returns slices with unique keys (the first 8 bytes). -func DedupeEntries(a [][]byte) [][]byte { - // Convert to a map where the last slice is used. - m := make(map[string][]byte) - for _, b := range a { - m[string(b[0:8])] = b - } - - // Convert map back to a slice of byte slices. - other := make([][]byte, 0, len(m)) - for _, v := range m { - other = append(other, v) - } - - // Sort entries. - sort.Sort(byteSlices(other)) - - return other -} - // entryHeaderSize is the number of bytes required for the header. const entryHeaderSize = 8 + 4 @@ -619,9 +715,3 @@ func u64tob(v uint64) []byte { // btou64 converts an 8-byte slice into an uint64. func btou64(b []byte) uint64 { return binary.BigEndian.Uint64(b) } - -type byteSlices [][]byte - -func (a byteSlices) Len() int { return len(a) } -func (a byteSlices) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a byteSlices) Less(i, j int) bool { return bytes.Compare(a[i], a[j]) == -1 } diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine/bz1/bz1_test.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine/bz1/bz1_test.go index 4354a3ee8..af78374ea 100644 --- a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine/bz1/bz1_test.go +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine/bz1/bz1_test.go @@ -23,15 +23,16 @@ func TestEngine_LoadMetadataIndex_Series(t *testing.T) { e := OpenDefaultEngine() defer e.Close() - // Setup nop mock. - e.PointsWriter.WritePointsFn = func(a []tsdb.Point) error { return nil } + // Setup mock that writes the index + seriesToCreate := []*tsdb.SeriesCreate{ + {Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "server0"})), map[string]string{"host": "server0"})}, + {Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "server1"})), map[string]string{"host": "server1"})}, + {Series: tsdb.NewSeries("series with spaces", nil)}, + } + e.PointsWriter.WritePointsFn = func(a []tsdb.Point) error { return e.WriteIndex(nil, nil, seriesToCreate) } // Write series metadata. - if err := e.WritePoints(nil, nil, []*tsdb.SeriesCreate{ - {Series: &tsdb.Series{Key: string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "server0"})), Tags: map[string]string{"host": "server0"}}}, - {Series: &tsdb.Series{Key: string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "server1"})), Tags: map[string]string{"host": "server1"}}}, - {Series: &tsdb.Series{Key: "series with spaces"}}, - }); err != nil { + if err := e.WritePoints(nil, nil, seriesToCreate); err != nil { t.Fatal(err) } @@ -62,17 +63,18 @@ func TestEngine_LoadMetadataIndex_Fields(t *testing.T) { e := OpenDefaultEngine() defer e.Close() - // Setup nop mock. - e.PointsWriter.WritePointsFn = func(a []tsdb.Point) error { return nil } - - // Write series metadata. - if err := e.WritePoints(nil, map[string]*tsdb.MeasurementFields{ + // Setup mock that writes the index + fields := map[string]*tsdb.MeasurementFields{ "cpu": &tsdb.MeasurementFields{ Fields: map[string]*tsdb.Field{ "value": &tsdb.Field{ID: 0, Name: "value"}, }, }, - }, nil); err != nil { + } + e.PointsWriter.WritePointsFn = func(a []tsdb.Point) error { return e.WriteIndex(nil, fields, nil) } + + // Write series metadata. + if err := e.WritePoints(nil, fields, nil); err != nil { t.Fatal(err) } @@ -144,13 +146,13 @@ func TestEngine_WriteIndex_Append(t *testing.T) { // Append points to index. if err := e.WriteIndex(map[string][][]byte{ "cpu": [][]byte{ - bz1.MarshalEntry(1, []byte{0x10}), - bz1.MarshalEntry(2, []byte{0x20}), + append(u64tob(1), 0x10), + append(u64tob(2), 0x20), }, "mem": [][]byte{ - bz1.MarshalEntry(0, []byte{0x30}), + append(u64tob(0), 0x30), }, - }); err != nil { + }, nil, nil); err != nil { t.Fatal(err) } @@ -185,32 +187,32 @@ func TestEngine_WriteIndex_Insert(t *testing.T) { // Write initial points to index. if err := e.WriteIndex(map[string][][]byte{ "cpu": [][]byte{ - bz1.MarshalEntry(10, []byte{0x10}), - bz1.MarshalEntry(20, []byte{0x20}), - bz1.MarshalEntry(30, []byte{0x30}), + append(u64tob(10), 0x10), + append(u64tob(20), 0x20), + append(u64tob(30), 0x30), }, - }); err != nil { + }, nil, nil); err != nil { t.Fatal(err) } // Write overlapping points to index. if err := e.WriteIndex(map[string][][]byte{ "cpu": [][]byte{ - bz1.MarshalEntry(9, []byte{0x09}), - bz1.MarshalEntry(10, []byte{0xFF}), - bz1.MarshalEntry(25, []byte{0x25}), - bz1.MarshalEntry(31, []byte{0x31}), + append(u64tob(9), 0x09), + append(u64tob(10), 0xFF), + append(u64tob(25), 0x25), + append(u64tob(31), 0x31), }, - }); err != nil { + }, nil, nil); err != nil { t.Fatal(err) } // Write overlapping points to index again. if err := e.WriteIndex(map[string][][]byte{ "cpu": [][]byte{ - bz1.MarshalEntry(31, []byte{0xFF}), + append(u64tob(31), 0xFF), }, - }); err != nil { + }, nil, nil); err != nil { t.Fatal(err) } @@ -239,7 +241,7 @@ func TestEngine_WriteIndex_Insert(t *testing.T) { func TestEngine_WriteIndex_NoKeys(t *testing.T) { e := OpenDefaultEngine() defer e.Close() - if err := e.WriteIndex(nil); err != nil { + if err := e.WriteIndex(nil, nil, nil); err != nil { t.Fatal(err) } } @@ -248,7 +250,7 @@ func TestEngine_WriteIndex_NoKeys(t *testing.T) { func TestEngine_WriteIndex_NoPoints(t *testing.T) { e := OpenDefaultEngine() defer e.Close() - if err := e.WriteIndex(map[string][][]byte{"cpu": nil}); err != nil { + if err := e.WriteIndex(map[string][][]byte{"cpu": nil}, nil, nil); err != nil { t.Fatal(err) } } @@ -266,7 +268,7 @@ func TestEngine_WriteIndex_Quick(t *testing.T) { // Write points to index in multiple sets. for _, set := range sets { - if err := e.WriteIndex(map[string][][]byte(set)); err != nil { + if err := e.WriteIndex(map[string][][]byte(set), nil, nil); err != nil { t.Fatal(err) } } @@ -291,15 +293,8 @@ func TestEngine_WriteIndex_Quick(t *testing.T) { got = append(got, append(copyBytes(k), v...)) } - // Generate expected values. - // We need to remove the data length from the slice. - var exp [][]byte - for _, b := range points[key] { - exp = append(exp, append(copyBytes(b[0:8]), b[12:]...)) // remove data len - } - - if !reflect.DeepEqual(got, exp) { - t.Fatalf("points: block size=%d, key=%s:\n\ngot=%x\n\nexp=%x\n\n", e.BlockSize, key, got, exp) + if !reflect.DeepEqual(got, points[key]) { + t.Fatalf("points: block size=%d, key=%s:\n\ngot=%x\n\nexp=%x\n\n", e.BlockSize, key, got, points[key]) } } @@ -324,7 +319,7 @@ func NewEngine(opt tsdb.EngineOptions) *Engine { e := &Engine{ Engine: bz1.NewEngine(f.Name(), opt).(*bz1.Engine), } - e.Engine.PointsWriter = &e.PointsWriter + e.Engine.WAL = &e.PointsWriter return e } @@ -361,10 +356,30 @@ type EnginePointsWriter struct { WritePointsFn func(points []tsdb.Point) error } -func (w *EnginePointsWriter) WritePoints(points []tsdb.Point) error { +func (w *EnginePointsWriter) WritePoints(points []tsdb.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error { return w.WritePointsFn(points) } +func (w *EnginePointsWriter) LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error { + return nil +} + +func (w *EnginePointsWriter) DeleteSeries(keys []string) error { return nil } + +func (w *EnginePointsWriter) Open() error { return nil } + +func (w *EnginePointsWriter) Close() error { return nil } + +func (w *EnginePointsWriter) Cursor(key string) tsdb.Cursor { return &Cursor{} } + +// Cursor represents a mock that implements tsdb.Curosr. +type Cursor struct { +} + +func (c *Cursor) Seek(key []byte) ([]byte, []byte) { return nil, nil } + +func (c *Cursor) Next() ([]byte, []byte) { return nil, nil } + // Points represents a set of encoded points by key. Implements quick.Generator. type Points map[string][][]byte @@ -411,7 +426,7 @@ func MergePoints(a []Points) Points { // Dedupe points. for key, values := range m { - m[key] = bz1.DedupeEntries(values) + m[key] = tsdb.DedupeEntries(values) } return m diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine/wal/wal.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine/wal/wal.go new file mode 100644 index 000000000..da72fb0ea --- /dev/null +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine/wal/wal.go @@ -0,0 +1,1596 @@ +/* +Package WAL implements a write ahead log optimized for write throughput +that can be put in front of the database index. + +The WAL is broken into different partitions. The default number of +partitions is 5. Each partition consists of a number of segment files. +By default these files will get up to 2MB in size before a new segment +file is opened. The files are numbered and start at 1. The number +indicates the order in which the files should be read on startup to +ensure data is recovered in the same order it was written. + +Partitions are flushed and compacted individually. One of the goals with +having multiple partitions was to be able to flush only a portion of the +WAL at a time. + +The WAL does not flush everything in a partition when it comes time. It will +only flush series that are over a given threshold (32kb by default). The rest +will be written into a new segment file so they can be flushed later. This +is like a compaction in an LSM Tree. +*/ +package wal + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "hash/fnv" + "io" + "log" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + "sync" + "time" + + "github.com/golang/snappy" + "github.com/influxdb/influxdb/tsdb" +) + +const ( + // DefaultSegmentSize of 2MB is the size at which segment files will be rolled over + DefaultSegmentSize = 2 * 1024 * 1024 + + // PartitionCount is the number of partitions in the WAL + PartitionCount = 5 + + // FileExtension is the file extension we expect for wal segments + FileExtension = "wal" + + // MetaFileExtension is the file extension for the log files of new fields and measurements that get created + MetaFileExtension = "meta" + + // CompactionExtension is the file extension we expect for compaction files + CompactionExtension = "CPT" + + // MetaFlushInterval is the period after which any compressed meta data in the .meta file will get + // flushed to the index + MetaFlushInterval = 10 * time.Minute + + // defaultFlushCheckInterval is how often flushes are triggered automatically by the flush criteria + defaultFlushCheckInterval = time.Second +) + +// flushType indiciates why a flush and compaction are being run so the partition can +// do the appropriate type of compaction +type flushType int + +const ( + // noFlush indicates that no flush or compaction are necesssary at this time + noFlush flushType = iota + // memoryFlush indicates that we should look for the series using the most + // memory to flush out and compact all others + memoryFlush + // idleFlush indicates that we should flush all series in the parition, + // delete all segment files and hold off on opening a new one + idleFlush + // thresholdFlush indicates that we should flush all series over the ReadySize + // and compact all other series + thresholdFlush + // deleteFlush indicates that we're flushing because series need to be removed from the WAL + deleteFlush +) + +var ( + // ErrCompactionRunning to return if we attempt to run a compaction on a partition that is currently running one + ErrCompactionRunning = errors.New("compaction running") + + // ErrMemoryCompactionDone gets returned if we called to flushAndCompact to free up memory + // but a compaction has already been done to do so + ErrMemoryCompactionDone = errors.New("compaction already run to free up memory") + + // CompactSequence is the byte sequence within a segment file that has been compacted + // that indicates the start of a compaction marker + CompactSequence = []byte{0xFF, 0xFF} +) + +type Log struct { + path string + + flush chan int // signals a background flush on the given partition + flushCheckTimer *time.Timer // check this often to see if a background flush should happen + flushCheckInterval time.Duration + + // These coordinate closing and waiting for running goroutines. + wg sync.WaitGroup + closing chan struct{} + + // LogOutput is the writer used by the logger. + LogOutput io.Writer + logger *log.Logger + + mu sync.RWMutex + partitions map[uint8]*Partition + + // metaFile is the file that compressed metadata like series and fields are written to + metaFile *os.File + + // FlushColdInterval is the period of time after which a partition will do a + // full flush and compaction if it has been cold for writes. + FlushColdInterval time.Duration + + // SegmentSize is the file size at which a segment file will be rotated in a partition. + SegmentSize int64 + + // MaxSeriesSize controls when a partition should get flushed to index and compacted + // if any series in the partition has exceeded this size threshold + MaxSeriesSize int + + // ReadySeriesSize is the minimum size a series of points must get to before getting flushed. + ReadySeriesSize int + + // CompactionThreshold controls when a parition will be flushed. Once this + // percentage of series in a partition are ready, a flush and compaction will be triggered. + CompactionThreshold float64 + + // PartitionSizeThreshold specifies when a partition should be forced to be flushed. + PartitionSizeThreshold uint64 + + // partitionCount is the number of separate partitions to create for the WAL. + // Compactions happen per partition. So this number will affect what percentage + // of the WAL gets compacted at a time. For instance, a setting of 10 means + // we generally will be compacting about 10% of the WAL at a time. + partitionCount uint64 + + // Index is the database that series data gets flushed to once it gets compacted + // out of the WAL. + Index IndexWriter + + // EnableLogging specifies if detailed logs should be output + EnableLogging bool +} + +// IndexWriter is an interface for the indexed database the WAL flushes data to +type IndexWriter interface { + // time ascending points where each byte array is: + // int64 time + // data + WriteIndex(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error +} + +func NewLog(path string) *Log { + return &Log{ + path: path, + flush: make(chan int, 1), + + // these options should be overriden by any options in the config + LogOutput: os.Stderr, + FlushColdInterval: tsdb.DefaultFlushColdInterval, + SegmentSize: DefaultSegmentSize, + MaxSeriesSize: tsdb.DefaultMaxSeriesSize, + CompactionThreshold: tsdb.DefaultCompactionThreshold, + PartitionSizeThreshold: tsdb.DefaultPartitionSizeThreshold, + ReadySeriesSize: tsdb.DefaultReadySeriesSize, + partitionCount: PartitionCount, + flushCheckInterval: defaultFlushCheckInterval, + } +} + +// Open opens and initializes the Log. Will recover from previous unclosed shutdowns +func (l *Log) Open() error { + if err := os.MkdirAll(l.path, 0777); err != nil { + return err + } + + // open the metafile for writing + if err := l.nextMetaFile(); err != nil { + return err + } + + // open the partitions + l.partitions = make(map[uint8]*Partition) + for i := uint64(1); i <= l.partitionCount; i++ { + p, err := NewPartition(uint8(i), l.path, l.SegmentSize, l.PartitionSizeThreshold, l.ReadySeriesSize, l.FlushColdInterval, l.Index) + if err != nil { + return err + } + p.log = l + l.partitions[uint8(i)] = p + } + if err := l.openPartitionFiles(); err != nil { + return err + } + + l.logger = log.New(l.LogOutput, "[wal] ", log.LstdFlags) + + l.flushCheckTimer = time.NewTimer(l.flushCheckInterval) + + // Start background goroutines. + l.wg.Add(1) + l.closing = make(chan struct{}) + go l.autoflusher(l.closing) + + return nil +} + +// Cursor will return a cursor object to Seek and iterate with Next for the WAL cache for the given +func (l *Log) Cursor(key string) tsdb.Cursor { + l.mu.RLock() + defer l.mu.RUnlock() + + return l.partition([]byte(key)).cursor(key) +} + +func (l *Log) WritePoints(points []tsdb.Point, fields map[string]*tsdb.MeasurementFields, series []*tsdb.SeriesCreate) error { + partitionsToWrite := l.pointsToPartitions(points) + + if err := l.writeSeriesAndFields(fields, series); err != nil { + l.logger.Println("error writing series and fields: ", err.Error()) + return err + } + + // get it to disk + l.mu.RLock() + defer l.mu.RUnlock() + + for p, points := range partitionsToWrite { + if err := p.Write(points); err != nil { + return err + } + } + + return nil +} + +// Flush will force a flush on all paritions +func (l *Log) Flush() error { + l.mu.RLock() + defer l.mu.RUnlock() + + for _, p := range l.partitions { + if err := p.flushAndCompact(idleFlush); err != nil { + return err + } + } + + return nil +} + +// LoadMetadatIndex loads the new series and fields files into memory and flushes them to the BoltDB index. This function +// should be called before making a call to Open() +func (l *Log) LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error { + metaFiles, err := l.metadataFiles() + if err != nil { + return err + } + + measurementFieldsToSave := make(map[string]*tsdb.MeasurementFields) + seriesToCreate := make([]*tsdb.SeriesCreate, 0) + + // read all the metafiles off disk + for _, fn := range metaFiles { + a, err := l.readMetadataFile(fn) + if err != nil { + return err + } + + // loop through the seriesAndFields and add them to the index and the collection to be written to the index + for _, sf := range a { + for k, mf := range sf.Fields { + measurementFieldsToSave[k] = mf + + m := index.CreateMeasurementIndexIfNotExists(string(k)) + for name, _ := range mf.Fields { + m.SetFieldName(name) + } + mf.Codec = tsdb.NewFieldCodec(mf.Fields) + measurementFields[m.Name] = mf + } + + for _, sc := range sf.Series { + seriesToCreate = append(seriesToCreate, sc) + + sc.Series.InitializeShards() + index.CreateSeriesIndexIfNotExists(tsdb.MeasurementFromSeriesKey(string(sc.Series.Key)), sc.Series) + } + } + } + + if err := l.Index.WriteIndex(nil, measurementFieldsToSave, seriesToCreate); err != nil { + return err + } + + // now remove all the old metafiles + for _, fn := range metaFiles { + if err := os.Remove(fn); err != nil { + return err + } + } + + return nil +} + +// DeleteSeries will flush the metadata that is in the WAL to the index and remove +// all series specified from the cache and the segment files in each partition. This +// will block all writes while a compaction is done against all partitions. This function +// is meant to be called by bz1 BEFORE it updates its own index, since the metadata +// is flushed here first. +func (l *Log) DeleteSeries(keys []string) error { + // we want to stop any writes from happening to ensure the data gets cleared + l.mu.Lock() + defer l.mu.Unlock() + + if err := l.flushMetadata(); err != nil { + return err + } + + for _, p := range l.partitions { + p.deleteSeries(keys) + } + + return nil +} + +// readMetadataFile will read the entire contents of the meta file and return a slice of the +// seriesAndFields objects that were written in. It ignores file errors since those can't be +// recovered. +func (l *Log) readMetadataFile(fileName string) ([]*seriesAndFields, error) { + f, err := os.OpenFile(fileName, os.O_RDWR, 0666) + if err != nil { + return nil, err + } + + a := make([]*seriesAndFields, 0) + + length := make([]byte, 8) + for { + // get the length of the compressed seriesAndFields blob + _, err := f.Read(length) + if err == io.EOF { + break + } else if err != nil { + f.Close() + return nil, err + } + + dataLength := btou64(length) + if dataLength == 0 { + break + } + + // read in the compressed block and decod it + b := make([]byte, dataLength) + + _, err = f.Read(b) + if err == io.EOF { + break + } else if err != nil { + // print the error and move on since we can't recover the file + l.logger.Println("error reading lenght of metadata: ", err.Error()) + break + } + + buf, err := snappy.Decode(nil, b) + if err != nil { + // print the error and move on since we can't recover the file + l.logger.Println("error reading compressed metadata info: ", err.Error()) + break + } + + sf := &seriesAndFields{} + if err := json.Unmarshal(buf, sf); err != nil { + // print the error and move on since we can't recover the file + l.logger.Println("error unmarshaling json for new series and fields: ", err.Error()) + break + } + + a = append(a, sf) + } + + if err := f.Close(); err != nil { + return nil, err + } + + return a, nil +} + +// writeSeriesAndFields will write the compressed fields and series to the meta file. This file persists the data +// in case the server gets shutdown before the WAL has a chance to flush everything to the cache. By default this +// file is flushed on start when bz1 calls LoadMetaDataIndex +func (l *Log) writeSeriesAndFields(fields map[string]*tsdb.MeasurementFields, series []*tsdb.SeriesCreate) error { + if len(fields) == 0 && len(series) == 0 { + return nil + } + + sf := &seriesAndFields{Fields: fields, Series: series} + b, err := json.Marshal(sf) + if err != nil { + return err + } + cb := snappy.Encode(nil, b) + + l.mu.Lock() + defer l.mu.Unlock() + + if _, err := l.metaFile.Write(u64tob(uint64(len(cb)))); err != nil { + return err + } + + if _, err := l.metaFile.Write(cb); err != nil { + return err + } + + return l.metaFile.Sync() +} + +// nextMetaFile will close the current file if there is one open and open a new file to log +// metadata updates to. This function assumes that you've locked l.mu elsewhere. +func (l *Log) nextMetaFile() error { + if l.metaFile != nil { + if err := l.metaFile.Close(); err != nil { + return err + } + } + + metaFiles, err := l.metadataFiles() + if err != nil { + return err + } + + id := 0 + if len(metaFiles) > 0 { + num := strings.Split(filepath.Base(metaFiles[len(metaFiles)-1]), ".")[0] + n, err := strconv.ParseInt(num, 10, 32) + + if err != nil { + return err + } + + id = int(n) + } + + nextFileName := filepath.Join(l.path, fmt.Sprintf("%06d.%s", id, MetaFileExtension)) + l.metaFile, err = os.OpenFile(nextFileName, os.O_CREATE|os.O_RDWR, 0666) + if err != nil { + return err + } + + return nil +} + +// metadataFiles returns the files in the WAL directory with the MetaFileExtension +func (l *Log) metadataFiles() ([]string, error) { + path := filepath.Join(l.path, fmt.Sprintf("*.%s", MetaFileExtension)) + + a, err := filepath.Glob(path) + if err != nil { + return nil, err + } + + sort.Strings(a) + + return a, nil +} + +// pointsToPartitions returns a map that organizes the points into the partitions they should be mapped to +func (l *Log) pointsToPartitions(points []tsdb.Point) map[*Partition][]tsdb.Point { + m := make(map[*Partition][]tsdb.Point) + for _, p := range points { + pp := l.partition(p.Key()) + m[pp] = append(m[pp], p) + } + return m +} + +// openPartitionFiles will open all partitions and read their segment files +func (l *Log) openPartitionFiles() error { + results := make(chan error, len(l.partitions)) + for _, p := range l.partitions { + + go func(p *Partition) { + // Recover from a partial compaction. + if err := p.recoverCompactionFile(); err != nil { + results <- fmt.Errorf("recover compaction files: %s", err) + return + } + + fileNames, err := p.segmentFileNames() + if err != nil { + results <- err + return + } + for _, n := range fileNames { + entries, err := p.readFile(n) + if err != nil { + results <- err + return + } + for _, e := range entries { + p.addToCache(e.key, e.data, e.timestamp) + } + } + results <- nil + }(p) + } + + for i := 0; i < len(l.partitions); i++ { + err := <-results + if err != nil { + return err + } + } + + return nil +} + +// Close will finish any flush that is currently in process and close file handles +func (l *Log) Close() error { + // stop the autoflushing process so it doesn't try to kick another one off + if l.closing != nil { + close(l.closing) + l.closing = nil + } + + l.wg.Wait() + + l.mu.Lock() + defer l.mu.Unlock() + + // clear the cache + l.partitions = nil + + return l.close() +} + +// close all the open Log partitions and file handles +func (l *Log) close() error { + for _, p := range l.partitions { + if err := p.Close(); err != nil { + return err + } + } + + if err := l.metaFile.Close(); err != nil { + return err + } + + l.metaFile = nil + return nil +} + +// triggerAutoFlush will flush and compact any partitions that have hit the thresholds for compaction +func (l *Log) triggerAutoFlush() { + l.mu.RLock() + defer l.mu.RUnlock() + for _, p := range l.partitions { + if f := p.shouldFlush(l.MaxSeriesSize, l.CompactionThreshold); f != noFlush { + if err := p.flushAndCompact(f); err != nil { + l.logger.Printf("error flushing partition %d: %s\n", p.id, err) + } + } + } +} + +// autoflusher waits for notification of a flush and kicks it off in the background. +// This method runs in a separate goroutine. +func (l *Log) autoflusher(closing chan struct{}) { + defer l.wg.Done() + + metaFlushTicker := time.NewTicker(MetaFlushInterval) + + for { + // Wait for close or flush signal. + select { + case <-closing: + metaFlushTicker.Stop() + return + case <-l.flushCheckTimer.C: + l.triggerAutoFlush() + l.flushCheckTimer.Reset(l.flushCheckInterval) + case <-l.flush: + if err := l.Flush(); err != nil { + l.logger.Printf("flush error: %s", err) + } + case <-metaFlushTicker.C: + if err := l.flushMetadata(); err != nil { + l.logger.Printf("metadata flush error: %s", err.Error()) + } + } + } +} + +// flushMetadata will write start a new metafile for writes to go through and then flush all +// metadata from previous files to the index. After a sucessful write, the metadata files +// will be removed. While the flush to index is happening we aren't blocked for new metadata writes. +func (l *Log) flushMetadata() error { + files, err := l.metadataFiles() + if err != nil { + return err + } + + if err := l.nextMetaFile(); err != nil { + return err + } + + measurements := make(map[string]*tsdb.MeasurementFields) + series := make([]*tsdb.SeriesCreate, 0) + + // read all the measurement fields and series from the metafiles + for _, fn := range files { + a, err := l.readMetadataFile(fn) + if err != nil { + return err + } + + for _, sf := range a { + for k, mf := range sf.Fields { + measurements[k] = mf + } + + series = append(series, sf.Series...) + } + } + + // write them to the index + if err := l.Index.WriteIndex(nil, measurements, series); err != nil { + return err + } + + // remove the old files now that we've persisted them elsewhere + for _, fn := range files { + if err := os.Remove(fn); err != nil { + return err + } + } + + return nil +} + +// walPartition returns the partition number that key belongs to. +func (l *Log) partition(key []byte) *Partition { + h := fnv.New64a() + h.Write(key) + id := uint8(h.Sum64()%l.partitionCount + 1) + p := l.partitions[id] + if p == nil { + p, err := NewPartition(id, l.path, l.SegmentSize, l.PartitionSizeThreshold, l.ReadySeriesSize, l.FlushColdInterval, l.Index) + if err != nil { + panic(err) + } + p.log = l + l.partitions[id] = p + } + return p +} + +// Partition is a set of files for a partition of the WAL. We use multiple partitions so when compactions occur +// only a portion of the WAL must be flushed and compacted +type Partition struct { + id uint8 + path string + mu sync.Mutex + currentSegmentFile *os.File + currentSegmentSize int64 + currentSegmentID uint32 + lastFileID uint32 + maxSegmentSize int64 + cache map[string]*cacheEntry + + index IndexWriter + readySeriesSize int + + // memorySize is the rough size in memory of all the cached series data + memorySize uint64 + + // sizeThreshold is the memory size after which writes start getting throttled + sizeThreshold uint64 + + // backoffCount is the number of times write has been called while memory is + // over the threshold. It's used to gradually increase write times to put + // backpressure on clients. + backoffCount int + + // flushCache is a temporary placeholder to keep data while its being flushed + // and compacted. It's for cursors to combine the cache and this if a flush is occuring + flushCache map[string][][]byte + compactionRunning bool + + // flushColdInterval and lastWriteTime are used to determin if a partition should + // be flushed because it has been idle for writes. + flushColdInterval time.Duration + lastWriteTime time.Time + + log *Log + + // Used for mocking OS calls + os struct { + OpenCompactionFile func(name string, flag int, perm os.FileMode) (file *os.File, err error) + OpenSegmentFile func(name string, flag int, perm os.FileMode) (file *os.File, err error) + Rename func(oldpath, newpath string) error + } +} + +func NewPartition(id uint8, path string, segmentSize int64, sizeThreshold uint64, readySeriesSize int, flushColdInterval time.Duration, index IndexWriter) (*Partition, error) { + p := &Partition{ + id: id, + path: path, + maxSegmentSize: segmentSize, + sizeThreshold: sizeThreshold, + lastWriteTime: time.Now(), + cache: make(map[string]*cacheEntry), + readySeriesSize: readySeriesSize, + index: index, + flushColdInterval: flushColdInterval, + } + + p.os.OpenCompactionFile = os.OpenFile + p.os.OpenSegmentFile = os.OpenFile + p.os.Rename = os.Rename + + return p, nil +} + +// Close resets the caches and closes the currently open segment file +func (p *Partition) Close() error { + p.mu.Lock() + defer p.mu.Unlock() + + p.cache = nil + if err := p.currentSegmentFile.Close(); err != nil { + return err + } + + return nil +} + +// Write will write a compressed block of the points to the current segment file. If the segment +// file is larger than the max size, it will roll over to a new file before performing the write. +// This method will also add the points to the in memory cache +func (p *Partition) Write(points []tsdb.Point) error { + block := make([]byte, 0) + for _, pp := range points { + block = append(block, marshalWALEntry(pp.Key(), pp.UnixNano(), pp.Data())...) + } + b := snappy.Encode(nil, block) + + if backoff, ok := func() (time.Duration, bool) { + p.mu.Lock() + defer p.mu.Unlock() + // pause writes for a bit if we've hit the size threshold + if p.memorySize > p.sizeThreshold { + p.backoffCount += 1 + return time.Millisecond * 20, true + } + + return 0, false + }(); ok { + go p.flushAndCompact(memoryFlush) + time.Sleep(backoff) + } + p.mu.Lock() + defer p.mu.Unlock() + + // rotate to a new file if we've gone over our limit + if p.currentSegmentFile == nil || p.currentSegmentSize > p.maxSegmentSize { + err := p.newSegmentFile() + if err != nil { + return err + } + } + + if n, err := p.currentSegmentFile.Write(u64tob(uint64(len(b)))); err != nil { + return err + } else if n != 8 { + return fmt.Errorf("expected to write %d bytes but wrote %d", 8, n) + } + + if n, err := p.currentSegmentFile.Write(b); err != nil { + return err + } else if n != len(b) { + return fmt.Errorf("expected to write %d bytes but wrote %d", len(b), n) + } + + if err := p.currentSegmentFile.Sync(); err != nil { + return err + } + + p.currentSegmentSize += int64(8 + len(b)) + p.lastWriteTime = time.Now() + + for _, pp := range points { + p.addToCache(pp.Key(), pp.Data(), pp.UnixNano()) + } + return nil +} + +// newSegmentFile will close the current segment file and open a new one, updating bookkeeping info on the partition +func (p *Partition) newSegmentFile() error { + p.currentSegmentID += 1 + if p.currentSegmentFile != nil { + if err := p.currentSegmentFile.Close(); err != nil { + return err + } + } + + fileName := p.fileNameForSegment(p.currentSegmentID) + ff, err := os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0666) + if err != nil { + return err + } + p.currentSegmentSize = 0 + p.currentSegmentFile = ff + + return nil +} + +// fileNameForSegment will return the full path and filename for a given segment ID +func (p *Partition) fileNameForSegment(id uint32) string { + return filepath.Join(p.path, fmt.Sprintf("%02d.%06d.%s", p.id, id, FileExtension)) +} + +// compactionFileName is the name of the temporary file used for compaction +func (p *Partition) compactionFileName() string { + return filepath.Join(p.path, fmt.Sprintf("%02d.%06d.%s", p.id, 1, CompactionExtension)) +} + +// fileIDFromName will return the segment ID from the file name +func (p *Partition) fileIDFromName(name string) (uint32, error) { + parts := strings.Split(filepath.Base(name), ".") + if len(parts) != 3 { + return 0, fmt.Errorf("file name doesn't follow wal format: %s", name) + } + id, err := strconv.ParseUint(parts[1], 10, 32) + if err != nil { + return 0, err + } + return uint32(id), nil +} + +// shouldFlush returns a flushType that indicates if a partition should be flushed and why. The criteria are: +// maxSeriesSize - flush if any series in the partition has exceeded this size threshold +// readySeriesSize - a series is ready to flush once it has this much data in it +// compactionThreshold - a partition is ready to flush if this percentage of series has hit the readySeriesSize or greater +func (p *Partition) shouldFlush(maxSeriesSize int, compactionThreshold float64) flushType { + p.mu.Lock() + defer p.mu.Unlock() + + if len(p.cache) == 0 { + return noFlush + } + + if p.memorySize > p.sizeThreshold { + return memoryFlush + } + + if time.Since(p.lastWriteTime) > p.flushColdInterval { + return idleFlush + } + + countReady := 0 + for _, c := range p.cache { + // if we have a series with the max possible size, shortcut out because we need to flush + if c.size > maxSeriesSize { + return thresholdFlush + } else if c.size > p.readySeriesSize { + countReady += 1 + } + } + + if float64(countReady)/float64(len(p.cache)) > compactionThreshold { + return thresholdFlush + } + + return noFlush +} + +// prepareSeriesToFlush will empty the cache of series that are ready based on their size +// and return information for the compaction process to use. +func (p *Partition) prepareSeriesToFlush(readySeriesSize int, flush flushType) (*compactionInfo, error) { + p.mu.Lock() + defer p.mu.Unlock() + + // if there is either a compaction running or one just ran and relieved + // memory pressure, just return from here + if p.compactionRunning { + return nil, ErrCompactionRunning + } else if flush == memoryFlush && p.memorySize < p.sizeThreshold { + return nil, ErrMemoryCompactionDone + } + p.compactionRunning = true + + // we've been ordered to flush and compact. iterate until we have at least + // some series to flush by cutting the ready size in half each iteration + // if we didn't come up with any + var seriesToFlush map[string][][]byte + var size int + + // if this flush is being triggered because the partition is idle, all series hit the threshold + if flush == idleFlush { + for _, c := range p.cache { + size += c.size + } + seriesToFlush = make(map[string][][]byte) + for k, c := range p.cache { + seriesToFlush[k] = c.points + } + p.cache = make(map[string]*cacheEntry) + } else { + // only grab the series that hit the thresold. loop until we have series to flush + for { + s, n := p.seriesToFlush(readySeriesSize) + if len(s) > 0 { + seriesToFlush = s + size += n + break + } + // we didn't get any series to flush so cut the ready size in half + // and see if there are series that are ready at that level + readySeriesSize = readySeriesSize / 2 + } + } + + c := &compactionInfo{seriesToFlush: seriesToFlush, flushSize: size} + + if flush == idleFlush { + // don't create a new segment file because this partition is idle + if p.currentSegmentFile != nil { + if err := p.currentSegmentFile.Close(); err != nil { + return nil, err + } + } + p.currentSegmentFile = nil + p.currentSegmentID += 1 + p.currentSegmentSize = 0 + } else { + // roll over a new segment file so we can compact all the old ones + if err := p.newSegmentFile(); err != nil { + return nil, err + } + } + + p.flushCache = c.seriesToFlush + c.compactFilesLessThan = p.currentSegmentID + + return c, nil +} + +// seriesToFlush will clear the cache of series over the give threshold and return +// them in a new map along with their combined size +func (p *Partition) seriesToFlush(readySeriesSize int) (map[string][][]byte, int) { + seriesToFlush := make(map[string][][]byte) + size := 0 + for k, c := range p.cache { + // if the series is over the threshold, save it in the map to flush later + if c.size >= readySeriesSize { + size += c.size + seriesToFlush[k] = c.points + + // always hand the index data that is sorted + if c.isDirtySort { + sort.Sort(tsdb.ByteSlices(seriesToFlush[k])) + } + + delete(p.cache, k) + } + } + + return seriesToFlush, size +} + +// flushAndCompact will flush any series that are over their threshold and then read in all old segment files and +// write the data that was not flushed to a new file +func (p *Partition) flushAndCompact(flush flushType) error { + c, err := p.prepareSeriesToFlush(p.readySeriesSize, flush) + + if err == ErrCompactionRunning || err == ErrMemoryCompactionDone { + return nil + } else if err != nil { + return err + } else if len(c.seriesToFlush) == 0 { // nothing to flush! + return nil + } + + startTime := time.Now() + if p.log.EnableLogging { + p.log.logger.Printf("compacting %d series from partition %d\n", len(c.seriesToFlush), p.id) + } + + // write the data to the index first + if err := p.index.WriteIndex(c.seriesToFlush, nil, nil); err != nil { + // if we can't write the index, we should just bring down the server hard + panic(fmt.Sprintf("error writing the wal to the index: %s", err.Error())) + } + + // clear the flush cache and reset the memory thresholds + p.mu.Lock() + p.flushCache = nil + p.memorySize -= uint64(c.flushSize) + p.backoffCount = 0 + p.mu.Unlock() + + // ensure that we mark that compaction is no longer running + defer func() { + p.mu.Lock() + p.compactionRunning = false + p.mu.Unlock() + }() + + err = p.compactFiles(c, flush) + if p.log.EnableLogging { + p.log.logger.Printf("compaction of partition %d took %s\n", p.id, time.Since(startTime)) + } + + return err +} + +func (p *Partition) compactFiles(c *compactionInfo, flush flushType) error { + // now compact all the old data + fileNames, err := p.segmentFileNames() + if err != nil { + return err + } + + // all compacted data from the segments will go into this file + compactionFile, err := p.os.OpenCompactionFile(p.compactionFileName(), os.O_CREATE|os.O_RDWR, 0666) + if err != nil { + return err + } + + for _, n := range fileNames { + id, err := p.idFromFileName(n) + if err != nil { + return err + } + + // only compact files that are older than the segment that became active when we started the flush + if id >= c.compactFilesLessThan { + break + } + + f, err := p.os.OpenSegmentFile(n, os.O_RDONLY, 0666) + if err != nil { + return err + } + + sf := newSegment(f) + var entries []*entry + for { + name, a, err := sf.readCompressedBlock() + if name != "" { + continue // skip name blocks + } else if err != nil { + return err + } else if a == nil { + break + } + + // only compact the entries from series that haven't been flushed + for _, e := range a { + if _, ok := c.seriesToFlush[string(e.key)]; !ok { + entries = append(entries, e) + } + } + } + + if err := p.writeCompactionEntry(compactionFile, f.Name(), entries); err != nil { + return err + } + + // now close and delete the file + if err := f.Close(); err != nil { + return err + } + + if err := os.Remove(n); err != nil { + return err + } + } + + // close the compaction file and rename it so that it will appear as the very first segment + if err := compactionFile.Close(); err != nil { + return err + } + + // if it's an idle flush remove the compaction file + if flush == idleFlush { + return os.Remove(compactionFile.Name()) + } + + return p.os.Rename(compactionFile.Name(), p.fileNameForSegment(1)) +} + +// writeCompactionEntry will write a marker for the beginning of the file we're compacting, a compressed block +// for all entries, then a marker for the end of the file +func (p *Partition) writeCompactionEntry(f *os.File, filename string, entries []*entry) error { + if err := p.writeCompactionFileName(f, filename); err != nil { + return err + } + + block := make([]byte, 0) + for _, e := range entries { + block = append(block, marshalWALEntry(e.key, e.timestamp, e.data)...) + } + + b := snappy.Encode(nil, block) + if _, err := f.Write(u64tob(uint64(len(b)))); err != nil { + return err + } + + if _, err := f.Write(b); err != nil { + return err + } + + return f.Sync() +} + +// writeCompactionFileName will write a compaction log length entry and the name of the file that is compacted +func (p *Partition) writeCompactionFileName(f *os.File, filename string) error { + length := u64tob(uint64(len([]byte(filename)))) + + // the beginning of the length has two bytes to indicate that this is a compaction log entry + length[0] = 0xFF + length[1] = 0xFF + + if _, err := f.Write(length); err != nil { + return err + } + + if _, err := f.Write([]byte(filename)); err != nil { + return err + } + + return nil +} + +// recoverCompactionFile iterates over all compaction files in a directory and +// cleans them and removes undeleted files. +func (p *Partition) recoverCompactionFile() error { + path := p.compactionFileName() + + // Open compaction file. Ignore if it doesn't exist. + f, err := p.os.OpenCompactionFile(path, os.O_RDWR, 0666) + if os.IsNotExist(err) { + return nil + } else if err != nil { + return err + } + defer f.Close() + + // Iterate through all named blocks. + sf := newSegment(f) + var hasData bool + for { + // Only read named blocks. + name, a, err := sf.readCompressedBlock() + if err != nil { + return fmt.Errorf("read name block: %s", err) + } else if name == "" && a == nil { + break // eof + } else if name == "" { + continue // skip unnamed blocks + } + + // Read data for the named block. + if s, entries, err := sf.readCompressedBlock(); err != nil { + return fmt.Errorf("read data block: %s", err) + } else if s != "" { + return fmt.Errorf("unexpected double name block") + } else if entries == nil { + break // eof + } + + // If data exists then ensure the underlying segment is deleted. + if err := os.Remove(name); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("remove segment: filename=%s, err=%s", name, err) + } + + // Flag the compaction file as having data and it should be renamed. + hasData = true + } + f.Close() + + // If the compaction file did not have at least one named block written to + // it then it should removed. This check is performed to ensure a partial + // compaction file does not overwrite an original segment file. + if !hasData { + if err := os.Remove(path); err != nil { + return fmt.Errorf("remove compaction file: %s", err) + } + return nil + } + + // Double check that we are not renaming the compaction file over an + // existing segment file. The segment file should be removed in the + // recovery process but this simply double checks that removal occurred. + newpath := p.fileNameForSegment(1) + if _, err := os.Stat(newpath); !os.IsNotExist(err) { + return fmt.Errorf("cannot rename compaction file, segment exists: filename=%s", newpath) + } + + // Rename compaction file to the first segment file. + if err := p.os.Rename(path, newpath); err != nil { + return fmt.Errorf("rename compaction file: %s", err) + } + + return nil +} + +// readFile will read a segment file and marshal its entries into the cache +func (p *Partition) readFile(path string) (entries []*entry, err error) { + id, err := p.fileIDFromName(path) + if err != nil { + return nil, err + } + + f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0666) + if err != nil { + return nil, err + } + + sf := newSegment(f) + for { + name, a, err := sf.readCompressedBlock() + if name != "" { + continue // skip name blocks + } else if err != nil { + f.Close() + return nil, err + } else if a == nil { + break + } + + entries = append(entries, a...) + } + + // if this is the highest segment file, it'll be the one we use, otherwise close it out now that we're done reading + if id > p.currentSegmentID { + p.currentSegmentID = id + p.currentSegmentFile = f + p.currentSegmentSize = sf.size + } else { + if err := f.Close(); err != nil { + return nil, err + } + } + return +} + +// addToCache will marshal the entry and add it to the in memory cache. It will also mark if this key will need sorting later +func (p *Partition) addToCache(key, data []byte, timestamp int64) { + // Generate in-memory cache entry of . + v := MarshalEntry(timestamp, data) + p.memorySize += uint64(len(v)) + + entry := p.cache[string(key)] + if entry == nil { + entry = &cacheEntry{ + points: [][]byte{v}, + size: len(v), + } + p.cache[string(key)] = entry + + return + } + + // Determine if we'll need to sort the values for this key later + entry.isDirtySort = bytes.Compare(entry.points[len(entry.points)-1][0:8], v[0:8]) != -1 + entry.points = append(entry.points, v) + entry.size += len(v) +} + +// cursor will combine the in memory cache and flush cache (if a flush is currently happening) to give a single ordered cursor for the key +func (p *Partition) cursor(key string) *cursor { + p.mu.Lock() + defer p.mu.Unlock() + + entry := p.cache[key] + if entry == nil { + return &cursor{} + } + + // if we're in the middle of a flush, combine the previous cache + // with this one for the cursor + if p.flushCache != nil { + if fc, ok := p.flushCache[key]; ok { + c := make([][]byte, len(fc), len(fc)+len(entry.points)) + copy(c, fc) + c = append(c, entry.points...) + + return &cursor{cache: tsdb.DedupeEntries(c)} + } + } + + if entry.isDirtySort { + entry.points = tsdb.DedupeEntries(entry.points) + entry.isDirtySort = false + } + + return &cursor{cache: entry.points} +} + +// idFromFileName parses the segment file ID from its name +func (p *Partition) idFromFileName(name string) (uint32, error) { + parts := strings.Split(filepath.Base(name), ".") + if len(parts) != 3 { + return 0, fmt.Errorf("file %s has wrong name format to be a segment file", name) + } + + id, err := strconv.ParseUint(parts[1], 10, 32) + + return uint32(id), err +} + +// segmentFileNames returns all the segment files names for the partition +func (p *Partition) segmentFileNames() ([]string, error) { + path := filepath.Join(p.path, fmt.Sprintf("%02d.*.%s", p.id, FileExtension)) + return filepath.Glob(path) +} + +// deleteSeries will perform a compaction on the partition, removing all data +// from any of the series passed in. +func (p *Partition) deleteSeries(keys []string) error { + p.mu.Lock() + defer p.mu.Unlock() + + p.compactionRunning = true + + // remove the series from the cache and prepare the compaction info + size := 0 + seriesToFlush := make(map[string][][]byte) + for _, k := range keys { + entry := p.cache[k] + if entry != nil { + seriesToFlush[k] = entry.points + size += entry.size + delete(p.cache, k) + } + } + p.memorySize -= uint64(size) + + c := &compactionInfo{seriesToFlush: seriesToFlush, flushSize: size} + + // roll over a new segment file so we can compact all the old ones + if err := p.newSegmentFile(); err != nil { + return err + } + c.compactFilesLessThan = p.currentSegmentID + + return p.compactFiles(c, deleteFlush) +} + +// compactionInfo is a data object with information about a compaction running +// and the series that will be flushed to the index +type compactionInfo struct { + seriesToFlush map[string][][]byte + compactFilesLessThan uint32 + flushSize int +} + +// segmentFile is a struct for reading in segment files from the WAL. Used on startup only while loading +type segment struct { + f *os.File + block []byte + length []byte + size int64 +} + +func newSegment(f *os.File) *segment { + return &segment{ + length: make([]byte, 8), + f: f, + } +} + +// readCompressedBlock will read the next compressed block from the file and marshal the entries. +// if we've hit the end of the file or corruption the entry array will be nil +func (s *segment) readCompressedBlock() (name string, entries []*entry, err error) { + blockSize := int64(0) + + n, err := s.f.Read(s.length) + if err == io.EOF { + return "", nil, nil + } else if err != nil { + return "", nil, fmt.Errorf("read length: %s", err) + } else if n != len(s.length) { + // seek back before this length so we can start overwriting the file from here + log.Println("unable to read the size of a data block from file: ", s.f.Name()) + s.f.Seek(-int64(n), 1) + return "", nil, nil + } + blockSize += int64(n) + + // Compacted WAL files will have a magic byte sequence that indicate the next part is a file name + // instead of a compressed block. We can ignore these bytes and the ensuing file name to get to the next block. + isCompactionFileNameBlock := bytes.Equal(s.length[0:2], CompactSequence) + if isCompactionFileNameBlock { + s.length[0], s.length[1] = 0x00, 0x00 + } + + dataLength := btou64(s.length) + + // make sure we haven't hit the end of data. trailing end of file can be zero bytes + if dataLength == 0 { + s.f.Seek(-int64(len(s.length)), 1) + return "", nil, nil + } + + if len(s.block) < int(dataLength) { + s.block = make([]byte, dataLength) + } + + n, err = s.f.Read(s.block[:dataLength]) + if err != nil { + return "", nil, fmt.Errorf("read block: %s", err) + } + blockSize += int64(n) + + // read the compressed block and decompress it. if partial or corrupt, + // overwrite with zeroes so we can start over on this wal file + if n != int(dataLength) { + log.Println("partial compressed block in file: ", s.f.Name()) + + // seek back to before this block and its size so we can overwrite the corrupt data + s.f.Seek(-int64(len(s.length)+n), 1) + if err := s.f.Truncate(s.size); err != nil { + return "", nil, fmt.Errorf("truncate(0): sz=%d, err=%s", s.size, err) + } + + return "", nil, nil + } + + // skip the rest if this is just the filename from a compaction + if isCompactionFileNameBlock { + return string(s.block[:dataLength]), nil, nil + } + + // if there was an error decoding, this is a corrupt block so we zero out the rest of the file + buf, err := snappy.Decode(nil, s.block[:dataLength]) + if err != nil { + log.Println("corrupt compressed block in file: ", err.Error(), s.f.Name()) + + // go back to the start of this block and zero out the rest of the file + s.f.Seek(-int64(len(s.length)+n), 1) + if err := s.f.Truncate(s.size); err != nil { + return "", nil, fmt.Errorf("truncate(1): sz=%d, err=%s", s.size, err) + } + + return "", nil, nil + } + + // read in the individual data points from the decompressed wal block + bytesRead := 0 + entries = make([]*entry, 0) + for { + if bytesRead >= len(buf) { + break + } + n, key, timestamp, data := unmarshalWALEntry(buf[bytesRead:]) + bytesRead += n + entries = append(entries, &entry{key: key, data: data, timestamp: timestamp}) + } + + s.size = blockSize + + return +} + +// entry is used as a temporary object when reading data from segment files +type entry struct { + key []byte + data []byte + timestamp int64 +} + +// cursor is a forward cursor for a given entry in the cache +type cursor struct { + cache [][]byte + position int +} + +// Seek will point the cursor to the given time (or key) +func (c *cursor) Seek(seek []byte) (key, value []byte) { + // Seek cache index. + c.position = sort.Search(len(c.cache), func(i int) bool { + return bytes.Compare(c.cache[i][0:8], seek) != -1 + }) + + return c.Next() +} + +// Next moves the cursor to the next key/value. will return nil if at the end +func (c *cursor) Next() (key, value []byte) { + if c.position >= len(c.cache) { + return nil, nil + } + + v := c.cache[c.position] + c.position++ + + return v[0:8], v[8:] + +} + +// seriesAndFields is a data struct to serialize new series and fields +// to get created into WAL segment files +type seriesAndFields struct { + Fields map[string]*tsdb.MeasurementFields `json:"fields,omitempty"` + Series []*tsdb.SeriesCreate `json:"series,omitempty"` +} + +// cacheEntry holds the cached data for a series +type cacheEntry struct { + points [][]byte + isDirtySort bool + size int +} + +// marshalWALEntry encodes point data into a single byte slice. +// +// The format of the byte slice is: +// +// uint64 timestamp +// uint32 key length +// uint32 data length +// []byte key +// []byte data +// +func marshalWALEntry(key []byte, timestamp int64, data []byte) []byte { + v := make([]byte, 8+4+4, 8+4+4+len(key)+len(data)) + binary.BigEndian.PutUint64(v[0:8], uint64(timestamp)) + binary.BigEndian.PutUint32(v[8:12], uint32(len(key))) + binary.BigEndian.PutUint32(v[12:16], uint32(len(data))) + + v = append(v, key...) + v = append(v, data...) + + return v +} + +// unmarshalWALEntry decodes a WAL entry into it's separate parts. +// Returned byte slices point to the original slice. +func unmarshalWALEntry(v []byte) (bytesRead int, key []byte, timestamp int64, data []byte) { + timestamp = int64(binary.BigEndian.Uint64(v[0:8])) + keyLen := binary.BigEndian.Uint32(v[8:12]) + dataLen := binary.BigEndian.Uint32(v[12:16]) + + key = v[16 : 16+keyLen] + data = v[16+keyLen : 16+keyLen+dataLen] + bytesRead = 16 + int(keyLen) + int(dataLen) + return +} + +// marshalCacheEntry encodes the timestamp and data to a single byte slice. +// +// The format of the byte slice is: +// +// uint64 timestamp +// []byte data +// +func MarshalEntry(timestamp int64, data []byte) []byte { + buf := make([]byte, 8+len(data)) + binary.BigEndian.PutUint64(buf[0:8], uint64(timestamp)) + copy(buf[8:], data) + return buf +} + +// unmarshalCacheEntry returns the timestamp and data from an encoded byte slice. +func UnmarshalEntry(buf []byte) (timestamp int64, data []byte) { + timestamp = int64(binary.BigEndian.Uint64(buf[0:8])) + data = buf[8:] + return +} + +// u64tob converts a uint64 into an 8-byte slice. +func u64tob(v uint64) []byte { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, v) + return b +} + +func btou64(b []byte) uint64 { + return binary.BigEndian.Uint64(b) +} diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine/wal/wal_test.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine/wal/wal_test.go new file mode 100644 index 000000000..0dea03f08 --- /dev/null +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/engine/wal/wal_test.go @@ -0,0 +1,906 @@ +package wal + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + "io/ioutil" + "math/rand" + "os" + "path/filepath" + "reflect" + "testing" + "time" + + // "runtime" + // "sync" + + "github.com/influxdb/influxdb/influxql" + "github.com/influxdb/influxdb/tsdb" +) + +func TestWAL_WritePoints(t *testing.T) { + log := openTestWAL() + defer log.Close() + defer os.RemoveAll(log.path) + + if err := log.Open(); err != nil { + t.Fatalf("couldn't open wal: %s", err.Error()) + } + + codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{ + "value": { + ID: uint8(1), + Name: "value", + Type: influxql.Float, + }, + }) + + // test that we can write to two different series + p1 := parsePoint("cpu,host=A value=23.2 1", codec) + p2 := parsePoint("cpu,host=A value=25.3 4", codec) + p3 := parsePoint("cpu,host=B value=1.0 1", codec) + if err := log.WritePoints([]tsdb.Point{p1, p2, p3}, nil, nil); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + + verify := func() { + c := log.Cursor("cpu,host=A") + k, v := c.Seek(inttob(1)) + + // ensure the series are there and points are in order + if bytes.Compare(v, p1.Data()) != 0 { + t.Fatalf("expected to seek to first point but got key and value: %v %v", k, v) + } + + k, v = c.Next() + if bytes.Compare(v, p2.Data()) != 0 { + t.Fatalf("expected to seek to first point but got key and value: %v %v", k, v) + } + + k, v = c.Next() + if k != nil { + t.Fatalf("expected nil on last seek: %v %v", k, v) + } + + c = log.Cursor("cpu,host=B") + k, v = c.Next() + if bytes.Compare(v, p3.Data()) != 0 { + t.Fatalf("expected to seek to first point but got key and value: %v %v", k, v) + } + } + + verify() + + // ensure that we can close and re-open the log with points still there + log.Close() + log.Open() + + verify() + + // ensure we can write new points into the series + p4 := parsePoint("cpu,host=A value=1.0 7", codec) + // ensure we can write an all new series + p5 := parsePoint("cpu,host=C value=1.4 2", codec) + // ensure we can write a point out of order and get it back + p6 := parsePoint("cpu,host=A value=1.3 2", codec) + // // ensure we can write to a new partition + // p7 := parsePoint("cpu,region=west value=2.2", codec) + if err := log.WritePoints([]tsdb.Point{p4, p5, p6}, nil, nil); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + + verify2 := func() { + c := log.Cursor("cpu,host=A") + k, v := c.Next() + if bytes.Compare(v, p1.Data()) != 0 { + t.Fatalf("order wrong, expected p1, %v %v %v", v, k, p1.Data()) + } + _, v = c.Next() + if bytes.Compare(v, p6.Data()) != 0 { + t.Fatal("order wrong, expected p6") + } + _, v = c.Next() + if bytes.Compare(v, p2.Data()) != 0 { + t.Fatal("order wrong, expected p6") + } + _, v = c.Next() + if bytes.Compare(v, p4.Data()) != 0 { + t.Fatal("order wrong, expected p6") + } + + c = log.Cursor("cpu,host=C") + _, v = c.Next() + if bytes.Compare(v, p5.Data()) != 0 { + t.Fatal("order wrong, expected p6") + } + } + + verify2() + + log.Close() + log.Open() + + verify2() +} + +func TestWAL_CorruptDataLengthSize(t *testing.T) { + log := openTestWAL() + defer log.Close() + defer os.RemoveAll(log.path) + + if err := log.Open(); err != nil { + t.Fatalf("couldn't open wal: %s", err.Error()) + } + + codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{ + "value": { + ID: uint8(1), + Name: "value", + Type: influxql.Float, + }, + }) + + // test that we can write to two different series + p1 := parsePoint("cpu,host=A value=23.2 1", codec) + p2 := parsePoint("cpu,host=A value=25.3 4", codec) + if err := log.WritePoints([]tsdb.Point{p1, p2}, nil, nil); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + + verify := func() { + c := log.Cursor("cpu,host=A") + _, v := c.Next() + if bytes.Compare(v, p1.Data()) != 0 { + t.Fatal("p1 value wrong") + } + _, v = c.Next() + if bytes.Compare(v, p2.Data()) != 0 { + t.Fatal("p2 value wrong") + } + _, v = c.Next() + if v != nil { + t.Fatal("expected cursor to return nil") + } + } + + verify() + + // now write junk data and ensure that we can close, re-open and read + f := log.partitions[1].currentSegmentFile + f.Write([]byte{0x23, 0x12}) + f.Sync() + log.Close() + log.Open() + + verify() + + // now write new data and ensure it's all good + p3 := parsePoint("cpu,host=A value=29.2 6", codec) + if err := log.WritePoints([]tsdb.Point{p3}, nil, nil); err != nil { + t.Fatalf("failed to write point: %s", err.Error()) + } + + verify = func() { + c := log.Cursor("cpu,host=A") + _, v := c.Next() + if bytes.Compare(v, p1.Data()) != 0 { + t.Fatal("p1 value wrong") + } + _, v = c.Next() + if bytes.Compare(v, p2.Data()) != 0 { + t.Fatal("p2 value wrong") + } + _, v = c.Next() + if bytes.Compare(v, p3.Data()) != 0 { + t.Fatal("p3 value wrong") + } + } + + verify() + log.Close() + log.Open() + verify() +} + +func TestWAL_CorruptDataBlock(t *testing.T) { + log := openTestWAL() + defer log.Close() + defer os.RemoveAll(log.path) + + if err := log.Open(); err != nil { + t.Fatalf("couldn't open wal: %s", err.Error()) + } + + codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{ + "value": { + ID: uint8(1), + Name: "value", + Type: influxql.Float, + }, + }) + + // test that we can write to two different series + p1 := parsePoint("cpu,host=A value=23.2 1", codec) + p2 := parsePoint("cpu,host=A value=25.3 4", codec) + if err := log.WritePoints([]tsdb.Point{p1, p2}, nil, nil); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + + verify := func() { + c := log.Cursor("cpu,host=A") + _, v := c.Next() + if bytes.Compare(v, p1.Data()) != 0 { + t.Fatal("p1 value wrong") + } + _, v = c.Next() + if bytes.Compare(v, p2.Data()) != 0 { + t.Fatal("p2 value wrong") + } + _, v = c.Next() + if v != nil { + t.Fatal("expected cursor to return nil") + } + } + + verify() + + // now write junk data and ensure that we can close, re-open and read + + f := log.partitions[1].currentSegmentFile + f.Write(u64tob(23)) + // now write a bunch of garbage + for i := 0; i < 1000; i++ { + f.Write([]byte{0x23, 0x78, 0x11, 0x33}) + } + f.Sync() + + log.Close() + log.Open() + + verify() + + // now write new data and ensure it's all good + p3 := parsePoint("cpu,host=A value=29.2 6", codec) + if err := log.WritePoints([]tsdb.Point{p3}, nil, nil); err != nil { + t.Fatalf("failed to write point: %s", err.Error()) + } + + verify = func() { + c := log.Cursor("cpu,host=A") + _, v := c.Next() + if bytes.Compare(v, p1.Data()) != 0 { + t.Fatal("p1 value wrong") + } + _, v = c.Next() + if bytes.Compare(v, p2.Data()) != 0 { + t.Fatal("p2 value wrong") + } + _, v = c.Next() + if bytes.Compare(v, p3.Data()) != 0 { + t.Fatal("p3 value wrong", p3.Data(), v) + } + } + + verify() + log.Close() + log.Open() + verify() +} + +// Ensure the wal flushes and compacts after a partition has enough series in +// it with enough data to flush +func TestWAL_CompactAfterPercentageThreshold(t *testing.T) { + log := openTestWAL() + log.partitionCount = 2 + log.CompactionThreshold = 0.7 + log.ReadySeriesSize = 1024 + + // set this high so that a flush doesn't automatically kick in and mess up our test + log.flushCheckInterval = time.Minute + + defer log.Close() + defer os.RemoveAll(log.path) + + points := make([]map[string][][]byte, 0) + log.Index = &testIndexWriter{fn: func(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error { + points = append(points, pointsByKey) + return nil + }} + + if err := log.Open(); err != nil { + t.Fatalf("couldn't open wal: %s", err.Error()) + } + + codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{ + "value": { + ID: uint8(1), + Name: "value", + Type: influxql.Float, + }, + }) + + numSeries := 100 + b := make([]byte, 70*5000) + for i := 1; i <= 100; i++ { + buf := bytes.NewBuffer(b) + for j := 1; j <= numSeries; j++ { + buf.WriteString(fmt.Sprintf("cpu,host=A,region=uswest%d value=%.3f %d\n", j, rand.Float64(), i)) + } + + // ensure that before we go over the threshold it isn't marked for flushing + if i < 50 { + // interleave data for some series that won't be ready to flush + buf.WriteString(fmt.Sprintf("cpu,host=A,region=useast1 value=%.3f %d\n", rand.Float64(), i)) + buf.WriteString(fmt.Sprintf("cpu,host=A,region=useast3 value=%.3f %d\n", rand.Float64(), i)) + + // ensure that as a whole its not ready for flushing yet + if log.partitions[1].shouldFlush(tsdb.DefaultMaxSeriesSize, tsdb.DefaultCompactionThreshold) != noFlush { + t.Fatal("expected partition 1 to return false from shouldFlush") + } + } + + // write the batch out + if err := log.WritePoints(parsePoints(buf.String(), codec), nil, nil); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + buf = bytes.NewBuffer(b) + } + + // ensure we have some data + c := log.Cursor("cpu,host=A,region=uswest23") + k, v := c.Next() + if btou64(k) != 1 { + t.Fatalf("expected timestamp of 1, but got %v %v", k, v) + } + + // ensure it is marked as should flush because of the threshold + if log.partitions[1].shouldFlush(tsdb.DefaultMaxSeriesSize, tsdb.DefaultCompactionThreshold) != thresholdFlush { + t.Fatal("expected partition 1 to return true from shouldFlush") + } + + if err := log.partitions[1].flushAndCompact(thresholdFlush); err != nil { + t.Fatalf("error flushing and compacting: %s", err.Error()) + } + + // should be nil + c = log.Cursor("cpu,host=A,region=uswest23") + k, v = c.Next() + if k != nil || v != nil { + t.Fatal("expected cache to be nil after flush: ", k, v) + } + + c = log.Cursor("cpu,host=A,region=useast1") + k, v = c.Next() + if btou64(k) != 1 { + t.Fatal("expected cache to be there after flush and compact: ", k, v) + } + + if len(points) == 0 { + t.Fatal("expected points to be flushed to index") + } + + // now close and re-open the wal and ensure the compacted data is gone and other data is still there + log.Close() + log.Open() + + c = log.Cursor("cpu,host=A,region=uswest23") + k, v = c.Next() + if k != nil || v != nil { + t.Fatal("expected cache to be nil after flush and re-open: ", k, v) + } + + c = log.Cursor("cpu,host=A,region=useast1") + k, v = c.Next() + if btou64(k) != 1 { + t.Fatal("expected cache to be there after flush and compact: ", k, v) + } +} + +// Ensure the wal forces a full flush after not having a write in a given interval of time +func TestWAL_CompactAfterTimeWithoutWrite(t *testing.T) { + log := openTestWAL() + log.partitionCount = 1 + + // set this low + log.flushCheckInterval = 10 * time.Millisecond + log.FlushColdInterval = 500 * time.Millisecond + + defer log.Close() + defer os.RemoveAll(log.path) + + points := make([]map[string][][]byte, 0) + log.Index = &testIndexWriter{fn: func(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error { + points = append(points, pointsByKey) + return nil + }} + + if err := log.Open(); err != nil { + t.Fatalf("couldn't open wal: %s", err.Error()) + } + + codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{ + "value": { + ID: uint8(1), + Name: "value", + Type: influxql.Float, + }, + }) + + numSeries := 100 + b := make([]byte, 70*5000) + for i := 1; i <= 10; i++ { + buf := bytes.NewBuffer(b) + for j := 1; j <= numSeries; j++ { + buf.WriteString(fmt.Sprintf("cpu,host=A,region=uswest%d value=%.3f %d\n", j, rand.Float64(), i)) + } + + // write the batch out + if err := log.WritePoints(parsePoints(buf.String(), codec), nil, nil); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + buf = bytes.NewBuffer(b) + } + + // ensure we have some data + c := log.Cursor("cpu,host=A,region=uswest10") + k, _ := c.Next() + if btou64(k) != 1 { + t.Fatalf("expected first data point but got one with key: %v", k) + } + + time.Sleep(700 * time.Millisecond) + + // ensure that as a whole its not ready for flushing yet + if f := log.partitions[1].shouldFlush(tsdb.DefaultMaxSeriesSize, tsdb.DefaultCompactionThreshold); f != noFlush { + t.Fatalf("expected partition 1 to return noFlush from shouldFlush %v", f) + } + + // ensure that the partition is empty + if log.partitions[1].memorySize != 0 || len(log.partitions[1].cache) != 0 { + t.Fatal("expected partition to be empty") + } + // ensure that we didn't bother to open a new segment file + if log.partitions[1].currentSegmentFile != nil { + t.Fatal("expected partition to not have an open segment file") + } +} + +func TestWAL_SeriesAndFieldsGetPersisted(t *testing.T) { + log := openTestWAL() + defer log.Close() + defer os.RemoveAll(log.path) + + if err := log.Open(); err != nil { + t.Fatalf("couldn't open wal: %s", err.Error()) + } + + codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{ + "value": { + ID: uint8(1), + Name: "value", + Type: influxql.Float, + }, + }) + + var measurementsToIndex map[string]*tsdb.MeasurementFields + var seriesToIndex []*tsdb.SeriesCreate + log.Index = &testIndexWriter{fn: func(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error { + measurementsToIndex = measurementFieldsToSave + seriesToIndex = append(seriesToIndex, seriesToCreate...) + return nil + }} + + // test that we can write to two different series + p1 := parsePoint("cpu,host=A value=23.2 1", codec) + p2 := parsePoint("cpu,host=A value=25.3 4", codec) + p3 := parsePoint("cpu,host=B value=1.0 1", codec) + + seriesToCreate := []*tsdb.SeriesCreate{ + {Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "A"})), map[string]string{"host": "A"})}, + {Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "B"})), map[string]string{"host": "B"})}, + } + + measaurementsToCreate := map[string]*tsdb.MeasurementFields{ + "cpu": { + Fields: map[string]*tsdb.Field{ + "value": {ID: 1, Name: "value"}, + }, + }, + } + + if err := log.WritePoints([]tsdb.Point{p1, p2, p3}, measaurementsToCreate, seriesToCreate); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + + // now close it and see if loading the metadata index will populate the measurement and series info + log.Close() + + idx := tsdb.NewDatabaseIndex() + mf := make(map[string]*tsdb.MeasurementFields) + + if err := log.LoadMetadataIndex(idx, mf); err != nil { + t.Fatalf("error loading metadata index: %s", err.Error()) + } + + s := idx.Series("cpu,host=A") + if s == nil { + t.Fatal("expected to find series cpu,host=A in index") + } + + s = idx.Series("cpu,host=B") + if s == nil { + t.Fatal("expected to find series cpu,host=B in index") + } + + m := mf["cpu"] + if m == nil { + t.Fatal("expected to find measurement fields for cpu", mf) + } + if m.Fields["value"] == nil { + t.Fatal("expected to find field definition for 'value'") + } + + // ensure that they were actually flushed to the index. do it this way because the annoying deepequal doessn't really work for these + for i, s := range seriesToCreate { + if seriesToIndex[i].Measurement != s.Measurement { + t.Fatal("expected measurement to be the same") + } + if seriesToIndex[i].Series.Key != s.Series.Key { + t.Fatal("expected series key to be the same") + } + if !reflect.DeepEqual(seriesToIndex[i].Series.Tags, s.Series.Tags) { + t.Fatal("expected series tags to be the same") + } + } + + // ensure that the measurement fields were flushed to the index + for k, v := range measaurementsToCreate { + m := measurementsToIndex[k] + if m == nil { + t.Fatalf("measurement %s wasn't indexed", k) + } + + if !reflect.DeepEqual(m.Fields, v.Fields) { + t.Fatal("measurement fields not equal") + } + } + + // now open and close the log and try to reload the metadata index, which should now be empty + if err := log.Open(); err != nil { + t.Fatalf("error opening log: %s", err.Error()) + } + if err := log.Close(); err != nil { + t.Fatalf("error closing log: %s", err.Error()) + } + + idx = tsdb.NewDatabaseIndex() + mf = make(map[string]*tsdb.MeasurementFields) + + if err := log.LoadMetadataIndex(idx, mf); err != nil { + t.Fatalf("error loading metadata index: %s", err.Error()) + } + + if len(idx.Measurements()) != 0 || len(mf) != 0 { + t.Fatal("expected index and measurement fields to be empty") + } +} + +func TestWAL_DeleteSeries(t *testing.T) { + log := openTestWAL() + defer log.Close() + defer os.RemoveAll(log.path) + + if err := log.Open(); err != nil { + t.Fatalf("couldn't open wal: %s", err.Error()) + } + + codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{ + "value": { + ID: uint8(1), + Name: "value", + Type: influxql.Float, + }, + }) + + var seriesToIndex []*tsdb.SeriesCreate + log.Index = &testIndexWriter{fn: func(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error { + seriesToIndex = append(seriesToIndex, seriesToCreate...) + return nil + }} + + seriesToCreate := []*tsdb.SeriesCreate{ + {Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "A"})), map[string]string{"host": "A"})}, + {Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "B"})), map[string]string{"host": "B"})}, + } + + // test that we can write to two different series + p1 := parsePoint("cpu,host=A value=23.2 1", codec) + p2 := parsePoint("cpu,host=B value=0.9 2", codec) + p3 := parsePoint("cpu,host=A value=25.3 4", codec) + p4 := parsePoint("cpu,host=B value=1.0 3", codec) + if err := log.WritePoints([]tsdb.Point{p1, p2, p3, p4}, nil, seriesToCreate); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + + // ensure data is there + c := log.Cursor("cpu,host=A") + if k, _ := c.Next(); btou64(k) != 1 { + t.Fatal("expected data point for cpu,host=A") + } + + c = log.Cursor("cpu,host=B") + if k, _ := c.Next(); btou64(k) != 2 { + t.Fatal("expected data point for cpu,host=B") + } + + // delete the series and ensure metadata was flushed and data is gone + if err := log.DeleteSeries([]string{"cpu,host=B"}); err != nil { + t.Fatalf("error deleting series: %s", err.Error()) + } + + // ensure data is there + c = log.Cursor("cpu,host=A") + if k, _ := c.Next(); btou64(k) != 1 { + t.Fatal("expected data point for cpu,host=A") + } + + // ensure series is deleted + c = log.Cursor("cpu,host=B") + if k, _ := c.Next(); k != nil { + t.Fatal("expected no data for cpu,host=B") + } + + // ensure that they were actually flushed to the index. do it this way because the annoying deepequal doessn't really work for these + for i, s := range seriesToCreate { + if seriesToIndex[i].Measurement != s.Measurement { + t.Fatal("expected measurement to be the same") + } + if seriesToIndex[i].Series.Key != s.Series.Key { + t.Fatal("expected series key to be the same") + } + if !reflect.DeepEqual(seriesToIndex[i].Series.Tags, s.Series.Tags) { + t.Fatal("expected series tags to be the same") + } + } + + // close and re-open the WAL to ensure that the data didn't show back up + if err := log.Close(); err != nil { + t.Fatalf("error closing log: %s", err.Error()) + } + + if err := log.Open(); err != nil { + t.Fatalf("error opening log: %s", err.Error()) + } + + // ensure data is there + c = log.Cursor("cpu,host=A") + if k, _ := c.Next(); btou64(k) != 1 { + t.Fatal("expected data point for cpu,host=A") + } + + // ensure series is deleted + c = log.Cursor("cpu,host=B") + if k, _ := c.Next(); k != nil { + t.Fatal("expected no data for cpu,host=B") + } +} + +// Ensure a partial compaction can be recovered from. +func TestWAL_Compact_Recovery(t *testing.T) { + log := openTestWAL() + log.partitionCount = 1 + log.CompactionThreshold = 0.7 + log.ReadySeriesSize = 1024 + log.flushCheckInterval = time.Minute + defer log.Close() + defer os.RemoveAll(log.path) + + points := make([]map[string][][]byte, 0) + log.Index = &testIndexWriter{fn: func(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error { + points = append(points, pointsByKey) + return nil + }} + + if err := log.Open(); err != nil { + t.Fatalf("couldn't open wal: %s", err.Error()) + } + + // Retrieve partition. + p := log.partitions[1] + + codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{ + "value": { + ID: uint8(1), + Name: "value", + Type: influxql.Float, + }, + }) + + b := make([]byte, 70*5000) + for i := 1; i <= 100; i++ { + buf := bytes.NewBuffer(b) + for j := 1; j <= 1000; j++ { + buf.WriteString(fmt.Sprintf("cpu,host=A,region=uswest%d value=%.3f %d\n", j, rand.Float64(), i)) + } + buf.WriteString(fmt.Sprintf("cpu,host=A,region=uswest%d value=%.3f %d\n", rand.Int(), rand.Float64(), i)) + + // Write the batch out. + if err := log.WritePoints(parsePoints(buf.String(), codec), nil, nil); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + } + + // Mock second open call to fail. + p.os.OpenSegmentFile = func(name string, flag int, perm os.FileMode) (file *os.File, err error) { + if filepath.Base(name) == "01.000001.wal" { + return os.OpenFile(name, flag, perm) + } + return nil, errors.New("marker") + } + if err := p.flushAndCompact(thresholdFlush); err == nil || err.Error() != "marker" { + t.Fatalf("unexpected flush error: %s", err) + } + p.os.OpenSegmentFile = os.OpenFile + + // Append second file to simulate partial write. + func() { + f, err := os.OpenFile(p.compactionFileName(), os.O_RDWR|os.O_APPEND, 0666) + if err != nil { + t.Fatal(err) + } + defer f.Close() + + // Append filename and partial data. + if err := p.writeCompactionEntry(f, "01.000002.wal", []*entry{{key: []byte("foo"), data: []byte("bar"), timestamp: 100}}); err != nil { + t.Fatal(err) + } + + // Truncate by a few bytes. + if fi, err := f.Stat(); err != nil { + t.Fatal(err) + } else if err = f.Truncate(fi.Size() - 2); err != nil { + t.Fatal(err) + } + }() + + // Now close and re-open the wal and ensure there are no errors. + log.Close() + if err := log.Open(); err != nil { + t.Fatalf("unexpected open error: %s", err) + } +} + +// test that partitions get compacted and flushed when number of series hits compaction threshold +// test that partitions get compacted and flushed when a single series hits the compaction threshold +// test that writes slow down when the partition size threshold is hit + +// func TestWAL_MultipleSegments(t *testing.T) { +// runtime.GOMAXPROCS(8) + +// log := openTestWAL() +// defer log.Close() +// defer os.RemoveAll(log.path) +// log.PartitionSizeThreshold = 1024 * 1024 * 100 +// flushCount := 0 +// log.Index = &testIndexWriter{fn: func(pointsByKey map[string][][]byte) error { +// flushCount += 1 +// fmt.Println("FLUSH: ", len(pointsByKey)) +// return nil +// }} + +// if err := log.Open(); err != nil { +// t.Fatalf("couldn't open wal: ", err.Error()) +// } + +// codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{ +// "value": { +// ID: uint8(1), +// Name: "value", +// Type: influxql.Float, +// }, +// }) + +// startTime := time.Now() +// numSeries := 5000 +// perPost := 5000 +// b := make([]byte, 70*5000) +// totalPoints := 0 +// for i := 1; i <= 10000; i++ { +// fmt.Println("WRITING: ", i*numSeries) +// n := 0 +// buf := bytes.NewBuffer(b) +// var wg sync.WaitGroup +// for j := 1; j <= numSeries; j++ { +// totalPoints += 1 +// n += 1 +// buf.WriteString(fmt.Sprintf("cpu,host=A,region=uswest%d value=%.3f %d\n", j, rand.Float64(), i)) +// if n >= perPost { +// go func(b string) { +// wg.Add(1) +// if err := log.WritePoints(parsePoints(b, codec)); err != nil { +// t.Fatalf("failed to write points: %s", err.Error()) +// } +// wg.Done() +// }(buf.String()) +// buf = bytes.NewBuffer(b) +// n = 0 +// } +// } +// wg.Wait() +// } +// fmt.Println("PATH: ", log.path) +// dur := time.Now().Sub(startTime) +// fmt.Println("TIME TO WRITE: ", totalPoints, dur, float64(totalPoints)/dur.Seconds()) +// fmt.Println("FLUSH COUNT: ", flushCount) +// for _, p := range log.partitions { +// fmt.Println("SIZE: ", p.memorySize/1024/1024) +// } + +// max := 0 +// for _, p := range log.partitions { +// for k, s := range p.cacheSizes { +// if s > max { +// fmt.Println(k, s) +// max = s +// } +// } +// } + +// fmt.Println("CLOSING") +// log.Close() +// fmt.Println("TEST OPENING") +// startTime = time.Now() +// log.Open() +// fmt.Println("TIME TO OPEN: ", time.Now().Sub(startTime)) +// for _, p := range log.partitions { +// fmt.Println("SIZE: ", p.memorySize) +// } + +// c := log.Cursor("cpu,host=A,region=uswest10") +// k, v := c.Seek(inttob(23)) +// fmt.Println("VALS: ", k, v) +// time.Sleep(time.Minute) +// } + +type testIndexWriter struct { + fn func(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error +} + +func (t *testIndexWriter) WriteIndex(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error { + return t.fn(pointsByKey, measurementFieldsToSave, seriesToCreate) +} + +func openTestWAL() *Log { + dir, err := ioutil.TempDir("", "wal-test") + if err != nil { + panic("couldn't get temp dir") + } + return NewLog(dir) +} + +func parsePoints(buf string, codec *tsdb.FieldCodec) []tsdb.Point { + points, err := tsdb.ParsePointsString(buf) + if err != nil { + panic(fmt.Sprintf("couldn't parse points: %s", err.Error())) + } + for _, p := range points { + b, err := codec.EncodeFields(p.Fields()) + if err != nil { + panic(fmt.Sprintf("couldn't encode fields: %s", err.Error())) + } + p.SetData(b) + } + return points +} + +func parsePoint(buf string, codec *tsdb.FieldCodec) tsdb.Point { + return parsePoints(buf, codec)[0] +} + +func inttob(v int) []byte { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, uint64(v)) + return b +} diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/executor_test.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/executor_test.go index 1f0ee15ce..b6c934264 100644 --- a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/executor_test.go +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/executor_test.go @@ -5,6 +5,7 @@ import ( "io/ioutil" "math" "os" + "path/filepath" "testing" "time" @@ -947,6 +948,8 @@ func testStore() *tsdb.Store { path, _ := ioutil.TempDir("", "") store := tsdb.NewStore(path) + + store.EngineOptions.Config.WALDir = filepath.Join(path, "wal") err := store.Open() if err != nil { panic(err) diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/mapper_test.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/mapper_test.go index 5fff0e8c5..231ec0647 100644 --- a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/mapper_test.go +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/mapper_test.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "os" "path" + "path/filepath" "reflect" "strings" "testing" @@ -426,7 +427,7 @@ func TestShardMapper_WriteAndSingleMapperAggregateQuery(t *testing.T) { } } -func TestShardMapper_LocalMapperTagSets(t *testing.T) { +func TestShardMapper_LocalMapperTagSetsFields(t *testing.T) { tmpDir, _ := ioutil.TempDir("", "shard_test") defer os.RemoveAll(tmpDir) shard := mustCreateShard(tmpDir) @@ -451,50 +452,64 @@ func TestShardMapper_LocalMapperTagSets(t *testing.T) { } var tests = []struct { - stmt string - expected []string + stmt string + expectedFields []string + expectedTags []string }{ { - stmt: `SELECT sum(value) FROM cpu`, - expected: []string{"cpu"}, + stmt: `SELECT sum(value) FROM cpu`, + expectedFields: []string{"value"}, + expectedTags: []string{"cpu"}, }, { - stmt: `SELECT sum(value) FROM cpu GROUP BY host`, - expected: []string{"cpu|host|serverA", "cpu|host|serverB"}, + stmt: `SELECT sum(value) FROM cpu GROUP BY host`, + expectedFields: []string{"value"}, + expectedTags: []string{"cpu|host|serverA", "cpu|host|serverB"}, }, { - stmt: `SELECT sum(value) FROM cpu GROUP BY region`, - expected: []string{"cpu|region|us-east"}, + stmt: `SELECT sum(value) FROM cpu GROUP BY region`, + expectedFields: []string{"value"}, + expectedTags: []string{"cpu|region|us-east"}, }, { - stmt: `SELECT sum(value) FROM cpu WHERE host='serverA'`, - expected: []string{"cpu"}, + stmt: `SELECT sum(value) FROM cpu WHERE host='serverA'`, + expectedFields: []string{"value"}, + expectedTags: []string{"cpu"}, }, { - stmt: `SELECT sum(value) FROM cpu WHERE host='serverB'`, - expected: []string{"cpu"}, + stmt: `SELECT sum(value) FROM cpu WHERE host='serverB'`, + expectedFields: []string{"value"}, + expectedTags: []string{"cpu"}, }, { - stmt: `SELECT sum(value) FROM cpu WHERE host='serverC'`, - expected: []string{}, + stmt: `SELECT sum(value) FROM cpu WHERE host='serverC'`, + expectedFields: []string{"value"}, + expectedTags: []string{}, }, } for _, tt := range tests { stmt := mustParseSelectStatement(tt.stmt) mapper := openLocalMapperOrFail(t, shard, stmt) - got := mapper.TagSets() - if !reflect.DeepEqual(got, tt.expected) { - t.Errorf("test '%s'\n\tgot %s\n\texpected %s", tt.stmt, got, tt.expected) + + fields := mapper.Fields() + if !reflect.DeepEqual(fields, tt.expectedFields) { + t.Errorf("test '%s'\n\tgot %s\n\texpected %s", tt.stmt, fields, tt.expectedFields) + } + + tags := mapper.TagSets() + if !reflect.DeepEqual(tags, tt.expectedTags) { + t.Errorf("test '%s'\n\tgot %s\n\texpected %s", tt.stmt, tags, tt.expectedTags) } } - } func mustCreateShard(dir string) *tsdb.Shard { tmpShard := path.Join(dir, "shard") index := tsdb.NewDatabaseIndex() - sh := tsdb.NewShard(index, tmpShard, tsdb.NewEngineOptions()) + opts := tsdb.NewEngineOptions() + opts.Config.WALDir = filepath.Join(dir, "wal") + sh := tsdb.NewShard(1, index, tmpShard, opts) if err := sh.Open(); err != nil { panic(fmt.Sprintf("error opening shard: %s", err.Error())) } diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/meta.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/meta.go index 3d27dbb99..1782c36f5 100644 --- a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/meta.go +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/meta.go @@ -980,6 +980,16 @@ type Series struct { id uint64 measurement *Measurement + shardIDs map[uint64]bool // shards that have this series defined +} + +// NewSeries returns an initialized series struct +func NewSeries(key string, tags map[string]string) *Series { + return &Series{ + Key: key, + Tags: tags, + shardIDs: make(map[uint64]bool), + } } // MarshalBinary encodes the object to a binary format. @@ -1008,6 +1018,10 @@ func (s *Series) UnmarshalBinary(buf []byte) error { return nil } +func (s *Series) InitializeShards() { + s.shardIDs = make(map[uint64]bool) +} + // match returns true if all tags match the series' tags. func (s *Series) match(tags map[string]string) bool { for k, v := range tags { diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/meta_test.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/meta_test.go index dac931bcb..9964130b8 100644 --- a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/meta_test.go +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/meta_test.go @@ -182,10 +182,7 @@ func genTestSeries(mCnt, tCnt, vCnt int) []*TestSeries { for _, ts := range tagSets { series = append(series, &TestSeries{ Measurement: m, - Series: &tsdb.Series{ - Key: fmt.Sprintf("%s:%s", m, string(tsdb.MarshalTags(ts))), - Tags: ts, - }, + Series: tsdb.NewSeries(fmt.Sprintf("%s:%s", m, string(tsdb.MarshalTags(ts))), ts), }) } } diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/query_executor_test.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/query_executor_test.go index 3090c86e6..2a0d3e581 100644 --- a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/query_executor_test.go +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/query_executor_test.go @@ -54,7 +54,9 @@ func TestWritePointsAndExecuteQuery(t *testing.T) { } store.Close() + conf := store.EngineOptions.Config store = tsdb.NewStore(store.Path()) + store.EngineOptions.Config = conf if err := store.Open(); err != nil { t.Fatalf(err.Error()) } @@ -84,7 +86,9 @@ func TestWritePointsAndExecuteQuery_Update(t *testing.T) { // Restart store. store.Close() + conf := store.EngineOptions.Config store = tsdb.NewStore(store.Path()) + store.EngineOptions.Config = conf if err := store.Open(); err != nil { t.Fatalf(err.Error()) } @@ -145,7 +149,9 @@ func TestDropSeriesStatement(t *testing.T) { } store.Close() + conf := store.EngineOptions.Config store = tsdb.NewStore(store.Path()) + store.EngineOptions.Config = conf store.Open() executor.Store = store @@ -215,7 +221,9 @@ func TestDropMeasurementStatement(t *testing.T) { validateDrop() store.Close() + conf := store.EngineOptions.Config store = tsdb.NewStore(store.Path()) + store.EngineOptions.Config = conf store.Open() executor.Store = store validateDrop() @@ -279,7 +287,9 @@ func TestDropDatabase(t *testing.T) { } store.Close() + conf := store.EngineOptions.Config store = tsdb.NewStore(store.Path()) + store.EngineOptions.Config = conf store.Open() executor.Store = store executor.ShardMapper = &testShardMapper{store: store} @@ -344,6 +354,8 @@ func testStoreAndExecutor() (*tsdb.Store, *tsdb.QueryExecutor) { path, _ := ioutil.TempDir("", "") store := tsdb.NewStore(path) + store.EngineOptions.Config.WALDir = filepath.Join(path, "wal") + err := store.Open() if err != nil { panic(err) diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/shard.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/shard.go index b37500f14..1747517a2 100644 --- a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/shard.go +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/shard.go @@ -40,6 +40,7 @@ type Shard struct { db *bolt.DB // underlying data store index *DatabaseIndex path string + id uint64 engine Engine options EngineOptions @@ -52,10 +53,11 @@ type Shard struct { } // NewShard returns a new initialized Shard -func NewShard(index *DatabaseIndex, path string, options EngineOptions) *Shard { +func NewShard(id uint64, index *DatabaseIndex, path string, options EngineOptions) *Shard { return &Shard{ index: index, path: path, + id: id, options: options, measurementFields: make(map[string]*MeasurementFields), @@ -327,8 +329,12 @@ func (s *Shard) validateSeriesAndFields(points []Point) ([]*SeriesCreate, []*Fie for _, p := range points { // see if the series should be added to the index if ss := s.index.series[string(p.Key())]; ss == nil { - series := &Series{Key: string(p.Key()), Tags: p.Tags()} + series := NewSeries(string(p.Key()), p.Tags()) seriesToCreate = append(seriesToCreate, &SeriesCreate{p.Name(), series}) + } else if !ss.shardIDs[s.id] { + // this is the first time this series is being written into this shard, persist it + ss.shardIDs[s.id] = true + seriesToCreate = append(seriesToCreate, &SeriesCreate{p.Name(), ss}) } // see if the field definitions need to be saved to the shard diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/shard_test.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/shard_test.go index db21bef34..604b14f71 100644 --- a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/shard_test.go +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/shard_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/influxdb/influxdb/tsdb" + "github.com/influxdb/influxdb/tsdb/engine/b1" ) func TestShardWriteAndIndex(t *testing.T) { @@ -19,7 +20,10 @@ func TestShardWriteAndIndex(t *testing.T) { tmpShard := path.Join(tmpDir, "shard") index := tsdb.NewDatabaseIndex() - sh := tsdb.NewShard(index, tmpShard, tsdb.NewEngineOptions()) + opts := tsdb.NewEngineOptions() + opts.Config.WALDir = filepath.Join(tmpDir, "wal") + + sh := tsdb.NewShard(1, index, tmpShard, opts) if err := sh.Open(); err != nil { t.Fatalf("error openeing shard: %s", err.Error()) } @@ -65,7 +69,7 @@ func TestShardWriteAndIndex(t *testing.T) { sh.Close() index = tsdb.NewDatabaseIndex() - sh = tsdb.NewShard(index, tmpShard, tsdb.NewEngineOptions()) + sh = tsdb.NewShard(1, index, tmpShard, opts) if err := sh.Open(); err != nil { t.Fatalf("error openeing shard: %s", err.Error()) } @@ -86,7 +90,10 @@ func TestShardWriteAddNewField(t *testing.T) { tmpShard := path.Join(tmpDir, "shard") index := tsdb.NewDatabaseIndex() - sh := tsdb.NewShard(index, tmpShard, tsdb.NewEngineOptions()) + opts := tsdb.NewEngineOptions() + opts.Config.WALDir = filepath.Join(tmpDir, "wal") + + sh := tsdb.NewShard(1, index, tmpShard, opts) if err := sh.Open(); err != nil { t.Fatalf("error openeing shard: %s", err.Error()) } @@ -142,7 +149,8 @@ func TestShard_Autoflush(t *testing.T) { defer os.RemoveAll(path) // Open shard with a really low size threshold, high flush interval. - sh := tsdb.NewShard(tsdb.NewDatabaseIndex(), filepath.Join(path, "shard"), tsdb.EngineOptions{ + sh := tsdb.NewShard(1, tsdb.NewDatabaseIndex(), filepath.Join(path, "shard"), tsdb.EngineOptions{ + EngineVersion: b1.Format, MaxWALSize: 1024, // 1KB WALFlushInterval: 1 * time.Hour, WALPartitionFlushDelay: 1 * time.Millisecond, @@ -181,7 +189,8 @@ func TestShard_Autoflush_FlushInterval(t *testing.T) { defer os.RemoveAll(path) // Open shard with a high size threshold, small time threshold. - sh := tsdb.NewShard(tsdb.NewDatabaseIndex(), filepath.Join(path, "shard"), tsdb.EngineOptions{ + sh := tsdb.NewShard(1, tsdb.NewDatabaseIndex(), filepath.Join(path, "shard"), tsdb.EngineOptions{ + EngineVersion: b1.Format, MaxWALSize: 10 * 1024 * 1024, // 10MB WALFlushInterval: 100 * time.Millisecond, WALPartitionFlushDelay: 1 * time.Millisecond, @@ -263,7 +272,7 @@ func benchmarkWritePoints(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) { for n := 0; n < b.N; n++ { tmpDir, _ := ioutil.TempDir("", "shard_test") tmpShard := path.Join(tmpDir, "shard") - shard := tsdb.NewShard(index, tmpShard, tsdb.NewEngineOptions()) + shard := tsdb.NewShard(1, index, tmpShard, tsdb.NewEngineOptions()) shard.Open() b.StartTimer() @@ -298,7 +307,7 @@ func benchmarkWritePointsExistingSeries(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt tmpDir, _ := ioutil.TempDir("", "") defer os.RemoveAll(tmpDir) tmpShard := path.Join(tmpDir, "shard") - shard := tsdb.NewShard(index, tmpShard, tsdb.NewEngineOptions()) + shard := tsdb.NewShard(1, index, tmpShard, tsdb.NewEngineOptions()) shard.Open() defer shard.Close() chunkedWrite(shard, points) diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/store.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/store.go index e028437bc..e7eaa26f5 100644 --- a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/store.go +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/store.go @@ -14,9 +14,12 @@ import ( ) func NewStore(path string) *Store { + opts := NewEngineOptions() + opts.Config = NewConfig() + return &Store{ path: path, - EngineOptions: NewEngineOptions(), + EngineOptions: opts, Logger: log.New(os.Stderr, "[store] ", log.LstdFlags), } } @@ -82,7 +85,7 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er } shardPath := filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10)) - shard := NewShard(db, shardPath, s.EngineOptions) + shard := NewShard(shardID, db, shardPath, s.EngineOptions) if err := shard.Open(); err != nil { return err } @@ -236,7 +239,7 @@ func (s *Store) loadShards() error { continue } - shard := NewShard(s.databaseIndexes[db], path, s.EngineOptions) + shard := NewShard(shardID, s.databaseIndexes[db], path, s.EngineOptions) err = shard.Open() if err != nil { return fmt.Errorf("failed to open shard %d: %s", shardID, err) diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/store_test.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/store_test.go index e619a3fa4..1c87257ac 100644 --- a/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/store_test.go +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/tsdb/store_test.go @@ -22,6 +22,7 @@ func TestStoreOpen(t *testing.T) { } s := tsdb.NewStore(dir) + s.EngineOptions.Config.WALDir = filepath.Join(dir, "wal") if err := s.Open(); err != nil { t.Fatalf("Store.Open() failed: %v", err) } @@ -49,6 +50,7 @@ func TestStoreOpenShard(t *testing.T) { } s := tsdb.NewStore(dir) + s.EngineOptions.Config.WALDir = filepath.Join(dir, "wal") if err := s.Open(); err != nil { t.Fatalf("Store.Open() failed: %v", err) } @@ -82,6 +84,7 @@ func TestStoreOpenShardCreateDelete(t *testing.T) { } s := tsdb.NewStore(dir) + s.EngineOptions.Config.WALDir = filepath.Join(dir, "wal") if err := s.Open(); err != nil { t.Fatalf("Store.Open() failed: %v", err) } @@ -129,6 +132,7 @@ func TestStoreOpenNotDatabaseDir(t *testing.T) { } s := tsdb.NewStore(dir) + s.EngineOptions.Config.WALDir = filepath.Join(dir, "wal") if err := s.Open(); err != nil { t.Fatalf("Store.Open() failed: %v", err) } @@ -159,6 +163,7 @@ func TestStoreOpenNotRPDir(t *testing.T) { } s := tsdb.NewStore(dir) + s.EngineOptions.Config.WALDir = filepath.Join(dir, "wal") if err := s.Open(); err != nil { t.Fatalf("Store.Open() failed: %v", err) } @@ -195,6 +200,7 @@ func TestStoreOpenShardBadShardPath(t *testing.T) { } s := tsdb.NewStore(dir) + s.EngineOptions.Config.WALDir = filepath.Join(dir, "wal") if err := s.Open(); err != nil { t.Fatalf("Store.Open() failed: %v", err) } @@ -213,6 +219,63 @@ func TestStoreOpenShardBadShardPath(t *testing.T) { } +func TestStoreEnsureSeriesPersistedInNewShards(t *testing.T) { + dir, err := ioutil.TempDir("", "store_test") + if err != nil { + t.Fatalf("Store.Open() failed to create temp dir: %v", err) + } + defer os.RemoveAll(dir) + + s := tsdb.NewStore(dir) + s.EngineOptions.Config.WALDir = filepath.Join(dir, "wal") + if err := s.Open(); err != nil { + t.Fatalf("Store.Open() failed: %v", err) + } + + if err := s.CreateShard("foo", "default", 1); err != nil { + t.Fatalf("error creating shard: %v", err) + } + + p, _ := tsdb.ParsePoints([]byte("cpu val=1")) + if err := s.WriteToShard(1, p); err != nil { + t.Fatalf("error writing to shard: %v", err) + } + + if err := s.CreateShard("foo", "default", 2); err != nil { + t.Fatalf("error creating shard: %v", err) + } + + if err := s.WriteToShard(2, p); err != nil { + t.Fatalf("error writing to shard: %v", err) + } + + d := s.DatabaseIndex("foo") + if d == nil { + t.Fatal("expected to have database index for foo") + } + if d.Series("cpu") == nil { + t.Fatal("expected series cpu to be in the index") + } + + // delete the shard, close the store and reopen it and confirm the measurement is still there + s.DeleteShard(1) + s.Close() + + s = tsdb.NewStore(dir) + s.EngineOptions.Config.WALDir = filepath.Join(dir, "wal") + if err := s.Open(); err != nil { + t.Fatalf("Store.Open() failed: %v", err) + } + + d = s.DatabaseIndex("foo") + if d == nil { + t.Fatal("expected to have database index for foo") + } + if d.Series("cpu") == nil { + t.Fatal("expected series cpu to be in the index") + } +} + func BenchmarkStoreOpen_200KSeries_100Shards(b *testing.B) { benchmarkStoreOpen(b, 64, 5, 5, 1, 100) } func benchmarkStoreOpen(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt, shardCnt int) {