1// Copyright 2012-present Oliver Eilhard. All rights reserved. 2// Use of this source code is governed by a MIT-license. 3// See http://olivere.mit-license.org/license.txt for details. 4 5package elastic 6 7import ( 8 "context" 9 "encoding/json" 10 "io" 11 _ "net/http" 12 "testing" 13) 14 15func TestScroll(t *testing.T) { 16 // client := setupTestClientAndCreateIndexAndLog(t) 17 client := setupTestClientAndCreateIndex(t) 18 19 tweet1 := tweet{User: "olivere", Message: "Welcome to Golang and Elasticsearch."} 20 tweet2 := tweet{User: "olivere", Message: "Another unrelated topic."} 21 tweet3 := tweet{User: "sandrae", Message: "Cycling is fun."} 22 23 // Add all documents 24 _, err := client.Index().Index(testIndexName).Type("doc").Id("1").BodyJson(&tweet1).Do(context.TODO()) 25 if err != nil { 26 t.Fatal(err) 27 } 28 29 _, err = client.Index().Index(testIndexName).Type("doc").Id("2").BodyJson(&tweet2).Do(context.TODO()) 30 if err != nil { 31 t.Fatal(err) 32 } 33 34 _, err = client.Index().Index(testIndexName).Type("doc").Id("3").BodyJson(&tweet3).Do(context.TODO()) 35 if err != nil { 36 t.Fatal(err) 37 } 38 39 _, err = client.Flush().Index(testIndexName).Do(context.TODO()) 40 if err != nil { 41 t.Fatal(err) 42 } 43 44 // Should return all documents. Just don't call Do yet! 45 svc := client.Scroll(testIndexName).Size(1) 46 47 pages := 0 48 docs := 0 49 50 for { 51 res, err := svc.Do(context.TODO()) 52 if err == io.EOF { 53 break 54 } 55 if err != nil { 56 t.Fatal(err) 57 } 58 if res == nil { 59 t.Fatal("expected results != nil; got nil") 60 } 61 if res.Hits == nil { 62 t.Fatal("expected results.Hits != nil; got nil") 63 } 64 if want, have := int64(3), res.Hits.TotalHits; want != have { 65 t.Fatalf("expected results.Hits.TotalHits = %d; got %d", want, have) 66 } 67 if want, have := 1, len(res.Hits.Hits); want != have { 68 t.Fatalf("expected len(results.Hits.Hits) = %d; got %d", want, have) 69 } 70 71 pages++ 72 73 for _, hit := range res.Hits.Hits { 74 if hit.Index != testIndexName { 75 t.Fatalf("expected SearchResult.Hits.Hit.Index = %q; got %q", testIndexName, hit.Index) 76 } 77 item := make(map[string]interface{}) 78 err := json.Unmarshal(*hit.Source, &item) 79 if err != nil { 80 t.Fatal(err) 81 } 82 docs++ 83 } 84 85 if len(res.ScrollId) == 0 { 86 t.Fatalf("expected scrollId in results; got %q", res.ScrollId) 87 } 88 } 89 90 if want, have := 3, pages; want != have { 91 t.Fatalf("expected to retrieve %d pages; got %d", want, have) 92 } 93 if want, have := 3, docs; want != have { 94 t.Fatalf("expected to retrieve %d hits; got %d", want, have) 95 } 96 97 err = svc.Clear(context.TODO()) 98 if err != nil { 99 t.Fatal(err) 100 } 101 102 _, err = svc.Do(context.TODO()) 103 if err == nil { 104 t.Fatal("expected to fail") 105 } 106} 107 108func TestScrollWithQueryAndSort(t *testing.T) { 109 client := setupTestClientAndCreateIndex(t) 110 // client := setupTestClientAndCreateIndexAndAddDocs(t, SetTraceLog(log.New(os.Stdout, "", log.LstdFlags))) 111 112 tweet1 := tweet{User: "olivere", Message: "Welcome to Golang and Elasticsearch."} 113 tweet2 := tweet{User: "olivere", Message: "Another unrelated topic."} 114 tweet3 := tweet{User: "sandrae", Message: "Cycling is fun."} 115 116 // Add all documents 117 _, err := client.Index().Index(testIndexName).Type("doc").Id("1").BodyJson(&tweet1).Do(context.TODO()) 118 if err != nil { 119 t.Fatal(err) 120 } 121 122 _, err = client.Index().Index(testIndexName).Type("doc").Id("2").BodyJson(&tweet2).Do(context.TODO()) 123 if err != nil { 124 t.Fatal(err) 125 } 126 127 _, err = client.Index().Index(testIndexName).Type("doc").Id("3").BodyJson(&tweet3).Do(context.TODO()) 128 if err != nil { 129 t.Fatal(err) 130 } 131 132 _, err = client.Flush().Index(testIndexName).Do(context.TODO()) 133 if err != nil { 134 t.Fatal(err) 135 } 136 137 // Create a scroll service that returns tweets from user olivere 138 // and returns them sorted by "message", in reverse order. 139 // 140 // Just don't call Do yet! 141 svc := client.Scroll(testIndexName). 142 Query(NewTermQuery("user", "olivere")). 143 Sort("message", false). 144 Size(1) 145 146 docs := 0 147 pages := 0 148 for { 149 res, err := svc.Do(context.TODO()) 150 if err == io.EOF { 151 break 152 } 153 if err != nil { 154 t.Fatal(err) 155 } 156 if err != nil { 157 t.Fatal(err) 158 } 159 if res == nil { 160 t.Fatal("expected results != nil; got nil") 161 } 162 if res.Hits == nil { 163 t.Fatal("expected results.Hits != nil; got nil") 164 } 165 if want, have := int64(2), res.Hits.TotalHits; want != have { 166 t.Fatalf("expected results.Hits.TotalHits = %d; got %d", want, have) 167 } 168 if want, have := 1, len(res.Hits.Hits); want != have { 169 t.Fatalf("expected len(results.Hits.Hits) = %d; got %d", want, have) 170 } 171 172 pages++ 173 174 for _, hit := range res.Hits.Hits { 175 if hit.Index != testIndexName { 176 t.Fatalf("expected SearchResult.Hits.Hit.Index = %q; got %q", testIndexName, hit.Index) 177 } 178 item := make(map[string]interface{}) 179 err := json.Unmarshal(*hit.Source, &item) 180 if err != nil { 181 t.Fatal(err) 182 } 183 docs++ 184 } 185 } 186 187 if want, have := 2, pages; want != have { 188 t.Fatalf("expected to retrieve %d pages; got %d", want, have) 189 } 190 if want, have := 2, docs; want != have { 191 t.Fatalf("expected to retrieve %d hits; got %d", want, have) 192 } 193} 194 195func TestScrollWithBody(t *testing.T) { 196 // client := setupTestClientAndCreateIndexAndLog(t) 197 client := setupTestClientAndCreateIndex(t) 198 199 tweet1 := tweet{User: "olivere", Message: "Welcome to Golang and Elasticsearch.", Retweets: 4} 200 tweet2 := tweet{User: "olivere", Message: "Another unrelated topic.", Retweets: 10} 201 tweet3 := tweet{User: "sandrae", Message: "Cycling is fun.", Retweets: 3} 202 203 // Add all documents 204 _, err := client.Index().Index(testIndexName).Type("doc").Id("1").BodyJson(&tweet1).Do(context.TODO()) 205 if err != nil { 206 t.Fatal(err) 207 } 208 209 _, err = client.Index().Index(testIndexName).Type("doc").Id("2").BodyJson(&tweet2).Do(context.TODO()) 210 if err != nil { 211 t.Fatal(err) 212 } 213 214 _, err = client.Index().Index(testIndexName).Type("doc").Id("3").BodyJson(&tweet3).Do(context.TODO()) 215 if err != nil { 216 t.Fatal(err) 217 } 218 219 _, err = client.Flush().Index(testIndexName).Do(context.TODO()) 220 if err != nil { 221 t.Fatal(err) 222 } 223 224 // Test with simple strings and a map 225 var tests = []struct { 226 Body interface{} 227 ExpectedTotalHits int64 228 ExpectedDocs int 229 ExpectedPages int 230 }{ 231 { 232 Body: `{"query":{"match_all":{}}}`, 233 ExpectedTotalHits: 3, 234 ExpectedDocs: 3, 235 ExpectedPages: 3, 236 }, 237 { 238 Body: `{"query":{"term":{"user":"olivere"}},"sort":["_doc"]}`, 239 ExpectedTotalHits: 2, 240 ExpectedDocs: 2, 241 ExpectedPages: 2, 242 }, 243 { 244 Body: `{"query":{"term":{"user":"olivere"}},"sort":[{"retweets":"desc"}]}`, 245 ExpectedTotalHits: 2, 246 ExpectedDocs: 2, 247 ExpectedPages: 2, 248 }, 249 { 250 Body: map[string]interface{}{ 251 "query": map[string]interface{}{ 252 "term": map[string]interface{}{ 253 "user": "olivere", 254 }, 255 }, 256 "sort": []interface{}{"_doc"}, 257 }, 258 ExpectedTotalHits: 2, 259 ExpectedDocs: 2, 260 ExpectedPages: 2, 261 }, 262 } 263 264 for i, tt := range tests { 265 // Should return all documents. Just don't call Do yet! 266 svc := client.Scroll(testIndexName).Size(1).Body(tt.Body) 267 268 pages := 0 269 docs := 0 270 271 for { 272 res, err := svc.Do(context.TODO()) 273 if err == io.EOF { 274 break 275 } 276 if err != nil { 277 t.Fatal(err) 278 } 279 if res == nil { 280 t.Fatalf("#%d: expected results != nil; got nil", i) 281 } 282 if res.Hits == nil { 283 t.Fatalf("#%d: expected results.Hits != nil; got nil", i) 284 } 285 if want, have := tt.ExpectedTotalHits, res.Hits.TotalHits; want != have { 286 t.Fatalf("#%d: expected results.Hits.TotalHits = %d; got %d", i, want, have) 287 } 288 if want, have := 1, len(res.Hits.Hits); want != have { 289 t.Fatalf("#%d: expected len(results.Hits.Hits) = %d; got %d", i, want, have) 290 } 291 292 pages++ 293 294 for _, hit := range res.Hits.Hits { 295 if hit.Index != testIndexName { 296 t.Fatalf("#%d: expected SearchResult.Hits.Hit.Index = %q; got %q", i, testIndexName, hit.Index) 297 } 298 item := make(map[string]interface{}) 299 err := json.Unmarshal(*hit.Source, &item) 300 if err != nil { 301 t.Fatalf("#%d: %v", i, err) 302 } 303 docs++ 304 } 305 306 if len(res.ScrollId) == 0 { 307 t.Fatalf("#%d: expected scrollId in results; got %q", i, res.ScrollId) 308 } 309 } 310 311 if want, have := tt.ExpectedPages, pages; want != have { 312 t.Fatalf("#%d: expected to retrieve %d pages; got %d", i, want, have) 313 } 314 if want, have := tt.ExpectedDocs, docs; want != have { 315 t.Fatalf("#%d: expected to retrieve %d hits; got %d", i, want, have) 316 } 317 318 err = svc.Clear(context.TODO()) 319 if err != nil { 320 t.Fatalf("#%d: failed to clear scroll context: %v", i, err) 321 } 322 323 _, err = svc.Do(context.TODO()) 324 if err == nil { 325 t.Fatalf("#%d: expected to fail", i) 326 } 327 } 328} 329 330func TestScrollWithSlice(t *testing.T) { 331 client := setupTestClientAndCreateIndexAndAddDocs(t) //, SetTraceLog(log.New(os.Stdout, "", 0))) 332 333 // Should return all documents. Just don't call Do yet! 334 sliceQuery := NewSliceQuery().Id(0).Max(2) 335 svc := client.Scroll(testIndexName).Slice(sliceQuery).Size(1) 336 337 pages := 0 338 docs := 0 339 340 for { 341 res, err := svc.Do(context.TODO()) 342 if err == io.EOF { 343 break 344 } 345 if err != nil { 346 t.Fatal(err) 347 } 348 if res == nil { 349 t.Fatal("expected results != nil; got nil") 350 } 351 if res.Hits == nil { 352 t.Fatal("expected results.Hits != nil; got nil") 353 } 354 355 pages++ 356 357 for _, hit := range res.Hits.Hits { 358 if hit.Index != testIndexName { 359 t.Fatalf("expected SearchResult.Hits.Hit.Index = %q; got %q", testIndexName, hit.Index) 360 } 361 item := make(map[string]interface{}) 362 err := json.Unmarshal(*hit.Source, &item) 363 if err != nil { 364 t.Fatal(err) 365 } 366 docs++ 367 } 368 369 if len(res.ScrollId) == 0 { 370 t.Fatalf("expected scrollId in results; got %q", res.ScrollId) 371 } 372 } 373 374 if pages == 0 { 375 t.Fatal("expected to retrieve some pages") 376 } 377 if docs == 0 { 378 t.Fatal("expected to retrieve some hits") 379 } 380 381 if err := svc.Clear(context.TODO()); err != nil { 382 t.Fatal(err) 383 } 384 385 if _, err := svc.Do(context.TODO()); err == nil { 386 t.Fatal("expected to fail") 387 } 388} 389 390func TestScrollWithMaxResponseSize(t *testing.T) { 391 client := setupTestClientAndCreateIndex(t) 392 393 tweet1 := tweet{User: "sandrae", Message: "Cycling is fun.", Retweets: 3} 394 tweet2 := tweet{User: "olivere", Message: "Welcome to Golang and Elasticsearch.", Retweets: 4} 395 396 // Add all documents 397 _, err := client.Index().Index(testIndexName).Type("doc").Id("1").BodyJson(&tweet1).Do(context.TODO()) 398 if err != nil { 399 t.Fatal(err) 400 } 401 402 _, err = client.Index().Index(testIndexName).Type("doc").Id("2").BodyJson(&tweet2).Do(context.TODO()) 403 if err != nil { 404 t.Fatal(err) 405 } 406 407 _, err = client.Flush().Index(testIndexName).Do(context.TODO()) 408 if err != nil { 409 t.Fatal(err) 410 } 411 412 // Test response size error on first scroll request (first response is 391 bytes) 413 svc := client.Scroll(testIndexName).Size(1).MaxResponseSize(300) 414 _, err = svc.Do(context.TODO()) 415 if err != ErrResponseSize { 416 t.Fatalf("expected response size error") 417 } 418 419 // Test response size error on second scroll request (first response is 391 bytes, second is 401 bytes) 420 svc = client.Scroll(testIndexName).Size(1).MaxResponseSize(400) 421 _, err = svc.Do(context.TODO()) 422 if err != nil { 423 t.Fatal(err) 424 } 425 426 _, err = svc.Do(context.TODO()) 427 if err != ErrResponseSize { 428 t.Fatalf("expected response size error") 429 } 430} 431 432func TestScrollWithFilterPath(t *testing.T) { 433 // client := setupTestClientAndCreateIndexAndLog(t) 434 client := setupTestClientAndCreateIndex(t) 435 436 tweet1 := tweet{User: "olivere", Message: "Welcome to Golang and Elasticsearch."} 437 tweet2 := tweet{User: "olivere", Message: "Another unrelated topic."} 438 tweet3 := tweet{User: "sandrae", Message: "Cycling is fun."} 439 440 // Add all documents 441 _, err := client.Index().Index(testIndexName).Type("doc").Id("1").BodyJson(&tweet1).Do(context.TODO()) 442 if err != nil { 443 t.Fatal(err) 444 } 445 446 _, err = client.Index().Index(testIndexName).Type("doc").Id("2").BodyJson(&tweet2).Do(context.TODO()) 447 if err != nil { 448 t.Fatal(err) 449 } 450 451 _, err = client.Index().Index(testIndexName).Type("doc").Id("3").BodyJson(&tweet3).Do(context.TODO()) 452 if err != nil { 453 t.Fatal(err) 454 } 455 456 _, err = client.Flush().Index(testIndexName).Do(context.TODO()) 457 if err != nil { 458 t.Fatal(err) 459 } 460 461 // Should return all documents. Just don't call Do yet! 462 // Notice that we don't have to add "_scroll_id" to the FilterPath here: 463 // It's been added automatically by the ScrollService. 464 svc := client.Scroll(testIndexName).Size(1). 465 FilterPath("hits.total", "hits.hits._index", "hits.hits._id", "hits.hits._source") 466 467 pages := 0 468 docs := 0 469 470 for { 471 res, err := svc.Do(context.TODO()) 472 if err == io.EOF { 473 break 474 } 475 if err != nil { 476 t.Fatal(err) 477 } 478 if res == nil { 479 t.Fatal("expected results != nil; got nil") 480 } 481 if res.Hits == nil { 482 t.Fatal("expected results.Hits != nil; got nil") 483 } 484 if want, have := int64(3), res.Hits.TotalHits; want != have { 485 t.Fatalf("expected results.Hits.TotalHits = %d; got %d", want, have) 486 } 487 if want, have := 1, len(res.Hits.Hits); want != have { 488 t.Fatalf("expected len(results.Hits.Hits) = %d; got %d", want, have) 489 } 490 491 pages++ 492 493 for _, hit := range res.Hits.Hits { 494 if hit.Index != testIndexName { 495 t.Fatalf("expected SearchResult.Hits.Hit.Index = %q; got %q", testIndexName, hit.Index) 496 } 497 item := make(map[string]interface{}) 498 err := json.Unmarshal(*hit.Source, &item) 499 if err != nil { 500 t.Fatal(err) 501 } 502 docs++ 503 } 504 505 if len(res.ScrollId) == 0 { 506 t.Fatalf("expected scrollId in results; got %q", res.ScrollId) 507 } 508 } 509 510 if want, have := 3, pages; want != have { 511 t.Fatalf("expected to retrieve %d pages; got %d", want, have) 512 } 513 if want, have := 3, docs; want != have { 514 t.Fatalf("expected to retrieve %d hits; got %d", want, have) 515 } 516 517 err = svc.Clear(context.TODO()) 518 if err != nil { 519 t.Fatal(err) 520 } 521 522 _, err = svc.Do(context.TODO()) 523 if err == nil { 524 t.Fatal("expected to fail") 525 } 526} 527 528func TestScrollWithFilterPathKeepingContext(t *testing.T) { 529 // client := setupTestClientAndCreateIndexAndLog(t) 530 client := setupTestClientAndCreateIndex(t) 531 532 tweet1 := tweet{User: "olivere", Message: "Welcome to Golang and Elasticsearch."} 533 tweet2 := tweet{User: "olivere", Message: "Another unrelated topic."} 534 tweet3 := tweet{User: "sandrae", Message: "Cycling is fun."} 535 536 // Add all documents 537 _, err := client.Index().Index(testIndexName).Type("doc").Id("1").BodyJson(&tweet1).Do(context.TODO()) 538 if err != nil { 539 t.Fatal(err) 540 } 541 542 _, err = client.Index().Index(testIndexName).Type("doc").Id("2").BodyJson(&tweet2).Do(context.TODO()) 543 if err != nil { 544 t.Fatal(err) 545 } 546 547 _, err = client.Index().Index(testIndexName).Type("doc").Id("3").BodyJson(&tweet3).Do(context.TODO()) 548 if err != nil { 549 t.Fatal(err) 550 } 551 552 _, err = client.Flush().Index(testIndexName).Do(context.TODO()) 553 if err != nil { 554 t.Fatal(err) 555 } 556 557 // Should return all documents. Just don't call Do yet! 558 // Notice that we don't have to add "_scroll_id" to the FilterPath here: 559 // It's been added automatically by the ScrollService. 560 svc := client.Scroll(testIndexName).Size(1). 561 FilterPath("hits.total", "hits.hits._index", "hits.hits._id") 562 563 pages := 0 564 docs := 0 565 566 for { 567 res, err := svc.Do(context.TODO()) 568 if err == io.EOF { 569 break 570 } 571 if err != nil { 572 t.Fatal(err) 573 } 574 if res == nil { 575 t.Fatal("expected results != nil; got nil") 576 } 577 if res.Hits == nil { 578 t.Fatal("expected results.Hits != nil; got nil") 579 } 580 if want, have := int64(3), res.Hits.TotalHits; want != have { 581 t.Fatalf("expected results.Hits.TotalHits = %d; got %d", want, have) 582 } 583 if want, have := 1, len(res.Hits.Hits); want != have { 584 t.Fatalf("expected len(results.Hits.Hits) = %d; got %d", want, have) 585 } 586 587 pages++ 588 589 for _, hit := range res.Hits.Hits { 590 if hit.Index != testIndexName { 591 t.Fatalf("expected SearchResult.Hits.Hit.Index = %q; got %q", testIndexName, hit.Index) 592 } 593 if hit.Source != nil { 594 t.Fatal("expected SearchResult.Hits.Hit.Source = nil") 595 } 596 docs++ 597 } 598 599 if len(res.ScrollId) == 0 { 600 t.Fatalf("expected scrollId in results; got %q", res.ScrollId) 601 } 602 } 603 604 if want, have := 3, pages; want != have { 605 t.Fatalf("expected to retrieve %d pages; got %d", want, have) 606 } 607 if want, have := 3, docs; want != have { 608 t.Fatalf("expected to retrieve %d hits; got %d", want, have) 609 } 610 611 err = svc.Clear(context.TODO()) 612 if err != nil { 613 t.Fatal(err) 614 } 615 616 _, err = svc.Do(context.TODO()) 617 if err == nil { 618 t.Fatal("expected to fail") 619 } 620} 621