1 /*
2  Copyright (c) 2014, 2016, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License, version 2.0,
6  as published by the Free Software Foundation.
7 
8  This program is also distributed with certain software (including
9  but not limited to OpenSSL) that is licensed under separate terms,
10  as designated in a particular file or component or in included license
11  documentation.  The authors of MySQL hereby grant you an additional
12  permission to link the program and your derivative works with the
13  separately licensed software that they have included with MySQL.
14 
15  This program is distributed in the hope that it will be useful,
16  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  GNU General Public License, version 2.0, for more details.
19 
20  You should have received a copy of the GNU General Public License
21  along with this program; if not, write to the Free Software
22  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include <NdbApi.hpp>
26 
27 #include "adapter_global.h"
28 #include "unified_debug.h"
29 #include "AsyncNdbContext.h"
30 #include "KeyOperation.h"
31 #include "SessionImpl.h"
32 #include "TransactionImpl.h"
33 
34 //////////
35 /////////////////
36 ///////////////////////// CachedTransactionsAccountant
37 /////////////////
38 //////////
39 
CachedTransactionsAccountant(Ndb_cluster_connection * conn,int maxTransactions)40 CachedTransactionsAccountant::CachedTransactionsAccountant(Ndb_cluster_connection *conn,
41                                                            int maxTransactions):
42   tc_bitmap(0),
43   nDataNodes(static_cast<unsigned short>(conn->no_db_nodes())),
44   concurrency(0),
45   cacheConcurrency(0),
46   maxConcurrency(static_cast<unsigned short>(maxTransactions))
47 {
48   assert(nDataNodes > 0);
49 }
50 
~CachedTransactionsAccountant()51 inline CachedTransactionsAccountant::~CachedTransactionsAccountant() {
52 }
53 
tallySetNodeId(int nodeId)54 inline void CachedTransactionsAccountant::tallySetNodeId(int nodeId) {
55   tc_bitmap ^= (1ULL << nodeId);
56 }
57 
tallySetMaskedNodeIds(int64_t mask)58 inline void CachedTransactionsAccountant::tallySetMaskedNodeIds(int64_t mask) {
59   tc_bitmap ^= mask;
60 }
61 
tallyClear()62 inline void CachedTransactionsAccountant::tallyClear() {
63   tc_bitmap = 0;
64 }
65 
tallyCountSetNodeIds()66 int CachedTransactionsAccountant::tallyCountSetNodeIds() {
67   // "Brian Kernighan's algorithm"; iterates once for each set bit
68   uint64_t v = tc_bitmap;
69   int c;
70   for(c = 0 ; v ; c++) {
71     v &= v-1;
72   }
73   return c;
74 }
75 
76 
77 /*  Returns a token that the user will supply to registerTxOpen().
78     If token is -1, user is allowed to call immediate startTransaction()
79     knowing it will not block (because the needed transaction record is
80     already cached).  Otherwise transaction should be started in an async
81     worker thread.
82 */
registerIntentToOpen()83 int64_t CachedTransactionsAccountant::registerIntentToOpen() {
84   concurrency++;
85   assert(concurrency <= maxConcurrency);
86 
87   // Is it already established that we can handle this many transactions?
88   if(concurrency < cacheConcurrency) {
89     return -1;
90   }
91 
92   // Do we have enough cached transactions to establish that fact now?
93   if(tallyCountSetNodeIds() == nDataNodes) {
94     cacheConcurrency++;
95     DEBUG_PRINT("Concurrency now: %d", cacheConcurrency);
96     tallyClear();
97     return -1;
98   }
99 
100   // Clear all tallies; return a token indicating which ones were cleared
101   int64_t token = static_cast<int64_t>(tc_bitmap);
102   tallyClear();
103   return token;
104 }
105 
registerTxClosed(int64_t token,int nodeId)106 void CachedTransactionsAccountant::registerTxClosed(int64_t token, int nodeId) {
107   concurrency--;
108   if(token >= 0) {
109     tallySetMaskedNodeIds(token);
110     tallySetNodeId(nodeId);
111   }
112 }
113 
114 
115 
116 
117 //////////
118 /////////////////
119 ///////////////////////// SessionImpl
120 /////////////////
121 //////////
122 
SessionImpl(Ndb_cluster_connection * conn,AsyncNdbContext * asyncNdbContext,const char * defaultDatabase,int maxTransactions)123 SessionImpl::SessionImpl(Ndb_cluster_connection *conn,
124                              AsyncNdbContext * asyncNdbContext,
125                              const char *defaultDatabase,
126                              int maxTransactions) :
127   CachedTransactionsAccountant(conn, maxTransactions),
128   maxNdbTransactions(maxTransactions),
129   nContexts(0),
130   asyncContext(asyncNdbContext),
131   freeList(0)
132 {
133   ndb = new Ndb(conn, defaultDatabase);
134   ndb->init(maxTransactions * 2);
135 }
136 
137 
~SessionImpl()138 SessionImpl::~SessionImpl() {
139   DEBUG_MARKER(UDEB_DETAIL);
140   delete ndb;
141 }
142 
143 
seizeTransaction()144 TransactionImpl * SessionImpl::seizeTransaction() {
145   TransactionImpl * ctx;
146   DEBUG_PRINT("FreeList: %p, nContexts: %d, maxNdbTransactions: %d",
147               freeList, nContexts, maxNdbTransactions);
148 
149   /* Is there a context on the freelist? */
150   if(freeList) {
151     ctx = freeList;
152     freeList = ctx->next;
153     return ctx;
154   }
155 
156   /* Can we produce a new context? */
157   if(nContexts < maxNdbTransactions) {
158     ctx = new TransactionImpl(this);
159     nContexts++;
160     return ctx;
161   }
162 
163   return 0;
164 }
165 
166 
releaseTransaction(TransactionImpl * ctx)167 bool SessionImpl::releaseTransaction(TransactionImpl *ctx) {
168   assert(ctx->parentSessionImpl == this);
169   bool status = ctx->isClosed();
170   DEBUG_PRINT("releaseTransaction status: %s", status ? "closed" : "open");
171   if(status) {
172     ctx->next = freeList;
173     freeList = ctx;
174   }
175   return status;
176 }
177 
178 
freeTransactions()179 void SessionImpl::freeTransactions() {
180   while(freeList) {
181     TransactionImpl * ctx = freeList;
182     freeList = ctx->next;
183     delete ctx;
184   }
185 }
186 
getNdbError() const187 const NdbError & SessionImpl::getNdbError() const {
188   return ndb->getNdbError();
189 }
190 
191