1/* 2Copyright 2016 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package bigtable 18 19import ( 20 "bytes" 21 "fmt" 22 23 btpb "google.golang.org/genproto/googleapis/bigtable/v2" 24) 25 26// A Row is returned by ReadRows. The map is keyed by column family (the prefix 27// of the column name before the colon). The values are the returned ReadItems 28// for that column family in the order returned by Read. 29type Row map[string][]ReadItem 30 31// Key returns the row's key, or "" if the row is empty. 32func (r Row) Key() string { 33 for _, items := range r { 34 if len(items) > 0 { 35 return items[0].Row 36 } 37 } 38 return "" 39} 40 41// A ReadItem is returned by Read. A ReadItem contains data from a specific row and column. 42type ReadItem struct { 43 Row, Column string 44 Timestamp Timestamp 45 Value []byte 46} 47 48// The current state of the read rows state machine. 49type rrState int64 50 51const ( 52 newRow rrState = iota 53 rowInProgress 54 cellInProgress 55) 56 57// chunkReader handles cell chunks from the read rows response and combines 58// them into full Rows. 59type chunkReader struct { 60 state rrState 61 curKey []byte 62 curFam string 63 curQual []byte 64 curTS int64 65 curVal []byte 66 curRow Row 67 lastKey string 68} 69 70// newChunkReader returns a new chunkReader for handling read rows responses. 71func newChunkReader() *chunkReader { 72 return &chunkReader{state: newRow} 73} 74 75// Process takes a cell chunk and returns a new Row if the given chunk 76// completes a Row, or nil otherwise. 77func (cr *chunkReader) Process(cc *btpb.ReadRowsResponse_CellChunk) (Row, error) { 78 var row Row 79 switch cr.state { 80 case newRow: 81 if err := cr.validateNewRow(cc); err != nil { 82 return nil, err 83 } 84 85 cr.curRow = make(Row) 86 cr.curKey = cc.RowKey 87 cr.curFam = cc.FamilyName.Value 88 cr.curQual = cc.Qualifier.Value 89 cr.curTS = cc.TimestampMicros 90 row = cr.handleCellValue(cc) 91 92 case rowInProgress: 93 if err := cr.validateRowInProgress(cc); err != nil { 94 return nil, err 95 } 96 97 if cc.GetResetRow() { 98 cr.resetToNewRow() 99 return nil, nil 100 } 101 102 if cc.FamilyName != nil { 103 cr.curFam = cc.FamilyName.Value 104 } 105 if cc.Qualifier != nil { 106 cr.curQual = cc.Qualifier.Value 107 } 108 cr.curTS = cc.TimestampMicros 109 row = cr.handleCellValue(cc) 110 111 case cellInProgress: 112 if err := cr.validateCellInProgress(cc); err != nil { 113 return nil, err 114 } 115 if cc.GetResetRow() { 116 cr.resetToNewRow() 117 return nil, nil 118 } 119 row = cr.handleCellValue(cc) 120 } 121 122 return row, nil 123} 124 125// Close must be called after all cell chunks from the response 126// have been processed. An error will be returned if the reader is 127// in an invalid state, in which case the error should be propagated to the caller. 128func (cr *chunkReader) Close() error { 129 if cr.state != newRow { 130 return fmt.Errorf("invalid state for end of stream %q", cr.state) 131 } 132 return nil 133} 134 135// handleCellValue returns a Row if the cell value includes a commit, otherwise nil. 136func (cr *chunkReader) handleCellValue(cc *btpb.ReadRowsResponse_CellChunk) Row { 137 if cc.ValueSize > 0 { 138 // ValueSize is specified so expect a split value of ValueSize bytes 139 if cr.curVal == nil { 140 cr.curVal = make([]byte, 0, cc.ValueSize) 141 } 142 cr.curVal = append(cr.curVal, cc.Value...) 143 cr.state = cellInProgress 144 } else { 145 // This cell is either the complete value or the last chunk of a split 146 if cr.curVal == nil { 147 cr.curVal = cc.Value 148 } else { 149 cr.curVal = append(cr.curVal, cc.Value...) 150 } 151 cr.finishCell() 152 153 if cc.GetCommitRow() { 154 return cr.commitRow() 155 } 156 cr.state = rowInProgress 157 } 158 159 return nil 160} 161 162func (cr *chunkReader) finishCell() { 163 ri := ReadItem{ 164 Row: string(cr.curKey), 165 Column: string(cr.curFam) + ":" + string(cr.curQual), 166 Timestamp: Timestamp(cr.curTS), 167 Value: cr.curVal, 168 } 169 cr.curRow[cr.curFam] = append(cr.curRow[cr.curFam], ri) 170 cr.curVal = nil 171} 172 173func (cr *chunkReader) commitRow() Row { 174 row := cr.curRow 175 cr.lastKey = cr.curRow.Key() 176 cr.resetToNewRow() 177 return row 178} 179 180func (cr *chunkReader) resetToNewRow() { 181 cr.curKey = nil 182 cr.curFam = "" 183 cr.curQual = nil 184 cr.curVal = nil 185 cr.curRow = nil 186 cr.curTS = 0 187 cr.state = newRow 188} 189 190func (cr *chunkReader) validateNewRow(cc *btpb.ReadRowsResponse_CellChunk) error { 191 if cc.GetResetRow() { 192 return fmt.Errorf("reset_row not allowed between rows") 193 } 194 if cc.RowKey == nil || cc.FamilyName == nil || cc.Qualifier == nil { 195 return fmt.Errorf("missing key field for new row %v", cc) 196 } 197 if cr.lastKey != "" && cr.lastKey >= string(cc.RowKey) { 198 return fmt.Errorf("out of order row key: %q, %q", cr.lastKey, string(cc.RowKey)) 199 } 200 return nil 201} 202 203func (cr *chunkReader) validateRowInProgress(cc *btpb.ReadRowsResponse_CellChunk) error { 204 if err := cr.validateRowStatus(cc); err != nil { 205 return err 206 } 207 if cc.RowKey != nil && !bytes.Equal(cc.RowKey, cr.curKey) { 208 return fmt.Errorf("received new row key %q during existing row %q", cc.RowKey, cr.curKey) 209 } 210 if cc.FamilyName != nil && cc.Qualifier == nil { 211 return fmt.Errorf("family name %q specified without a qualifier", cc.FamilyName) 212 } 213 return nil 214} 215 216func (cr *chunkReader) validateCellInProgress(cc *btpb.ReadRowsResponse_CellChunk) error { 217 if err := cr.validateRowStatus(cc); err != nil { 218 return err 219 } 220 if cr.curVal == nil { 221 return fmt.Errorf("no cached cell while CELL_IN_PROGRESS %v", cc) 222 } 223 if cc.GetResetRow() == false && cr.isAnyKeyPresent(cc) { 224 return fmt.Errorf("cell key components found while CELL_IN_PROGRESS %v", cc) 225 } 226 return nil 227} 228 229func (cr *chunkReader) isAnyKeyPresent(cc *btpb.ReadRowsResponse_CellChunk) bool { 230 return cc.RowKey != nil || 231 cc.FamilyName != nil || 232 cc.Qualifier != nil || 233 cc.TimestampMicros != 0 234} 235 236// Validate a RowStatus, commit or reset, if present. 237func (cr *chunkReader) validateRowStatus(cc *btpb.ReadRowsResponse_CellChunk) error { 238 // Resets can't be specified with any other part of a cell 239 if cc.GetResetRow() && (cr.isAnyKeyPresent(cc) || 240 cc.Value != nil || 241 cc.ValueSize != 0 || 242 cc.Labels != nil) { 243 return fmt.Errorf("reset must not be specified with other fields %v", cc) 244 } 245 if cc.GetCommitRow() && cc.ValueSize > 0 { 246 return fmt.Errorf("commit row found in between chunks in a cell") 247 } 248 return nil 249} 250