468 lines
9.9 KiB
Go
468 lines
9.9 KiB
Go
package gorethink
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"reflect"
|
|
"sync/atomic"
|
|
|
|
"github.com/dancannon/gorethink/encoding"
|
|
p "github.com/dancannon/gorethink/ql2"
|
|
)
|
|
|
|
var (
|
|
errCursorClosed = errors.New("connection closed, cannot read cursor")
|
|
)
|
|
|
|
func newCursor(conn *Connection, cursorType string, token int64, term *Term, opts map[string]interface{}) *Cursor {
|
|
if cursorType == "" {
|
|
cursorType = "Cursor"
|
|
}
|
|
|
|
cursor := &Cursor{
|
|
conn: conn,
|
|
token: token,
|
|
cursorType: cursorType,
|
|
term: term,
|
|
opts: opts,
|
|
}
|
|
|
|
return cursor
|
|
}
|
|
|
|
// Cursor is the result of a query. Its cursor starts before the first row
|
|
// of the result set. A Cursor is not thread safe and should only be accessed
|
|
// by a single goroutine at any given time. Use Next to advance through the
|
|
// rows:
|
|
//
|
|
// cursor, err := query.Run(session)
|
|
// ...
|
|
// defer cursor.Close()
|
|
//
|
|
// var response interface{}
|
|
// for cursor.Next(&response) {
|
|
// ...
|
|
// }
|
|
// err = cursor.Err() // get any error encountered during iteration
|
|
// ...
|
|
type Cursor struct {
|
|
releaseConn func(error)
|
|
|
|
conn *Connection
|
|
token int64
|
|
cursorType string
|
|
term *Term
|
|
opts map[string]interface{}
|
|
|
|
lastErr error
|
|
fetching bool
|
|
closed int32
|
|
finished bool
|
|
isAtom bool
|
|
buffer queue
|
|
responses queue
|
|
profile interface{}
|
|
}
|
|
|
|
// Profile returns the information returned from the query profiler.
|
|
func (c *Cursor) Profile() interface{} {
|
|
return c.profile
|
|
}
|
|
|
|
// Type returns the cursor type (by default "Cursor")
|
|
func (c *Cursor) Type() string {
|
|
return c.cursorType
|
|
}
|
|
|
|
// Err returns nil if no errors happened during iteration, or the actual
|
|
// error otherwise.
|
|
func (c *Cursor) Err() error {
|
|
return c.lastErr
|
|
}
|
|
|
|
// Close closes the cursor, preventing further enumeration. If the end is
|
|
// encountered, the cursor is closed automatically. Close is idempotent.
|
|
func (c *Cursor) Close() error {
|
|
var err error
|
|
|
|
if c.closed != 0 || !atomic.CompareAndSwapInt32(&c.closed, 0, 1) {
|
|
return nil
|
|
}
|
|
|
|
conn := c.conn
|
|
if conn == nil {
|
|
return nil
|
|
}
|
|
if conn.conn == nil {
|
|
return nil
|
|
}
|
|
|
|
// Stop any unfinished queries
|
|
if !c.finished {
|
|
q := Query{
|
|
Type: p.Query_STOP,
|
|
Token: c.token,
|
|
}
|
|
|
|
_, _, err = conn.Query(q)
|
|
}
|
|
|
|
if c.releaseConn != nil {
|
|
c.releaseConn(err)
|
|
}
|
|
|
|
c.conn = nil
|
|
c.buffer.elems = nil
|
|
c.responses.elems = nil
|
|
|
|
return err
|
|
}
|
|
|
|
// Next retrieves the next document from the result set, blocking if necessary.
|
|
// This method will also automatically retrieve another batch of documents from
|
|
// the server when the current one is exhausted, or before that in background
|
|
// if possible.
|
|
//
|
|
// Next returns true if a document was successfully unmarshalled onto result,
|
|
// and false at the end of the result set or if an error happened.
|
|
// When Next returns false, the Err method should be called to verify if
|
|
// there was an error during iteration.
|
|
//
|
|
// Also note that you are able to reuse the same variable multiple times as
|
|
// `Next` zeroes the value before scanning in the result.
|
|
func (c *Cursor) Next(dest interface{}) bool {
|
|
if c.closed != 0 {
|
|
return false
|
|
}
|
|
|
|
hasMore, err := c.loadNext(dest)
|
|
if c.handleError(err) != nil {
|
|
c.Close()
|
|
return false
|
|
}
|
|
|
|
if !hasMore {
|
|
c.Close()
|
|
}
|
|
|
|
return hasMore
|
|
}
|
|
|
|
func (c *Cursor) loadNext(dest interface{}) (bool, error) {
|
|
for c.lastErr == nil {
|
|
// Check if response is closed/finished
|
|
if c.buffer.Len() == 0 && c.responses.Len() == 0 && c.closed != 0 {
|
|
return false, errCursorClosed
|
|
}
|
|
|
|
if c.buffer.Len() == 0 && c.responses.Len() == 0 && !c.finished {
|
|
err := c.fetchMore()
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
|
|
if c.buffer.Len() == 0 && c.responses.Len() == 0 && c.finished {
|
|
return false, nil
|
|
}
|
|
|
|
if c.buffer.Len() == 0 && c.responses.Len() > 0 {
|
|
if response, ok := c.responses.Pop().(json.RawMessage); ok {
|
|
var value interface{}
|
|
err := json.Unmarshal(response, &value)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
value, err = recursivelyConvertPseudotype(value, c.opts)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
// If response is an ATOM then try and convert to an array
|
|
if data, ok := value.([]interface{}); ok && c.isAtom {
|
|
for _, v := range data {
|
|
c.buffer.Push(v)
|
|
}
|
|
} else if value == nil {
|
|
c.buffer.Push(nil)
|
|
} else {
|
|
c.buffer.Push(value)
|
|
}
|
|
}
|
|
}
|
|
|
|
if c.buffer.Len() > 0 {
|
|
data := c.buffer.Pop()
|
|
|
|
err := encoding.Decode(dest, data)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
}
|
|
|
|
return false, c.lastErr
|
|
}
|
|
|
|
// All retrieves all documents from the result set into the provided slice
|
|
// and closes the cursor.
|
|
//
|
|
// The result argument must necessarily be the address for a slice. The slice
|
|
// may be nil or previously allocated.
|
|
//
|
|
// Also note that you are able to reuse the same variable multiple times as
|
|
// `All` zeroes the value before scanning in the result. It also attempts
|
|
// to reuse the existing slice without allocating any more space by either
|
|
// resizing or returning a selection of the slice if necessary.
|
|
func (c *Cursor) All(result interface{}) error {
|
|
resultv := reflect.ValueOf(result)
|
|
if resultv.Kind() != reflect.Ptr || resultv.Elem().Kind() != reflect.Slice {
|
|
panic("result argument must be a slice address")
|
|
}
|
|
slicev := resultv.Elem()
|
|
slicev = slicev.Slice(0, slicev.Cap())
|
|
elemt := slicev.Type().Elem()
|
|
i := 0
|
|
for {
|
|
if slicev.Len() == i {
|
|
elemp := reflect.New(elemt)
|
|
if !c.Next(elemp.Interface()) {
|
|
break
|
|
}
|
|
slicev = reflect.Append(slicev, elemp.Elem())
|
|
slicev = slicev.Slice(0, slicev.Cap())
|
|
} else {
|
|
if !c.Next(slicev.Index(i).Addr().Interface()) {
|
|
break
|
|
}
|
|
}
|
|
i++
|
|
}
|
|
resultv.Elem().Set(slicev.Slice(0, i))
|
|
|
|
if err := c.Err(); err != nil {
|
|
c.Close()
|
|
return err
|
|
}
|
|
|
|
if err := c.Close(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// One retrieves a single document from the result set into the provided
|
|
// slice and closes the cursor.
|
|
//
|
|
// Also note that you are able to reuse the same variable multiple times as
|
|
// `One` zeroes the value before scanning in the result.
|
|
func (c *Cursor) One(result interface{}) error {
|
|
if c.IsNil() {
|
|
c.Close()
|
|
return ErrEmptyResult
|
|
}
|
|
|
|
hasResult := c.Next(result)
|
|
|
|
if err := c.Err(); err != nil {
|
|
c.Close()
|
|
return err
|
|
}
|
|
|
|
if err := c.Close(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if !hasResult {
|
|
return ErrEmptyResult
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Listen listens for rows from the database and sends the result onto the given
|
|
// channel. The type that the row is scanned into is determined by the element
|
|
// type of the channel.
|
|
//
|
|
// Also note that this function returns immediately.
|
|
//
|
|
// cursor, err := r.Expr([]int{1,2,3}).Run(session)
|
|
// if err != nil {
|
|
// panic(err)
|
|
// }
|
|
//
|
|
// ch := make(chan int)
|
|
// cursor.Listen(ch)
|
|
// <- ch // 1
|
|
// <- ch // 2
|
|
// <- ch // 3
|
|
func (c *Cursor) Listen(channel interface{}) {
|
|
go func() {
|
|
channelv := reflect.ValueOf(channel)
|
|
if channelv.Kind() != reflect.Chan {
|
|
panic("input argument must be a channel")
|
|
}
|
|
elemt := channelv.Type().Elem()
|
|
for {
|
|
elemp := reflect.New(elemt)
|
|
if !c.Next(elemp.Interface()) {
|
|
break
|
|
}
|
|
|
|
channelv.Send(elemp.Elem())
|
|
}
|
|
|
|
c.Close()
|
|
channelv.Close()
|
|
}()
|
|
}
|
|
|
|
// IsNil tests if the current row is nil.
|
|
func (c *Cursor) IsNil() bool {
|
|
if c.buffer.Len() > 0 {
|
|
bufferedItem := c.buffer.Peek()
|
|
if bufferedItem == nil {
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
if c.responses.Len() > 0 {
|
|
response := c.responses.Peek()
|
|
if response == nil {
|
|
return true
|
|
}
|
|
|
|
if response, ok := response.(json.RawMessage); ok {
|
|
if string(response) == "null" {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// fetchMore fetches more rows from the database.
|
|
//
|
|
// If wait is true then it will wait for the database to reply otherwise it
|
|
// will return after sending the continue query.
|
|
func (c *Cursor) fetchMore() error {
|
|
var err error
|
|
if !c.fetching {
|
|
c.fetching = true
|
|
|
|
if c.closed != 0 {
|
|
return errCursorClosed
|
|
}
|
|
|
|
q := Query{
|
|
Type: p.Query_CONTINUE,
|
|
Token: c.token,
|
|
}
|
|
|
|
_, _, err = c.conn.Query(q)
|
|
c.handleError(err)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// handleError sets the value of lastErr to err if lastErr is not yet set.
|
|
func (c *Cursor) handleError(err error) error {
|
|
return c.handleErrorLocked(err)
|
|
}
|
|
|
|
// handleError sets the value of lastErr to err if lastErr is not yet set.
|
|
func (c *Cursor) handleErrorLocked(err error) error {
|
|
if c.lastErr == nil {
|
|
c.lastErr = err
|
|
}
|
|
|
|
return c.lastErr
|
|
}
|
|
|
|
// extend adds the result of a continue query to the cursor.
|
|
func (c *Cursor) extend(response *Response) {
|
|
for _, response := range response.Responses {
|
|
c.responses.Push(response)
|
|
}
|
|
|
|
c.finished = response.Type != p.Response_SUCCESS_PARTIAL
|
|
c.fetching = false
|
|
c.isAtom = response.Type == p.Response_SUCCESS_ATOM
|
|
|
|
putResponse(response)
|
|
}
|
|
|
|
// Queue structure used for storing responses
|
|
|
|
type queue struct {
|
|
elems []interface{}
|
|
nelems, popi, pushi int
|
|
}
|
|
|
|
func (q *queue) Len() int {
|
|
if len(q.elems) == 0 {
|
|
return 0
|
|
}
|
|
|
|
return q.nelems
|
|
}
|
|
func (q *queue) Push(elem interface{}) {
|
|
if q.nelems == len(q.elems) {
|
|
q.expand()
|
|
}
|
|
q.elems[q.pushi] = elem
|
|
q.nelems++
|
|
q.pushi = (q.pushi + 1) % len(q.elems)
|
|
}
|
|
func (q *queue) Pop() (elem interface{}) {
|
|
if q.nelems == 0 {
|
|
return nil
|
|
}
|
|
elem = q.elems[q.popi]
|
|
q.elems[q.popi] = nil // Help GC.
|
|
q.nelems--
|
|
q.popi = (q.popi + 1) % len(q.elems)
|
|
return elem
|
|
}
|
|
func (q *queue) Peek() (elem interface{}) {
|
|
if q.nelems == 0 {
|
|
return nil
|
|
}
|
|
return q.elems[q.popi]
|
|
}
|
|
func (q *queue) expand() {
|
|
curcap := len(q.elems)
|
|
var newcap int
|
|
if curcap == 0 {
|
|
newcap = 8
|
|
} else if curcap < 1024 {
|
|
newcap = curcap * 2
|
|
} else {
|
|
newcap = curcap + (curcap / 4)
|
|
}
|
|
elems := make([]interface{}, newcap)
|
|
if q.popi == 0 {
|
|
copy(elems, q.elems)
|
|
q.pushi = curcap
|
|
} else {
|
|
newpopi := newcap - (curcap - q.popi)
|
|
copy(elems, q.elems[:q.popi])
|
|
copy(elems[newpopi:], q.elems[q.popi:])
|
|
q.popi = newpopi
|
|
}
|
|
for i := range q.elems {
|
|
q.elems[i] = nil // Help GC.
|
|
}
|
|
q.elems = elems
|
|
}
|