Skip to content

Commit

Permalink
Merge pull request #17 from idealo/limiting_updates
Browse files Browse the repository at this point in the history
Limiting updates
  • Loading branch information
manuelkasiske4idealo authored Nov 20, 2024
2 parents 646b78f + e72c251 commit 6e64625
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 46 deletions.
90 changes: 71 additions & 19 deletions collection_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type CollectionAPI interface {
UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error)
DeleteOne(ctx context.Context, filter interface{}) (*mongo.DeleteResult, error)
CountDocuments(ctx context.Context, filter interface{}) (int64, error)
Aggregate(ctx context.Context, pipeline interface{}, opts ...*options.AggregateOptions) (*mongo.Cursor, error)
Drop(ctx context.Context) error
Find(ctx context.Context, filter interface{}, opts ...*options.FindOptions) (*mongo.Cursor, error)
}
Expand Down Expand Up @@ -49,31 +50,82 @@ func (c *MongoDBCollection) Find(ctx context.Context, filter interface{}, opts .
return c.Collection.Find(ctx, filter, opts...)
}

func fetchDocumentIDs(collection CollectionAPI) ([]primitive.ObjectID, error) {
func (c *MongoDBCollection) Aggregate(ctx context.Context, pipeline interface{}, opts ...*options.AggregateOptions) (*mongo.Cursor, error) {
return c.Collection.Aggregate(ctx, pipeline, opts...)
}

func fetchDocumentIDs(collection CollectionAPI, limit int64, testType string) ([]primitive.ObjectID, error) {
var docIDs []primitive.ObjectID
var cursor *mongo.Cursor
var err error

cursor, err := collection.Find(context.Background(), bson.M{}, options.Find().SetProjection(bson.M{"_id": 1}))
if err != nil {
return nil, fmt.Errorf("failed to fetch document IDs: %v", err)
}
defer cursor.Close(context.Background())
switch testType {
case "insert", "upsert", "delete":
if limit > 0 {
cursor, err = collection.Find(context.Background(), bson.M{}, options.Find().SetProjection(bson.M{"_id": 1}).SetLimit(limit))
} else {
cursor, err = collection.Find(context.Background(), bson.M{}, options.Find().SetProjection(bson.M{"_id": 1}))
}
if err != nil {
return nil, fmt.Errorf("failed to fetch document IDs: %v", err)
}
defer cursor.Close(context.Background())

for cursor.Next(context.Background()) {
var result bson.M
if err := cursor.Decode(&result); err != nil {
log.Printf("Failed to decode document: %v", err)
continue
for cursor.Next(context.Background()) {
var result bson.M
if err := cursor.Decode(&result); err != nil {
log.Printf("Failed to decode document: %v", err)
continue
}
// Check if `_id` is of type `ObjectId` and add to `docIDs`
if id, ok := result["_id"].(primitive.ObjectID); ok {
docIDs = append(docIDs, id)
} else {
log.Printf("Skipping document with unsupported _id type: %T", result["_id"])
}
}
// Check if `_id` is of type `ObjectId` and add to `docIDs`
if id, ok := result["_id"].(primitive.ObjectID); ok {
docIDs = append(docIDs, id)
} else {
log.Printf("Skipping document with unsupported _id type: %T", result["_id"])

if err := cursor.Err(); err != nil {
return nil, fmt.Errorf("cursor error: %v", err)
}
}
case "update":
if limit > 0 {
pipeline := []bson.M{{"$sample": bson.M{"size": limit}}}
cursor, err = collection.Aggregate(context.Background(), pipeline)

defer cursor.Close(context.Background())

for cursor.Next(context.Background()) {
var result bson.M
if err := cursor.Decode(&result); err != nil {
log.Printf("Failed to decode document: %v", err)
continue
}
// Check if `_id` is of type `ObjectId` and add to `docIDs`
if id, ok := result["_id"].(primitive.ObjectID); ok {
docIDs = append(docIDs, id)
} else {
log.Printf("Skipping document with unsupported _id type: %T", result["_id"])
}
}

if err := cursor.Err(); err != nil {
return nil, fmt.Errorf("cursor error: %v", err)
} else {
defer cursor.Close(context.Background())

for cursor.Next(context.Background()) {
var result bson.M
if err := cursor.Decode(&result); err != nil {
log.Printf("Failed to decode document: %v", err)
continue
}
// Check if `_id` is of type `ObjectId` and add to `docIDs`
if id, ok := result["_id"].(primitive.ObjectID); ok {
docIDs = append(docIDs, id)
} else {
log.Printf("Skipping document with unsupported _id type: %T", result["_id"])
}
}
}
}

return docIDs, nil
Expand Down
31 changes: 20 additions & 11 deletions docs_testing_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (t DocCountTestingStrategy) runTestSequence(collection CollectionAPI, confi
}
}

func (t DocCountTestingStrategy) runTest(collection CollectionAPI, testType string, config TestingConfig, fetchDocIDs func(CollectionAPI) ([]primitive.ObjectID, error)) {
func (t DocCountTestingStrategy) runTest(collection CollectionAPI, testType string, config TestingConfig, fetchDocIDs func(CollectionAPI, int64, string) ([]primitive.ObjectID, error)) {
if testType == "insert" || testType == "upsert" {
if config.DropDb {
if err := collection.Drop(context.Background()); err != nil {
Expand All @@ -38,10 +38,6 @@ func (t DocCountTestingStrategy) runTest(collection CollectionAPI, testType stri
log.Printf("Starting %s test...\n", testType)
}

insertRate := metrics.NewMeter()
var records [][]string
records = append(records, []string{"t", "count", "mean", "m1_rate", "m5_rate", "m15_rate", "mean_rate"})

var partitions [][]primitive.ObjectID

var threads = config.Threads
Expand All @@ -51,7 +47,7 @@ func (t DocCountTestingStrategy) runTest(collection CollectionAPI, testType stri
switch testType {
case "delete":
// Fetch document IDs as ObjectId and partition them
docIDs, err := fetchDocIDs(collection)
docIDs, err := fetchDocIDs(collection, int64(config.DocCount), testType)
if err != nil {
log.Fatalf("Failed to fetch document IDs: %v", err)
}
Expand All @@ -67,7 +63,7 @@ func (t DocCountTestingStrategy) runTest(collection CollectionAPI, testType stri
}

case "update":
docIDs, err := fetchDocIDs(collection)
docIDs, err := fetchDocIDs(collection, int64(config.DocCount), testType)
if err != nil {
log.Fatalf("Failed to fetch document IDs: %v", err)
}
Expand All @@ -80,6 +76,16 @@ func (t DocCountTestingStrategy) runTest(collection CollectionAPI, testType stri
}

// Start the ticker just before starting the main workload goroutines
insertRate := metrics.NewMeter()
var records [][]string
records = append(records, []string{"t", "count", "mean", "m1_rate", "m5_rate", "m15_rate", "mean_rate"})

var doc interface{}
var data = make([]byte, 1024*2)
for i := 0; i < len(data); i++ {
data[i] = byte(rand.Intn(256))
}

secondTicker := time.NewTicker(1 * time.Second)
defer secondTicker.Stop()
go func() {
Expand Down Expand Up @@ -116,17 +122,20 @@ func (t DocCountTestingStrategy) runTest(collection CollectionAPI, testType stri
for _, docID := range partition {
switch testType {
case "insert":
// Let MongoDB generate the _id automatically
doc := bson.M{"threadRunCount": i, "rnd": rand.Int63(), "v": 1}
if config.LargeDocs {
doc = bson.M{"threadRunCount": i, "rnd": rand.Int63(), "v": 1, "data": data}
} else {
doc = bson.M{"threadRunCount": i, "rnd": rand.Int63(), "v": 1}
}
_, err := collection.InsertOne(context.Background(), doc)
if err == nil {
insertRate.Mark(1)
} else {
log.Printf("Insert failed: %v", err)
}

case "update":
filter := bson.M{"_id": docID}
randomDocID := partition[rand.Intn(len(partition))]
filter := bson.M{"_id": randomDocID}
update := bson.M{"$set": bson.M{"updatedAt": time.Now().Unix(), "rnd": rand.Int63()}}
_, err := collection.UpdateOne(context.Background(), filter, update)
if err == nil {
Expand Down
16 changes: 5 additions & 11 deletions duration_testing_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,7 @@ func (t DurationTestingStrategy) runTestSequence(collection CollectionAPI, confi
}
}

func (t DurationTestingStrategy) runTest(collection CollectionAPI, testType string, config TestingConfig, fetchDocIDs func(CollectionAPI) ([]primitive.ObjectID, error)) {
// Set up the timer for the duration of the test
endTime := time.Now().Add(time.Duration(config.Duration) * time.Second)

// Set up the ticker to record metrics every second
insertRate := metrics.NewMeter()
records := [][]string{
{"timestamp", "count", "mean_rate", "m1_rate", "m5_rate", "m15_rate"},
}

func (t DurationTestingStrategy) runTest(collection CollectionAPI, testType string, config TestingConfig, fetchDocIDs func(CollectionAPI, int64, string) ([]primitive.ObjectID, error)) {
var partitions [][]primitive.ObjectID
if testType == "insert" {
if config.DropDb {
Expand All @@ -45,7 +36,7 @@ func (t DurationTestingStrategy) runTest(collection CollectionAPI, testType stri
log.Println("Collection stays. Dropping disabled.")
}
} else if testType == "update" {
docIDs, err := fetchDocIDs(collection)
docIDs, err := fetchDocIDs(collection, int64(config.DocCount), testType)
if err != nil {
log.Fatalf("Failed to fetch document IDs: %v", err)
}
Expand All @@ -67,6 +58,9 @@ func (t DurationTestingStrategy) runTest(collection CollectionAPI, testType stri
data[i] = byte(rand.Intn(256))
}

endTime := time.Now().Add(time.Duration(config.Duration) * time.Second)
insertRate := metrics.NewMeter()
records := [][]string{{"timestamp", "count", "mean_rate", "m1_rate", "m5_rate", "m15_rate"}}
secondTicker := time.NewTicker(1 * time.Second)
defer secondTicker.Stop()
go func() {
Expand Down
8 changes: 5 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func main() {
var strategy TestingStrategy
var config TestingConfig

client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(uri))
client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(uri).SetMaxPoolSize(uint64(threads)))
if err != nil {
log.Fatalf("Failed to connect to MongoDB: %v", err)
}
Expand All @@ -58,8 +58,10 @@ func main() {
} else {
strategy = DocCountTestingStrategy{}
config = TestingConfig{
Threads: threads,
DocCount: docCount,
Threads: threads,
DocCount: docCount,
LargeDocs: largeDocs,
DropDb: dropDb,
}
if runAll {
strategy.runTestSequence(mongoCollection, config)
Expand Down
7 changes: 6 additions & 1 deletion mongo_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,13 @@ func (m *MockCollection) Find(ctx context.Context, filter interface{}, opts ...*
return args.Get(0).(*mongo.Cursor), args.Error(1)
}

func (m *MockCollection) Aggregate(ctx context.Context, pipeline interface{}, opts ...*options.AggregateOptions) (*mongo.Cursor, error) {
args := m.Called(ctx, pipeline, opts)
return args.Get(0).(*mongo.Cursor), args.Error(1)
}

// fetchDocumentIDsMock returns a slice of mock ObjectIDs for testing
func fetchDocumentIDsMock(_ CollectionAPI) ([]primitive.ObjectID, error) {
func fetchDocumentIDsMock(_ CollectionAPI, _ int64, _ string) ([]primitive.ObjectID, error) {
return []primitive.ObjectID{
primitive.NewObjectID(),
primitive.NewObjectID(),
Expand Down
2 changes: 1 addition & 1 deletion strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ type TestingConfig struct {

type TestingStrategy interface {
runTestSequence(collection CollectionAPI, config TestingConfig)
runTest(collection CollectionAPI, testType string, config TestingConfig, fetchDocIDs func(CollectionAPI) ([]primitive.ObjectID, error))
runTest(collection CollectionAPI, testType string, config TestingConfig, fetchDocIDs func(CollectionAPI, int64, string) ([]primitive.ObjectID, error))
}

0 comments on commit 6e64625

Please sign in to comment.