1// Copyright (C) MongoDB, Inc. 2017-present. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may 4// not use this file except in compliance with the License. You may obtain 5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 6 7package integration 8 9import ( 10 "fmt" 11 "sync" 12 13 "go.mongodb.org/mongo-driver/bson" 14 "go.mongodb.org/mongo-driver/internal/testutil/assert" 15 "go.mongodb.org/mongo-driver/mongo/integration/mtest" 16) 17 18// Helper functions for the operations in the unified spec test runner that require creating and synchronizing 19// background goroutines. 20 21// backgroundRoutine represents a background goroutine that can execute operations. The goroutine reads operations from 22// a channel and executes them in order. It exits when an operation with name exitRoutineOperationName is read. If any 23// of the operations error, all future operations passed to the routine are skipped. The first error is reported by the 24// stop() function. 25type backgroundRoutine struct { 26 operations chan *operation 27 mt *mtest.T 28 testCase *testCase 29 wg sync.WaitGroup 30 err error 31} 32 33func newBackgroundRoutine(mt *mtest.T, testCase *testCase) *backgroundRoutine { 34 routine := &backgroundRoutine{ 35 operations: make(chan *operation, 10), 36 mt: mt, 37 testCase: testCase, 38 } 39 40 return routine 41} 42 43func (b *backgroundRoutine) start() { 44 b.wg.Add(1) 45 46 go func() { 47 defer b.wg.Done() 48 49 for op := range b.operations { 50 if b.err != nil { 51 continue 52 } 53 54 if err := runOperation(b.mt, b.testCase, op, nil, nil); err != nil { 55 b.err = fmt.Errorf("error running operation %s: %v", op.Name, err) 56 } 57 } 58 }() 59} 60 61func (b *backgroundRoutine) stop() error { 62 close(b.operations) 63 b.wg.Wait() 64 return b.err 65} 66 67func (b *backgroundRoutine) addOperation(op *operation) bool { 68 select { 69 case b.operations <- op: 70 return true 71 default: 72 return false 73 } 74} 75 76func startThread(mt *mtest.T, testCase *testCase, op *operation) { 77 routine := newBackgroundRoutine(mt, testCase) 78 testCase.routinesMap.Store(getThreadName(op), routine) 79 routine.start() 80} 81 82func runOnThread(mt *mtest.T, testCase *testCase, op *operation) { 83 routineName := getThreadName(op) 84 routineVal, ok := testCase.routinesMap.Load(routineName) 85 assert.True(mt, ok, "no background routine found with name %s", routineName) 86 routine := routineVal.(*backgroundRoutine) 87 88 var routineOperation operation 89 operationDoc := op.Arguments.Lookup("operation") 90 err := bson.UnmarshalWithRegistry(specTestRegistry, operationDoc.Document(), &routineOperation) 91 assert.Nil(mt, err, "error creating operation for runOnThread: %v", err) 92 93 ok = routine.addOperation(&routineOperation) 94 assert.True(mt, ok, "failed to add operation %s to routine %s", routineOperation.Name, routineName) 95} 96 97func waitForThread(mt *mtest.T, testCase *testCase, op *operation) { 98 name := getThreadName(op) 99 routineVal, ok := testCase.routinesMap.Load(name) 100 assert.True(mt, ok, "no background routine found with name %s", name) 101 102 err := routineVal.(*backgroundRoutine).stop() 103 testCase.routinesMap.Delete(name) 104 assert.Nil(mt, err, "error on background routine %s: %v", name, err) 105} 106 107func getThreadName(op *operation) string { 108 return op.Arguments.Lookup("name").StringValue() 109} 110