1/*
2Copyright 2016 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package bigtable
18
19import (
20	"bytes"
21	"fmt"
22
23	btpb "google.golang.org/genproto/googleapis/bigtable/v2"
24)
25
26// A Row is returned by ReadRows. The map is keyed by column family (the prefix
27// of the column name before the colon). The values are the returned ReadItems
28// for that column family in the order returned by Read.
29type Row map[string][]ReadItem
30
31// Key returns the row's key, or "" if the row is empty.
32func (r Row) Key() string {
33	for _, items := range r {
34		if len(items) > 0 {
35			return items[0].Row
36		}
37	}
38	return ""
39}
40
41// A ReadItem is returned by Read. A ReadItem contains data from a specific row and column.
42type ReadItem struct {
43	Row, Column string
44	Timestamp   Timestamp
45	Value       []byte
46}
47
48// The current state of the read rows state machine.
49type rrState int64
50
51const (
52	newRow rrState = iota
53	rowInProgress
54	cellInProgress
55)
56
57// chunkReader handles cell chunks from the read rows response and combines
58// them into full Rows.
59type chunkReader struct {
60	state   rrState
61	curKey  []byte
62	curFam  string
63	curQual []byte
64	curTS   int64
65	curVal  []byte
66	curRow  Row
67	lastKey string
68}
69
70// newChunkReader returns a new chunkReader for handling read rows responses.
71func newChunkReader() *chunkReader {
72	return &chunkReader{state: newRow}
73}
74
75// Process takes a cell chunk and returns a new Row if the given chunk
76// completes a Row, or nil otherwise.
77func (cr *chunkReader) Process(cc *btpb.ReadRowsResponse_CellChunk) (Row, error) {
78	var row Row
79	switch cr.state {
80	case newRow:
81		if err := cr.validateNewRow(cc); err != nil {
82			return nil, err
83		}
84
85		cr.curRow = make(Row)
86		cr.curKey = cc.RowKey
87		cr.curFam = cc.FamilyName.Value
88		cr.curQual = cc.Qualifier.Value
89		cr.curTS = cc.TimestampMicros
90		row = cr.handleCellValue(cc)
91
92	case rowInProgress:
93		if err := cr.validateRowInProgress(cc); err != nil {
94			return nil, err
95		}
96
97		if cc.GetResetRow() {
98			cr.resetToNewRow()
99			return nil, nil
100		}
101
102		if cc.FamilyName != nil {
103			cr.curFam = cc.FamilyName.Value
104		}
105		if cc.Qualifier != nil {
106			cr.curQual = cc.Qualifier.Value
107		}
108		cr.curTS = cc.TimestampMicros
109		row = cr.handleCellValue(cc)
110
111	case cellInProgress:
112		if err := cr.validateCellInProgress(cc); err != nil {
113			return nil, err
114		}
115		if cc.GetResetRow() {
116			cr.resetToNewRow()
117			return nil, nil
118		}
119		row = cr.handleCellValue(cc)
120	}
121
122	return row, nil
123}
124
125// Close must be called after all cell chunks from the response
126// have been processed. An error will be returned if the reader is
127// in an invalid state, in which case the error should be propagated to the caller.
128func (cr *chunkReader) Close() error {
129	if cr.state != newRow {
130		return fmt.Errorf("invalid state for end of stream %q", cr.state)
131	}
132	return nil
133}
134
135// handleCellValue returns a Row if the cell value includes a commit, otherwise nil.
136func (cr *chunkReader) handleCellValue(cc *btpb.ReadRowsResponse_CellChunk) Row {
137	if cc.ValueSize > 0 {
138		// ValueSize is specified so expect a split value of ValueSize bytes
139		if cr.curVal == nil {
140			cr.curVal = make([]byte, 0, cc.ValueSize)
141		}
142		cr.curVal = append(cr.curVal, cc.Value...)
143		cr.state = cellInProgress
144	} else {
145		// This cell is either the complete value or the last chunk of a split
146		if cr.curVal == nil {
147			cr.curVal = cc.Value
148		} else {
149			cr.curVal = append(cr.curVal, cc.Value...)
150		}
151		cr.finishCell()
152
153		if cc.GetCommitRow() {
154			return cr.commitRow()
155		}
156		cr.state = rowInProgress
157	}
158
159	return nil
160}
161
162func (cr *chunkReader) finishCell() {
163	ri := ReadItem{
164		Row:       string(cr.curKey),
165		Column:    string(cr.curFam) + ":" + string(cr.curQual),
166		Timestamp: Timestamp(cr.curTS),
167		Value:     cr.curVal,
168	}
169	cr.curRow[cr.curFam] = append(cr.curRow[cr.curFam], ri)
170	cr.curVal = nil
171}
172
173func (cr *chunkReader) commitRow() Row {
174	row := cr.curRow
175	cr.lastKey = cr.curRow.Key()
176	cr.resetToNewRow()
177	return row
178}
179
180func (cr *chunkReader) resetToNewRow() {
181	cr.curKey = nil
182	cr.curFam = ""
183	cr.curQual = nil
184	cr.curVal = nil
185	cr.curRow = nil
186	cr.curTS = 0
187	cr.state = newRow
188}
189
190func (cr *chunkReader) validateNewRow(cc *btpb.ReadRowsResponse_CellChunk) error {
191	if cc.GetResetRow() {
192		return fmt.Errorf("reset_row not allowed between rows")
193	}
194	if cc.RowKey == nil || cc.FamilyName == nil || cc.Qualifier == nil {
195		return fmt.Errorf("missing key field for new row %v", cc)
196	}
197	if cr.lastKey != "" && cr.lastKey >= string(cc.RowKey) {
198		return fmt.Errorf("out of order row key: %q, %q", cr.lastKey, string(cc.RowKey))
199	}
200	return nil
201}
202
203func (cr *chunkReader) validateRowInProgress(cc *btpb.ReadRowsResponse_CellChunk) error {
204	if err := cr.validateRowStatus(cc); err != nil {
205		return err
206	}
207	if cc.RowKey != nil && !bytes.Equal(cc.RowKey, cr.curKey) {
208		return fmt.Errorf("received new row key %q during existing row %q", cc.RowKey, cr.curKey)
209	}
210	if cc.FamilyName != nil && cc.Qualifier == nil {
211		return fmt.Errorf("family name %q specified without a qualifier", cc.FamilyName)
212	}
213	return nil
214}
215
216func (cr *chunkReader) validateCellInProgress(cc *btpb.ReadRowsResponse_CellChunk) error {
217	if err := cr.validateRowStatus(cc); err != nil {
218		return err
219	}
220	if cr.curVal == nil {
221		return fmt.Errorf("no cached cell while CELL_IN_PROGRESS %v", cc)
222	}
223	if cc.GetResetRow() == false && cr.isAnyKeyPresent(cc) {
224		return fmt.Errorf("cell key components found while CELL_IN_PROGRESS %v", cc)
225	}
226	return nil
227}
228
229func (cr *chunkReader) isAnyKeyPresent(cc *btpb.ReadRowsResponse_CellChunk) bool {
230	return cc.RowKey != nil ||
231		cc.FamilyName != nil ||
232		cc.Qualifier != nil ||
233		cc.TimestampMicros != 0
234}
235
236// Validate a RowStatus, commit or reset, if present.
237func (cr *chunkReader) validateRowStatus(cc *btpb.ReadRowsResponse_CellChunk) error {
238	// Resets can't be specified with any other part of a cell
239	if cc.GetResetRow() && (cr.isAnyKeyPresent(cc) ||
240		cc.Value != nil ||
241		cc.ValueSize != 0 ||
242		cc.Labels != nil) {
243		return fmt.Errorf("reset must not be specified with other fields %v", cc)
244	}
245	if cc.GetCommitRow() && cc.ValueSize > 0 {
246		return fmt.Errorf("commit row found in between chunks in a cell")
247	}
248	return nil
249}
250