1/*
2 * MinIO Go Library for Amazon S3 Compatible Cloud Storage
3 * (C) 2018-2020 MinIO, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package minio
19
20import (
21	"bytes"
22	"context"
23	"encoding/binary"
24	"encoding/xml"
25	"errors"
26	"fmt"
27	"hash"
28	"hash/crc32"
29	"io"
30	"net/http"
31	"net/url"
32	"strings"
33
34	"github.com/minio/minio-go/v7/pkg/encrypt"
35	"github.com/minio/minio-go/v7/pkg/s3utils"
36)
37
38// CSVFileHeaderInfo - is the parameter for whether to utilize headers.
39type CSVFileHeaderInfo string
40
41// Constants for file header info.
42const (
43	CSVFileHeaderInfoNone   CSVFileHeaderInfo = "NONE"
44	CSVFileHeaderInfoIgnore                   = "IGNORE"
45	CSVFileHeaderInfoUse                      = "USE"
46)
47
48// SelectCompressionType - is the parameter for what type of compression is
49// present
50type SelectCompressionType string
51
52// Constants for compression types under select API.
53const (
54	SelectCompressionNONE SelectCompressionType = "NONE"
55	SelectCompressionGZIP                       = "GZIP"
56	SelectCompressionBZIP                       = "BZIP2"
57
58	// Non-standard compression schemes, supported by MinIO hosts:
59
60	SelectCompressionZSTD   = "ZSTD"   // Zstandard compression.
61	SelectCompressionLZ4    = "LZ4"    // LZ4 Stream
62	SelectCompressionS2     = "S2"     // S2 Stream
63	SelectCompressionSNAPPY = "SNAPPY" // Snappy stream
64)
65
66// CSVQuoteFields - is the parameter for how CSV fields are quoted.
67type CSVQuoteFields string
68
69// Constants for csv quote styles.
70const (
71	CSVQuoteFieldsAlways   CSVQuoteFields = "Always"
72	CSVQuoteFieldsAsNeeded                = "AsNeeded"
73)
74
75// QueryExpressionType - is of what syntax the expression is, this should only
76// be SQL
77type QueryExpressionType string
78
79// Constants for expression type.
80const (
81	QueryExpressionTypeSQL QueryExpressionType = "SQL"
82)
83
84// JSONType determines json input serialization type.
85type JSONType string
86
87// Constants for JSONTypes.
88const (
89	JSONDocumentType JSONType = "DOCUMENT"
90	JSONLinesType             = "LINES"
91)
92
93// ParquetInputOptions parquet input specific options
94type ParquetInputOptions struct{}
95
96// CSVInputOptions csv input specific options
97type CSVInputOptions struct {
98	FileHeaderInfo    CSVFileHeaderInfo
99	fileHeaderInfoSet bool
100
101	RecordDelimiter    string
102	recordDelimiterSet bool
103
104	FieldDelimiter    string
105	fieldDelimiterSet bool
106
107	QuoteCharacter    string
108	quoteCharacterSet bool
109
110	QuoteEscapeCharacter    string
111	quoteEscapeCharacterSet bool
112
113	Comments    string
114	commentsSet bool
115}
116
117// SetFileHeaderInfo sets the file header info in the CSV input options
118func (c *CSVInputOptions) SetFileHeaderInfo(val CSVFileHeaderInfo) {
119	c.FileHeaderInfo = val
120	c.fileHeaderInfoSet = true
121}
122
123// SetRecordDelimiter sets the record delimiter in the CSV input options
124func (c *CSVInputOptions) SetRecordDelimiter(val string) {
125	c.RecordDelimiter = val
126	c.recordDelimiterSet = true
127}
128
129// SetFieldDelimiter sets the field delimiter in the CSV input options
130func (c *CSVInputOptions) SetFieldDelimiter(val string) {
131	c.FieldDelimiter = val
132	c.fieldDelimiterSet = true
133}
134
135// SetQuoteCharacter sets the quote character in the CSV input options
136func (c *CSVInputOptions) SetQuoteCharacter(val string) {
137	c.QuoteCharacter = val
138	c.quoteCharacterSet = true
139}
140
141// SetQuoteEscapeCharacter sets the quote escape character in the CSV input options
142func (c *CSVInputOptions) SetQuoteEscapeCharacter(val string) {
143	c.QuoteEscapeCharacter = val
144	c.quoteEscapeCharacterSet = true
145}
146
147// SetComments sets the comments character in the CSV input options
148func (c *CSVInputOptions) SetComments(val string) {
149	c.Comments = val
150	c.commentsSet = true
151}
152
153// MarshalXML - produces the xml representation of the CSV input options struct
154func (c CSVInputOptions) MarshalXML(e *xml.Encoder, start xml.StartElement) error {
155	if err := e.EncodeToken(start); err != nil {
156		return err
157	}
158	if c.FileHeaderInfo != "" || c.fileHeaderInfoSet {
159		if err := e.EncodeElement(c.FileHeaderInfo, xml.StartElement{Name: xml.Name{Local: "FileHeaderInfo"}}); err != nil {
160			return err
161		}
162	}
163
164	if c.RecordDelimiter != "" || c.recordDelimiterSet {
165		if err := e.EncodeElement(c.RecordDelimiter, xml.StartElement{Name: xml.Name{Local: "RecordDelimiter"}}); err != nil {
166			return err
167		}
168	}
169
170	if c.FieldDelimiter != "" || c.fieldDelimiterSet {
171		if err := e.EncodeElement(c.FieldDelimiter, xml.StartElement{Name: xml.Name{Local: "FieldDelimiter"}}); err != nil {
172			return err
173		}
174	}
175
176	if c.QuoteCharacter != "" || c.quoteCharacterSet {
177		if err := e.EncodeElement(c.QuoteCharacter, xml.StartElement{Name: xml.Name{Local: "QuoteCharacter"}}); err != nil {
178			return err
179		}
180	}
181
182	if c.QuoteEscapeCharacter != "" || c.quoteEscapeCharacterSet {
183		if err := e.EncodeElement(c.QuoteEscapeCharacter, xml.StartElement{Name: xml.Name{Local: "QuoteEscapeCharacter"}}); err != nil {
184			return err
185		}
186	}
187
188	if c.Comments != "" || c.commentsSet {
189		if err := e.EncodeElement(c.Comments, xml.StartElement{Name: xml.Name{Local: "Comments"}}); err != nil {
190			return err
191		}
192	}
193
194	return e.EncodeToken(xml.EndElement{Name: start.Name})
195}
196
197// CSVOutputOptions csv output specific options
198type CSVOutputOptions struct {
199	QuoteFields    CSVQuoteFields
200	quoteFieldsSet bool
201
202	RecordDelimiter    string
203	recordDelimiterSet bool
204
205	FieldDelimiter    string
206	fieldDelimiterSet bool
207
208	QuoteCharacter    string
209	quoteCharacterSet bool
210
211	QuoteEscapeCharacter    string
212	quoteEscapeCharacterSet bool
213}
214
215// SetQuoteFields sets the quote field parameter in the CSV output options
216func (c *CSVOutputOptions) SetQuoteFields(val CSVQuoteFields) {
217	c.QuoteFields = val
218	c.quoteFieldsSet = true
219}
220
221// SetRecordDelimiter sets the record delimiter character in the CSV output options
222func (c *CSVOutputOptions) SetRecordDelimiter(val string) {
223	c.RecordDelimiter = val
224	c.recordDelimiterSet = true
225}
226
227// SetFieldDelimiter sets the field delimiter character in the CSV output options
228func (c *CSVOutputOptions) SetFieldDelimiter(val string) {
229	c.FieldDelimiter = val
230	c.fieldDelimiterSet = true
231}
232
233// SetQuoteCharacter sets the quote character in the CSV output options
234func (c *CSVOutputOptions) SetQuoteCharacter(val string) {
235	c.QuoteCharacter = val
236	c.quoteCharacterSet = true
237}
238
239// SetQuoteEscapeCharacter sets the quote escape character in the CSV output options
240func (c *CSVOutputOptions) SetQuoteEscapeCharacter(val string) {
241	c.QuoteEscapeCharacter = val
242	c.quoteEscapeCharacterSet = true
243}
244
245// MarshalXML - produces the xml representation of the CSVOutputOptions struct
246func (c CSVOutputOptions) MarshalXML(e *xml.Encoder, start xml.StartElement) error {
247	if err := e.EncodeToken(start); err != nil {
248		return err
249	}
250
251	if c.QuoteFields != "" || c.quoteFieldsSet {
252		if err := e.EncodeElement(c.QuoteFields, xml.StartElement{Name: xml.Name{Local: "QuoteFields"}}); err != nil {
253			return err
254		}
255	}
256
257	if c.RecordDelimiter != "" || c.recordDelimiterSet {
258		if err := e.EncodeElement(c.RecordDelimiter, xml.StartElement{Name: xml.Name{Local: "RecordDelimiter"}}); err != nil {
259			return err
260		}
261	}
262
263	if c.FieldDelimiter != "" || c.fieldDelimiterSet {
264		if err := e.EncodeElement(c.FieldDelimiter, xml.StartElement{Name: xml.Name{Local: "FieldDelimiter"}}); err != nil {
265			return err
266		}
267	}
268
269	if c.QuoteCharacter != "" || c.quoteCharacterSet {
270		if err := e.EncodeElement(c.QuoteCharacter, xml.StartElement{Name: xml.Name{Local: "QuoteCharacter"}}); err != nil {
271			return err
272		}
273	}
274
275	if c.QuoteEscapeCharacter != "" || c.quoteEscapeCharacterSet {
276		if err := e.EncodeElement(c.QuoteEscapeCharacter, xml.StartElement{Name: xml.Name{Local: "QuoteEscapeCharacter"}}); err != nil {
277			return err
278		}
279	}
280
281	return e.EncodeToken(xml.EndElement{Name: start.Name})
282}
283
284// JSONInputOptions json input specific options
285type JSONInputOptions struct {
286	Type    JSONType
287	typeSet bool
288}
289
290// SetType sets the JSON type in the JSON input options
291func (j *JSONInputOptions) SetType(typ JSONType) {
292	j.Type = typ
293	j.typeSet = true
294}
295
296// MarshalXML - produces the xml representation of the JSONInputOptions struct
297func (j JSONInputOptions) MarshalXML(e *xml.Encoder, start xml.StartElement) error {
298	if err := e.EncodeToken(start); err != nil {
299		return err
300	}
301
302	if j.Type != "" || j.typeSet {
303		if err := e.EncodeElement(j.Type, xml.StartElement{Name: xml.Name{Local: "Type"}}); err != nil {
304			return err
305		}
306	}
307
308	return e.EncodeToken(xml.EndElement{Name: start.Name})
309}
310
311// JSONOutputOptions - json output specific options
312type JSONOutputOptions struct {
313	RecordDelimiter    string
314	recordDelimiterSet bool
315}
316
317// SetRecordDelimiter sets the record delimiter in the JSON output options
318func (j *JSONOutputOptions) SetRecordDelimiter(val string) {
319	j.RecordDelimiter = val
320	j.recordDelimiterSet = true
321}
322
323// MarshalXML - produces the xml representation of the JSONOutputOptions struct
324func (j JSONOutputOptions) MarshalXML(e *xml.Encoder, start xml.StartElement) error {
325	if err := e.EncodeToken(start); err != nil {
326		return err
327	}
328
329	if j.RecordDelimiter != "" || j.recordDelimiterSet {
330		if err := e.EncodeElement(j.RecordDelimiter, xml.StartElement{Name: xml.Name{Local: "RecordDelimiter"}}); err != nil {
331			return err
332		}
333	}
334
335	return e.EncodeToken(xml.EndElement{Name: start.Name})
336}
337
338// SelectObjectInputSerialization - input serialization parameters
339type SelectObjectInputSerialization struct {
340	CompressionType SelectCompressionType `xml:"CompressionType,omitempty"`
341	Parquet         *ParquetInputOptions  `xml:"Parquet,omitempty"`
342	CSV             *CSVInputOptions      `xml:"CSV,omitempty"`
343	JSON            *JSONInputOptions     `xml:"JSON,omitempty"`
344}
345
346// SelectObjectOutputSerialization - output serialization parameters.
347type SelectObjectOutputSerialization struct {
348	CSV  *CSVOutputOptions  `xml:"CSV,omitempty"`
349	JSON *JSONOutputOptions `xml:"JSON,omitempty"`
350}
351
352// SelectObjectOptions - represents the input select body
353type SelectObjectOptions struct {
354	XMLName              xml.Name           `xml:"SelectObjectContentRequest" json:"-"`
355	ServerSideEncryption encrypt.ServerSide `xml:"-"`
356	Expression           string
357	ExpressionType       QueryExpressionType
358	InputSerialization   SelectObjectInputSerialization
359	OutputSerialization  SelectObjectOutputSerialization
360	RequestProgress      struct {
361		Enabled bool
362	}
363}
364
365// Header returns the http.Header representation of the SelectObject options.
366func (o SelectObjectOptions) Header() http.Header {
367	headers := make(http.Header)
368	if o.ServerSideEncryption != nil && o.ServerSideEncryption.Type() == encrypt.SSEC {
369		o.ServerSideEncryption.Marshal(headers)
370	}
371	return headers
372}
373
374// SelectObjectType - is the parameter which defines what type of object the
375// operation is being performed on.
376type SelectObjectType string
377
378// Constants for input data types.
379const (
380	SelectObjectTypeCSV     SelectObjectType = "CSV"
381	SelectObjectTypeJSON                     = "JSON"
382	SelectObjectTypeParquet                  = "Parquet"
383)
384
385// preludeInfo is used for keeping track of necessary information from the
386// prelude.
387type preludeInfo struct {
388	totalLen  uint32
389	headerLen uint32
390}
391
392// SelectResults is used for the streaming responses from the server.
393type SelectResults struct {
394	pipeReader *io.PipeReader
395	resp       *http.Response
396	stats      *StatsMessage
397	progress   *ProgressMessage
398}
399
400// ProgressMessage is a struct for progress xml message.
401type ProgressMessage struct {
402	XMLName xml.Name `xml:"Progress" json:"-"`
403	StatsMessage
404}
405
406// StatsMessage is a struct for stat xml message.
407type StatsMessage struct {
408	XMLName        xml.Name `xml:"Stats" json:"-"`
409	BytesScanned   int64
410	BytesProcessed int64
411	BytesReturned  int64
412}
413
414// messageType represents the type of message.
415type messageType string
416
417const (
418	errorMsg  messageType = "error"
419	commonMsg             = "event"
420)
421
422// eventType represents the type of event.
423type eventType string
424
425// list of event-types returned by Select API.
426const (
427	endEvent      eventType = "End"
428	recordsEvent            = "Records"
429	progressEvent           = "Progress"
430	statsEvent              = "Stats"
431)
432
433// contentType represents content type of event.
434type contentType string
435
436const (
437	xmlContent contentType = "text/xml"
438)
439
440// SelectObjectContent is a implementation of http://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectSELECTContent.html AWS S3 API.
441func (c *Client) SelectObjectContent(ctx context.Context, bucketName, objectName string, opts SelectObjectOptions) (*SelectResults, error) {
442	// Input validation.
443	if err := s3utils.CheckValidBucketName(bucketName); err != nil {
444		return nil, err
445	}
446	if err := s3utils.CheckValidObjectName(objectName); err != nil {
447		return nil, err
448	}
449
450	selectReqBytes, err := xml.Marshal(opts)
451	if err != nil {
452		return nil, err
453	}
454
455	urlValues := make(url.Values)
456	urlValues.Set("select", "")
457	urlValues.Set("select-type", "2")
458
459	// Execute POST on bucket/object.
460	resp, err := c.executeMethod(ctx, http.MethodPost, requestMetadata{
461		bucketName:       bucketName,
462		objectName:       objectName,
463		queryValues:      urlValues,
464		customHeader:     opts.Header(),
465		contentMD5Base64: sumMD5Base64(selectReqBytes),
466		contentSHA256Hex: sum256Hex(selectReqBytes),
467		contentBody:      bytes.NewReader(selectReqBytes),
468		contentLength:    int64(len(selectReqBytes)),
469	})
470	if err != nil {
471		return nil, err
472	}
473
474	return NewSelectResults(resp, bucketName)
475}
476
477// NewSelectResults creates a Select Result parser that parses the response
478// and returns a Reader that will return parsed and assembled select output.
479func NewSelectResults(resp *http.Response, bucketName string) (*SelectResults, error) {
480	if resp.StatusCode != http.StatusOK {
481		return nil, httpRespToErrorResponse(resp, bucketName, "")
482	}
483
484	pipeReader, pipeWriter := io.Pipe()
485	streamer := &SelectResults{
486		resp:       resp,
487		stats:      &StatsMessage{},
488		progress:   &ProgressMessage{},
489		pipeReader: pipeReader,
490	}
491	streamer.start(pipeWriter)
492	return streamer, nil
493}
494
495// Close - closes the underlying response body and the stream reader.
496func (s *SelectResults) Close() error {
497	defer closeResponse(s.resp)
498	return s.pipeReader.Close()
499}
500
501// Read - is a reader compatible implementation for SelectObjectContent records.
502func (s *SelectResults) Read(b []byte) (n int, err error) {
503	return s.pipeReader.Read(b)
504}
505
506// Stats - information about a request's stats when processing is complete.
507func (s *SelectResults) Stats() *StatsMessage {
508	return s.stats
509}
510
511// Progress - information about the progress of a request.
512func (s *SelectResults) Progress() *ProgressMessage {
513	return s.progress
514}
515
516// start is the main function that decodes the large byte array into
517// several events that are sent through the eventstream.
518func (s *SelectResults) start(pipeWriter *io.PipeWriter) {
519	go func() {
520		for {
521			var prelude preludeInfo
522			var headers = make(http.Header)
523			var err error
524
525			// Create CRC code
526			crc := crc32.New(crc32.IEEETable)
527			crcReader := io.TeeReader(s.resp.Body, crc)
528
529			// Extract the prelude(12 bytes) into a struct to extract relevant information.
530			prelude, err = processPrelude(crcReader, crc)
531			if err != nil {
532				pipeWriter.CloseWithError(err)
533				closeResponse(s.resp)
534				return
535			}
536
537			// Extract the headers(variable bytes) into a struct to extract relevant information
538			if prelude.headerLen > 0 {
539				if err = extractHeader(io.LimitReader(crcReader, int64(prelude.headerLen)), headers); err != nil {
540					pipeWriter.CloseWithError(err)
541					closeResponse(s.resp)
542					return
543				}
544			}
545
546			// Get the actual payload length so that the appropriate amount of
547			// bytes can be read or parsed.
548			payloadLen := prelude.PayloadLen()
549
550			m := messageType(headers.Get("message-type"))
551
552			switch m {
553			case errorMsg:
554				pipeWriter.CloseWithError(errors.New(headers.Get("error-code") + ":\"" + headers.Get("error-message") + "\""))
555				closeResponse(s.resp)
556				return
557			case commonMsg:
558				// Get content-type of the payload.
559				c := contentType(headers.Get("content-type"))
560
561				// Get event type of the payload.
562				e := eventType(headers.Get("event-type"))
563
564				// Handle all supported events.
565				switch e {
566				case endEvent:
567					pipeWriter.Close()
568					closeResponse(s.resp)
569					return
570				case recordsEvent:
571					if _, err = io.Copy(pipeWriter, io.LimitReader(crcReader, payloadLen)); err != nil {
572						pipeWriter.CloseWithError(err)
573						closeResponse(s.resp)
574						return
575					}
576				case progressEvent:
577					switch c {
578					case xmlContent:
579						if err = xmlDecoder(io.LimitReader(crcReader, payloadLen), s.progress); err != nil {
580							pipeWriter.CloseWithError(err)
581							closeResponse(s.resp)
582							return
583						}
584					default:
585						pipeWriter.CloseWithError(fmt.Errorf("Unexpected content-type %s sent for event-type %s", c, progressEvent))
586						closeResponse(s.resp)
587						return
588					}
589				case statsEvent:
590					switch c {
591					case xmlContent:
592						if err = xmlDecoder(io.LimitReader(crcReader, payloadLen), s.stats); err != nil {
593							pipeWriter.CloseWithError(err)
594							closeResponse(s.resp)
595							return
596						}
597					default:
598						pipeWriter.CloseWithError(fmt.Errorf("Unexpected content-type %s sent for event-type %s", c, statsEvent))
599						closeResponse(s.resp)
600						return
601					}
602				}
603			}
604
605			// Ensures that the full message's CRC is correct and
606			// that the message is not corrupted
607			if err := checkCRC(s.resp.Body, crc.Sum32()); err != nil {
608				pipeWriter.CloseWithError(err)
609				closeResponse(s.resp)
610				return
611			}
612
613		}
614	}()
615}
616
617// PayloadLen is a function that calculates the length of the payload.
618func (p preludeInfo) PayloadLen() int64 {
619	return int64(p.totalLen - p.headerLen - 16)
620}
621
622// processPrelude is the function that reads the 12 bytes of the prelude and
623// ensures the CRC is correct while also extracting relevant information into
624// the struct,
625func processPrelude(prelude io.Reader, crc hash.Hash32) (preludeInfo, error) {
626	var err error
627	var pInfo = preludeInfo{}
628
629	// reads total length of the message (first 4 bytes)
630	pInfo.totalLen, err = extractUint32(prelude)
631	if err != nil {
632		return pInfo, err
633	}
634
635	// reads total header length of the message (2nd 4 bytes)
636	pInfo.headerLen, err = extractUint32(prelude)
637	if err != nil {
638		return pInfo, err
639	}
640
641	// checks that the CRC is correct (3rd 4 bytes)
642	preCRC := crc.Sum32()
643	if err := checkCRC(prelude, preCRC); err != nil {
644		return pInfo, err
645	}
646
647	return pInfo, nil
648}
649
650// extracts the relevant information from the Headers.
651func extractHeader(body io.Reader, myHeaders http.Header) error {
652	for {
653		// extracts the first part of the header,
654		headerTypeName, err := extractHeaderType(body)
655		if err != nil {
656			// Since end of file, we have read all of our headers
657			if err == io.EOF {
658				break
659			}
660			return err
661		}
662
663		// reads the 7 present in the header and ignores it.
664		extractUint8(body)
665
666		headerValueName, err := extractHeaderValue(body)
667		if err != nil {
668			return err
669		}
670
671		myHeaders.Set(headerTypeName, headerValueName)
672
673	}
674	return nil
675}
676
677// extractHeaderType extracts the first half of the header message, the header type.
678func extractHeaderType(body io.Reader) (string, error) {
679	// extracts 2 bit integer
680	headerNameLen, err := extractUint8(body)
681	if err != nil {
682		return "", err
683	}
684	// extracts the string with the appropriate number of bytes
685	headerName, err := extractString(body, int(headerNameLen))
686	if err != nil {
687		return "", err
688	}
689	return strings.TrimPrefix(headerName, ":"), nil
690}
691
692// extractsHeaderValue extracts the second half of the header message, the
693// header value
694func extractHeaderValue(body io.Reader) (string, error) {
695	bodyLen, err := extractUint16(body)
696	if err != nil {
697		return "", err
698	}
699	bodyName, err := extractString(body, int(bodyLen))
700	if err != nil {
701		return "", err
702	}
703	return bodyName, nil
704}
705
706// extracts a string from byte array of a particular number of bytes.
707func extractString(source io.Reader, lenBytes int) (string, error) {
708	myVal := make([]byte, lenBytes)
709	_, err := source.Read(myVal)
710	if err != nil {
711		return "", err
712	}
713	return string(myVal), nil
714}
715
716// extractUint32 extracts a 4 byte integer from the byte array.
717func extractUint32(r io.Reader) (uint32, error) {
718	buf := make([]byte, 4)
719	_, err := readFull(r, buf)
720	if err != nil {
721		return 0, err
722	}
723	return binary.BigEndian.Uint32(buf), nil
724}
725
726// extractUint16 extracts a 2 byte integer from the byte array.
727func extractUint16(r io.Reader) (uint16, error) {
728	buf := make([]byte, 2)
729	_, err := readFull(r, buf)
730	if err != nil {
731		return 0, err
732	}
733	return binary.BigEndian.Uint16(buf), nil
734}
735
736// extractUint8 extracts a 1 byte integer from the byte array.
737func extractUint8(r io.Reader) (uint8, error) {
738	buf := make([]byte, 1)
739	_, err := readFull(r, buf)
740	if err != nil {
741		return 0, err
742	}
743	return buf[0], nil
744}
745
746// checkCRC ensures that the CRC matches with the one from the reader.
747func checkCRC(r io.Reader, expect uint32) error {
748	msgCRC, err := extractUint32(r)
749	if err != nil {
750		return err
751	}
752
753	if msgCRC != expect {
754		return fmt.Errorf("Checksum Mismatch, MessageCRC of 0x%X does not equal expected CRC of 0x%X", msgCRC, expect)
755
756	}
757	return nil
758}
759