1package query 2 3import ( 4 "context" 5 "errors" 6 "flag" 7 "fmt" 8 "log" 9 "os" 10 "sort" 11 "strings" 12 "text/tabwriter" 13 "time" 14 15 util_log "github.com/cortexproject/cortex/pkg/util/log" 16 "github.com/fatih/color" 17 json "github.com/json-iterator/go" 18 "github.com/prometheus/client_golang/prometheus" 19 "github.com/weaveworks/common/user" 20 21 "github.com/grafana/loki/pkg/logcli/client" 22 "github.com/grafana/loki/pkg/logcli/output" 23 "github.com/grafana/loki/pkg/loghttp" 24 "github.com/grafana/loki/pkg/logproto" 25 "github.com/grafana/loki/pkg/logql" 26 "github.com/grafana/loki/pkg/logqlmodel" 27 "github.com/grafana/loki/pkg/logqlmodel/stats" 28 "github.com/grafana/loki/pkg/loki" 29 "github.com/grafana/loki/pkg/storage" 30 chunk_storage "github.com/grafana/loki/pkg/storage/chunk/storage" 31 "github.com/grafana/loki/pkg/storage/stores/shipper" 32 "github.com/grafana/loki/pkg/util/cfg" 33 "github.com/grafana/loki/pkg/util/marshal" 34 "github.com/grafana/loki/pkg/validation" 35) 36 37type streamEntryPair struct { 38 entry loghttp.Entry 39 labels loghttp.LabelSet 40} 41 42// Query contains all necessary fields to execute instant and range queries and print the results. 43type Query struct { 44 QueryString string 45 Start time.Time 46 End time.Time 47 Limit int 48 BatchSize int 49 Forward bool 50 Step time.Duration 51 Interval time.Duration 52 Quiet bool 53 NoLabels bool 54 IgnoreLabelsKey []string 55 ShowLabelsKey []string 56 FixedLabelsLen int 57 ColoredOutput bool 58 LocalConfig string 59} 60 61// DoQuery executes the query and prints out the results 62func (q *Query) DoQuery(c client.Client, out output.LogOutput, statistics bool) { 63 if q.LocalConfig != "" { 64 if err := q.DoLocalQuery(out, statistics, c.GetOrgID()); err != nil { 65 log.Fatalf("Query failed: %+v", err) 66 } 67 return 68 } 69 70 d := q.resultsDirection() 71 72 var resp *loghttp.QueryResponse 73 var err error 74 75 if q.isInstant() { 76 resp, err = c.Query(q.QueryString, q.Limit, q.Start, d, q.Quiet) 77 if err != nil { 78 log.Fatalf("Query failed: %+v", err) 79 } 80 if statistics { 81 q.printStats(resp.Data.Statistics) 82 } 83 _, _ = q.printResult(resp.Data.Result, out, nil) 84 } else { 85 if q.Limit < q.BatchSize { 86 q.BatchSize = q.Limit 87 } 88 resultLength := 0 89 total := 0 90 start := q.Start 91 end := q.End 92 var lastEntry []*loghttp.Entry 93 for total < q.Limit { 94 bs := q.BatchSize 95 // We want to truncate the batch size if the remaining number 96 // of items needed to reach the limit is less than the batch size 97 if q.Limit-total < q.BatchSize { 98 // Truncated batchsize is q.Limit - total, however we add to this 99 // the length of the overlap from the last query to make sure we get the 100 // correct amount of new logs knowing there will be some overlapping logs returned. 101 bs = q.Limit - total + len(lastEntry) 102 } 103 resp, err = c.QueryRange(q.QueryString, bs, start, end, d, q.Step, q.Interval, q.Quiet) 104 if err != nil { 105 log.Fatalf("Query failed: %+v", err) 106 } 107 108 if statistics { 109 q.printStats(resp.Data.Statistics) 110 } 111 112 resultLength, lastEntry = q.printResult(resp.Data.Result, out, lastEntry) 113 // Was not a log stream query, or no results, no more batching 114 if resultLength <= 0 { 115 break 116 } 117 // Also no result, wouldn't expect to hit this. 118 if len(lastEntry) == 0 { 119 break 120 } 121 // Can only happen if all the results return in one request 122 if resultLength == q.Limit { 123 break 124 } 125 if len(lastEntry) >= q.BatchSize { 126 log.Fatalf("Invalid batch size %v, the next query will have %v overlapping entries "+ 127 "(there will always be 1 overlapping entry but Loki allows multiple entries to have "+ 128 "the same timestamp, so when a batch ends in this scenario the next query will include "+ 129 "all the overlapping entries again). Please increase your batch size to at least %v to account "+ 130 "for overlapping entryes\n", q.BatchSize, len(lastEntry), len(lastEntry)+1) 131 } 132 133 // Batching works by taking the timestamp of the last query and using it in the next query, 134 // because Loki supports multiple entries with the same timestamp it's possible for a batch to have 135 // fallen in the middle of a list of entries for the same time, so to make sure we get all entries 136 // we start the query on the same time as the last entry from the last batch, and then we keep this last 137 // entry and remove the duplicate when printing the results. 138 // Because of this duplicate entry, we have to subtract it here from the total for each batch 139 // to get the desired limit. 140 total += resultLength 141 // Based on the query direction we either set the start or end for the next query. 142 // If there are multiple entries in `lastEntry` they have to have the same timestamp so we can pick just the first 143 if q.Forward { 144 start = lastEntry[0].Timestamp 145 } else { 146 // The end timestamp is exclusive on a backward query, so to make sure we get back an overlapping result 147 // fudge the timestamp forward in time to make sure to get the last entry from this batch in the next query 148 end = lastEntry[0].Timestamp.Add(1 * time.Nanosecond) 149 } 150 151 } 152 } 153} 154 155func (q *Query) printResult(value loghttp.ResultValue, out output.LogOutput, lastEntry []*loghttp.Entry) (int, []*loghttp.Entry) { 156 length := -1 157 var entry []*loghttp.Entry 158 switch value.Type() { 159 case logqlmodel.ValueTypeStreams: 160 length, entry = q.printStream(value.(loghttp.Streams), out, lastEntry) 161 case loghttp.ResultTypeScalar: 162 q.printScalar(value.(loghttp.Scalar)) 163 case loghttp.ResultTypeMatrix: 164 q.printMatrix(value.(loghttp.Matrix)) 165 case loghttp.ResultTypeVector: 166 q.printVector(value.(loghttp.Vector)) 167 default: 168 log.Fatalf("Unable to print unsupported type: %v", value.Type()) 169 } 170 return length, entry 171} 172 173// DoLocalQuery executes the query against the local store using a Loki configuration file. 174func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string) error { 175 var conf loki.Config 176 conf.RegisterFlags(flag.CommandLine) 177 if q.LocalConfig == "" { 178 return errors.New("no supplied config file") 179 } 180 if err := cfg.YAML(q.LocalConfig, false)(&conf); err != nil { 181 return err 182 } 183 184 if err := conf.Validate(); err != nil { 185 return err 186 } 187 188 limits, err := validation.NewOverrides(conf.LimitsConfig, nil) 189 if err != nil { 190 return err 191 } 192 storage.RegisterCustomIndexClients(&conf.StorageConfig, prometheus.DefaultRegisterer) 193 conf.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly 194 chunkStore, err := chunk_storage.NewStore(conf.StorageConfig.Config, conf.ChunkStoreConfig.StoreConfig, conf.SchemaConfig.SchemaConfig, limits, prometheus.DefaultRegisterer, nil, util_log.Logger) 195 if err != nil { 196 return err 197 } 198 199 querier, err := storage.NewStore(conf.StorageConfig, conf.SchemaConfig, chunkStore, prometheus.DefaultRegisterer) 200 if err != nil { 201 return err 202 } 203 204 eng := logql.NewEngine(conf.Querier.Engine, querier, limits) 205 var query logql.Query 206 207 if q.isInstant() { 208 query = eng.Query(logql.NewLiteralParams( 209 q.QueryString, 210 q.Start, 211 q.Start, 212 0, 213 0, 214 q.resultsDirection(), 215 uint32(q.Limit), 216 nil, 217 )) 218 } else { 219 query = eng.Query(logql.NewLiteralParams( 220 q.QueryString, 221 q.Start, 222 q.End, 223 q.Step, 224 q.Interval, 225 q.resultsDirection(), 226 uint32(q.Limit), 227 nil, 228 )) 229 } 230 231 // execute the query 232 ctx := user.InjectOrgID(context.Background(), orgID) 233 result, err := query.Exec(ctx) 234 if err != nil { 235 return err 236 } 237 238 if statistics { 239 q.printStats(result.Statistics) 240 } 241 242 value, err := marshal.NewResultValue(result.Data) 243 if err != nil { 244 return err 245 } 246 247 q.printResult(value, out, nil) 248 return nil 249} 250 251// SetInstant makes the Query an instant type 252func (q *Query) SetInstant(time time.Time) { 253 q.Start = time 254 q.End = time 255} 256 257func (q *Query) isInstant() bool { 258 return q.Start == q.End && q.Step == 0 259} 260 261func (q *Query) printStream(streams loghttp.Streams, out output.LogOutput, lastEntry []*loghttp.Entry) (int, []*loghttp.Entry) { 262 common := commonLabels(streams) 263 264 // Remove the labels we want to show from common 265 if len(q.ShowLabelsKey) > 0 { 266 common = matchLabels(false, common, q.ShowLabelsKey) 267 } 268 269 if len(common) > 0 && !q.Quiet { 270 log.Println("Common labels:", color.RedString(common.String())) 271 } 272 273 if len(q.IgnoreLabelsKey) > 0 && !q.Quiet { 274 log.Println("Ignoring labels key:", color.RedString(strings.Join(q.IgnoreLabelsKey, ","))) 275 } 276 277 if len(q.ShowLabelsKey) > 0 && !q.Quiet { 278 log.Println("Print only labels key:", color.RedString(strings.Join(q.ShowLabelsKey, ","))) 279 } 280 281 // Remove ignored and common labels from the cached labels and 282 // calculate the max labels length 283 maxLabelsLen := q.FixedLabelsLen 284 for i, s := range streams { 285 // Remove common labels 286 ls := subtract(s.Labels, common) 287 288 if len(q.ShowLabelsKey) > 0 { 289 ls = matchLabels(true, ls, q.ShowLabelsKey) 290 } 291 292 // Remove ignored labels 293 if len(q.IgnoreLabelsKey) > 0 { 294 ls = matchLabels(false, ls, q.IgnoreLabelsKey) 295 } 296 297 // Overwrite existing Labels 298 streams[i].Labels = ls 299 300 // Update max labels length 301 len := len(ls.String()) 302 if maxLabelsLen < len { 303 maxLabelsLen = len 304 } 305 } 306 307 // sort and display entries 308 allEntries := make([]streamEntryPair, 0) 309 310 for _, s := range streams { 311 for _, e := range s.Entries { 312 allEntries = append(allEntries, streamEntryPair{ 313 entry: e, 314 labels: s.Labels, 315 }) 316 } 317 } 318 319 if len(allEntries) == 0 { 320 return 0, nil 321 } 322 323 if q.Forward { 324 sort.Slice(allEntries, func(i, j int) bool { return allEntries[i].entry.Timestamp.Before(allEntries[j].entry.Timestamp) }) 325 } else { 326 sort.Slice(allEntries, func(i, j int) bool { return allEntries[i].entry.Timestamp.After(allEntries[j].entry.Timestamp) }) 327 } 328 329 printed := 0 330 for _, e := range allEntries { 331 // Skip the last entry if it overlaps, this happens because batching includes the last entry from the last batch 332 if len(lastEntry) > 0 && e.entry.Timestamp == lastEntry[0].Timestamp { 333 skip := false 334 // Because many logs can share a timestamp in the unlucky event a batch ends with a timestamp 335 // shared by multiple entries we have to check all that were stored to see if we've already 336 // printed them. 337 for _, le := range lastEntry { 338 if e.entry.Line == le.Line { 339 skip = true 340 } 341 } 342 if skip { 343 continue 344 } 345 } 346 out.FormatAndPrintln(e.entry.Timestamp, e.labels, maxLabelsLen, e.entry.Line) 347 printed++ 348 } 349 350 // Loki allows multiple entries at the same timestamp, this is a bit of a mess if a batch ends 351 // with an entry that shared multiple timestamps, so we need to keep a list of all these entries 352 // because the next query is going to contain them too and we want to not duplicate anything already 353 // printed. 354 lel := []*loghttp.Entry{} 355 // Start with the timestamp of the last entry 356 le := allEntries[len(allEntries)-1].entry 357 for i, e := range allEntries { 358 // Save any entry which has this timestamp (most of the time this will only be the single last entry) 359 if e.entry.Timestamp.Equal(le.Timestamp) { 360 lel = append(lel, &allEntries[i].entry) 361 } 362 } 363 364 return printed, lel 365} 366 367func (q *Query) printMatrix(matrix loghttp.Matrix) { 368 // yes we are effectively unmarshalling and then immediately marshalling this object back to json. we are doing this b/c 369 // it gives us more flexibility with regard to output types in the future. initially we are supporting just formatted json but eventually 370 // we might add output options such as render to an image file on disk 371 bytes, err := json.MarshalIndent(matrix, "", " ") 372 if err != nil { 373 log.Fatalf("Error marshalling matrix: %v", err) 374 } 375 376 fmt.Print(string(bytes)) 377} 378 379func (q *Query) printVector(vector loghttp.Vector) { 380 bytes, err := json.MarshalIndent(vector, "", " ") 381 if err != nil { 382 log.Fatalf("Error marshalling vector: %v", err) 383 } 384 385 fmt.Print(string(bytes)) 386} 387 388func (q *Query) printScalar(scalar loghttp.Scalar) { 389 bytes, err := json.MarshalIndent(scalar, "", " ") 390 if err != nil { 391 log.Fatalf("Error marshalling scalar: %v", err) 392 } 393 394 fmt.Print(string(bytes)) 395} 396 397type kvLogger struct { 398 *tabwriter.Writer 399} 400 401func (k kvLogger) Log(keyvals ...interface{}) error { 402 for i := 0; i < len(keyvals); i += 2 { 403 fmt.Fprintln(k.Writer, color.BlueString("%s", keyvals[i]), "\t", fmt.Sprintf("%v", keyvals[i+1])) 404 } 405 k.Flush() 406 return nil 407} 408 409func (q *Query) printStats(stats stats.Result) { 410 writer := tabwriter.NewWriter(os.Stderr, 0, 8, 0, '\t', 0) 411 stats.Log(kvLogger{Writer: writer}) 412} 413 414func (q *Query) resultsDirection() logproto.Direction { 415 if q.Forward { 416 return logproto.FORWARD 417 } 418 return logproto.BACKWARD 419} 420