1// Copyright 2012-present Oliver Eilhard. All rights reserved.
2// Use of this source code is governed by a MIT-license.
3// See http://olivere.mit-license.org/license.txt for details.
4
5package elastic
6
7import (
8	"context"
9	"fmt"
10	"math/rand"
11	"reflect"
12	"sync/atomic"
13	"testing"
14	"time"
15)
16
17func TestBulkProcessorDefaults(t *testing.T) {
18	client := setupTestClientAndCreateIndex(t)
19
20	p := client.BulkProcessor()
21	if p == nil {
22		t.Fatalf("expected BulkProcessorService; got: %v", p)
23	}
24	if got, want := p.name, ""; got != want {
25		t.Errorf("expected %q; got: %q", want, got)
26	}
27	if got, want := p.numWorkers, 1; got != want {
28		t.Errorf("expected %d; got: %d", want, got)
29	}
30	if got, want := p.bulkActions, 1000; got != want {
31		t.Errorf("expected %d; got: %d", want, got)
32	}
33	if got, want := p.bulkSize, 5*1024*1024; got != want {
34		t.Errorf("expected %d; got: %d", want, got)
35	}
36	if got, want := p.flushInterval, time.Duration(0); got != want {
37		t.Errorf("expected %v; got: %v", want, got)
38	}
39	if got, want := p.wantStats, false; got != want {
40		t.Errorf("expected %v; got: %v", want, got)
41	}
42	if got, want := p.retryItemStatusCodes, defaultRetryItemStatusCodes; !reflect.DeepEqual(got, want) {
43		t.Errorf("expected %v; got: %v", want, got)
44	}
45	if p.backoff == nil {
46		t.Fatalf("expected non-nill backoff; got: %v", p.backoff)
47	}
48}
49
50func TestBulkProcessorCommitOnBulkActions(t *testing.T) {
51	//client := setupTestClientAndCreateIndexAndLog(t, SetTraceLog(log.New(os.Stdout, "", 0)))
52	client := setupTestClientAndCreateIndex(t)
53
54	testBulkProcessor(t,
55		10000,
56		client.BulkProcessor().
57			Name("Actions-1").
58			Workers(1).
59			BulkActions(100).
60			BulkSize(-1),
61	)
62
63	testBulkProcessor(t,
64		10000,
65		client.BulkProcessor().
66			Name("Actions-2").
67			Workers(2).
68			BulkActions(100).
69			BulkSize(-1),
70	)
71}
72
73func TestBulkProcessorCommitOnBulkSize(t *testing.T) {
74	//client := setupTestClientAndCreateIndexAndLog(t, SetTraceLog(log.New(os.Stdout, "", 0)))
75	client := setupTestClientAndCreateIndex(t)
76
77	testBulkProcessor(t,
78		10000,
79		client.BulkProcessor().
80			Name("Size-1").
81			Workers(1).
82			BulkActions(-1).
83			BulkSize(64*1024),
84	)
85
86	testBulkProcessor(t,
87		10000,
88		client.BulkProcessor().
89			Name("Size-2").
90			Workers(2).
91			BulkActions(-1).
92			BulkSize(64*1024),
93	)
94}
95
96func TestBulkProcessorBasedOnFlushInterval(t *testing.T) {
97	//client := setupTestClientAndCreateIndexAndLog(t, SetTraceLog(log.New(os.Stdout, "", 0)))
98	client := setupTestClientAndCreateIndex(t)
99
100	var beforeRequests int64
101	var befores int64
102	var afters int64
103	var failures int64
104	var afterRequests int64
105
106	beforeFn := func(executionId int64, requests []BulkableRequest) {
107		atomic.AddInt64(&beforeRequests, int64(len(requests)))
108		atomic.AddInt64(&befores, 1)
109	}
110	afterFn := func(executionId int64, requests []BulkableRequest, response *BulkResponse, err error) {
111		atomic.AddInt64(&afters, 1)
112		if err != nil {
113			atomic.AddInt64(&failures, 1)
114		}
115		atomic.AddInt64(&afterRequests, int64(len(requests)))
116	}
117
118	svc := client.BulkProcessor().
119		Name("FlushInterval-1").
120		Workers(2).
121		BulkActions(-1).
122		BulkSize(-1).
123		FlushInterval(1 * time.Second).
124		Before(beforeFn).
125		After(afterFn)
126
127	p, err := svc.Do(context.Background())
128	if err != nil {
129		t.Fatal(err)
130	}
131
132	const numDocs = 1000 // low-enough number that flush should be invoked
133
134	for i := 1; i <= numDocs; i++ {
135		tweet := tweet{User: "olivere", Message: fmt.Sprintf("%d. %s", i, randomString(rand.Intn(64)))}
136		request := NewBulkIndexRequest().Index(testIndexName).Type("doc").Id(fmt.Sprintf("%d", i)).Doc(tweet)
137		p.Add(request)
138	}
139
140	// Should flush at least once
141	time.Sleep(2 * time.Second)
142
143	err = p.Close()
144	if err != nil {
145		t.Fatal(err)
146	}
147
148	if p.stats.Flushed == 0 {
149		t.Errorf("expected at least 1 flush; got: %d", p.stats.Flushed)
150	}
151	if got, want := beforeRequests, int64(numDocs); got != want {
152		t.Errorf("expected %d requests to before callback; got: %d", want, got)
153	}
154	if got, want := afterRequests, int64(numDocs); got != want {
155		t.Errorf("expected %d requests to after callback; got: %d", want, got)
156	}
157	if befores == 0 {
158		t.Error("expected at least 1 call to before callback")
159	}
160	if afters == 0 {
161		t.Error("expected at least 1 call to after callback")
162	}
163	if failures != 0 {
164		t.Errorf("expected 0 calls to failure callback; got: %d", failures)
165	}
166
167	// Check number of documents that were bulk indexed
168	_, err = p.c.Flush(testIndexName).Do(context.TODO())
169	if err != nil {
170		t.Fatal(err)
171	}
172	count, err := p.c.Count(testIndexName).Do(context.TODO())
173	if err != nil {
174		t.Fatal(err)
175	}
176	if count != int64(numDocs) {
177		t.Fatalf("expected %d documents; got: %d", numDocs, count)
178	}
179}
180
181func TestBulkProcessorClose(t *testing.T) {
182	//client := setupTestClientAndCreateIndexAndLog(t, SetTraceLog(log.New(os.Stdout, "", 0)))
183	client := setupTestClientAndCreateIndex(t)
184
185	var beforeRequests int64
186	var befores int64
187	var afters int64
188	var failures int64
189	var afterRequests int64
190
191	beforeFn := func(executionId int64, requests []BulkableRequest) {
192		atomic.AddInt64(&beforeRequests, int64(len(requests)))
193		atomic.AddInt64(&befores, 1)
194	}
195	afterFn := func(executionId int64, requests []BulkableRequest, response *BulkResponse, err error) {
196		atomic.AddInt64(&afters, 1)
197		if err != nil {
198			atomic.AddInt64(&failures, 1)
199		}
200		atomic.AddInt64(&afterRequests, int64(len(requests)))
201	}
202
203	p, err := client.BulkProcessor().
204		Name("FlushInterval-1").
205		Workers(2).
206		BulkActions(-1).
207		BulkSize(-1).
208		FlushInterval(30 * time.Second). // 30 seconds to flush
209		Before(beforeFn).After(afterFn).
210		Do(context.Background())
211	if err != nil {
212		t.Fatal(err)
213	}
214
215	const numDocs = 1000 // low-enough number that flush should be invoked
216
217	for i := 1; i <= numDocs; i++ {
218		tweet := tweet{User: "olivere", Message: fmt.Sprintf("%d. %s", i, randomString(rand.Intn(64)))}
219		request := NewBulkIndexRequest().Index(testIndexName).Type("doc").Id(fmt.Sprintf("%d", i)).Doc(tweet)
220		p.Add(request)
221	}
222
223	// Should not flush because 30s > 1s
224	time.Sleep(1 * time.Second)
225
226	// Close should flush
227	err = p.Close()
228	if err != nil {
229		t.Fatal(err)
230	}
231
232	if p.stats.Flushed != 0 {
233		t.Errorf("expected no flush; got: %d", p.stats.Flushed)
234	}
235	if got, want := beforeRequests, int64(numDocs); got != want {
236		t.Errorf("expected %d requests to before callback; got: %d", want, got)
237	}
238	if got, want := afterRequests, int64(numDocs); got != want {
239		t.Errorf("expected %d requests to after callback; got: %d", want, got)
240	}
241	if befores == 0 {
242		t.Error("expected at least 1 call to before callback")
243	}
244	if afters == 0 {
245		t.Error("expected at least 1 call to after callback")
246	}
247	if failures != 0 {
248		t.Errorf("expected 0 calls to failure callback; got: %d", failures)
249	}
250
251	// Check number of documents that were bulk indexed
252	_, err = p.c.Flush(testIndexName).Do(context.TODO())
253	if err != nil {
254		t.Fatal(err)
255	}
256	count, err := p.c.Count(testIndexName).Do(context.TODO())
257	if err != nil {
258		t.Fatal(err)
259	}
260	if count != int64(numDocs) {
261		t.Fatalf("expected %d documents; got: %d", numDocs, count)
262	}
263}
264
265func TestBulkProcessorFlush(t *testing.T) {
266	//client := setupTestClientAndCreateIndexAndLog(t, SetTraceLog(log.New(os.Stdout, "", 0)))
267	client := setupTestClientAndCreateIndex(t)
268
269	p, err := client.BulkProcessor().
270		Name("ManualFlush").
271		Workers(10).
272		BulkActions(-1).
273		BulkSize(-1).
274		FlushInterval(30 * time.Second). // 30 seconds to flush
275		Stats(true).
276		Do(context.Background())
277	if err != nil {
278		t.Fatal(err)
279	}
280
281	const numDocs = 100
282
283	for i := 1; i <= numDocs; i++ {
284		tweet := tweet{User: "olivere", Message: fmt.Sprintf("%d. %s", i, randomString(rand.Intn(64)))}
285		request := NewBulkIndexRequest().Index(testIndexName).Type("doc").Id(fmt.Sprintf("%d", i)).Doc(tweet)
286		p.Add(request)
287	}
288
289	// Should not flush because 30s > 1s
290	time.Sleep(1 * time.Second)
291
292	// No flush yet
293	stats := p.Stats()
294	if stats.Flushed != 0 {
295		t.Errorf("expected no flush; got: %d", p.stats.Flushed)
296	}
297
298	// Manual flush
299	err = p.Flush()
300	if err != nil {
301		t.Fatal(err)
302	}
303
304	time.Sleep(1 * time.Second)
305
306	// Now flushed
307	stats = p.Stats()
308	if got, want := p.stats.Flushed, int64(1); got != want {
309		t.Errorf("expected %d flush; got: %d", want, got)
310	}
311
312	// Close should not start another flush
313	err = p.Close()
314	if err != nil {
315		t.Fatal(err)
316	}
317
318	// Still 1 flush
319	stats = p.Stats()
320	if got, want := p.stats.Flushed, int64(1); got != want {
321		t.Errorf("expected %d flush; got: %d", want, got)
322	}
323
324	// Check number of documents that were bulk indexed
325	_, err = p.c.Flush(testIndexName).Do(context.TODO())
326	if err != nil {
327		t.Fatal(err)
328	}
329	count, err := p.c.Count(testIndexName).Do(context.TODO())
330	if err != nil {
331		t.Fatal(err)
332	}
333	if count != int64(numDocs) {
334		t.Fatalf("expected %d documents; got: %d", numDocs, count)
335	}
336}
337
338// -- Helper --
339
340func testBulkProcessor(t *testing.T, numDocs int, svc *BulkProcessorService) {
341	var beforeRequests int64
342	var befores int64
343	var afters int64
344	var failures int64
345	var afterRequests int64
346
347	beforeFn := func(executionId int64, requests []BulkableRequest) {
348		atomic.AddInt64(&beforeRequests, int64(len(requests)))
349		atomic.AddInt64(&befores, 1)
350	}
351	afterFn := func(executionId int64, requests []BulkableRequest, response *BulkResponse, err error) {
352		atomic.AddInt64(&afters, 1)
353		if err != nil {
354			atomic.AddInt64(&failures, 1)
355		}
356		atomic.AddInt64(&afterRequests, int64(len(requests)))
357	}
358
359	p, err := svc.Before(beforeFn).After(afterFn).Stats(true).Do(context.Background())
360	if err != nil {
361		t.Fatal(err)
362	}
363
364	for i := 1; i <= numDocs; i++ {
365		tweet := tweet{User: "olivere", Message: fmt.Sprintf("%07d. %s", i, randomString(1+rand.Intn(63)))}
366		request := NewBulkIndexRequest().Index(testIndexName).Type("doc").Id(fmt.Sprintf("%d", i)).Doc(tweet)
367		p.Add(request)
368	}
369
370	err = p.Close()
371	if err != nil {
372		t.Fatal(err)
373	}
374
375	stats := p.Stats()
376
377	if stats.Flushed != 0 {
378		t.Errorf("expected no flush; got: %d", stats.Flushed)
379	}
380	if stats.Committed <= 0 {
381		t.Errorf("expected committed > %d; got: %d", 0, stats.Committed)
382	}
383	if got, want := stats.Indexed, int64(numDocs); got != want {
384		t.Errorf("expected indexed = %d; got: %d", want, got)
385	}
386	if got, want := stats.Created, int64(0); got != want {
387		t.Errorf("expected created = %d; got: %d", want, got)
388	}
389	if got, want := stats.Updated, int64(0); got != want {
390		t.Errorf("expected updated = %d; got: %d", want, got)
391	}
392	if got, want := stats.Deleted, int64(0); got != want {
393		t.Errorf("expected deleted = %d; got: %d", want, got)
394	}
395	if got, want := stats.Succeeded, int64(numDocs); got != want {
396		t.Errorf("expected succeeded = %d; got: %d", want, got)
397	}
398	if got, want := stats.Failed, int64(0); got != want {
399		t.Errorf("expected failed = %d; got: %d", want, got)
400	}
401	if got, want := beforeRequests, int64(numDocs); got != want {
402		t.Errorf("expected %d requests to before callback; got: %d", want, got)
403	}
404	if got, want := afterRequests, int64(numDocs); got != want {
405		t.Errorf("expected %d requests to after callback; got: %d", want, got)
406	}
407	if befores == 0 {
408		t.Error("expected at least 1 call to before callback")
409	}
410	if afters == 0 {
411		t.Error("expected at least 1 call to after callback")
412	}
413	if failures != 0 {
414		t.Errorf("expected 0 calls to failure callback; got: %d", failures)
415	}
416
417	// Check number of documents that were bulk indexed
418	_, err = p.c.Flush(testIndexName).Do(context.TODO())
419	if err != nil {
420		t.Fatal(err)
421	}
422	count, err := p.c.Count(testIndexName).Do(context.TODO())
423	if err != nil {
424		t.Fatal(err)
425	}
426	if count != int64(numDocs) {
427		t.Fatalf("expected %d documents; got: %d", numDocs, count)
428	}
429}
430