1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
32 
33 #include "mongo/platform/basic.h"
34 
35 #include "mongo/db/repl/oplogreader.h"
36 
37 #include <string>
38 
39 #include "mongo/client/dbclientinterface.h"
40 #include "mongo/db/auth/authorization_manager.h"
41 #include "mongo/db/auth/authorization_manager_global.h"
42 #include "mongo/db/auth/authorization_session.h"
43 #include "mongo/db/auth/internal_user_auth.h"
44 #include "mongo/executor/network_interface.h"
45 #include "mongo/util/log.h"
46 
47 namespace mongo {
48 
49 using std::shared_ptr;
50 using std::endl;
51 using std::string;
52 
53 namespace repl {
54 
replAuthenticate(DBClientBase * conn)55 bool replAuthenticate(DBClientBase* conn) {
56     if (isInternalAuthSet())
57         return conn->authenticateInternalUser();
58     if (getGlobalAuthorizationManager()->isAuthEnabled())
59         return false;
60     return true;
61 }
62 
63 const Seconds OplogReader::kSocketTimeout(30);
64 
OplogReader()65 OplogReader::OplogReader() {
66     _tailingQueryOptions = QueryOption_SlaveOk;
67     _tailingQueryOptions |= QueryOption_CursorTailable | QueryOption_OplogReplay;
68 
69     /* TODO: slaveOk maybe shouldn't use? */
70     _tailingQueryOptions |= QueryOption_AwaitData;
71 
72     // Currently find command doesn't do the cursor tracking that master-slave relies on.
73     _tailingQueryOptions |= DBClientCursor::QueryOptionLocal_forceOpQuery;
74 }
75 
connect(const HostAndPort & host)76 bool OplogReader::connect(const HostAndPort& host) {
77     if (conn() == NULL || _host != host) {
78         resetConnection();
79         _conn = shared_ptr<DBClientConnection>(
80             new DBClientConnection(false, durationCount<Seconds>(kSocketTimeout)));
81         string errmsg;
82         if (!_conn->connect(host, StringData(), errmsg) || !replAuthenticate(_conn.get())) {
83             resetConnection();
84             error() << errmsg << endl;
85             return false;
86         }
87         _conn->port().setTag(_conn->port().getTag() |
88                              executor::NetworkInterface::kMessagingPortKeepOpen);
89         _host = host;
90     }
91     return true;
92 }
93 
tailCheck()94 void OplogReader::tailCheck() {
95     if (cursor.get() && cursor->isDead()) {
96         log() << "old cursor isDead, will initiate a new one" << std::endl;
97         resetCursor();
98     }
99 }
100 
tailingQuery(const char * ns,const BSONObj & query)101 void OplogReader::tailingQuery(const char* ns, const BSONObj& query) {
102     verify(!haveCursor());
103     LOG(2) << ns << ".find(" << redact(query) << ')' << endl;
104     cursor.reset(_conn->query(ns, query, 0, 0, nullptr, _tailingQueryOptions).release());
105 }
106 
107 }  // namespace repl
108 }  // namespace mongo
109