1// Copyright 2012-2015 Oliver Eilhard. All rights reserved.
2// Use of this source code is governed by a MIT-license.
3// See http://olivere.mit-license.org/license.txt for details.
4
5package elastic
6
7import (
8	"encoding/json"
9	"fmt"
10	"net/url"
11	"strings"
12
13	"gopkg.in/olivere/elastic.v2/uritemplates"
14)
15
16// ScrollService manages a cursor through documents in Elasticsearch.
17type ScrollService struct {
18	client    *Client
19	indices   []string
20	types     []string
21	keepAlive string
22	query     Query
23	size      *int
24	pretty    bool
25	scrollId  string
26}
27
28func NewScrollService(client *Client) *ScrollService {
29	builder := &ScrollService{
30		client: client,
31		query:  NewMatchAllQuery(),
32	}
33	return builder
34}
35
36func (s *ScrollService) Index(index string) *ScrollService {
37	if s.indices == nil {
38		s.indices = make([]string, 0)
39	}
40	s.indices = append(s.indices, index)
41	return s
42}
43
44func (s *ScrollService) Indices(indices ...string) *ScrollService {
45	if s.indices == nil {
46		s.indices = make([]string, 0)
47	}
48	s.indices = append(s.indices, indices...)
49	return s
50}
51
52func (s *ScrollService) Type(typ string) *ScrollService {
53	if s.types == nil {
54		s.types = make([]string, 0)
55	}
56	s.types = append(s.types, typ)
57	return s
58}
59
60func (s *ScrollService) Types(types ...string) *ScrollService {
61	if s.types == nil {
62		s.types = make([]string, 0)
63	}
64	s.types = append(s.types, types...)
65	return s
66}
67
68// Scroll is an alias for KeepAlive, the time to keep
69// the cursor alive (e.g. "5m" for 5 minutes).
70func (s *ScrollService) Scroll(keepAlive string) *ScrollService {
71	s.keepAlive = keepAlive
72	return s
73}
74
75// KeepAlive sets the maximum time the cursor will be
76// available before expiration (e.g. "5m" for 5 minutes).
77func (s *ScrollService) KeepAlive(keepAlive string) *ScrollService {
78	s.keepAlive = keepAlive
79	return s
80}
81
82func (s *ScrollService) Query(query Query) *ScrollService {
83	s.query = query
84	return s
85}
86
87func (s *ScrollService) Pretty(pretty bool) *ScrollService {
88	s.pretty = pretty
89	return s
90}
91
92func (s *ScrollService) Size(size int) *ScrollService {
93	s.size = &size
94	return s
95}
96
97func (s *ScrollService) ScrollId(scrollId string) *ScrollService {
98	s.scrollId = scrollId
99	return s
100}
101
102func (s *ScrollService) Do() (*SearchResult, error) {
103	if s.scrollId == "" {
104		return s.GetFirstPage()
105	}
106	return s.GetNextPage()
107}
108
109func (s *ScrollService) GetFirstPage() (*SearchResult, error) {
110	// Build url
111	path := "/"
112
113	// Indices part
114	indexPart := make([]string, 0)
115	for _, index := range s.indices {
116		index, err := uritemplates.Expand("{index}", map[string]string{
117			"index": index,
118		})
119		if err != nil {
120			return nil, err
121		}
122		indexPart = append(indexPart, index)
123	}
124	if len(indexPart) > 0 {
125		path += strings.Join(indexPart, ",")
126	}
127
128	// Types
129	typesPart := make([]string, 0)
130	for _, typ := range s.types {
131		typ, err := uritemplates.Expand("{type}", map[string]string{
132			"type": typ,
133		})
134		if err != nil {
135			return nil, err
136		}
137		typesPart = append(typesPart, typ)
138	}
139	if len(typesPart) > 0 {
140		path += "/" + strings.Join(typesPart, ",")
141	}
142
143	// Search
144	path += "/_search"
145
146	// Parameters
147	params := make(url.Values)
148	params.Set("search_type", "scan")
149	if s.pretty {
150		params.Set("pretty", fmt.Sprintf("%v", s.pretty))
151	}
152	if s.keepAlive != "" {
153		params.Set("scroll", s.keepAlive)
154	} else {
155		params.Set("scroll", defaultKeepAlive)
156	}
157	if s.size != nil && *s.size > 0 {
158		params.Set("size", fmt.Sprintf("%d", *s.size))
159	}
160
161	// Set body
162	body := make(map[string]interface{})
163	if s.query != nil {
164		body["query"] = s.query.Source()
165	}
166
167	// Get response
168	res, err := s.client.PerformRequest("POST", path, params, body)
169	if err != nil {
170		return nil, err
171	}
172
173	// Return result
174	searchResult := new(SearchResult)
175	if err := json.Unmarshal(res.Body, searchResult); err != nil {
176		return nil, err
177	}
178
179	return searchResult, nil
180}
181
182func (s *ScrollService) GetNextPage() (*SearchResult, error) {
183	if s.scrollId == "" {
184		return nil, EOS
185	}
186
187	// Build url
188	path := "/_search/scroll"
189
190	// Parameters
191	params := make(url.Values)
192	if s.pretty {
193		params.Set("pretty", fmt.Sprintf("%v", s.pretty))
194	}
195	if s.keepAlive != "" {
196		params.Set("scroll", s.keepAlive)
197	} else {
198		params.Set("scroll", defaultKeepAlive)
199	}
200
201	// Get response
202	res, err := s.client.PerformRequest("POST", path, params, s.scrollId)
203	if err != nil {
204		return nil, err
205	}
206
207	// Return result
208	searchResult := new(SearchResult)
209	if err := json.Unmarshal(res.Body, searchResult); err != nil {
210		return nil, err
211	}
212
213	// Determine last page
214	if searchResult == nil || searchResult.Hits == nil || len(searchResult.Hits.Hits) == 0 || searchResult.Hits.TotalHits == 0 {
215		return nil, EOS
216	}
217
218	return searchResult, nil
219}
220