1// Copyright 2012-present Oliver Eilhard. All rights reserved. 2// Use of this source code is governed by a MIT-license. 3// See http://olivere.mit-license.org/license.txt for details. 4 5package elastic 6 7import ( 8 "context" 9 "fmt" 10 "math/rand" 11 "reflect" 12 "sync/atomic" 13 "testing" 14 "time" 15) 16 17func TestBulkProcessorDefaults(t *testing.T) { 18 client := setupTestClientAndCreateIndex(t) 19 20 p := client.BulkProcessor() 21 if p == nil { 22 t.Fatalf("expected BulkProcessorService; got: %v", p) 23 } 24 if got, want := p.name, ""; got != want { 25 t.Errorf("expected %q; got: %q", want, got) 26 } 27 if got, want := p.numWorkers, 1; got != want { 28 t.Errorf("expected %d; got: %d", want, got) 29 } 30 if got, want := p.bulkActions, 1000; got != want { 31 t.Errorf("expected %d; got: %d", want, got) 32 } 33 if got, want := p.bulkSize, 5*1024*1024; got != want { 34 t.Errorf("expected %d; got: %d", want, got) 35 } 36 if got, want := p.flushInterval, time.Duration(0); got != want { 37 t.Errorf("expected %v; got: %v", want, got) 38 } 39 if got, want := p.wantStats, false; got != want { 40 t.Errorf("expected %v; got: %v", want, got) 41 } 42 if got, want := p.retryItemStatusCodes, defaultRetryItemStatusCodes; !reflect.DeepEqual(got, want) { 43 t.Errorf("expected %v; got: %v", want, got) 44 } 45 if p.backoff == nil { 46 t.Fatalf("expected non-nill backoff; got: %v", p.backoff) 47 } 48} 49 50func TestBulkProcessorCommitOnBulkActions(t *testing.T) { 51 //client := setupTestClientAndCreateIndexAndLog(t, SetTraceLog(log.New(os.Stdout, "", 0))) 52 client := setupTestClientAndCreateIndex(t) 53 54 testBulkProcessor(t, 55 10000, 56 client.BulkProcessor(). 57 Name("Actions-1"). 58 Workers(1). 59 BulkActions(100). 60 BulkSize(-1), 61 ) 62 63 testBulkProcessor(t, 64 10000, 65 client.BulkProcessor(). 66 Name("Actions-2"). 67 Workers(2). 68 BulkActions(100). 69 BulkSize(-1), 70 ) 71} 72 73func TestBulkProcessorCommitOnBulkSize(t *testing.T) { 74 //client := setupTestClientAndCreateIndexAndLog(t, SetTraceLog(log.New(os.Stdout, "", 0))) 75 client := setupTestClientAndCreateIndex(t) 76 77 testBulkProcessor(t, 78 10000, 79 client.BulkProcessor(). 80 Name("Size-1"). 81 Workers(1). 82 BulkActions(-1). 83 BulkSize(64*1024), 84 ) 85 86 testBulkProcessor(t, 87 10000, 88 client.BulkProcessor(). 89 Name("Size-2"). 90 Workers(2). 91 BulkActions(-1). 92 BulkSize(64*1024), 93 ) 94} 95 96func TestBulkProcessorBasedOnFlushInterval(t *testing.T) { 97 //client := setupTestClientAndCreateIndexAndLog(t, SetTraceLog(log.New(os.Stdout, "", 0))) 98 client := setupTestClientAndCreateIndex(t) 99 100 var beforeRequests int64 101 var befores int64 102 var afters int64 103 var failures int64 104 var afterRequests int64 105 106 beforeFn := func(executionId int64, requests []BulkableRequest) { 107 atomic.AddInt64(&beforeRequests, int64(len(requests))) 108 atomic.AddInt64(&befores, 1) 109 } 110 afterFn := func(executionId int64, requests []BulkableRequest, response *BulkResponse, err error) { 111 atomic.AddInt64(&afters, 1) 112 if err != nil { 113 atomic.AddInt64(&failures, 1) 114 } 115 atomic.AddInt64(&afterRequests, int64(len(requests))) 116 } 117 118 svc := client.BulkProcessor(). 119 Name("FlushInterval-1"). 120 Workers(2). 121 BulkActions(-1). 122 BulkSize(-1). 123 FlushInterval(1 * time.Second). 124 Before(beforeFn). 125 After(afterFn) 126 127 p, err := svc.Do(context.Background()) 128 if err != nil { 129 t.Fatal(err) 130 } 131 132 const numDocs = 1000 // low-enough number that flush should be invoked 133 134 for i := 1; i <= numDocs; i++ { 135 tweet := tweet{User: "olivere", Message: fmt.Sprintf("%d. %s", i, randomString(rand.Intn(64)))} 136 request := NewBulkIndexRequest().Index(testIndexName).Type("doc").Id(fmt.Sprintf("%d", i)).Doc(tweet) 137 p.Add(request) 138 } 139 140 // Should flush at least once 141 time.Sleep(2 * time.Second) 142 143 err = p.Close() 144 if err != nil { 145 t.Fatal(err) 146 } 147 148 if p.stats.Flushed == 0 { 149 t.Errorf("expected at least 1 flush; got: %d", p.stats.Flushed) 150 } 151 if got, want := beforeRequests, int64(numDocs); got != want { 152 t.Errorf("expected %d requests to before callback; got: %d", want, got) 153 } 154 if got, want := afterRequests, int64(numDocs); got != want { 155 t.Errorf("expected %d requests to after callback; got: %d", want, got) 156 } 157 if befores == 0 { 158 t.Error("expected at least 1 call to before callback") 159 } 160 if afters == 0 { 161 t.Error("expected at least 1 call to after callback") 162 } 163 if failures != 0 { 164 t.Errorf("expected 0 calls to failure callback; got: %d", failures) 165 } 166 167 // Check number of documents that were bulk indexed 168 _, err = p.c.Flush(testIndexName).Do(context.TODO()) 169 if err != nil { 170 t.Fatal(err) 171 } 172 count, err := p.c.Count(testIndexName).Do(context.TODO()) 173 if err != nil { 174 t.Fatal(err) 175 } 176 if count != int64(numDocs) { 177 t.Fatalf("expected %d documents; got: %d", numDocs, count) 178 } 179} 180 181func TestBulkProcessorClose(t *testing.T) { 182 //client := setupTestClientAndCreateIndexAndLog(t, SetTraceLog(log.New(os.Stdout, "", 0))) 183 client := setupTestClientAndCreateIndex(t) 184 185 var beforeRequests int64 186 var befores int64 187 var afters int64 188 var failures int64 189 var afterRequests int64 190 191 beforeFn := func(executionId int64, requests []BulkableRequest) { 192 atomic.AddInt64(&beforeRequests, int64(len(requests))) 193 atomic.AddInt64(&befores, 1) 194 } 195 afterFn := func(executionId int64, requests []BulkableRequest, response *BulkResponse, err error) { 196 atomic.AddInt64(&afters, 1) 197 if err != nil { 198 atomic.AddInt64(&failures, 1) 199 } 200 atomic.AddInt64(&afterRequests, int64(len(requests))) 201 } 202 203 p, err := client.BulkProcessor(). 204 Name("FlushInterval-1"). 205 Workers(2). 206 BulkActions(-1). 207 BulkSize(-1). 208 FlushInterval(30 * time.Second). // 30 seconds to flush 209 Before(beforeFn).After(afterFn). 210 Do(context.Background()) 211 if err != nil { 212 t.Fatal(err) 213 } 214 215 const numDocs = 1000 // low-enough number that flush should be invoked 216 217 for i := 1; i <= numDocs; i++ { 218 tweet := tweet{User: "olivere", Message: fmt.Sprintf("%d. %s", i, randomString(rand.Intn(64)))} 219 request := NewBulkIndexRequest().Index(testIndexName).Type("doc").Id(fmt.Sprintf("%d", i)).Doc(tweet) 220 p.Add(request) 221 } 222 223 // Should not flush because 30s > 1s 224 time.Sleep(1 * time.Second) 225 226 // Close should flush 227 err = p.Close() 228 if err != nil { 229 t.Fatal(err) 230 } 231 232 if p.stats.Flushed != 0 { 233 t.Errorf("expected no flush; got: %d", p.stats.Flushed) 234 } 235 if got, want := beforeRequests, int64(numDocs); got != want { 236 t.Errorf("expected %d requests to before callback; got: %d", want, got) 237 } 238 if got, want := afterRequests, int64(numDocs); got != want { 239 t.Errorf("expected %d requests to after callback; got: %d", want, got) 240 } 241 if befores == 0 { 242 t.Error("expected at least 1 call to before callback") 243 } 244 if afters == 0 { 245 t.Error("expected at least 1 call to after callback") 246 } 247 if failures != 0 { 248 t.Errorf("expected 0 calls to failure callback; got: %d", failures) 249 } 250 251 // Check number of documents that were bulk indexed 252 _, err = p.c.Flush(testIndexName).Do(context.TODO()) 253 if err != nil { 254 t.Fatal(err) 255 } 256 count, err := p.c.Count(testIndexName).Do(context.TODO()) 257 if err != nil { 258 t.Fatal(err) 259 } 260 if count != int64(numDocs) { 261 t.Fatalf("expected %d documents; got: %d", numDocs, count) 262 } 263} 264 265func TestBulkProcessorFlush(t *testing.T) { 266 //client := setupTestClientAndCreateIndexAndLog(t, SetTraceLog(log.New(os.Stdout, "", 0))) 267 client := setupTestClientAndCreateIndex(t) 268 269 p, err := client.BulkProcessor(). 270 Name("ManualFlush"). 271 Workers(10). 272 BulkActions(-1). 273 BulkSize(-1). 274 FlushInterval(30 * time.Second). // 30 seconds to flush 275 Stats(true). 276 Do(context.Background()) 277 if err != nil { 278 t.Fatal(err) 279 } 280 281 const numDocs = 100 282 283 for i := 1; i <= numDocs; i++ { 284 tweet := tweet{User: "olivere", Message: fmt.Sprintf("%d. %s", i, randomString(rand.Intn(64)))} 285 request := NewBulkIndexRequest().Index(testIndexName).Type("doc").Id(fmt.Sprintf("%d", i)).Doc(tweet) 286 p.Add(request) 287 } 288 289 // Should not flush because 30s > 1s 290 time.Sleep(1 * time.Second) 291 292 // No flush yet 293 stats := p.Stats() 294 if stats.Flushed != 0 { 295 t.Errorf("expected no flush; got: %d", p.stats.Flushed) 296 } 297 298 // Manual flush 299 err = p.Flush() 300 if err != nil { 301 t.Fatal(err) 302 } 303 304 time.Sleep(1 * time.Second) 305 306 // Now flushed 307 stats = p.Stats() 308 if got, want := p.stats.Flushed, int64(1); got != want { 309 t.Errorf("expected %d flush; got: %d", want, got) 310 } 311 312 // Close should not start another flush 313 err = p.Close() 314 if err != nil { 315 t.Fatal(err) 316 } 317 318 // Still 1 flush 319 stats = p.Stats() 320 if got, want := p.stats.Flushed, int64(1); got != want { 321 t.Errorf("expected %d flush; got: %d", want, got) 322 } 323 324 // Check number of documents that were bulk indexed 325 _, err = p.c.Flush(testIndexName).Do(context.TODO()) 326 if err != nil { 327 t.Fatal(err) 328 } 329 count, err := p.c.Count(testIndexName).Do(context.TODO()) 330 if err != nil { 331 t.Fatal(err) 332 } 333 if count != int64(numDocs) { 334 t.Fatalf("expected %d documents; got: %d", numDocs, count) 335 } 336} 337 338// -- Helper -- 339 340func testBulkProcessor(t *testing.T, numDocs int, svc *BulkProcessorService) { 341 var beforeRequests int64 342 var befores int64 343 var afters int64 344 var failures int64 345 var afterRequests int64 346 347 beforeFn := func(executionId int64, requests []BulkableRequest) { 348 atomic.AddInt64(&beforeRequests, int64(len(requests))) 349 atomic.AddInt64(&befores, 1) 350 } 351 afterFn := func(executionId int64, requests []BulkableRequest, response *BulkResponse, err error) { 352 atomic.AddInt64(&afters, 1) 353 if err != nil { 354 atomic.AddInt64(&failures, 1) 355 } 356 atomic.AddInt64(&afterRequests, int64(len(requests))) 357 } 358 359 p, err := svc.Before(beforeFn).After(afterFn).Stats(true).Do(context.Background()) 360 if err != nil { 361 t.Fatal(err) 362 } 363 364 for i := 1; i <= numDocs; i++ { 365 tweet := tweet{User: "olivere", Message: fmt.Sprintf("%07d. %s", i, randomString(1+rand.Intn(63)))} 366 request := NewBulkIndexRequest().Index(testIndexName).Type("doc").Id(fmt.Sprintf("%d", i)).Doc(tweet) 367 p.Add(request) 368 } 369 370 err = p.Close() 371 if err != nil { 372 t.Fatal(err) 373 } 374 375 stats := p.Stats() 376 377 if stats.Flushed != 0 { 378 t.Errorf("expected no flush; got: %d", stats.Flushed) 379 } 380 if stats.Committed <= 0 { 381 t.Errorf("expected committed > %d; got: %d", 0, stats.Committed) 382 } 383 if got, want := stats.Indexed, int64(numDocs); got != want { 384 t.Errorf("expected indexed = %d; got: %d", want, got) 385 } 386 if got, want := stats.Created, int64(0); got != want { 387 t.Errorf("expected created = %d; got: %d", want, got) 388 } 389 if got, want := stats.Updated, int64(0); got != want { 390 t.Errorf("expected updated = %d; got: %d", want, got) 391 } 392 if got, want := stats.Deleted, int64(0); got != want { 393 t.Errorf("expected deleted = %d; got: %d", want, got) 394 } 395 if got, want := stats.Succeeded, int64(numDocs); got != want { 396 t.Errorf("expected succeeded = %d; got: %d", want, got) 397 } 398 if got, want := stats.Failed, int64(0); got != want { 399 t.Errorf("expected failed = %d; got: %d", want, got) 400 } 401 if got, want := beforeRequests, int64(numDocs); got != want { 402 t.Errorf("expected %d requests to before callback; got: %d", want, got) 403 } 404 if got, want := afterRequests, int64(numDocs); got != want { 405 t.Errorf("expected %d requests to after callback; got: %d", want, got) 406 } 407 if befores == 0 { 408 t.Error("expected at least 1 call to before callback") 409 } 410 if afters == 0 { 411 t.Error("expected at least 1 call to after callback") 412 } 413 if failures != 0 { 414 t.Errorf("expected 0 calls to failure callback; got: %d", failures) 415 } 416 417 // Check number of documents that were bulk indexed 418 _, err = p.c.Flush(testIndexName).Do(context.TODO()) 419 if err != nil { 420 t.Fatal(err) 421 } 422 count, err := p.c.Count(testIndexName).Do(context.TODO()) 423 if err != nil { 424 t.Fatal(err) 425 } 426 if count != int64(numDocs) { 427 t.Fatalf("expected %d documents; got: %d", numDocs, count) 428 } 429} 430