1 /*
2 Copyright (c) 2014, 2016, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include <NdbApi.hpp>
26
27 #include "adapter_global.h"
28 #include "unified_debug.h"
29 #include "AsyncNdbContext.h"
30 #include "KeyOperation.h"
31 #include "SessionImpl.h"
32 #include "TransactionImpl.h"
33
34 //////////
35 /////////////////
36 ///////////////////////// CachedTransactionsAccountant
37 /////////////////
38 //////////
39
CachedTransactionsAccountant(Ndb_cluster_connection * conn,int maxTransactions)40 CachedTransactionsAccountant::CachedTransactionsAccountant(Ndb_cluster_connection *conn,
41 int maxTransactions):
42 tc_bitmap(0),
43 nDataNodes(static_cast<unsigned short>(conn->no_db_nodes())),
44 concurrency(0),
45 cacheConcurrency(0),
46 maxConcurrency(static_cast<unsigned short>(maxTransactions))
47 {
48 assert(nDataNodes > 0);
49 }
50
~CachedTransactionsAccountant()51 inline CachedTransactionsAccountant::~CachedTransactionsAccountant() {
52 }
53
tallySetNodeId(int nodeId)54 inline void CachedTransactionsAccountant::tallySetNodeId(int nodeId) {
55 tc_bitmap ^= (1ULL << nodeId);
56 }
57
tallySetMaskedNodeIds(int64_t mask)58 inline void CachedTransactionsAccountant::tallySetMaskedNodeIds(int64_t mask) {
59 tc_bitmap ^= mask;
60 }
61
tallyClear()62 inline void CachedTransactionsAccountant::tallyClear() {
63 tc_bitmap = 0;
64 }
65
tallyCountSetNodeIds()66 int CachedTransactionsAccountant::tallyCountSetNodeIds() {
67 // "Brian Kernighan's algorithm"; iterates once for each set bit
68 uint64_t v = tc_bitmap;
69 int c;
70 for(c = 0 ; v ; c++) {
71 v &= v-1;
72 }
73 return c;
74 }
75
76
77 /* Returns a token that the user will supply to registerTxOpen().
78 If token is -1, user is allowed to call immediate startTransaction()
79 knowing it will not block (because the needed transaction record is
80 already cached). Otherwise transaction should be started in an async
81 worker thread.
82 */
registerIntentToOpen()83 int64_t CachedTransactionsAccountant::registerIntentToOpen() {
84 concurrency++;
85 assert(concurrency <= maxConcurrency);
86
87 // Is it already established that we can handle this many transactions?
88 if(concurrency < cacheConcurrency) {
89 return -1;
90 }
91
92 // Do we have enough cached transactions to establish that fact now?
93 if(tallyCountSetNodeIds() == nDataNodes) {
94 cacheConcurrency++;
95 DEBUG_PRINT("Concurrency now: %d", cacheConcurrency);
96 tallyClear();
97 return -1;
98 }
99
100 // Clear all tallies; return a token indicating which ones were cleared
101 int64_t token = static_cast<int64_t>(tc_bitmap);
102 tallyClear();
103 return token;
104 }
105
registerTxClosed(int64_t token,int nodeId)106 void CachedTransactionsAccountant::registerTxClosed(int64_t token, int nodeId) {
107 concurrency--;
108 if(token >= 0) {
109 tallySetMaskedNodeIds(token);
110 tallySetNodeId(nodeId);
111 }
112 }
113
114
115
116
117 //////////
118 /////////////////
119 ///////////////////////// SessionImpl
120 /////////////////
121 //////////
122
SessionImpl(Ndb_cluster_connection * conn,AsyncNdbContext * asyncNdbContext,const char * defaultDatabase,int maxTransactions)123 SessionImpl::SessionImpl(Ndb_cluster_connection *conn,
124 AsyncNdbContext * asyncNdbContext,
125 const char *defaultDatabase,
126 int maxTransactions) :
127 CachedTransactionsAccountant(conn, maxTransactions),
128 maxNdbTransactions(maxTransactions),
129 nContexts(0),
130 asyncContext(asyncNdbContext),
131 freeList(0)
132 {
133 ndb = new Ndb(conn, defaultDatabase);
134 ndb->init(maxTransactions * 2);
135 }
136
137
~SessionImpl()138 SessionImpl::~SessionImpl() {
139 DEBUG_MARKER(UDEB_DETAIL);
140 delete ndb;
141 }
142
143
seizeTransaction()144 TransactionImpl * SessionImpl::seizeTransaction() {
145 TransactionImpl * ctx;
146 DEBUG_PRINT("FreeList: %p, nContexts: %d, maxNdbTransactions: %d",
147 freeList, nContexts, maxNdbTransactions);
148
149 /* Is there a context on the freelist? */
150 if(freeList) {
151 ctx = freeList;
152 freeList = ctx->next;
153 return ctx;
154 }
155
156 /* Can we produce a new context? */
157 if(nContexts < maxNdbTransactions) {
158 ctx = new TransactionImpl(this);
159 nContexts++;
160 return ctx;
161 }
162
163 return 0;
164 }
165
166
releaseTransaction(TransactionImpl * ctx)167 bool SessionImpl::releaseTransaction(TransactionImpl *ctx) {
168 assert(ctx->parentSessionImpl == this);
169 bool status = ctx->isClosed();
170 DEBUG_PRINT("releaseTransaction status: %s", status ? "closed" : "open");
171 if(status) {
172 ctx->next = freeList;
173 freeList = ctx;
174 }
175 return status;
176 }
177
178
freeTransactions()179 void SessionImpl::freeTransactions() {
180 while(freeList) {
181 TransactionImpl * ctx = freeList;
182 freeList = ctx->next;
183 delete ctx;
184 }
185 }
186
getNdbError() const187 const NdbError & SessionImpl::getNdbError() const {
188 return ndb->getNdbError();
189 }
190
191