1// Copyright (C) MongoDB, Inc. 2014-present.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may
4// not use this file except in compliance with the License. You may obtain
5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
7package mongorestore
8
9import (
10	"fmt"
11	"strconv"
12	"strings"
13
14	"github.com/mongodb/mongo-tools-common/db"
15	"github.com/mongodb/mongo-tools-common/intents"
16	"github.com/mongodb/mongo-tools-common/log"
17	"github.com/mongodb/mongo-tools-common/progress"
18	"github.com/mongodb/mongo-tools-common/txn"
19	"github.com/mongodb/mongo-tools-common/util"
20	"go.mongodb.org/mongo-driver/bson"
21	"go.mongodb.org/mongo-driver/bson/primitive"
22	"go.mongodb.org/mongo-driver/mongo"
23)
24
25// oplogMaxCommandSize sets the maximum size for multiple buffered ops in the
26// applyOps command. This is to prevent pathological cases where the array overhead
27// of many small operations can overflow the maximum command size.
28// Note that ops > 8MB will still be buffered, just as single elements.
29const oplogMaxCommandSize = 1024 * 1024 * 8
30
31type oplogContext struct {
32	progressor *progress.CountProgressor
33	session    *mongo.Client
34	totalOps   int
35	txnBuffer  *txn.Buffer
36}
37
38// RestoreOplog attempts to restore a MongoDB oplog.
39func (restore *MongoRestore) RestoreOplog() error {
40	log.Logv(log.Always, "replaying oplog")
41	intent := restore.manager.Oplog()
42	if intent == nil {
43		// this should not be reached
44		log.Logv(log.Always, "no oplog file provided, skipping oplog application")
45		return nil
46	}
47	if err := intent.BSONFile.Open(); err != nil {
48		return err
49	}
50	if fileNeedsIOBuffer, ok := intent.BSONFile.(intents.FileNeedsIOBuffer); ok {
51		fileNeedsIOBuffer.TakeIOBuffer(make([]byte, db.MaxBSONSize))
52	}
53	defer intent.BSONFile.Close()
54
55	// NewBufferlessBSONSource reads each bson document into its own buffer
56	// because bson.Unmarshal currently can't unmarshal binary types without
57	// them referencing the source buffer.
58	// We also increase the max BSON size by 16 KiB to accommodate the maximum
59	// document size of 16 MiB plus any additional oplog-specific data.
60	bsonSource := db.NewBufferlessBSONSource(intent.BSONFile)
61	bsonSource.SetMaxBSONSize(db.MaxBSONSize + 16*1024)
62	decodedBsonSource := db.NewDecodedBSONSource(bsonSource)
63	defer decodedBsonSource.Close()
64
65	session, err := restore.SessionProvider.GetSession()
66	if err != nil {
67		return fmt.Errorf("error establishing connection: %v", err)
68	}
69
70	oplogCtx := &oplogContext{
71		progressor: progress.NewCounter(intent.BSONSize),
72		txnBuffer:  txn.NewBuffer(),
73		session:    session,
74	}
75	defer oplogCtx.txnBuffer.Stop()
76
77	if restore.ProgressManager != nil {
78		restore.ProgressManager.Attach("oplog", oplogCtx.progressor)
79		defer restore.ProgressManager.Detach("oplog")
80	}
81
82	for {
83		rawOplogEntry := decodedBsonSource.LoadNext()
84		if rawOplogEntry == nil {
85			break
86		}
87		oplogCtx.progressor.Inc(int64(len(rawOplogEntry)))
88
89		entryAsOplog := db.Oplog{}
90		err = bson.Unmarshal(rawOplogEntry, &entryAsOplog)
91		if err != nil {
92			return fmt.Errorf("error reading oplog: %v", err)
93		}
94		if entryAsOplog.Operation == "n" {
95			//skip no-ops
96			continue
97		}
98		if !restore.TimestampBeforeLimit(entryAsOplog.Timestamp) {
99			log.Logvf(
100				log.DebugLow,
101				"timestamp %v is not below limit of %v; ending oplog restoration",
102				entryAsOplog.Timestamp,
103				restore.oplogLimit,
104			)
105			break
106		}
107
108		meta, err := txn.NewMeta(entryAsOplog)
109		if err != nil {
110			return fmt.Errorf("error getting op metadata: %v", err)
111		}
112
113		if meta.IsTxn() {
114			err := restore.HandleTxnOp(oplogCtx, meta, entryAsOplog)
115			if err != nil {
116				return fmt.Errorf("error handling transaction oplog entry: %v", err)
117			}
118		} else {
119			err := restore.HandleNonTxnOp(oplogCtx, entryAsOplog)
120			if err != nil {
121				return fmt.Errorf("error applying oplog: %v", err)
122			}
123		}
124
125	}
126	if fileNeedsIOBuffer, ok := intent.BSONFile.(intents.FileNeedsIOBuffer); ok {
127		fileNeedsIOBuffer.ReleaseIOBuffer()
128	}
129
130	log.Logvf(log.Always, "applied %v oplog entries", oplogCtx.totalOps)
131	if err := decodedBsonSource.Err(); err != nil {
132		return fmt.Errorf("error reading oplog bson input: %v", err)
133	}
134	return nil
135
136}
137
138func (restore *MongoRestore) HandleNonTxnOp(oplogCtx *oplogContext, op db.Oplog) error {
139	oplogCtx.totalOps++
140
141	op, err := restore.filterUUIDs(op)
142	if err != nil {
143		return fmt.Errorf("error filtering UUIDs from oplog: %v", err)
144	}
145
146	return restore.ApplyOps(oplogCtx.session, []interface{}{op})
147}
148
149func (restore *MongoRestore) HandleTxnOp(oplogCtx *oplogContext, meta txn.Meta, op db.Oplog) error {
150
151	err := oplogCtx.txnBuffer.AddOp(meta, op)
152	if err != nil {
153		return fmt.Errorf("error buffering transaction oplog entry: %v", err)
154	}
155
156	if meta.IsAbort() {
157		err := oplogCtx.txnBuffer.PurgeTxn(meta)
158		if err != nil {
159			return fmt.Errorf("error cleaning up transaction buffer on abort: %v", err)
160		}
161		return nil
162	}
163
164	if !meta.IsCommit() {
165		return nil
166	}
167
168	// From here, we're applying transaction entries
169	ops, errs := oplogCtx.txnBuffer.GetTxnStream(meta)
170
171Loop:
172	for {
173		select {
174		case o, ok := <-ops:
175			if !ok {
176				break Loop
177			}
178			err = restore.HandleNonTxnOp(oplogCtx, o)
179			if err != nil {
180				return fmt.Errorf("error applying transaction op: %v", err)
181			}
182		case err := <-errs:
183			if err != nil {
184				return fmt.Errorf("error replaying transaction: %v", err)
185			}
186			break Loop
187		}
188	}
189
190	err = oplogCtx.txnBuffer.PurgeTxn(meta)
191	if err != nil {
192		return fmt.Errorf("error cleaning up transaction buffer: %v", err)
193	}
194
195	return nil
196}
197
198// ApplyOps is a wrapper for the applyOps database command, we pass in
199// a session to avoid opening a new connection for a few inserts at a time.
200func (restore *MongoRestore) ApplyOps(session *mongo.Client, entries []interface{}) error {
201	singleRes := session.Database("admin").RunCommand(nil, bson.D{{"applyOps", entries}})
202	if err := singleRes.Err(); err != nil {
203		return fmt.Errorf("applyOps: %v", err)
204	}
205	res := bson.M{}
206	singleRes.Decode(&res)
207	if util.IsFalsy(res["ok"]) {
208		return fmt.Errorf("applyOps command: %v", res["errmsg"])
209	}
210
211	return nil
212}
213
214// TimestampBeforeLimit returns true if the given timestamp is allowed to be
215// applied to mongorestore's target database.
216func (restore *MongoRestore) TimestampBeforeLimit(ts primitive.Timestamp) bool {
217	if restore.oplogLimit.T == 0 && restore.oplogLimit.I == 0 {
218		// always valid if there is no --oplogLimit set
219		return true
220	}
221	return util.TimestampGreaterThan(restore.oplogLimit, ts)
222}
223
224// ParseTimestampFlag takes in a string the form of <time_t>:<ordinal>,
225// where <time_t> is the seconds since the UNIX epoch, and <ordinal> represents
226// a counter of operations in the oplog that occurred in the specified second.
227// It parses this timestamp string and returns a bson.MongoTimestamp type.
228func ParseTimestampFlag(ts string) (primitive.Timestamp, error) {
229	var seconds, increment int
230	timestampFields := strings.Split(ts, ":")
231	if len(timestampFields) > 2 {
232		return primitive.Timestamp{}, fmt.Errorf("too many : characters")
233	}
234
235	seconds, err := strconv.Atoi(timestampFields[0])
236	if err != nil {
237		return primitive.Timestamp{}, fmt.Errorf("error parsing timestamp seconds: %v", err)
238	}
239
240	// parse the increment field if it exists
241	if len(timestampFields) == 2 {
242		if len(timestampFields[1]) > 0 {
243			increment, err = strconv.Atoi(timestampFields[1])
244			if err != nil {
245				return primitive.Timestamp{}, fmt.Errorf("error parsing timestamp increment: %v", err)
246			}
247		} else {
248			// handle the case where the user writes "<time_t>:" with no ordinal
249			increment = 0
250		}
251	}
252
253	return primitive.Timestamp{T: uint32(seconds), I: uint32(increment)}, nil
254}
255
256// Server versions 3.6.0-3.6.8 and 4.0.0-4.0.2 require a 'ui' field
257// in the createIndexes command.
258func (restore *MongoRestore) needsCreateIndexWorkaround() bool {
259	sv := restore.serverVersion
260	if (sv.GTE(db.Version{3, 6, 0}) && sv.LTE(db.Version{3, 6, 8})) ||
261		(sv.GTE(db.Version{4, 0, 0}) && sv.LTE(db.Version{4, 0, 2})) {
262		return true
263	}
264	return false
265}
266
267// filterUUIDs removes 'ui' entries from ops, including nested applyOps ops.
268// It also modifies ops that rely on 'ui'.
269func (restore *MongoRestore) filterUUIDs(op db.Oplog) (db.Oplog, error) {
270	// Remove UUIDs from oplog entries
271	if !restore.OutputOptions.PreserveUUID {
272		op.UI = nil
273
274		// The createIndexes oplog command requires 'ui' for some server versions, so
275		// in that case we fall back to an old-style system.indexes insert.
276		if op.Operation == "c" && op.Object[0].Key == "createIndexes" && restore.needsCreateIndexWorkaround() {
277			return convertCreateIndexToIndexInsert(op)
278		}
279	}
280
281	// Check for and filter nested applyOps ops
282	if op.Operation == "c" && isApplyOpsCmd(op.Object) {
283		filtered, err := restore.newFilteredApplyOps(op.Object)
284		if err != nil {
285			return db.Oplog{}, err
286		}
287		op.Object = filtered
288	}
289
290	return op, nil
291}
292
293// convertCreateIndexToIndexInsert converts from new-style create indexes
294// command to old style special index insert.
295func convertCreateIndexToIndexInsert(op db.Oplog) (db.Oplog, error) {
296	dbName, _ := util.SplitNamespace(op.Namespace)
297
298	cmdValue := op.Object[0].Value
299	collName, ok := cmdValue.(string)
300	if !ok {
301		return db.Oplog{}, fmt.Errorf("unknown format for createIndexes")
302	}
303
304	indexSpec := op.Object[1:]
305	if len(indexSpec) < 3 {
306		return db.Oplog{}, fmt.Errorf("unknown format for createIndexes, index spec " +
307			"must have at least \"v\", \"key\", and \"name\" fields")
308	}
309
310	// createIndexes does not include the "ns" field but index inserts
311	// do. Add it as the third field, after "v", "key", and "name".
312	ns := bson.D{{"ns", fmt.Sprintf("%s.%s", dbName, collName)}}
313	indexSpec = append(indexSpec[:3], append(ns, indexSpec[3:]...)...)
314	op.Object = indexSpec
315	op.Namespace = fmt.Sprintf("%s.system.indexes", dbName)
316	op.Operation = "i"
317
318	return op, nil
319}
320
321// isApplyOpsCmd returns true if a document seems to be an applyOps command.
322func isApplyOpsCmd(cmd bson.D) bool {
323	for _, v := range cmd {
324		if v.Key == "applyOps" {
325			return true
326		}
327	}
328	return false
329}
330
331// newFilteredApplyOps iterates over nested ops in an applyOps document and
332// returns a new applyOps document that omits the 'ui' field from nested ops.
333func (restore *MongoRestore) newFilteredApplyOps(cmd bson.D) (bson.D, error) {
334	ops, err := unwrapNestedApplyOps(cmd)
335	if err != nil {
336		return nil, err
337	}
338
339	filtered := make([]db.Oplog, len(ops))
340	for i, v := range ops {
341		filtered[i], err = restore.filterUUIDs(v)
342		if err != nil {
343			return nil, err
344		}
345	}
346
347	doc, err := wrapNestedApplyOps(filtered)
348	if err != nil {
349		return nil, err
350	}
351
352	return doc, nil
353}
354
355// nestedApplyOps models an applyOps command document
356type nestedApplyOps struct {
357	ApplyOps []db.Oplog `bson:"applyOps"`
358}
359
360// unwrapNestedApplyOps converts a bson.D to a typed data structure.
361// Unfortunately, we're forced to convert by marshaling to bytes and
362// unmarshalling.
363func unwrapNestedApplyOps(doc bson.D) ([]db.Oplog, error) {
364	// Doc to bytes
365	bs, err := bson.Marshal(doc)
366	if err != nil {
367		return nil, fmt.Errorf("cannot remarshal nested applyOps: %s", err)
368	}
369
370	// Bytes to typed data
371	var cmd nestedApplyOps
372	err = bson.Unmarshal(bs, &cmd)
373	if err != nil {
374		return nil, fmt.Errorf("cannot unwrap nested applyOps: %s", err)
375	}
376
377	return cmd.ApplyOps, nil
378}
379
380// wrapNestedApplyOps converts a typed data structure to a bson.D.
381// Unfortunately, we're forced to convert by marshaling to bytes and
382// unmarshalling.
383func wrapNestedApplyOps(ops []db.Oplog) (bson.D, error) {
384	cmd := &nestedApplyOps{ApplyOps: ops}
385
386	// Typed data to bytes
387	raw, err := bson.Marshal(cmd)
388	if err != nil {
389		return nil, fmt.Errorf("cannot rewrap nested applyOps op: %s", err)
390	}
391
392	// Bytes to doc
393	var doc bson.D
394	err = bson.Unmarshal(raw, &doc)
395	if err != nil {
396		return nil, fmt.Errorf("cannot reunmarshal nested applyOps op: %s", err)
397	}
398
399	return doc, nil
400}
401