1// Copyright 2012-2015 Oliver Eilhard. All rights reserved. 2// Use of this source code is governed by a MIT-license. 3// See http://olivere.mit-license.org/license.txt for details. 4 5package elastic 6 7import ( 8 "encoding/json" 9 "fmt" 10 "net/url" 11 "strings" 12 13 "gopkg.in/olivere/elastic.v2/uritemplates" 14) 15 16// ScrollService manages a cursor through documents in Elasticsearch. 17type ScrollService struct { 18 client *Client 19 indices []string 20 types []string 21 keepAlive string 22 query Query 23 size *int 24 pretty bool 25 scrollId string 26} 27 28func NewScrollService(client *Client) *ScrollService { 29 builder := &ScrollService{ 30 client: client, 31 query: NewMatchAllQuery(), 32 } 33 return builder 34} 35 36func (s *ScrollService) Index(index string) *ScrollService { 37 if s.indices == nil { 38 s.indices = make([]string, 0) 39 } 40 s.indices = append(s.indices, index) 41 return s 42} 43 44func (s *ScrollService) Indices(indices ...string) *ScrollService { 45 if s.indices == nil { 46 s.indices = make([]string, 0) 47 } 48 s.indices = append(s.indices, indices...) 49 return s 50} 51 52func (s *ScrollService) Type(typ string) *ScrollService { 53 if s.types == nil { 54 s.types = make([]string, 0) 55 } 56 s.types = append(s.types, typ) 57 return s 58} 59 60func (s *ScrollService) Types(types ...string) *ScrollService { 61 if s.types == nil { 62 s.types = make([]string, 0) 63 } 64 s.types = append(s.types, types...) 65 return s 66} 67 68// Scroll is an alias for KeepAlive, the time to keep 69// the cursor alive (e.g. "5m" for 5 minutes). 70func (s *ScrollService) Scroll(keepAlive string) *ScrollService { 71 s.keepAlive = keepAlive 72 return s 73} 74 75// KeepAlive sets the maximum time the cursor will be 76// available before expiration (e.g. "5m" for 5 minutes). 77func (s *ScrollService) KeepAlive(keepAlive string) *ScrollService { 78 s.keepAlive = keepAlive 79 return s 80} 81 82func (s *ScrollService) Query(query Query) *ScrollService { 83 s.query = query 84 return s 85} 86 87func (s *ScrollService) Pretty(pretty bool) *ScrollService { 88 s.pretty = pretty 89 return s 90} 91 92func (s *ScrollService) Size(size int) *ScrollService { 93 s.size = &size 94 return s 95} 96 97func (s *ScrollService) ScrollId(scrollId string) *ScrollService { 98 s.scrollId = scrollId 99 return s 100} 101 102func (s *ScrollService) Do() (*SearchResult, error) { 103 if s.scrollId == "" { 104 return s.GetFirstPage() 105 } 106 return s.GetNextPage() 107} 108 109func (s *ScrollService) GetFirstPage() (*SearchResult, error) { 110 // Build url 111 path := "/" 112 113 // Indices part 114 indexPart := make([]string, 0) 115 for _, index := range s.indices { 116 index, err := uritemplates.Expand("{index}", map[string]string{ 117 "index": index, 118 }) 119 if err != nil { 120 return nil, err 121 } 122 indexPart = append(indexPart, index) 123 } 124 if len(indexPart) > 0 { 125 path += strings.Join(indexPart, ",") 126 } 127 128 // Types 129 typesPart := make([]string, 0) 130 for _, typ := range s.types { 131 typ, err := uritemplates.Expand("{type}", map[string]string{ 132 "type": typ, 133 }) 134 if err != nil { 135 return nil, err 136 } 137 typesPart = append(typesPart, typ) 138 } 139 if len(typesPart) > 0 { 140 path += "/" + strings.Join(typesPart, ",") 141 } 142 143 // Search 144 path += "/_search" 145 146 // Parameters 147 params := make(url.Values) 148 params.Set("search_type", "scan") 149 if s.pretty { 150 params.Set("pretty", fmt.Sprintf("%v", s.pretty)) 151 } 152 if s.keepAlive != "" { 153 params.Set("scroll", s.keepAlive) 154 } else { 155 params.Set("scroll", defaultKeepAlive) 156 } 157 if s.size != nil && *s.size > 0 { 158 params.Set("size", fmt.Sprintf("%d", *s.size)) 159 } 160 161 // Set body 162 body := make(map[string]interface{}) 163 if s.query != nil { 164 body["query"] = s.query.Source() 165 } 166 167 // Get response 168 res, err := s.client.PerformRequest("POST", path, params, body) 169 if err != nil { 170 return nil, err 171 } 172 173 // Return result 174 searchResult := new(SearchResult) 175 if err := json.Unmarshal(res.Body, searchResult); err != nil { 176 return nil, err 177 } 178 179 return searchResult, nil 180} 181 182func (s *ScrollService) GetNextPage() (*SearchResult, error) { 183 if s.scrollId == "" { 184 return nil, EOS 185 } 186 187 // Build url 188 path := "/_search/scroll" 189 190 // Parameters 191 params := make(url.Values) 192 if s.pretty { 193 params.Set("pretty", fmt.Sprintf("%v", s.pretty)) 194 } 195 if s.keepAlive != "" { 196 params.Set("scroll", s.keepAlive) 197 } else { 198 params.Set("scroll", defaultKeepAlive) 199 } 200 201 // Get response 202 res, err := s.client.PerformRequest("POST", path, params, s.scrollId) 203 if err != nil { 204 return nil, err 205 } 206 207 // Return result 208 searchResult := new(SearchResult) 209 if err := json.Unmarshal(res.Body, searchResult); err != nil { 210 return nil, err 211 } 212 213 // Determine last page 214 if searchResult == nil || searchResult.Hits == nil || len(searchResult.Hits.Hits) == 0 || searchResult.Hits.TotalHits == 0 { 215 return nil, EOS 216 } 217 218 return searchResult, nil 219} 220