Skip to content

Commit

Permalink
GT-379 Retriable batch reads in AQL cursors - V1 (#500)
Browse files Browse the repository at this point in the history
  • Loading branch information
jwierzbo authored May 12, 2023
1 parent 84832af commit 0b8aa69
Show file tree
Hide file tree
Showing 12 changed files with 269 additions and 80 deletions.
25 changes: 12 additions & 13 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,22 @@ go:
- 1.20.3

env:
global:
- GOIMAGE=gcr.io/gcr-for-testing/golang:1.20.3
- ALPINE_IMAGE=gcr.io/gcr-for-testing/alpine:3.17
- STARTER=gcr.io/gcr-for-testing/arangodb/arangodb-starter:0.15.7
jobs:
- TEST_SUITE=run-unit-tests ALWAYS=1

- TEST_SUITE=run-tests-single ARANGODB=gcr.io/gcr-for-testing/arangodb/arangodb:3.9.6
- TEST_SUITE=run-tests-single ARANGODB=gcr.io/gcr-for-testing/arangodb/arangodb-preview:3.10.5 TEST_DISALLOW_UNKNOWN_FIELDS=false ALWAYS=1
- TEST_SUITE=run-tests-resilientsingle ARANGODB=gcr.io/gcr-for-testing/arangodb/arangodb-preview:3.10.5
- TEST_SUITE=run-tests-cluster ARANGODB=gcr.io/gcr-for-testing/arangodb/arangodb-preview:3.10.5
- TEST_SUITE=run-tests-cluster ARANGODB=gcr.io/gcr-for-testing/arangodb/enterprise:3.10.5
- TEST_SUITE=run-tests-single
- TEST_SUITE=run-tests-cluster
# - TEST_SUITE=run-tests-resilientsingle

- TEST_SUITE=run-v2-tests-single ARANGODB=gcr.io/gcr-for-testing/arangodb/arangodb:3.9.6
- TEST_SUITE=run-v2-tests-single ARANGODB=gcr.io/gcr-for-testing/arangodb/arangodb-preview:3.10.5 ALWAYS=1
- TEST_SUITE=run-v2-tests-cluster ARANGODB=gcr.io/gcr-for-testing/arangodb/arangodb-preview:3.10.5
- TEST_SUITE=run-v2-tests-cluster ARANGODB=gcr.io/gcr-for-testing/arangodb/enterprise:3.10.5
- TEST_SUITE=run-v2-tests-single
- TEST_SUITE=run-v2-tests-cluster
# - TEST_SUITE=run-v2-tests-resilientsingle
global:
- ARANGODB=gcr.io/gcr-for-testing/arangodb/enterprise-preview:3.11.0-beta.1
- GOIMAGE=gcr.io/gcr-for-testing/golang:1.20.3
- ALPINE_IMAGE=gcr.io/gcr-for-testing/alpine:3.17
- STARTER=gcr.io/gcr-for-testing/arangodb/arangodb-starter:0.15.7
- TEST_DISALLOW_UNKNOWN_FIELDS=false

before_script:
- |
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- Use Go 1.20.3 for testing. Add govulncheck to pipeline
- Fix test for extended names
- Fix potential bug with DB name escaping for URL when requesting replication-related API
- Retriable batch reads in AQL cursors

## [1.5.2](https://github.com/arangodb/go-driver/tree/v1.5.2) (2023-03-01)
- Bump `DRIVER_VERSION`
Expand Down
38 changes: 23 additions & 15 deletions cursor.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//
// DISCLAIMER
//
// Copyright 2017 ArangoDB GmbH, Cologne, Germany
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -16,9 +16,6 @@
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//

package driver

Expand All @@ -36,28 +33,34 @@ type QueryExtra interface {
// GetProfileRaw returns raw profile information in json
GetProfileRaw() ([]byte, bool, error)

// PlanRaw returns raw plan
// GetPlanRaw returns raw plan
GetPlanRaw() ([]byte, bool, error)
}

// Statistics returned with the query cursor
// QueryStatistics Statistics returned with the query cursor
type QueryStatistics interface {
// the total number of data-modification operations successfully executed.
// WritesExecuted the total number of data-modification operations successfully executed.
WritesExecuted() int64
// The total number of data-modification operations that were unsuccessful

// WritesIgnored The total number of data-modification operations that were unsuccessful
WritesIgnored() int64
// The total number of documents iterated over when scanning a collection without an index.

// ScannedFull The total number of documents iterated over when scanning a collection without an index.
ScannedFull() int64
// The total number of documents iterated over when scanning a collection using an index.

// ScannedIndex The total number of documents iterated over when scanning a collection using an index.
ScannedIndex() int64
// the total number of documents that were removed after executing a filter condition in a FilterNode

// Filtered the total number of documents that were removed after executing a filter condition in a FilterNode
Filtered() int64
// Returns the numer of results before the last LIMIT in the query was applied.

// FullCount Returns the number of results before the last LIMIT in the query was applied.
// A valid return value is only available when the has been created with a context that was
// prepared with `WithFullCount`. Additionally this will also not return a valid value if
// prepared with `WithFullCount`. Additionally, this will also not return a valid value if
// the context was prepared with `WithStream`.
FullCount() int64
// Execution time of the query (wall-clock time). value will be set from the outside

// ExecutionTime of the query (wall-clock time). value will be set from the outside
ExecutionTime() time.Duration
}

Expand All @@ -70,12 +73,17 @@ type Cursor interface {
HasMore() bool

// ReadDocument reads the next document from the cursor.
// The document data is stored into result, the document meta data is returned.
// The document data is stored into result, the document metadata is returned.
// If the cursor has no more documents, a NoMoreDocuments error is returned.
// Note: If the query (resulting in this cursor) does not return documents,
// then the returned DocumentMeta will be empty.
ReadDocument(ctx context.Context, result interface{}) (DocumentMeta, error)

// RetryReadDocument reads the last document from the cursor once more time
// It can be used e.g., in case of network error during ReadDocument
// It requires 'driver.WithQueryAllowRetry' to be set to true on the Context during Cursor creation.
RetryReadDocument(ctx context.Context, result interface{}) (DocumentMeta, error)

// Count returns the total number of result documents available.
// A valid return value is only available when the cursor has been created with a context that was
// prepared with `WithQueryCount` and not with `WithQueryStream`.
Expand Down
117 changes: 82 additions & 35 deletions cursor_impl.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//
// DISCLAIMER
//
// Copyright 2017 ArangoDB GmbH, Cologne, Germany
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -17,8 +17,6 @@
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//

package driver

Expand All @@ -37,19 +35,30 @@ func newCursor(data cursorData, endpoint string, db *database, allowDirtyReads b
if db == nil {
return nil, WithStack(InvalidArgumentError{Message: "db is nil"})
}
return &cursor{

c := &cursor{
cursorData: data,
endpoint: endpoint,
db: db,
conn: db.conn,
allowDirtyReads: allowDirtyReads,
}, nil
}

if data.NextBatchID != "" {
c.retryData = &retryData{
cursorID: data.ID,
currentBatchID: "1",
}
}

return c, nil
}

type cursor struct {
cursorData
endpoint string
resultIndex int
retryData *retryData
db *database
conn Connection
closed int32
Expand All @@ -58,6 +67,11 @@ type cursor struct {
lastReadWasDirty bool
}

type retryData struct {
cursorID string
currentBatchID string
}

// CursorStats TODO: all these int64 should be changed into uint64
type cursorStats struct {
// The total number of data-modification operations successfully executed.
Expand Down Expand Up @@ -86,11 +100,11 @@ type cursorStats struct {
CursorsRearmed uint64 `json:"cursorsRearmed,omitempty"`
// CacheHits the total number of index entries read from in-memory caches for indexes of type edge or persistent.
// This value will only be non-zero when reading from indexes that have an in-memory cache enabled,
// and when the query allows using the in-memory cache (i.e. using equality lookups on all index attributes).
// and when the query allows using the in-memory cache (i.e., using equality lookups on all index attributes).
CacheHits uint64 `json:"cacheHits,omitempty"`
// CacheMisses the total number of cache read attempts for index entries that could not be served from in-memory caches for indexes of type edge or persistent.
// This value will only be non-zero when reading from indexes that have an in-memory cache enabled,
// the query allows using the in-memory cache (i.e. using equality lookups on all index attributes) and the looked up values are not present in the cache.
// the query allows using the in-memory cache (i.e., using equality lookups on all index attributes), and the looked-up values are not present in the cache.
CacheMisses uint64 `json:"cacheMisses,omitempty"`
}

Expand Down Expand Up @@ -163,13 +177,14 @@ type cursorPlanNodes map[string]interface{}
type cursorProfile map[string]interface{}

type cursorData struct {
Key string `json:"_key,omitempty"`
Count int64 `json:"count,omitempty"` // the total number of result documents available (only available if the query was executed with the count attribute set)
ID string `json:"id"` // id of temporary cursor created on the server (optional, see above)
Result []*RawObject `json:"result,omitempty"` // an array of result documents (might be empty if query has no results)
HasMore bool `json:"hasMore,omitempty"` // A boolean indicator whether there are more results available for the cursor on the server
Extra cursorExtra `json:"extra"`
Cached bool `json:"cached,omitempty"`
Key string `json:"_key,omitempty"`
Count int64 `json:"count,omitempty"` // the total number of result documents available (only available if the query was executed with the count attribute set)
ID string `json:"id"` // id of temporary cursor created on the server (optional, see above)
Result []*RawObject `json:"result,omitempty"` // an array of result documents (might be empty if the query has no results)
HasMore bool `json:"hasMore,omitempty"` // A boolean indicator whether there are more results available for the cursor on the server
Extra cursorExtra `json:"extra"`
Cached bool `json:"cached,omitempty"`
NextBatchID string `json:"nextBatchId,omitempty"`
ArangoError
}

Expand All @@ -178,22 +193,22 @@ func (c *cursor) relPath() string {
return path.Join(c.db.relPath(), "_api", "cursor")
}

// Name returns the name of the collection.
// HasMore Name returns the name of the collection.
func (c *cursor) HasMore() bool {
return c.resultIndex < len(c.Result) || c.cursorData.HasMore
}

// Count returns the total number of result documents available.
// A valid return value is only available when the cursor has been created with a context that was
// prepare with `WithQueryCount`.
// prepared with `WithQueryCount`.
func (c *cursor) Count() int64 {
return c.cursorData.Count
}

// Close deletes the cursor and frees the resources associated with it.
func (c *cursor) Close() error {
if c == nil {
// Avoid panics in the case that someone defer's a close before checking that the cursor is not nil.
// Avoid panics in the case that someone defers a close before checking that the cursor is not nil.
return nil
}
if c := atomic.LoadInt32(&c.closed); c != 0 {
Expand Down Expand Up @@ -224,28 +239,60 @@ func (c *cursor) Close() error {
}

// ReadDocument reads the next document from the cursor.
// The document data is stored into result, the document meta data is returned.
// The document data is stored into the result, the document metadata is returned.
// If the cursor has no more documents, a NoMoreDocuments error is returned.
func (c *cursor) ReadDocument(ctx context.Context, result interface{}) (DocumentMeta, error) {
return c.readDocument(ctx, result, "")
}

// RetryReadDocument reads the last document from the cursor once more time
// It can be used e.g., in case of network error during ReadDocument
// It requires 'driver.WithQueryAllowRetry' to be set to true on the Context during Cursor creation.
func (c *cursor) RetryReadDocument(ctx context.Context, result interface{}) (DocumentMeta, error) {
if c.resultIndex > 0 {
c.resultIndex--
}
return c.readDocument(ctx, result, c.retryData.currentBatchID)
}

func (c *cursor) readDocument(ctx context.Context, result interface{}, retryBatchID string) (DocumentMeta, error) {
// Force use of initial endpoint
ctx = WithEndpoint(ctx, c.endpoint)

if c.resultIndex >= len(c.Result) && c.cursorData.HasMore {
// This is required since we are interested if this was a dirty read
if c.resultIndex >= len(c.Result) && (c.cursorData.HasMore || retryBatchID != "") {
// This is required since we are interested if this was a dirty read,
// but we do not want to trash the users bool reference.
var wasDirtyRead bool
fetchctx := ctx
fetchCtx := ctx
if c.allowDirtyReads {
fetchctx = WithAllowDirtyReads(ctx, &wasDirtyRead)
fetchCtx = WithAllowDirtyReads(ctx, &wasDirtyRead)
}

p := path.Join(c.relPath(), c.cursorData.ID)

// If we have a NextBatchID, use it
if c.NextBatchID != "" {
p = path.Join(c.relPath(), c.cursorData.ID, c.NextBatchID)
}

// Fetch next batch
req, err := c.conn.NewRequest("PUT", path.Join(c.relPath(), c.cursorData.ID))
// We have to retry the batch instead of fetching the next one
if retryBatchID != "" {
p = path.Join(c.relPath(), c.retryData.cursorID, retryBatchID)
}

// Update currentBatchID before fetching the next batch (no retry case)
if c.NextBatchID != "" && retryBatchID == "" {
c.retryData.currentBatchID = c.NextBatchID
}

// Fetch the next batch
req, err := c.conn.NewRequest("POST", p)
if err != nil {
return DocumentMeta{}, WithStack(err)
}
cs := applyContextSettings(fetchctx, req)
resp, err := c.conn.Do(fetchctx, req)

cs := applyContextSettings(fetchCtx, req)
resp, err := c.conn.Do(fetchCtx, req)
if err != nil {
return DocumentMeta{}, WithStack(err)
}
Expand Down Expand Up @@ -295,7 +342,7 @@ func (c *cursor) ReadDocument(ctx context.Context, result interface{}) (Document
return meta, nil
}

// Return execution statistics for this cursor. This might not
// Statistics Return execution statistics for this cursor. This might not
// be valid if the cursor has been created with a context that was
// prepared with `WithStream`
func (c *cursor) Statistics() QueryStatistics {
Expand All @@ -306,40 +353,40 @@ func (c *cursor) Extra() QueryExtra {
return c.cursorData.Extra
}

// the total number of data-modification operations successfully executed.
// WritesExecuted the total number of data-modification operations successfully executed.
func (cs cursorStats) WritesExecuted() int64 {
return cs.WritesExecutedInt
}

// The total number of data-modification operations that were unsuccessful
// WritesIgnored The total number of data-modification operations that were unsuccessful
func (cs cursorStats) WritesIgnored() int64 {
return cs.WritesIgnoredInt
}

// The total number of documents iterated over when scanning a collection without an index.
// ScannedFull The total number of documents iterated over when scanning a collection without an index.
func (cs cursorStats) ScannedFull() int64 {
return cs.ScannedFullInt
}

// The total number of documents iterated over when scanning a collection using an index.
// ScannedIndex The total number of documents iterated over when scanning a collection using an index.
func (cs cursorStats) ScannedIndex() int64 {
return cs.ScannedIndexInt
}

// the total number of documents that were removed after executing a filter condition in a FilterNode
// Filtered the total number of documents that were removed after executing a filter condition in a FilterNode
func (cs cursorStats) Filtered() int64 {
return cs.FilteredInt
}

// Returns the numer of results before the last LIMIT in the query was applied.
// FullCount Returns the number of results before the last LIMIT in the query was applied.
// A valid return value is only available when the has been created with a context that was
// prepared with `WithFullCount`. Additionally this will also not return a valid value if
// prepared with `WithFullCount`. Additionally, this will also not return a valid value if
// the context was prepared with `WithStream`.
func (cs cursorStats) FullCount() int64 {
return cs.FullCountInt
}

// query execution time (wall-clock time). value will be set from the outside
// ExecutionTime query execution time (wall-clock time). value will be set from the outside
func (cs cursorStats) ExecutionTime() time.Duration {
return time.Duration(cs.ExecutionTimeInt * float64(time.Second))
}
Loading

0 comments on commit 0b8aa69

Please sign in to comment.