1.. currentmodule:: motor.motor_tornado
2
3Bulk Write Operations
4=====================
5
6.. testsetup::
7
8  client = MotorClient()
9  db = client.test_database
10  IOLoop.current().run_sync(db.test.drop)
11
12This tutorial explains how to take advantage of Motor's bulk
13write operation features. Executing write operations in batches
14reduces the number of network round trips, increasing write
15throughput.
16
17This example describes using Motor with Tornado. Beginning in
18version 0.5 Motor can also integrate with asyncio instead of Tornado.
19
20Bulk Insert
21-----------
22
23A batch of documents can be inserted by passing a list or generator
24to the :meth:`~MotorCollection.insert_many` method. Motor
25will automatically split the batch into smaller sub-batches based on
26the maximum message size accepted by MongoDB, supporting very large
27bulk insert operations.
28
29.. doctest::
30
31  >>> @gen.coroutine
32  ... def f():
33  ...     yield db.test.insert_many(({'i': i} for i in range(10000)))
34  ...     count = yield db.test.count()
35  ...     print("Final count: %d" % count)
36  >>>
37  >>> IOLoop.current().run_sync(f)
38  Final count: 10000
39
40Mixed Bulk Write Operations
41---------------------------
42
43.. versionadded:: 0.2
44
45Motor also supports executing mixed bulk write operations. A batch
46of insert, update, and delete operations can be executed together using
47the Bulk API.
48
49.. _ordered_bulk:
50
51Ordered Bulk Write Operations
52.............................
53
54Ordered bulk write operations are batched and sent to the server in the
55order provided for serial execution. The return value is a document
56describing the type and count of operations performed.
57
58.. doctest::
59
60  >>> from pprint import pprint
61  >>>
62  >>> @gen.coroutine
63  ... def f():
64  ...    bulk = db.test.initialize_ordered_bulk_op()
65  ...    # Remove all documents from the previous example.
66  ...    bulk.find({}).remove()
67  ...    bulk.insert({'_id': 1})
68  ...    bulk.insert({'_id': 2})
69  ...    bulk.insert({'_id': 3})
70  ...    bulk.find({'_id': 1}).update({'$set': {'foo': 'bar'}})
71  ...    bulk.find({'_id': 4}).upsert().update({'$inc': {'j': 1}})
72  ...    bulk.find({'j': 1}).replace_one({'j': 2})
73  ...    result = yield bulk.execute()
74  ...    pprint(result)
75  ...
76  >>> IOLoop.current().run_sync(f)
77  {'nInserted': 3,
78   'nMatched': 2,
79   'nModified': 2,
80   'nRemoved': 10000,
81   'nUpserted': 1,
82   'upserted': [{'_id': 4, 'index': 5}],
83   'writeConcernErrors': [],
84   'writeErrors': []}
85
86The first write failure that occurs (e.g. duplicate key error) aborts the
87remaining operations, and Motor raises :class:`~pymongo.errors.BulkWriteError`.
88The :attr:`details` attibute of the exception instance provides the execution
89results up until the failure occurred and details about the failure - including
90the operation that caused the failure.
91
92.. doctest::
93
94  >>> from pymongo.errors import BulkWriteError
95  >>>
96  >>> @gen.coroutine
97  ... def f():
98  ...     bulk = db.test.initialize_ordered_bulk_op()
99  ...     bulk.find({'j': 2}).replace_one({'i': 5})
100  ...     # Violates the unique key constraint on _id.
101  ...
102  ...     bulk.insert({'_id': 4})
103  ...     bulk.find({'i': 5}).remove_one()
104  ...     try:
105  ...         yield bulk.execute()
106  ...     except BulkWriteError as err:
107  ...         pprint(err.details)
108  ...
109  >>> IOLoop.current().run_sync(f)
110  {'nInserted': 0,
111   'nMatched': 1,
112   'nModified': 1,
113   'nRemoved': 0,
114   'nUpserted': 0,
115   'upserted': [],
116   'writeConcernErrors': [],
117   'writeErrors': [{'code': 11000,
118                    'errmsg': '... duplicate key error ...',
119                    'index': 1,
120                    'op': {'_id': 4}}]}
121
122.. _unordered_bulk:
123
124Unordered Bulk Write Operations
125...............................
126
127Unordered bulk write operations are batched and sent to the server in
128**arbitrary order** where they may be executed in parallel. Any errors
129that occur are reported after all operations are attempted.
130
131In the next example the first and third operations fail due to the unique
132constraint on _id. Since we are doing unordered execution the second
133and fourth operations succeed.
134
135.. doctest::
136
137  >>> @gen.coroutine
138  ... def f():
139  ...     bulk = db.test.initialize_unordered_bulk_op()
140  ...     bulk.insert({'_id': 1})
141  ...     bulk.find({'_id': 2}).remove_one()
142  ...     bulk.insert({'_id': 3})
143  ...     bulk.find({'_id': 4}).replace_one({'i': 1})
144  ...     try:
145  ...         yield bulk.execute()
146  ...     except BulkWriteError as err:
147  ...         pprint(err.details)
148  ...
149  >>> IOLoop.current().run_sync(f)
150  {'nInserted': 0,
151   'nMatched': 1,
152   'nModified': 1,
153   'nRemoved': 1,
154   'nUpserted': 0,
155   'upserted': [],
156   'writeConcernErrors': [],
157   'writeErrors': [{'code': 11000,
158                    'errmsg': '... duplicate key error ...',
159                    'index': 0,
160                    'op': {'_id': 1}},
161                   {'code': 11000,
162                    'errmsg': '... duplicate key error ...',
163                    'index': 2,
164                    'op': {'_id': 3}}]}
165
166Write Concern
167.............
168
169By default bulk operations are executed with the
170:meth:`~MotorCollection.write_concern` of the collection they are
171executed against, typically the default write concern ``{w: 1}``. A custom
172write concern can be passed to the
173:meth:`~MotorBulkOperationBuilder.execute` method. Write concern
174errors (e.g. wtimeout) will be reported after all operations are attempted,
175regardless of execution order.
176
177.. doctest::
178  :options: +SKIP
179
180  .. Standalone MongoDB raises "can't use w>1" with this example, so skip it.
181
182  >>> @gen.coroutine
183  ... def f():
184  ...     bulk = db.test.initialize_ordered_bulk_op()
185  ...     bulk.insert({'a': 0})
186  ...     bulk.insert({'a': 1})
187  ...     bulk.insert({'a': 2})
188  ...     bulk.insert({'a': 3})
189  ...     try:
190  ...         # Times out if the replica set has fewer than four members.
191  ...         yield bulk.execute({'w': 4, 'wtimeout': 1})
192  ...     except BulkWriteError as err:
193  ...         pprint(err.details)
194  ...
195  >>> IOLoop.current().run_sync(f)
196  {'nInserted': 4,
197   'nMatched': 0,
198   'nModified': 0,
199   'nRemoved': 0,
200   'nUpserted': 0,
201   'upserted': [],
202   'writeConcernErrors': [{'code': 64,
203                           'errInfo': {'wtimeout': True},
204                           'errmsg': 'waiting for replication timed out'}],
205   'writeErrors': []}
206