1//go:build e2e
2// +build e2e
3
4// Copyright 2020-2021 InfluxData, Inc. All rights reserved.
5// Use of this source code is governed by MIT
6// license that can be found in the LICENSE file.
7
8package influxdb2_test
9
10import (
11	"context"
12	"fmt"
13	influxdb2 "github.com/influxdata/influxdb-client-go/v2"
14	"github.com/influxdata/influxdb-client-go/v2/domain"
15	"github.com/influxdata/influxdb-client-go/v2/log"
16	"github.com/stretchr/testify/assert"
17	"github.com/stretchr/testify/require"
18	"os"
19	"strconv"
20	"sync"
21	"testing"
22	"time"
23)
24
25var authToken string
26var serverURL string
27var serverV1URL string
28var onboardingURL string
29
30func getEnvValue(key, defVal string) string {
31	if val, ok := os.LookupEnv(key); ok {
32		return val
33	} else {
34		return defVal
35	}
36}
37
38func init() {
39	authToken = getEnvValue("INFLUXDB2_TOKEN", "my-token")
40	serverURL = getEnvValue("INFLUXDB2_URL", "http://localhost:8086")
41	serverV1URL = getEnvValue("INFLUXDB_URL", "http://localhost:8087")
42	onboardingURL = getEnvValue("INFLUXDB2_ONBOARDING_URL", "http://localhost:8089")
43}
44
45func TestSetup(t *testing.T) {
46	client := influxdb2.NewClientWithOptions(onboardingURL, "", influxdb2.DefaultOptions().SetLogLevel(2))
47	response, err := client.Setup(context.Background(), "my-user", "my-password", "my-org", "my-bucket", 24)
48	if err != nil {
49		t.Error(err)
50	}
51	require.NotNil(t, response)
52	require.NotNil(t, response.Auth)
53	require.NotNil(t, response.Auth.Token)
54	require.NotNil(t, response.Bucket)
55	require.NotNil(t, response.Bucket.RetentionRules)
56	require.Len(t, response.Bucket.RetentionRules, 1)
57	assert.Equal(t, int64(24*3600), response.Bucket.RetentionRules[0].EverySeconds)
58
59	_, err = client.Setup(context.Background(), "my-user", "my-password", "my-org", "my-bucket", 0)
60	require.NotNil(t, err)
61	assert.Equal(t, "conflict: onboarding has already been completed", err.Error())
62}
63
64func TestReady(t *testing.T) {
65	client := influxdb2.NewClient(serverURL, "")
66
67	ready, err := client.Ready(context.Background())
68	require.NoError(t, err)
69	require.NotNil(t, ready)
70	require.NotNil(t, ready.Started)
71	assert.True(t, ready.Started.Before(time.Now()))
72	dur, err := time.ParseDuration(*ready.Up)
73	require.NoError(t, err)
74	assert.True(t, dur.Seconds() > 0)
75}
76
77func TestHealth(t *testing.T) {
78	client := influxdb2.NewClient(serverURL, "")
79
80	health, err := client.Health(context.Background())
81	if err != nil {
82		t.Error(err)
83	}
84	require.NotNil(t, health)
85	assert.Equal(t, domain.HealthCheckStatusPass, health.Status)
86}
87
88func TestPing(t *testing.T) {
89	client := influxdb2.NewClient(serverURL, "")
90
91	ok, err := client.Ping(context.Background())
92	require.NoError(t, err)
93	assert.True(t, ok)
94}
95
96func TestWrite(t *testing.T) {
97	client := influxdb2.NewClientWithOptions(serverURL, authToken, influxdb2.DefaultOptions().SetLogLevel(3))
98	writeAPI := client.WriteAPI("my-org", "my-bucket")
99	errCh := writeAPI.Errors()
100	errorsCount := 0
101	var wg sync.WaitGroup
102	wg.Add(1)
103	go func() {
104		for err := range errCh {
105			errorsCount++
106			fmt.Println("Error proc: write error: ", err.Error())
107		}
108		fmt.Println("Error proc: finished ")
109		wg.Done()
110	}()
111	timestamp := time.Now()
112	for i, f := 0, 3.3; i < 10; i++ {
113		writeAPI.WriteRecord(fmt.Sprintf("test,a=%d,b=local f=%.2f,i=%di %d", i%2, f, i, timestamp.UnixNano()))
114		//writeAPI.Flush()
115		f += 3.3
116		timestamp = timestamp.Add(time.Nanosecond)
117	}
118
119	for i, f := int64(10), 33.0; i < 20; i++ {
120		p := influxdb2.NewPoint("test",
121			map[string]string{"a": strconv.FormatInt(i%2, 10), "b": "static"},
122			map[string]interface{}{"f": f, "i": i},
123			timestamp)
124		writeAPI.WritePoint(p)
125		f += 3.3
126		timestamp = timestamp.Add(time.Nanosecond)
127	}
128
129	err := client.WriteAPIBlocking("my-org", "my-bucket").WritePoint(context.Background(), influxdb2.NewPointWithMeasurement("test").
130		AddTag("a", "3").AddField("i", 20).AddField("f", 4.4))
131	assert.NoError(t, err)
132
133	client.Close()
134	wg.Wait()
135	assert.Equal(t, 0, errorsCount)
136
137}
138
139func TestQueryRaw(t *testing.T) {
140	client := influxdb2.NewClient(serverURL, authToken)
141
142	queryAPI := client.QueryAPI("my-org")
143	res, err := queryAPI.QueryRaw(context.Background(), `from(bucket:"my-bucket")|> range(start: -24h) |> filter(fn: (r) => r._measurement == "test")`, influxdb2.DefaultDialect())
144	if err != nil {
145		t.Error(err)
146	} else {
147		fmt.Println("QueryResult:")
148		fmt.Println(res)
149	}
150}
151
152func TestQuery(t *testing.T) {
153	client := influxdb2.NewClient(serverURL, authToken)
154
155	queryAPI := client.QueryAPI("my-org")
156	fmt.Println("QueryResult")
157	result, err := queryAPI.Query(context.Background(), `from(bucket:"my-bucket")|> range(start: -24h) |> filter(fn: (r) => r._measurement == "test")`)
158	if err != nil {
159		t.Error(err)
160	} else {
161		for result.Next() {
162			if result.TableChanged() {
163				fmt.Printf("table: %s\n", result.TableMetadata().String())
164			}
165			fmt.Printf("row: %sv\n", result.Record().String())
166		}
167		if result.Err() != nil {
168			t.Error(result.Err())
169		}
170	}
171
172}
173
174func TestPingV1(t *testing.T) {
175	client := influxdb2.NewClient(serverV1URL, "")
176
177	ok, err := client.Ping(context.Background())
178	require.NoError(t, err)
179	assert.True(t, ok)
180}
181
182func TestHealthV1Compatibility(t *testing.T) {
183	client := influxdb2.NewClient(serverV1URL, "")
184
185	health, err := client.Health(context.Background())
186	if err != nil {
187		t.Error(err)
188	}
189	require.NotNil(t, health)
190	assert.Equal(t, domain.HealthCheckStatusPass, health.Status)
191}
192
193func TestWriteV1Compatibility(t *testing.T) {
194	client := influxdb2.NewClientWithOptions(serverV1URL, "", influxdb2.DefaultOptions().SetLogLevel(log.DebugLevel))
195	writeAPI := client.WriteAPI("", "mydb/autogen")
196	errCh := writeAPI.Errors()
197	errorsCount := 0
198	var wg sync.WaitGroup
199	wg.Add(1)
200	go func() {
201		for err := range errCh {
202			errorsCount++
203			fmt.Println("Error proc: write error: ", err.Error())
204		}
205		wg.Done()
206	}()
207	timestamp := time.Now()
208	for i, f := 0, 3.3; i < 10; i++ {
209		writeAPI.WriteRecord(fmt.Sprintf("testv1,a=%d,b=local f=%.2f,i=%di %d", i%2, f, i, timestamp.UnixNano()))
210		//writeAPI.Flush()
211		f += 3.3
212		timestamp = timestamp.Add(time.Nanosecond)
213	}
214
215	for i, f := int64(10), 33.0; i < 20; i++ {
216		p := influxdb2.NewPoint("testv1",
217			map[string]string{"a": strconv.FormatInt(i%2, 10), "b": "static"},
218			map[string]interface{}{"f": f, "i": i},
219			timestamp)
220		writeAPI.WritePoint(p)
221		f += 3.3
222		timestamp = timestamp.Add(time.Nanosecond)
223	}
224
225	err := client.WriteAPIBlocking("", "mydb/autogen").WritePoint(context.Background(), influxdb2.NewPointWithMeasurement("testv1").
226		AddTag("a", "3").AddField("i", 20).AddField("f", 4.4))
227	assert.NoError(t, err)
228
229	client.Close()
230	wg.Wait()
231	assert.Equal(t, 0, errorsCount)
232
233}
234
235func TestQueryRawV1Compatibility(t *testing.T) {
236	client := influxdb2.NewClient(serverV1URL, "")
237
238	queryAPI := client.QueryAPI("")
239	res, err := queryAPI.QueryRaw(context.Background(), `from(bucket:"mydb/autogen")|> range(start: -24h) |> filter(fn: (r) => r._measurement == "testv1")`, influxdb2.DefaultDialect())
240	if err != nil {
241		t.Error(err)
242	} else {
243		fmt.Println("QueryResult:")
244		fmt.Println(res)
245	}
246}
247
248func TestQueryV1Compatibility(t *testing.T) {
249	client := influxdb2.NewClient(serverV1URL, "")
250
251	queryAPI := client.QueryAPI("")
252	fmt.Println("QueryResult")
253	result, err := queryAPI.Query(context.Background(), `from(bucket:"mydb/autogen")|> range(start: -24h) |> filter(fn: (r) => r._measurement == "testv1")`)
254	if err != nil {
255		t.Error(err)
256	} else {
257		rows := 0
258		for result.Next() {
259			rows++
260			if result.TableChanged() {
261				fmt.Printf("table: %s\n", result.TableMetadata().String())
262			}
263			fmt.Printf("row: %sv\n", result.Record().String())
264		}
265		if result.Err() != nil {
266			t.Error(result.Err())
267		}
268		assert.True(t, rows > 0)
269	}
270}
271
272func TestV2APIAgainstV1Server(t *testing.T) {
273	client := influxdb2.NewClient(serverV1URL, "")
274	ctx := context.Background()
275	_, err := client.AuthorizationsAPI().GetAuthorizations(ctx)
276	require.Error(t, err)
277	_, err = client.UsersAPI().GetUsers(ctx)
278	require.Error(t, err)
279	_, err = client.OrganizationsAPI().GetOrganizations(ctx)
280	require.Error(t, err)
281	_, err = client.TasksAPI().FindTasks(ctx, nil)
282	require.Error(t, err)
283	_, err = client.LabelsAPI().GetLabels(ctx)
284	require.Error(t, err)
285	_, err = client.BucketsAPI().GetBuckets(ctx)
286	require.Error(t, err)
287	err = client.DeleteAPI().DeleteWithName(ctx, "org", "bucket", time.Now(), time.Now(), "")
288	require.Error(t, err)
289}
290
291func TestHTTPService(t *testing.T) {
292	client := influxdb2.NewClient(serverURL, authToken)
293	apiClient := domain.NewClientWithResponses(client.HTTPService())
294	org, err := client.OrganizationsAPI().FindOrganizationByName(context.Background(), "my-org")
295	if err != nil {
296		//return err
297		t.Fatal(err)
298	}
299	taskDescription := "Example task"
300	taskFlux := `option task = {
301  name: "My task",
302  every: 1h
303}
304
305from(bucket:"my-bucket") |> range(start: -1m) |> last()`
306	taskStatus := domain.TaskStatusTypeActive
307	taskRequest := domain.TaskCreateRequest{
308		Org:         &org.Name,
309		OrgID:       org.Id,
310		Description: &taskDescription,
311		Flux:        taskFlux,
312		Status:      &taskStatus,
313	}
314	resp, err := apiClient.PostTasksWithResponse(context.Background(), &domain.PostTasksParams{}, domain.PostTasksJSONRequestBody(taskRequest))
315	if err != nil {
316		//return err
317		t.Error(err)
318	}
319	if resp.JSONDefault != nil {
320		t.Error(resp.JSONDefault.Message)
321	}
322	if assert.NotNil(t, resp.JSON201) {
323		assert.Equal(t, "My task", resp.JSON201.Name)
324		_, err := apiClient.DeleteTasksID(context.Background(), resp.JSON201.Id, &domain.DeleteTasksIDParams{})
325		if err != nil {
326			//return err
327			t.Error(err)
328		}
329	}
330}
331
332func TestLogsConcurrent(t *testing.T) {
333	var wg sync.WaitGroup
334	w := func(loc string, temp float32) {
335		client1 := influxdb2.NewClientWithOptions(serverURL, authToken, influxdb2.DefaultOptions().SetLogLevel(log.ErrorLevel))
336		for i := 0; i < 10000; i++ {
337			client1.WriteAPI("my-org", "my-bucket").WriteRecord(fmt.Sprintf("room,location=%s temp=%f", loc, temp))
338		}
339		client1.Close()
340		wg.Done()
341	}
342	for i := 0; i < 2; i++ {
343		wg.Add(1)
344		go w(fmt.Sprintf("T%d", i), 23.3+float32(i))
345		<-time.After(time.Nanosecond)
346	}
347	wg.Wait()
348}
349