1 /*
2  *  Copyright (c) 2010, 2021, Oracle and/or its affiliates.
3  *
4  *  This program is free software; you can redistribute it and/or modify
5  *  it under the terms of the GNU General Public License, version 2.0,
6  *  as published by the Free Software Foundation.
7  *
8  *  This program is also distributed with certain software (including
9  *  but not limited to OpenSSL) that is licensed under separate terms,
10  *  as designated in a particular file or component or in included license
11  *  documentation.  The authors of MySQL hereby grant you an additional
12  *  permission to link the program and your derivative works with the
13  *  separately licensed software that they have included with MySQL.
14  *
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License, version 2.0, for more details.
19  *
20  *  You should have received a copy of the GNU General Public License
21  *  along with this program; if not, write to the Free Software
22  *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23  */
24 
25 package com.mysql.clusterj.tie;
26 
27 import java.util.List;
28 
29 import com.mysql.clusterj.core.store.Column;
30 
31 import com.mysql.clusterj.core.util.I18NHelper;
32 import com.mysql.clusterj.core.util.Logger;
33 import com.mysql.clusterj.core.util.LoggerFactoryService;
34 import com.mysql.clusterj.tie.DbImpl.BufferManager;
35 
36 import com.mysql.ndbjtie.ndbapi.NdbScanOperation;
37 import com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode;
38 
39 /**
40  *
41  */
42 class ScanResultDataImpl extends ResultDataImpl {
43 
44     /** My message translator */
45     static final I18NHelper local = I18NHelper
46             .getInstance(ScanResultDataImpl.class);
47 
48     /** My logger */
49     static final Logger logger = LoggerFactoryService.getFactory()
50             .getInstance(ScanResultDataImpl.class);
51 
52     private NdbScanOperation ndbScanOperation = null;
53     private ClusterTransactionImpl clusterTransaction = null;
54     /** The number to skip */
55     protected long skip = 0;
56 
57     /** The limit */
58     protected long limit = Long.MAX_VALUE;
59 
60     /** The record counter during the scan */
61     protected long recordCounter = 0;
62 
63     /** True if any records have been locked while scanning the cache */
64     boolean recordsLocked = false;
65 
66     /** Flags for iterating a scan */
67     protected final int RESULT_READY = 0;
68     protected final int SCAN_FINISHED = 1;
69     protected final int CACHE_EMPTY = 2;
70 
ScanResultDataImpl(ClusterTransactionImpl clusterTransaction, NdbScanOperation ndbScanOperation, List<Column> storeColumns, int maximumColumnId, int bufferSize, int[] offsets, int[] lengths, int maximumColumnLength, BufferManager bufferManager, long skip, long limit)71     public ScanResultDataImpl(ClusterTransactionImpl clusterTransaction,
72             NdbScanOperation ndbScanOperation, List<Column> storeColumns,
73             int maximumColumnId, int bufferSize, int[] offsets, int[] lengths, int maximumColumnLength,
74             BufferManager bufferManager, long skip, long limit) {
75         super(ndbScanOperation, storeColumns, maximumColumnId, bufferSize, offsets, lengths,
76                 bufferManager, false);
77         this.clusterTransaction = clusterTransaction;
78         this.ndbScanOperation = ndbScanOperation;
79         this.skip = skip;
80         this.limit = limit;
81     }
82 
83     /** If any locks were taken over, execute the takeover operations
84      */
executeIfRecordsLocked()85     private void executeIfRecordsLocked() {
86         if (recordsLocked) {
87             clusterTransaction.executeNoCommit(true, true);
88             recordsLocked = false;
89         }
90     }
91 
92     @Override
next()93     public boolean next() {
94         if (recordCounter >= limit) {
95             // the next record is past the limit; we have delivered all the rows
96             executeIfRecordsLocked();
97             ndbScanOperation.close(true, true);
98             return false;
99         }
100         // NdbScanOperation may have many results.
101         boolean done = false;
102         boolean fetch = false;
103         boolean force = true; // always true for scans
104         while (!done) {
105             int result = ndbScanOperation.nextResult(fetch, force);
106             switch (result) {
107                 case RESULT_READY:
108                     if (++recordCounter > skip) {
109                         // this record is past the skip
110                         // if scanning with locks, grab the lock for the current transaction
111                         if (ndbScanOperation.getLockMode() != LockMode.LM_CommittedRead) {
112                             // TODO: remember the operations and check them at SCAN_FINISHED and CACHE_EMPTY
113                             // check for result code 499: scan moved on and you forgot to execute the takeover op
114                             ndbScanOperation.lockCurrentTuple();
115                             recordsLocked = true;
116                         }
117                         return true;
118                     } else {
119                         // skip this record
120                         break;
121                     }
122                 case SCAN_FINISHED:
123                     executeIfRecordsLocked();
124                     ndbScanOperation.close(true, true);
125                     return false;
126                 case CACHE_EMPTY:
127                     executeIfRecordsLocked();
128                     fetch = true;
129                     break;
130                 default:
131                     Utility.throwError(result, ndbScanOperation.getNdbError());
132             }
133         }
134         return true; // this statement is needed to make the compiler happy but it's never executed
135     }
136 
137 }
138