1package loghttp
2
3import (
4	"errors"
5	"fmt"
6	"net/http"
7	"time"
8	"unsafe"
9
10	"github.com/buger/jsonparser"
11	json "github.com/json-iterator/go"
12	"github.com/prometheus/common/model"
13
14	"github.com/grafana/loki/pkg/logproto"
15	"github.com/grafana/loki/pkg/logqlmodel/stats"
16)
17
18var (
19	errEndBeforeStart   = errors.New("end timestamp must not be before or equal to start time")
20	errNegativeStep     = errors.New("zero or negative query resolution step widths are not accepted. Try a positive integer")
21	errStepTooSmall     = errors.New("exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)")
22	errNegativeInterval = errors.New("interval must be >= 0")
23)
24
25// QueryStatus holds the status of a query
26type QueryStatus string
27
28// QueryStatus values
29const (
30	QueryStatusSuccess = "success"
31	QueryStatusFail    = "fail"
32)
33
34// QueryResponse represents the http json response to a Loki range and instant query
35type QueryResponse struct {
36	Status string            `json:"status"`
37	Data   QueryResponseData `json:"data"`
38}
39
40func (q *QueryResponse) UnmarshalJSON(data []byte) error {
41	return jsonparser.ObjectEach(data, func(key, value []byte, dataType jsonparser.ValueType, offset int) error {
42		switch string(key) {
43		case "status":
44			q.Status = string(value)
45		case "data":
46			var responseData QueryResponseData
47			if err := responseData.UnmarshalJSON(value); err != nil {
48				return err
49			}
50			q.Data = responseData
51		}
52		return nil
53	})
54}
55
56// PushRequest models a log stream push
57type PushRequest struct {
58	Streams []*Stream `json:"streams"`
59}
60
61// ResultType holds the type of the result
62type ResultType string
63
64// ResultType values
65const (
66	ResultTypeStream = "streams"
67	ResultTypeScalar = "scalar"
68	ResultTypeVector = "vector"
69	ResultTypeMatrix = "matrix"
70)
71
72// ResultValue interface mimics the promql.Value interface
73type ResultValue interface {
74	Type() ResultType
75}
76
77// QueryResponseData represents the http json response to a label query
78type QueryResponseData struct {
79	ResultType ResultType   `json:"resultType"`
80	Result     ResultValue  `json:"result"`
81	Statistics stats.Result `json:"stats"`
82}
83
84// Type implements the promql.Value interface
85func (Streams) Type() ResultType { return ResultTypeStream }
86
87// Type implements the promql.Value interface
88func (Scalar) Type() ResultType { return ResultTypeScalar }
89
90// Type implements the promql.Value interface
91func (Vector) Type() ResultType { return ResultTypeVector }
92
93// Type implements the promql.Value interface
94func (Matrix) Type() ResultType { return ResultTypeMatrix }
95
96// Streams is a slice of Stream
97type Streams []Stream
98
99func (ss *Streams) UnmarshalJSON(data []byte) error {
100	var parseError error
101	_, err := jsonparser.ArrayEach(data, func(value []byte, dataType jsonparser.ValueType, offset int, err error) {
102		var stream Stream
103		if err := stream.UnmarshalJSON(value); err != nil {
104			parseError = err
105			return
106		}
107		*ss = append(*ss, stream)
108	})
109	if parseError != nil {
110		return parseError
111	}
112	return err
113}
114
115func (s Streams) ToProto() []logproto.Stream {
116	if len(s) == 0 {
117		return nil
118	}
119	result := make([]logproto.Stream, 0, len(s))
120	for _, s := range s {
121		entries := *(*[]logproto.Entry)(unsafe.Pointer(&s.Entries))
122		result = append(result, logproto.Stream{Labels: s.Labels.String(), Entries: entries})
123	}
124	return result
125}
126
127// Stream represents a log stream.  It includes a set of log entries and their labels.
128type Stream struct {
129	Labels  LabelSet `json:"stream"`
130	Entries []Entry  `json:"values"`
131}
132
133func (s *Stream) UnmarshalJSON(data []byte) error {
134	if s.Labels == nil {
135		s.Labels = LabelSet{}
136	}
137	if len(s.Entries) > 0 {
138		s.Entries = s.Entries[:0]
139	}
140	return jsonparser.ObjectEach(data, func(key, value []byte, ty jsonparser.ValueType, _ int) error {
141		switch string(key) {
142		case "stream":
143			if err := s.Labels.UnmarshalJSON(value); err != nil {
144				return err
145			}
146		case "values":
147			if ty == jsonparser.Null {
148				return nil
149			}
150			var parseError error
151			_, err := jsonparser.ArrayEach(value, func(value []byte, ty jsonparser.ValueType, _ int, _ error) {
152				if ty == jsonparser.Null {
153					return
154				}
155				var entry Entry
156				if err := entry.UnmarshalJSON(value); err != nil {
157					parseError = err
158					return
159				}
160				s.Entries = append(s.Entries, entry)
161			})
162			if parseError != nil {
163				return parseError
164			}
165			return err
166		}
167		return nil
168	})
169}
170
171// UnmarshalJSON implements the json.Unmarshaler interface.
172func (q *QueryResponseData) UnmarshalJSON(data []byte) error {
173	resultType, err := jsonparser.GetString(data, "resultType")
174	if err != nil {
175		return err
176	}
177	q.ResultType = ResultType(resultType)
178
179	return jsonparser.ObjectEach(data, func(key, value []byte, dataType jsonparser.ValueType, _ int) error {
180		switch string(key) {
181		case "result":
182			switch q.ResultType {
183			case ResultTypeStream:
184				ss := Streams{}
185				if err := ss.UnmarshalJSON(value); err != nil {
186					return err
187				}
188				q.Result = ss
189			case ResultTypeMatrix:
190				var m Matrix
191				if err = json.Unmarshal(value, &m); err != nil {
192					return err
193				}
194				q.Result = m
195			case ResultTypeVector:
196				var v Vector
197				if err = json.Unmarshal(value, &v); err != nil {
198					return err
199				}
200				q.Result = v
201			case ResultTypeScalar:
202				var v Scalar
203				if err = json.Unmarshal(value, &v); err != nil {
204					return err
205				}
206				q.Result = v
207			default:
208				return fmt.Errorf("unknown type: %s", q.ResultType)
209			}
210		case "stats":
211			if err := json.Unmarshal(value, &q.Statistics); err != nil {
212				return err
213			}
214		}
215		return nil
216	})
217}
218
219// Scalar is a single timestamp/float with no labels
220type Scalar model.Scalar
221
222func (s Scalar) MarshalJSON() ([]byte, error) {
223	return model.Scalar(s).MarshalJSON()
224}
225
226func (s *Scalar) UnmarshalJSON(b []byte) error {
227	var v model.Scalar
228	if err := v.UnmarshalJSON(b); err != nil {
229		return err
230	}
231	*s = Scalar(v)
232	return nil
233}
234
235// Vector is a slice of Samples
236type Vector []model.Sample
237
238// Matrix is a slice of SampleStreams
239type Matrix []model.SampleStream
240
241// InstantQuery defines a log instant query.
242type InstantQuery struct {
243	Query     string
244	Ts        time.Time
245	Limit     uint32
246	Direction logproto.Direction
247	Shards    []string
248}
249
250// ParseInstantQuery parses an InstantQuery request from an http request.
251func ParseInstantQuery(r *http.Request) (*InstantQuery, error) {
252	var err error
253	request := &InstantQuery{
254		Query: query(r),
255	}
256	request.Limit, err = limit(r)
257	if err != nil {
258		return nil, err
259	}
260
261	request.Ts, err = ts(r)
262	if err != nil {
263		return nil, err
264	}
265	request.Shards = shards(r)
266
267	request.Direction, err = direction(r)
268	if err != nil {
269		return nil, err
270	}
271
272	return request, nil
273}
274
275// RangeQuery defines a log range query.
276type RangeQuery struct {
277	Start     time.Time
278	End       time.Time
279	Step      time.Duration
280	Interval  time.Duration
281	Query     string
282	Direction logproto.Direction
283	Limit     uint32
284	Shards    []string
285}
286
287// ParseRangeQuery parses a RangeQuery request from an http request.
288func ParseRangeQuery(r *http.Request) (*RangeQuery, error) {
289	var result RangeQuery
290	var err error
291
292	result.Query = query(r)
293	result.Start, result.End, err = bounds(r)
294	if err != nil {
295		return nil, err
296	}
297
298	if result.End.Before(result.Start) {
299		return nil, errEndBeforeStart
300	}
301
302	result.Limit, err = limit(r)
303	if err != nil {
304		return nil, err
305	}
306
307	result.Direction, err = direction(r)
308	if err != nil {
309		return nil, err
310	}
311
312	result.Step, err = step(r, result.Start, result.End)
313	if err != nil {
314		return nil, err
315	}
316
317	if result.Step <= 0 {
318		return nil, errNegativeStep
319	}
320
321	result.Shards = shards(r)
322
323	// For safety, limit the number of returned points per timeseries.
324	// This is sufficient for 60s resolution for a week or 1h resolution for a year.
325	if (result.End.Sub(result.Start) / result.Step) > 11000 {
326		return nil, errStepTooSmall
327	}
328
329	result.Interval, err = interval(r)
330	if err != nil {
331		return nil, err
332	}
333
334	if result.Interval < 0 {
335		return nil, errNegativeInterval
336	}
337
338	return &result, nil
339}
340