1.. currentmodule:: motor.motor_tornado 2 3Bulk Write Operations 4===================== 5 6.. testsetup:: 7 8 client = MotorClient() 9 db = client.test_database 10 IOLoop.current().run_sync(db.test.drop) 11 12This tutorial explains how to take advantage of Motor's bulk 13write operation features. Executing write operations in batches 14reduces the number of network round trips, increasing write 15throughput. 16 17This example describes using Motor with Tornado. Beginning in 18version 0.5 Motor can also integrate with asyncio instead of Tornado. 19 20Bulk Insert 21----------- 22 23A batch of documents can be inserted by passing a list or generator 24to the :meth:`~MotorCollection.insert_many` method. Motor 25will automatically split the batch into smaller sub-batches based on 26the maximum message size accepted by MongoDB, supporting very large 27bulk insert operations. 28 29.. doctest:: 30 31 >>> @gen.coroutine 32 ... def f(): 33 ... yield db.test.insert_many(({'i': i} for i in range(10000))) 34 ... count = yield db.test.count() 35 ... print("Final count: %d" % count) 36 >>> 37 >>> IOLoop.current().run_sync(f) 38 Final count: 10000 39 40Mixed Bulk Write Operations 41--------------------------- 42 43.. versionadded:: 0.2 44 45Motor also supports executing mixed bulk write operations. A batch 46of insert, update, and delete operations can be executed together using 47the Bulk API. 48 49.. _ordered_bulk: 50 51Ordered Bulk Write Operations 52............................. 53 54Ordered bulk write operations are batched and sent to the server in the 55order provided for serial execution. The return value is a document 56describing the type and count of operations performed. 57 58.. doctest:: 59 60 >>> from pprint import pprint 61 >>> 62 >>> @gen.coroutine 63 ... def f(): 64 ... bulk = db.test.initialize_ordered_bulk_op() 65 ... # Remove all documents from the previous example. 66 ... bulk.find({}).remove() 67 ... bulk.insert({'_id': 1}) 68 ... bulk.insert({'_id': 2}) 69 ... bulk.insert({'_id': 3}) 70 ... bulk.find({'_id': 1}).update({'$set': {'foo': 'bar'}}) 71 ... bulk.find({'_id': 4}).upsert().update({'$inc': {'j': 1}}) 72 ... bulk.find({'j': 1}).replace_one({'j': 2}) 73 ... result = yield bulk.execute() 74 ... pprint(result) 75 ... 76 >>> IOLoop.current().run_sync(f) 77 {'nInserted': 3, 78 'nMatched': 2, 79 'nModified': 2, 80 'nRemoved': 10000, 81 'nUpserted': 1, 82 'upserted': [{'_id': 4, 'index': 5}], 83 'writeConcernErrors': [], 84 'writeErrors': []} 85 86The first write failure that occurs (e.g. duplicate key error) aborts the 87remaining operations, and Motor raises :class:`~pymongo.errors.BulkWriteError`. 88The :attr:`details` attibute of the exception instance provides the execution 89results up until the failure occurred and details about the failure - including 90the operation that caused the failure. 91 92.. doctest:: 93 94 >>> from pymongo.errors import BulkWriteError 95 >>> 96 >>> @gen.coroutine 97 ... def f(): 98 ... bulk = db.test.initialize_ordered_bulk_op() 99 ... bulk.find({'j': 2}).replace_one({'i': 5}) 100 ... # Violates the unique key constraint on _id. 101 ... 102 ... bulk.insert({'_id': 4}) 103 ... bulk.find({'i': 5}).remove_one() 104 ... try: 105 ... yield bulk.execute() 106 ... except BulkWriteError as err: 107 ... pprint(err.details) 108 ... 109 >>> IOLoop.current().run_sync(f) 110 {'nInserted': 0, 111 'nMatched': 1, 112 'nModified': 1, 113 'nRemoved': 0, 114 'nUpserted': 0, 115 'upserted': [], 116 'writeConcernErrors': [], 117 'writeErrors': [{'code': 11000, 118 'errmsg': '... duplicate key error ...', 119 'index': 1, 120 'op': {'_id': 4}}]} 121 122.. _unordered_bulk: 123 124Unordered Bulk Write Operations 125............................... 126 127Unordered bulk write operations are batched and sent to the server in 128**arbitrary order** where they may be executed in parallel. Any errors 129that occur are reported after all operations are attempted. 130 131In the next example the first and third operations fail due to the unique 132constraint on _id. Since we are doing unordered execution the second 133and fourth operations succeed. 134 135.. doctest:: 136 137 >>> @gen.coroutine 138 ... def f(): 139 ... bulk = db.test.initialize_unordered_bulk_op() 140 ... bulk.insert({'_id': 1}) 141 ... bulk.find({'_id': 2}).remove_one() 142 ... bulk.insert({'_id': 3}) 143 ... bulk.find({'_id': 4}).replace_one({'i': 1}) 144 ... try: 145 ... yield bulk.execute() 146 ... except BulkWriteError as err: 147 ... pprint(err.details) 148 ... 149 >>> IOLoop.current().run_sync(f) 150 {'nInserted': 0, 151 'nMatched': 1, 152 'nModified': 1, 153 'nRemoved': 1, 154 'nUpserted': 0, 155 'upserted': [], 156 'writeConcernErrors': [], 157 'writeErrors': [{'code': 11000, 158 'errmsg': '... duplicate key error ...', 159 'index': 0, 160 'op': {'_id': 1}}, 161 {'code': 11000, 162 'errmsg': '... duplicate key error ...', 163 'index': 2, 164 'op': {'_id': 3}}]} 165 166Write Concern 167............. 168 169By default bulk operations are executed with the 170:meth:`~MotorCollection.write_concern` of the collection they are 171executed against, typically the default write concern ``{w: 1}``. A custom 172write concern can be passed to the 173:meth:`~MotorBulkOperationBuilder.execute` method. Write concern 174errors (e.g. wtimeout) will be reported after all operations are attempted, 175regardless of execution order. 176 177.. doctest:: 178 :options: +SKIP 179 180 .. Standalone MongoDB raises "can't use w>1" with this example, so skip it. 181 182 >>> @gen.coroutine 183 ... def f(): 184 ... bulk = db.test.initialize_ordered_bulk_op() 185 ... bulk.insert({'a': 0}) 186 ... bulk.insert({'a': 1}) 187 ... bulk.insert({'a': 2}) 188 ... bulk.insert({'a': 3}) 189 ... try: 190 ... # Times out if the replica set has fewer than four members. 191 ... yield bulk.execute({'w': 4, 'wtimeout': 1}) 192 ... except BulkWriteError as err: 193 ... pprint(err.details) 194 ... 195 >>> IOLoop.current().run_sync(f) 196 {'nInserted': 4, 197 'nMatched': 0, 198 'nModified': 0, 199 'nRemoved': 0, 200 'nUpserted': 0, 201 'upserted': [], 202 'writeConcernErrors': [{'code': 64, 203 'errInfo': {'wtimeout': True}, 204 'errmsg': 'waiting for replication timed out'}], 205 'writeErrors': []} 206