1/* 2Copyright 2016 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package bigtable 18 19import ( 20 "bytes" 21 "fmt" 22 23 btpb "google.golang.org/genproto/googleapis/bigtable/v2" 24) 25 26// A Row is returned by ReadRows. The map is keyed by column family (the prefix 27// of the column name before the colon). The values are the returned ReadItems 28// for that column family in the order returned by Read. 29type Row map[string][]ReadItem 30 31// Key returns the row's key, or "" if the row is empty. 32func (r Row) Key() string { 33 for _, items := range r { 34 if len(items) > 0 { 35 return items[0].Row 36 } 37 } 38 return "" 39} 40 41// A ReadItem is returned by Read. A ReadItem contains data from a specific row and column. 42type ReadItem struct { 43 Row, Column string 44 Timestamp Timestamp 45 Value []byte 46 Labels []string 47} 48 49// The current state of the read rows state machine. 50type rrState int64 51 52const ( 53 newRow rrState = iota 54 rowInProgress 55 cellInProgress 56) 57 58// chunkReader handles cell chunks from the read rows response and combines 59// them into full Rows. 60type chunkReader struct { 61 state rrState 62 curKey []byte 63 curLabels []string 64 curFam string 65 curQual []byte 66 curTS int64 67 curVal []byte 68 curRow Row 69 lastKey string 70} 71 72// newChunkReader returns a new chunkReader for handling read rows responses. 73func newChunkReader() *chunkReader { 74 return &chunkReader{state: newRow} 75} 76 77// Process takes a cell chunk and returns a new Row if the given chunk 78// completes a Row, or nil otherwise. 79func (cr *chunkReader) Process(cc *btpb.ReadRowsResponse_CellChunk) (Row, error) { 80 var row Row 81 switch cr.state { 82 case newRow: 83 if err := cr.validateNewRow(cc); err != nil { 84 return nil, err 85 } 86 87 cr.curRow = make(Row) 88 cr.curKey = cc.RowKey 89 cr.curFam = cc.FamilyName.Value 90 cr.curQual = cc.Qualifier.Value 91 cr.curTS = cc.TimestampMicros 92 row = cr.handleCellValue(cc) 93 94 case rowInProgress: 95 if err := cr.validateRowInProgress(cc); err != nil { 96 return nil, err 97 } 98 99 if cc.GetResetRow() { 100 cr.resetToNewRow() 101 return nil, nil 102 } 103 104 if cc.FamilyName != nil { 105 cr.curFam = cc.FamilyName.Value 106 } 107 if cc.Qualifier != nil { 108 cr.curQual = cc.Qualifier.Value 109 } 110 cr.curTS = cc.TimestampMicros 111 row = cr.handleCellValue(cc) 112 113 case cellInProgress: 114 if err := cr.validateCellInProgress(cc); err != nil { 115 return nil, err 116 } 117 if cc.GetResetRow() { 118 cr.resetToNewRow() 119 return nil, nil 120 } 121 row = cr.handleCellValue(cc) 122 } 123 124 return row, nil 125} 126 127// Close must be called after all cell chunks from the response 128// have been processed. An error will be returned if the reader is 129// in an invalid state, in which case the error should be propagated to the caller. 130func (cr *chunkReader) Close() error { 131 if cr.state != newRow { 132 return fmt.Errorf("invalid state for end of stream %q", cr.state) 133 } 134 return nil 135} 136 137// handleCellValue returns a Row if the cell value includes a commit, otherwise nil. 138func (cr *chunkReader) handleCellValue(cc *btpb.ReadRowsResponse_CellChunk) Row { 139 if cc.ValueSize > 0 { 140 // ValueSize is specified so expect a split value of ValueSize bytes 141 if cr.curVal == nil { 142 cr.curVal = make([]byte, 0, cc.ValueSize) 143 cr.curLabels = cc.Labels 144 } 145 cr.curVal = append(cr.curVal, cc.Value...) 146 cr.state = cellInProgress 147 } else { 148 // This cell is either the complete value or the last chunk of a split 149 if cr.curVal == nil { 150 cr.curVal = cc.Value 151 cr.curLabels = cc.Labels 152 } else { 153 cr.curVal = append(cr.curVal, cc.Value...) 154 } 155 cr.finishCell() 156 157 if cc.GetCommitRow() { 158 return cr.commitRow() 159 } 160 cr.state = rowInProgress 161 } 162 163 return nil 164} 165 166func (cr *chunkReader) finishCell() { 167 ri := ReadItem{ 168 Row: string(cr.curKey), 169 Column: string(cr.curFam) + ":" + string(cr.curQual), 170 Timestamp: Timestamp(cr.curTS), 171 Value: cr.curVal, 172 Labels: cr.curLabels, 173 } 174 cr.curRow[cr.curFam] = append(cr.curRow[cr.curFam], ri) 175 cr.curVal = nil 176 cr.curLabels = nil 177} 178 179func (cr *chunkReader) commitRow() Row { 180 row := cr.curRow 181 cr.lastKey = cr.curRow.Key() 182 cr.resetToNewRow() 183 return row 184} 185 186func (cr *chunkReader) resetToNewRow() { 187 cr.curKey = nil 188 cr.curFam = "" 189 cr.curQual = nil 190 cr.curVal = nil 191 cr.curRow = nil 192 cr.curTS = 0 193 cr.state = newRow 194} 195 196func (cr *chunkReader) validateNewRow(cc *btpb.ReadRowsResponse_CellChunk) error { 197 if cc.GetResetRow() { 198 return fmt.Errorf("reset_row not allowed between rows") 199 } 200 if cc.RowKey == nil || cc.FamilyName == nil || cc.Qualifier == nil { 201 return fmt.Errorf("missing key field for new row %v", cc) 202 } 203 if cr.lastKey != "" && cr.lastKey >= string(cc.RowKey) { 204 return fmt.Errorf("out of order row key: %q, %q", cr.lastKey, string(cc.RowKey)) 205 } 206 return nil 207} 208 209func (cr *chunkReader) validateRowInProgress(cc *btpb.ReadRowsResponse_CellChunk) error { 210 if err := cr.validateRowStatus(cc); err != nil { 211 return err 212 } 213 if cc.RowKey != nil && !bytes.Equal(cc.RowKey, cr.curKey) { 214 return fmt.Errorf("received new row key %q during existing row %q", cc.RowKey, cr.curKey) 215 } 216 if cc.FamilyName != nil && cc.Qualifier == nil { 217 return fmt.Errorf("family name %q specified without a qualifier", cc.FamilyName) 218 } 219 return nil 220} 221 222func (cr *chunkReader) validateCellInProgress(cc *btpb.ReadRowsResponse_CellChunk) error { 223 if err := cr.validateRowStatus(cc); err != nil { 224 return err 225 } 226 if cr.curVal == nil { 227 return fmt.Errorf("no cached cell while CELL_IN_PROGRESS %v", cc) 228 } 229 if cc.GetResetRow() == false && cr.isAnyKeyPresent(cc) { 230 return fmt.Errorf("cell key components found while CELL_IN_PROGRESS %v", cc) 231 } 232 return nil 233} 234 235func (cr *chunkReader) isAnyKeyPresent(cc *btpb.ReadRowsResponse_CellChunk) bool { 236 return cc.RowKey != nil || 237 cc.FamilyName != nil || 238 cc.Qualifier != nil || 239 cc.TimestampMicros != 0 240} 241 242// Validate a RowStatus, commit or reset, if present. 243func (cr *chunkReader) validateRowStatus(cc *btpb.ReadRowsResponse_CellChunk) error { 244 // Resets can't be specified with any other part of a cell 245 if cc.GetResetRow() && (cr.isAnyKeyPresent(cc) || 246 cc.Value != nil || 247 cc.ValueSize != 0 || 248 cc.Labels != nil) { 249 return fmt.Errorf("reset must not be specified with other fields %v", cc) 250 } 251 if cc.GetCommitRow() && cc.ValueSize > 0 { 252 return fmt.Errorf("commit row found in between chunks in a cell") 253 } 254 return nil 255} 256