1package indexgateway
2
3import (
4	"github.com/grafana/dskit/services"
5
6	"github.com/grafana/loki/pkg/storage/chunk"
7	"github.com/grafana/loki/pkg/storage/stores/shipper"
8	"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb"
9	"github.com/grafana/loki/pkg/storage/stores/shipper/util"
10)
11
12const maxIndexEntriesPerResponse = 1000
13
14type gateway struct {
15	services.Service
16
17	shipper chunk.IndexClient
18}
19
20func NewIndexGateway(shipperIndexClient *shipper.Shipper) *gateway {
21	g := &gateway{
22		shipper: shipperIndexClient,
23	}
24	g.Service = services.NewIdleService(nil, func(failureCase error) error {
25		g.shipper.Stop()
26		return nil
27	})
28	return g
29}
30
31func (g gateway) QueryIndex(request *indexgatewaypb.QueryIndexRequest, server indexgatewaypb.IndexGateway_QueryIndexServer) error {
32	var outerErr error
33	var innerErr error
34
35	queries := make([]chunk.IndexQuery, 0, len(request.Queries))
36	for _, query := range request.Queries {
37		queries = append(queries, chunk.IndexQuery{
38			TableName:        query.TableName,
39			HashValue:        query.HashValue,
40			RangeValuePrefix: query.RangeValuePrefix,
41			RangeValueStart:  query.RangeValueStart,
42			ValueEqual:       query.ValueEqual,
43		})
44	}
45	outerErr = g.shipper.QueryPages(server.Context(), queries, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool {
46		innerErr = g.sendBatch(server, query, batch)
47		if innerErr != nil {
48			return false
49		}
50
51		return true
52	})
53
54	if innerErr != nil {
55		return innerErr
56	}
57
58	return outerErr
59}
60
61func (g *gateway) sendBatch(server indexgatewaypb.IndexGateway_QueryIndexServer, query chunk.IndexQuery, batch chunk.ReadBatch) error {
62	itr := batch.Iterator()
63	var resp []*indexgatewaypb.Row
64
65	for itr.Next() {
66		if len(resp) == maxIndexEntriesPerResponse {
67			err := server.Send(&indexgatewaypb.QueryIndexResponse{
68				QueryKey: util.QueryKey(query),
69				Rows:     resp,
70			})
71			if err != nil {
72				return err
73			}
74			resp = []*indexgatewaypb.Row{}
75		}
76
77		resp = append(resp, &indexgatewaypb.Row{
78			RangeValue: itr.RangeValue(),
79			Value:      itr.Value(),
80		})
81	}
82
83	if len(resp) != 0 {
84		err := server.Send(&indexgatewaypb.QueryIndexResponse{
85			QueryKey: util.QueryKey(query),
86			Rows:     resp,
87		})
88		if err != nil {
89			return err
90		}
91	}
92
93	return nil
94}
95