1package indexgateway 2 3import ( 4 "github.com/grafana/dskit/services" 5 6 "github.com/grafana/loki/pkg/storage/chunk" 7 "github.com/grafana/loki/pkg/storage/stores/shipper" 8 "github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb" 9 "github.com/grafana/loki/pkg/storage/stores/shipper/util" 10) 11 12const maxIndexEntriesPerResponse = 1000 13 14type gateway struct { 15 services.Service 16 17 shipper chunk.IndexClient 18} 19 20func NewIndexGateway(shipperIndexClient *shipper.Shipper) *gateway { 21 g := &gateway{ 22 shipper: shipperIndexClient, 23 } 24 g.Service = services.NewIdleService(nil, func(failureCase error) error { 25 g.shipper.Stop() 26 return nil 27 }) 28 return g 29} 30 31func (g gateway) QueryIndex(request *indexgatewaypb.QueryIndexRequest, server indexgatewaypb.IndexGateway_QueryIndexServer) error { 32 var outerErr error 33 var innerErr error 34 35 queries := make([]chunk.IndexQuery, 0, len(request.Queries)) 36 for _, query := range request.Queries { 37 queries = append(queries, chunk.IndexQuery{ 38 TableName: query.TableName, 39 HashValue: query.HashValue, 40 RangeValuePrefix: query.RangeValuePrefix, 41 RangeValueStart: query.RangeValueStart, 42 ValueEqual: query.ValueEqual, 43 }) 44 } 45 outerErr = g.shipper.QueryPages(server.Context(), queries, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool { 46 innerErr = g.sendBatch(server, query, batch) 47 if innerErr != nil { 48 return false 49 } 50 51 return true 52 }) 53 54 if innerErr != nil { 55 return innerErr 56 } 57 58 return outerErr 59} 60 61func (g *gateway) sendBatch(server indexgatewaypb.IndexGateway_QueryIndexServer, query chunk.IndexQuery, batch chunk.ReadBatch) error { 62 itr := batch.Iterator() 63 var resp []*indexgatewaypb.Row 64 65 for itr.Next() { 66 if len(resp) == maxIndexEntriesPerResponse { 67 err := server.Send(&indexgatewaypb.QueryIndexResponse{ 68 QueryKey: util.QueryKey(query), 69 Rows: resp, 70 }) 71 if err != nil { 72 return err 73 } 74 resp = []*indexgatewaypb.Row{} 75 } 76 77 resp = append(resp, &indexgatewaypb.Row{ 78 RangeValue: itr.RangeValue(), 79 Value: itr.Value(), 80 }) 81 } 82 83 if len(resp) != 0 { 84 err := server.Send(&indexgatewaypb.QueryIndexResponse{ 85 QueryKey: util.QueryKey(query), 86 Rows: resp, 87 }) 88 if err != nil { 89 return err 90 } 91 } 92 93 return nil 94} 95