From 23d2d6a798007782835eaaac792a8671ca644478 Mon Sep 17 00:00:00 2001 From: Kasiske Date: Thu, 14 Nov 2024 15:50:00 +0100 Subject: [PATCH 1/6] feat: limiting the number of documents to update --- collection_api.go | 90 ++++++++++++++++++++++++++++-------- docs_testing_strategy.go | 9 ++-- duration_testing_strategy.go | 4 +- main.go | 3 ++ mongo_bench_test.go | 7 ++- strategy.go | 3 +- 6 files changed, 89 insertions(+), 27 deletions(-) diff --git a/collection_api.go b/collection_api.go index 171fd80..9e50b21 100644 --- a/collection_api.go +++ b/collection_api.go @@ -16,6 +16,7 @@ type CollectionAPI interface { UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) DeleteOne(ctx context.Context, filter interface{}) (*mongo.DeleteResult, error) CountDocuments(ctx context.Context, filter interface{}) (int64, error) + Aggregate(ctx context.Context, pipeline interface{}, opts ...*options.AggregateOptions) (*mongo.Cursor, error) Drop(ctx context.Context) error Find(ctx context.Context, filter interface{}, opts ...*options.FindOptions) (*mongo.Cursor, error) } @@ -49,31 +50,82 @@ func (c *MongoDBCollection) Find(ctx context.Context, filter interface{}, opts . return c.Collection.Find(ctx, filter, opts...) } -func fetchDocumentIDs(collection CollectionAPI) ([]primitive.ObjectID, error) { +func (c *MongoDBCollection) Aggregate(ctx context.Context, pipeline interface{}, opts ...*options.AggregateOptions) (*mongo.Cursor, error) { + return c.Collection.Aggregate(ctx, pipeline, opts...) +} + +func fetchDocumentIDs(collection CollectionAPI, limit int64, testType string) ([]primitive.ObjectID, error) { var docIDs []primitive.ObjectID + var cursor *mongo.Cursor + var err error - cursor, err := collection.Find(context.Background(), bson.M{}, options.Find().SetProjection(bson.M{"_id": 1})) - if err != nil { - return nil, fmt.Errorf("failed to fetch document IDs: %v", err) - } - defer cursor.Close(context.Background()) + switch testType { + case "insert", "upsert", "delete": + if limit > 0 { + cursor, err = collection.Find(context.Background(), bson.M{}, options.Find().SetProjection(bson.M{"_id": 1}).SetLimit(limit)) + } else { + cursor, err = collection.Find(context.Background(), bson.M{}, options.Find().SetProjection(bson.M{"_id": 1})) + } + if err != nil { + return nil, fmt.Errorf("failed to fetch document IDs: %v", err) + } + defer cursor.Close(context.Background()) - for cursor.Next(context.Background()) { - var result bson.M - if err := cursor.Decode(&result); err != nil { - log.Printf("Failed to decode document: %v", err) - continue + for cursor.Next(context.Background()) { + var result bson.M + if err := cursor.Decode(&result); err != nil { + log.Printf("Failed to decode document: %v", err) + continue + } + // Check if `_id` is of type `ObjectId` and add to `docIDs` + if id, ok := result["_id"].(primitive.ObjectID); ok { + docIDs = append(docIDs, id) + } else { + log.Printf("Skipping document with unsupported _id type: %T", result["_id"]) + } } - // Check if `_id` is of type `ObjectId` and add to `docIDs` - if id, ok := result["_id"].(primitive.ObjectID); ok { - docIDs = append(docIDs, id) - } else { - log.Printf("Skipping document with unsupported _id type: %T", result["_id"]) + + if err := cursor.Err(); err != nil { + return nil, fmt.Errorf("cursor error: %v", err) } - } + case "update": + if limit > 0 { + pipeline := []bson.M{{"$sample": bson.M{"size": limit}}} + cursor, err = collection.Aggregate(context.Background(), pipeline) + + defer cursor.Close(context.Background()) + + for cursor.Next(context.Background()) { + var result bson.M + if err := cursor.Decode(&result); err != nil { + log.Printf("Failed to decode document: %v", err) + continue + } + // Check if `_id` is of type `ObjectId` and add to `docIDs` + if id, ok := result["_id"].(primitive.ObjectID); ok { + docIDs = append(docIDs, id) + } else { + log.Printf("Skipping document with unsupported _id type: %T", result["_id"]) + } + } - if err := cursor.Err(); err != nil { - return nil, fmt.Errorf("cursor error: %v", err) + } else { + defer cursor.Close(context.Background()) + + for cursor.Next(context.Background()) { + var result bson.M + if err := cursor.Decode(&result); err != nil { + log.Printf("Failed to decode document: %v", err) + continue + } + // Check if `_id` is of type `ObjectId` and add to `docIDs` + if id, ok := result["_id"].(primitive.ObjectID); ok { + docIDs = append(docIDs, id) + } else { + log.Printf("Skipping document with unsupported _id type: %T", result["_id"]) + } + } + } } return docIDs, nil diff --git a/docs_testing_strategy.go b/docs_testing_strategy.go index 84e6e94..e830422 100644 --- a/docs_testing_strategy.go +++ b/docs_testing_strategy.go @@ -24,7 +24,7 @@ func (t DocCountTestingStrategy) runTestSequence(collection CollectionAPI, confi } } -func (t DocCountTestingStrategy) runTest(collection CollectionAPI, testType string, config TestingConfig, fetchDocIDs func(CollectionAPI) ([]primitive.ObjectID, error)) { +func (t DocCountTestingStrategy) runTest(collection CollectionAPI, testType string, config TestingConfig, fetchDocIDs func(CollectionAPI, int64, string) ([]primitive.ObjectID, error)) { if testType == "insert" || testType == "upsert" { if config.DropDb { if err := collection.Drop(context.Background()); err != nil { @@ -51,7 +51,7 @@ func (t DocCountTestingStrategy) runTest(collection CollectionAPI, testType stri switch testType { case "delete": // Fetch document IDs as ObjectId and partition them - docIDs, err := fetchDocIDs(collection) + docIDs, err := fetchDocIDs(collection, config.Limit, testType) if err != nil { log.Fatalf("Failed to fetch document IDs: %v", err) } @@ -67,7 +67,7 @@ func (t DocCountTestingStrategy) runTest(collection CollectionAPI, testType stri } case "update": - docIDs, err := fetchDocIDs(collection) + docIDs, err := fetchDocIDs(collection, config.Limit, testType) if err != nil { log.Fatalf("Failed to fetch document IDs: %v", err) } @@ -126,7 +126,8 @@ func (t DocCountTestingStrategy) runTest(collection CollectionAPI, testType stri } case "update": - filter := bson.M{"_id": docID} + randomDocID := partition[rand.Intn(len(partition))] + filter := bson.M{"_id": randomDocID} update := bson.M{"$set": bson.M{"updatedAt": time.Now().Unix(), "rnd": rand.Int63()}} _, err := collection.UpdateOne(context.Background(), filter, update) if err == nil { diff --git a/duration_testing_strategy.go b/duration_testing_strategy.go index e41a5f4..bf9a82d 100644 --- a/duration_testing_strategy.go +++ b/duration_testing_strategy.go @@ -24,7 +24,7 @@ func (t DurationTestingStrategy) runTestSequence(collection CollectionAPI, confi } } -func (t DurationTestingStrategy) runTest(collection CollectionAPI, testType string, config TestingConfig, fetchDocIDs func(CollectionAPI) ([]primitive.ObjectID, error)) { +func (t DurationTestingStrategy) runTest(collection CollectionAPI, testType string, config TestingConfig, fetchDocIDs func(CollectionAPI, int64, string) ([]primitive.ObjectID, error)) { // Set up the timer for the duration of the test endTime := time.Now().Add(time.Duration(config.Duration) * time.Second) @@ -45,7 +45,7 @@ func (t DurationTestingStrategy) runTest(collection CollectionAPI, testType stri log.Println("Collection stays. Dropping disabled.") } } else if testType == "update" { - docIDs, err := fetchDocIDs(collection) + docIDs, err := fetchDocIDs(collection, config.Limit, testType) if err != nil { log.Fatalf("Failed to fetch document IDs: %v", err) } diff --git a/main.go b/main.go index 2a7d9ea..05be7ae 100644 --- a/main.go +++ b/main.go @@ -17,6 +17,7 @@ func main() { var runAll bool var largeDocs bool var dropDb bool + var limit int64 flag.IntVar(&threads, "threads", 10, "Number of threads for inserting, updating, upserting, or deleting documents") flag.IntVar(&docCount, "docs", 1000, "Total number of documents to insert, update, upsert, or delete") @@ -26,6 +27,7 @@ func main() { flag.IntVar(&duration, "duration", 0, "Duration in seconds to run the test") flag.BoolVar(&largeDocs, "largeDocs", false, "Use large documents for testing") flag.BoolVar(&dropDb, "dropDb", true, "Drop the database before running the test") + flag.Int64("limit", 0, "Limit the number of documents to fetch") flag.Parse() var strategy TestingStrategy @@ -48,6 +50,7 @@ func main() { Duration: duration, LargeDocs: largeDocs, DropDb: dropDb, + Limit: limit, } if runAll { diff --git a/mongo_bench_test.go b/mongo_bench_test.go index 92550c0..0b35697 100644 --- a/mongo_bench_test.go +++ b/mongo_bench_test.go @@ -47,8 +47,13 @@ func (m *MockCollection) Find(ctx context.Context, filter interface{}, opts ...* return args.Get(0).(*mongo.Cursor), args.Error(1) } +func (m *MockCollection) Aggregate(ctx context.Context, pipeline interface{}, opts ...*options.AggregateOptions) (*mongo.Cursor, error) { + args := m.Called(ctx, pipeline, opts) + return args.Get(0).(*mongo.Cursor), args.Error(1) +} + // fetchDocumentIDsMock returns a slice of mock ObjectIDs for testing -func fetchDocumentIDsMock(_ CollectionAPI) ([]primitive.ObjectID, error) { +func fetchDocumentIDsMock(_ CollectionAPI, _ int64, _ string) ([]primitive.ObjectID, error) { return []primitive.ObjectID{ primitive.NewObjectID(), primitive.NewObjectID(), diff --git a/strategy.go b/strategy.go index bf5bc96..eb80aa6 100644 --- a/strategy.go +++ b/strategy.go @@ -8,9 +8,10 @@ type TestingConfig struct { Duration int LargeDocs bool DropDb bool + Limit int64 } type TestingStrategy interface { runTestSequence(collection CollectionAPI, config TestingConfig) - runTest(collection CollectionAPI, testType string, config TestingConfig, fetchDocIDs func(CollectionAPI) ([]primitive.ObjectID, error)) + runTest(collection CollectionAPI, testType string, config TestingConfig, fetchDocIDs func(CollectionAPI, int64, string) ([]primitive.ObjectID, error)) } From 7239e190b8bcaadc71ab5d626943b47e4c0a2f75 Mon Sep 17 00:00:00 2001 From: Kasiske Date: Thu, 14 Nov 2024 16:01:30 +0100 Subject: [PATCH 2/6] feat: limiting the number of documents to update --- docs_testing_strategy.go | 4 ++-- duration_testing_strategy.go | 2 +- main.go | 3 --- strategy.go | 1 - 4 files changed, 3 insertions(+), 7 deletions(-) diff --git a/docs_testing_strategy.go b/docs_testing_strategy.go index e830422..e983bac 100644 --- a/docs_testing_strategy.go +++ b/docs_testing_strategy.go @@ -51,7 +51,7 @@ func (t DocCountTestingStrategy) runTest(collection CollectionAPI, testType stri switch testType { case "delete": // Fetch document IDs as ObjectId and partition them - docIDs, err := fetchDocIDs(collection, config.Limit, testType) + docIDs, err := fetchDocIDs(collection, int64(config.DocCount), testType) if err != nil { log.Fatalf("Failed to fetch document IDs: %v", err) } @@ -67,7 +67,7 @@ func (t DocCountTestingStrategy) runTest(collection CollectionAPI, testType stri } case "update": - docIDs, err := fetchDocIDs(collection, config.Limit, testType) + docIDs, err := fetchDocIDs(collection, int64(config.DocCount), testType) if err != nil { log.Fatalf("Failed to fetch document IDs: %v", err) } diff --git a/duration_testing_strategy.go b/duration_testing_strategy.go index bf9a82d..adfd880 100644 --- a/duration_testing_strategy.go +++ b/duration_testing_strategy.go @@ -45,7 +45,7 @@ func (t DurationTestingStrategy) runTest(collection CollectionAPI, testType stri log.Println("Collection stays. Dropping disabled.") } } else if testType == "update" { - docIDs, err := fetchDocIDs(collection, config.Limit, testType) + docIDs, err := fetchDocIDs(collection, int64(config.DocCount), testType) if err != nil { log.Fatalf("Failed to fetch document IDs: %v", err) } diff --git a/main.go b/main.go index 05be7ae..2a7d9ea 100644 --- a/main.go +++ b/main.go @@ -17,7 +17,6 @@ func main() { var runAll bool var largeDocs bool var dropDb bool - var limit int64 flag.IntVar(&threads, "threads", 10, "Number of threads for inserting, updating, upserting, or deleting documents") flag.IntVar(&docCount, "docs", 1000, "Total number of documents to insert, update, upsert, or delete") @@ -27,7 +26,6 @@ func main() { flag.IntVar(&duration, "duration", 0, "Duration in seconds to run the test") flag.BoolVar(&largeDocs, "largeDocs", false, "Use large documents for testing") flag.BoolVar(&dropDb, "dropDb", true, "Drop the database before running the test") - flag.Int64("limit", 0, "Limit the number of documents to fetch") flag.Parse() var strategy TestingStrategy @@ -50,7 +48,6 @@ func main() { Duration: duration, LargeDocs: largeDocs, DropDb: dropDb, - Limit: limit, } if runAll { diff --git a/strategy.go b/strategy.go index eb80aa6..eee2f07 100644 --- a/strategy.go +++ b/strategy.go @@ -8,7 +8,6 @@ type TestingConfig struct { Duration int LargeDocs bool DropDb bool - Limit int64 } type TestingStrategy interface { From 46b154f0faab0b0769c1160b7c4f17c1c3c20ab5 Mon Sep 17 00:00:00 2001 From: Kasiske Date: Thu, 14 Nov 2024 16:11:20 +0100 Subject: [PATCH 3/6] feat: limiting the number of documents to update --- docs_testing_strategy.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs_testing_strategy.go b/docs_testing_strategy.go index e983bac..dce8c28 100644 --- a/docs_testing_strategy.go +++ b/docs_testing_strategy.go @@ -38,10 +38,6 @@ func (t DocCountTestingStrategy) runTest(collection CollectionAPI, testType stri log.Printf("Starting %s test...\n", testType) } - insertRate := metrics.NewMeter() - var records [][]string - records = append(records, []string{"t", "count", "mean", "m1_rate", "m5_rate", "m15_rate", "mean_rate"}) - var partitions [][]primitive.ObjectID var threads = config.Threads @@ -80,6 +76,10 @@ func (t DocCountTestingStrategy) runTest(collection CollectionAPI, testType stri } // Start the ticker just before starting the main workload goroutines + insertRate := metrics.NewMeter() + var records [][]string + records = append(records, []string{"t", "count", "mean", "m1_rate", "m5_rate", "m15_rate", "mean_rate"}) + secondTicker := time.NewTicker(1 * time.Second) defer secondTicker.Stop() go func() { From eadb63b65c5ed324e73e2a52c0e602f49c927c2e Mon Sep 17 00:00:00 2001 From: Kasiske Date: Thu, 14 Nov 2024 16:14:33 +0100 Subject: [PATCH 4/6] feat: limiting the number of documents to update --- duration_testing_strategy.go | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/duration_testing_strategy.go b/duration_testing_strategy.go index adfd880..b1ec088 100644 --- a/duration_testing_strategy.go +++ b/duration_testing_strategy.go @@ -25,15 +25,6 @@ func (t DurationTestingStrategy) runTestSequence(collection CollectionAPI, confi } func (t DurationTestingStrategy) runTest(collection CollectionAPI, testType string, config TestingConfig, fetchDocIDs func(CollectionAPI, int64, string) ([]primitive.ObjectID, error)) { - // Set up the timer for the duration of the test - endTime := time.Now().Add(time.Duration(config.Duration) * time.Second) - - // Set up the ticker to record metrics every second - insertRate := metrics.NewMeter() - records := [][]string{ - {"timestamp", "count", "mean_rate", "m1_rate", "m5_rate", "m15_rate"}, - } - var partitions [][]primitive.ObjectID if testType == "insert" { if config.DropDb { @@ -67,6 +58,9 @@ func (t DurationTestingStrategy) runTest(collection CollectionAPI, testType stri data[i] = byte(rand.Intn(256)) } + endTime := time.Now().Add(time.Duration(config.Duration) * time.Second) + insertRate := metrics.NewMeter() + records := [][]string{{"timestamp", "count", "mean_rate", "m1_rate", "m5_rate", "m15_rate"}} secondTicker := time.NewTicker(1 * time.Second) defer secondTicker.Stop() go func() { From 7166e639431548cd5af1bd3f0fca874fb177379b Mon Sep 17 00:00:00 2001 From: Kasiske Date: Thu, 14 Nov 2024 17:17:34 +0100 Subject: [PATCH 5/6] feat: limiting the number of documents to update --- docs_testing_strategy.go | 14 +++++++++++--- main.go | 6 ++++-- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/docs_testing_strategy.go b/docs_testing_strategy.go index dce8c28..4ca937b 100644 --- a/docs_testing_strategy.go +++ b/docs_testing_strategy.go @@ -80,6 +80,12 @@ func (t DocCountTestingStrategy) runTest(collection CollectionAPI, testType stri var records [][]string records = append(records, []string{"t", "count", "mean", "m1_rate", "m5_rate", "m15_rate", "mean_rate"}) + var doc interface{} + var data = make([]byte, 1024*2) + for i := 0; i < len(data); i++ { + data[i] = byte(rand.Intn(256)) + } + secondTicker := time.NewTicker(1 * time.Second) defer secondTicker.Stop() go func() { @@ -116,15 +122,17 @@ func (t DocCountTestingStrategy) runTest(collection CollectionAPI, testType stri for _, docID := range partition { switch testType { case "insert": - // Let MongoDB generate the _id automatically - doc := bson.M{"threadRunCount": i, "rnd": rand.Int63(), "v": 1} + if config.LargeDocs { + doc = bson.M{"threadRunCount": i, "rnd": rand.Int63(), "v": 1, "data": data} + } else { + doc = bson.M{"threadRunCount": i, "rnd": rand.Int63(), "v": 1} + } _, err := collection.InsertOne(context.Background(), doc) if err == nil { insertRate.Mark(1) } else { log.Printf("Insert failed: %v", err) } - case "update": randomDocID := partition[rand.Intn(len(partition))] filter := bson.M{"_id": randomDocID} diff --git a/main.go b/main.go index 2a7d9ea..c17281c 100644 --- a/main.go +++ b/main.go @@ -58,8 +58,10 @@ func main() { } else { strategy = DocCountTestingStrategy{} config = TestingConfig{ - Threads: threads, - DocCount: docCount, + Threads: threads, + DocCount: docCount, + LargeDocs: largeDocs, + DropDb: dropDb, } if runAll { strategy.runTestSequence(mongoCollection, config) From e72c2518f68e2840aafeaabe40de700d96ad8b00 Mon Sep 17 00:00:00 2001 From: Kasiske Date: Wed, 20 Nov 2024 14:43:06 +0100 Subject: [PATCH 6/6] fix: changed max pool size to the number of threads. --- main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.go b/main.go index c17281c..ef831e8 100644 --- a/main.go +++ b/main.go @@ -31,7 +31,7 @@ func main() { var strategy TestingStrategy var config TestingConfig - client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(uri)) + client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(uri).SetMaxPoolSize(uint64(threads))) if err != nil { log.Fatalf("Failed to connect to MongoDB: %v", err) }