1/*
2Copyright 2016 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package bigtable
18
19import (
20	"bytes"
21	"fmt"
22
23	btpb "google.golang.org/genproto/googleapis/bigtable/v2"
24)
25
26// A Row is returned by ReadRows. The map is keyed by column family (the prefix
27// of the column name before the colon). The values are the returned ReadItems
28// for that column family in the order returned by Read.
29type Row map[string][]ReadItem
30
31// Key returns the row's key, or "" if the row is empty.
32func (r Row) Key() string {
33	for _, items := range r {
34		if len(items) > 0 {
35			return items[0].Row
36		}
37	}
38	return ""
39}
40
41// A ReadItem is returned by Read. A ReadItem contains data from a specific row and column.
42type ReadItem struct {
43	Row, Column string
44	Timestamp   Timestamp
45	Value       []byte
46	Labels      []string
47}
48
49// The current state of the read rows state machine.
50type rrState int64
51
52const (
53	newRow rrState = iota
54	rowInProgress
55	cellInProgress
56)
57
58// chunkReader handles cell chunks from the read rows response and combines
59// them into full Rows.
60type chunkReader struct {
61	state     rrState
62	curKey    []byte
63	curLabels []string
64	curFam    string
65	curQual   []byte
66	curTS     int64
67	curVal    []byte
68	curRow    Row
69	lastKey   string
70}
71
72// newChunkReader returns a new chunkReader for handling read rows responses.
73func newChunkReader() *chunkReader {
74	return &chunkReader{state: newRow}
75}
76
77// Process takes a cell chunk and returns a new Row if the given chunk
78// completes a Row, or nil otherwise.
79func (cr *chunkReader) Process(cc *btpb.ReadRowsResponse_CellChunk) (Row, error) {
80	var row Row
81	switch cr.state {
82	case newRow:
83		if err := cr.validateNewRow(cc); err != nil {
84			return nil, err
85		}
86
87		cr.curRow = make(Row)
88		cr.curKey = cc.RowKey
89		cr.curFam = cc.FamilyName.Value
90		cr.curQual = cc.Qualifier.Value
91		cr.curTS = cc.TimestampMicros
92		row = cr.handleCellValue(cc)
93
94	case rowInProgress:
95		if err := cr.validateRowInProgress(cc); err != nil {
96			return nil, err
97		}
98
99		if cc.GetResetRow() {
100			cr.resetToNewRow()
101			return nil, nil
102		}
103
104		if cc.FamilyName != nil {
105			cr.curFam = cc.FamilyName.Value
106		}
107		if cc.Qualifier != nil {
108			cr.curQual = cc.Qualifier.Value
109		}
110		cr.curTS = cc.TimestampMicros
111		row = cr.handleCellValue(cc)
112
113	case cellInProgress:
114		if err := cr.validateCellInProgress(cc); err != nil {
115			return nil, err
116		}
117		if cc.GetResetRow() {
118			cr.resetToNewRow()
119			return nil, nil
120		}
121		row = cr.handleCellValue(cc)
122	}
123
124	return row, nil
125}
126
127// Close must be called after all cell chunks from the response
128// have been processed. An error will be returned if the reader is
129// in an invalid state, in which case the error should be propagated to the caller.
130func (cr *chunkReader) Close() error {
131	if cr.state != newRow {
132		return fmt.Errorf("invalid state for end of stream %q", cr.state)
133	}
134	return nil
135}
136
137// handleCellValue returns a Row if the cell value includes a commit, otherwise nil.
138func (cr *chunkReader) handleCellValue(cc *btpb.ReadRowsResponse_CellChunk) Row {
139	if cc.ValueSize > 0 {
140		// ValueSize is specified so expect a split value of ValueSize bytes
141		if cr.curVal == nil {
142			cr.curVal = make([]byte, 0, cc.ValueSize)
143			cr.curLabels = cc.Labels
144		}
145		cr.curVal = append(cr.curVal, cc.Value...)
146		cr.state = cellInProgress
147	} else {
148		// This cell is either the complete value or the last chunk of a split
149		if cr.curVal == nil {
150			cr.curVal = cc.Value
151			cr.curLabels = cc.Labels
152		} else {
153			cr.curVal = append(cr.curVal, cc.Value...)
154		}
155		cr.finishCell()
156
157		if cc.GetCommitRow() {
158			return cr.commitRow()
159		}
160		cr.state = rowInProgress
161	}
162
163	return nil
164}
165
166func (cr *chunkReader) finishCell() {
167	ri := ReadItem{
168		Row:       string(cr.curKey),
169		Column:    string(cr.curFam) + ":" + string(cr.curQual),
170		Timestamp: Timestamp(cr.curTS),
171		Value:     cr.curVal,
172		Labels:    cr.curLabels,
173	}
174	cr.curRow[cr.curFam] = append(cr.curRow[cr.curFam], ri)
175	cr.curVal = nil
176	cr.curLabels = nil
177}
178
179func (cr *chunkReader) commitRow() Row {
180	row := cr.curRow
181	cr.lastKey = cr.curRow.Key()
182	cr.resetToNewRow()
183	return row
184}
185
186func (cr *chunkReader) resetToNewRow() {
187	cr.curKey = nil
188	cr.curFam = ""
189	cr.curQual = nil
190	cr.curVal = nil
191	cr.curRow = nil
192	cr.curTS = 0
193	cr.state = newRow
194}
195
196func (cr *chunkReader) validateNewRow(cc *btpb.ReadRowsResponse_CellChunk) error {
197	if cc.GetResetRow() {
198		return fmt.Errorf("reset_row not allowed between rows")
199	}
200	if cc.RowKey == nil || cc.FamilyName == nil || cc.Qualifier == nil {
201		return fmt.Errorf("missing key field for new row %v", cc)
202	}
203	if cr.lastKey != "" && cr.lastKey >= string(cc.RowKey) {
204		return fmt.Errorf("out of order row key: %q, %q", cr.lastKey, string(cc.RowKey))
205	}
206	return nil
207}
208
209func (cr *chunkReader) validateRowInProgress(cc *btpb.ReadRowsResponse_CellChunk) error {
210	if err := cr.validateRowStatus(cc); err != nil {
211		return err
212	}
213	if cc.RowKey != nil && !bytes.Equal(cc.RowKey, cr.curKey) {
214		return fmt.Errorf("received new row key %q during existing row %q", cc.RowKey, cr.curKey)
215	}
216	if cc.FamilyName != nil && cc.Qualifier == nil {
217		return fmt.Errorf("family name %q specified without a qualifier", cc.FamilyName)
218	}
219	return nil
220}
221
222func (cr *chunkReader) validateCellInProgress(cc *btpb.ReadRowsResponse_CellChunk) error {
223	if err := cr.validateRowStatus(cc); err != nil {
224		return err
225	}
226	if cr.curVal == nil {
227		return fmt.Errorf("no cached cell while CELL_IN_PROGRESS %v", cc)
228	}
229	if cc.GetResetRow() == false && cr.isAnyKeyPresent(cc) {
230		return fmt.Errorf("cell key components found while CELL_IN_PROGRESS %v", cc)
231	}
232	return nil
233}
234
235func (cr *chunkReader) isAnyKeyPresent(cc *btpb.ReadRowsResponse_CellChunk) bool {
236	return cc.RowKey != nil ||
237		cc.FamilyName != nil ||
238		cc.Qualifier != nil ||
239		cc.TimestampMicros != 0
240}
241
242// Validate a RowStatus, commit or reset, if present.
243func (cr *chunkReader) validateRowStatus(cc *btpb.ReadRowsResponse_CellChunk) error {
244	// Resets can't be specified with any other part of a cell
245	if cc.GetResetRow() && (cr.isAnyKeyPresent(cc) ||
246		cc.Value != nil ||
247		cc.ValueSize != 0 ||
248		cc.Labels != nil) {
249		return fmt.Errorf("reset must not be specified with other fields %v", cc)
250	}
251	if cc.GetCommitRow() && cc.ValueSize > 0 {
252		return fmt.Errorf("commit row found in between chunks in a cell")
253	}
254	return nil
255}
256