1package loghttp 2 3import ( 4 "errors" 5 "fmt" 6 "net/http" 7 "time" 8 "unsafe" 9 10 "github.com/buger/jsonparser" 11 json "github.com/json-iterator/go" 12 "github.com/prometheus/common/model" 13 14 "github.com/grafana/loki/pkg/logproto" 15 "github.com/grafana/loki/pkg/logqlmodel/stats" 16) 17 18var ( 19 errEndBeforeStart = errors.New("end timestamp must not be before or equal to start time") 20 errNegativeStep = errors.New("zero or negative query resolution step widths are not accepted. Try a positive integer") 21 errStepTooSmall = errors.New("exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)") 22 errNegativeInterval = errors.New("interval must be >= 0") 23) 24 25// QueryStatus holds the status of a query 26type QueryStatus string 27 28// QueryStatus values 29const ( 30 QueryStatusSuccess = "success" 31 QueryStatusFail = "fail" 32) 33 34// QueryResponse represents the http json response to a Loki range and instant query 35type QueryResponse struct { 36 Status string `json:"status"` 37 Data QueryResponseData `json:"data"` 38} 39 40func (q *QueryResponse) UnmarshalJSON(data []byte) error { 41 return jsonparser.ObjectEach(data, func(key, value []byte, dataType jsonparser.ValueType, offset int) error { 42 switch string(key) { 43 case "status": 44 q.Status = string(value) 45 case "data": 46 var responseData QueryResponseData 47 if err := responseData.UnmarshalJSON(value); err != nil { 48 return err 49 } 50 q.Data = responseData 51 } 52 return nil 53 }) 54} 55 56// PushRequest models a log stream push 57type PushRequest struct { 58 Streams []*Stream `json:"streams"` 59} 60 61// ResultType holds the type of the result 62type ResultType string 63 64// ResultType values 65const ( 66 ResultTypeStream = "streams" 67 ResultTypeScalar = "scalar" 68 ResultTypeVector = "vector" 69 ResultTypeMatrix = "matrix" 70) 71 72// ResultValue interface mimics the promql.Value interface 73type ResultValue interface { 74 Type() ResultType 75} 76 77// QueryResponseData represents the http json response to a label query 78type QueryResponseData struct { 79 ResultType ResultType `json:"resultType"` 80 Result ResultValue `json:"result"` 81 Statistics stats.Result `json:"stats"` 82} 83 84// Type implements the promql.Value interface 85func (Streams) Type() ResultType { return ResultTypeStream } 86 87// Type implements the promql.Value interface 88func (Scalar) Type() ResultType { return ResultTypeScalar } 89 90// Type implements the promql.Value interface 91func (Vector) Type() ResultType { return ResultTypeVector } 92 93// Type implements the promql.Value interface 94func (Matrix) Type() ResultType { return ResultTypeMatrix } 95 96// Streams is a slice of Stream 97type Streams []Stream 98 99func (ss *Streams) UnmarshalJSON(data []byte) error { 100 var parseError error 101 _, err := jsonparser.ArrayEach(data, func(value []byte, dataType jsonparser.ValueType, offset int, err error) { 102 var stream Stream 103 if err := stream.UnmarshalJSON(value); err != nil { 104 parseError = err 105 return 106 } 107 *ss = append(*ss, stream) 108 }) 109 if parseError != nil { 110 return parseError 111 } 112 return err 113} 114 115func (s Streams) ToProto() []logproto.Stream { 116 if len(s) == 0 { 117 return nil 118 } 119 result := make([]logproto.Stream, 0, len(s)) 120 for _, s := range s { 121 entries := *(*[]logproto.Entry)(unsafe.Pointer(&s.Entries)) 122 result = append(result, logproto.Stream{Labels: s.Labels.String(), Entries: entries}) 123 } 124 return result 125} 126 127// Stream represents a log stream. It includes a set of log entries and their labels. 128type Stream struct { 129 Labels LabelSet `json:"stream"` 130 Entries []Entry `json:"values"` 131} 132 133func (s *Stream) UnmarshalJSON(data []byte) error { 134 if s.Labels == nil { 135 s.Labels = LabelSet{} 136 } 137 if len(s.Entries) > 0 { 138 s.Entries = s.Entries[:0] 139 } 140 return jsonparser.ObjectEach(data, func(key, value []byte, ty jsonparser.ValueType, _ int) error { 141 switch string(key) { 142 case "stream": 143 if err := s.Labels.UnmarshalJSON(value); err != nil { 144 return err 145 } 146 case "values": 147 if ty == jsonparser.Null { 148 return nil 149 } 150 var parseError error 151 _, err := jsonparser.ArrayEach(value, func(value []byte, ty jsonparser.ValueType, _ int, _ error) { 152 if ty == jsonparser.Null { 153 return 154 } 155 var entry Entry 156 if err := entry.UnmarshalJSON(value); err != nil { 157 parseError = err 158 return 159 } 160 s.Entries = append(s.Entries, entry) 161 }) 162 if parseError != nil { 163 return parseError 164 } 165 return err 166 } 167 return nil 168 }) 169} 170 171// UnmarshalJSON implements the json.Unmarshaler interface. 172func (q *QueryResponseData) UnmarshalJSON(data []byte) error { 173 resultType, err := jsonparser.GetString(data, "resultType") 174 if err != nil { 175 return err 176 } 177 q.ResultType = ResultType(resultType) 178 179 return jsonparser.ObjectEach(data, func(key, value []byte, dataType jsonparser.ValueType, _ int) error { 180 switch string(key) { 181 case "result": 182 switch q.ResultType { 183 case ResultTypeStream: 184 ss := Streams{} 185 if err := ss.UnmarshalJSON(value); err != nil { 186 return err 187 } 188 q.Result = ss 189 case ResultTypeMatrix: 190 var m Matrix 191 if err = json.Unmarshal(value, &m); err != nil { 192 return err 193 } 194 q.Result = m 195 case ResultTypeVector: 196 var v Vector 197 if err = json.Unmarshal(value, &v); err != nil { 198 return err 199 } 200 q.Result = v 201 case ResultTypeScalar: 202 var v Scalar 203 if err = json.Unmarshal(value, &v); err != nil { 204 return err 205 } 206 q.Result = v 207 default: 208 return fmt.Errorf("unknown type: %s", q.ResultType) 209 } 210 case "stats": 211 if err := json.Unmarshal(value, &q.Statistics); err != nil { 212 return err 213 } 214 } 215 return nil 216 }) 217} 218 219// Scalar is a single timestamp/float with no labels 220type Scalar model.Scalar 221 222func (s Scalar) MarshalJSON() ([]byte, error) { 223 return model.Scalar(s).MarshalJSON() 224} 225 226func (s *Scalar) UnmarshalJSON(b []byte) error { 227 var v model.Scalar 228 if err := v.UnmarshalJSON(b); err != nil { 229 return err 230 } 231 *s = Scalar(v) 232 return nil 233} 234 235// Vector is a slice of Samples 236type Vector []model.Sample 237 238// Matrix is a slice of SampleStreams 239type Matrix []model.SampleStream 240 241// InstantQuery defines a log instant query. 242type InstantQuery struct { 243 Query string 244 Ts time.Time 245 Limit uint32 246 Direction logproto.Direction 247 Shards []string 248} 249 250// ParseInstantQuery parses an InstantQuery request from an http request. 251func ParseInstantQuery(r *http.Request) (*InstantQuery, error) { 252 var err error 253 request := &InstantQuery{ 254 Query: query(r), 255 } 256 request.Limit, err = limit(r) 257 if err != nil { 258 return nil, err 259 } 260 261 request.Ts, err = ts(r) 262 if err != nil { 263 return nil, err 264 } 265 request.Shards = shards(r) 266 267 request.Direction, err = direction(r) 268 if err != nil { 269 return nil, err 270 } 271 272 return request, nil 273} 274 275// RangeQuery defines a log range query. 276type RangeQuery struct { 277 Start time.Time 278 End time.Time 279 Step time.Duration 280 Interval time.Duration 281 Query string 282 Direction logproto.Direction 283 Limit uint32 284 Shards []string 285} 286 287// ParseRangeQuery parses a RangeQuery request from an http request. 288func ParseRangeQuery(r *http.Request) (*RangeQuery, error) { 289 var result RangeQuery 290 var err error 291 292 result.Query = query(r) 293 result.Start, result.End, err = bounds(r) 294 if err != nil { 295 return nil, err 296 } 297 298 if result.End.Before(result.Start) { 299 return nil, errEndBeforeStart 300 } 301 302 result.Limit, err = limit(r) 303 if err != nil { 304 return nil, err 305 } 306 307 result.Direction, err = direction(r) 308 if err != nil { 309 return nil, err 310 } 311 312 result.Step, err = step(r, result.Start, result.End) 313 if err != nil { 314 return nil, err 315 } 316 317 if result.Step <= 0 { 318 return nil, errNegativeStep 319 } 320 321 result.Shards = shards(r) 322 323 // For safety, limit the number of returned points per timeseries. 324 // This is sufficient for 60s resolution for a week or 1h resolution for a year. 325 if (result.End.Sub(result.Start) / result.Step) > 11000 { 326 return nil, errStepTooSmall 327 } 328 329 result.Interval, err = interval(r) 330 if err != nil { 331 return nil, err 332 } 333 334 if result.Interval < 0 { 335 return nil, errNegativeInterval 336 } 337 338 return &result, nil 339} 340