1//go:build e2e 2// +build e2e 3 4// Copyright 2020-2021 InfluxData, Inc. All rights reserved. 5// Use of this source code is governed by MIT 6// license that can be found in the LICENSE file. 7 8package influxdb2_test 9 10import ( 11 "context" 12 "fmt" 13 influxdb2 "github.com/influxdata/influxdb-client-go/v2" 14 "github.com/influxdata/influxdb-client-go/v2/domain" 15 "github.com/influxdata/influxdb-client-go/v2/log" 16 "github.com/stretchr/testify/assert" 17 "github.com/stretchr/testify/require" 18 "os" 19 "strconv" 20 "sync" 21 "testing" 22 "time" 23) 24 25var authToken string 26var serverURL string 27var serverV1URL string 28var onboardingURL string 29 30func getEnvValue(key, defVal string) string { 31 if val, ok := os.LookupEnv(key); ok { 32 return val 33 } else { 34 return defVal 35 } 36} 37 38func init() { 39 authToken = getEnvValue("INFLUXDB2_TOKEN", "my-token") 40 serverURL = getEnvValue("INFLUXDB2_URL", "http://localhost:8086") 41 serverV1URL = getEnvValue("INFLUXDB_URL", "http://localhost:8087") 42 onboardingURL = getEnvValue("INFLUXDB2_ONBOARDING_URL", "http://localhost:8089") 43} 44 45func TestSetup(t *testing.T) { 46 client := influxdb2.NewClientWithOptions(onboardingURL, "", influxdb2.DefaultOptions().SetLogLevel(2)) 47 response, err := client.Setup(context.Background(), "my-user", "my-password", "my-org", "my-bucket", 24) 48 if err != nil { 49 t.Error(err) 50 } 51 require.NotNil(t, response) 52 require.NotNil(t, response.Auth) 53 require.NotNil(t, response.Auth.Token) 54 require.NotNil(t, response.Bucket) 55 require.NotNil(t, response.Bucket.RetentionRules) 56 require.Len(t, response.Bucket.RetentionRules, 1) 57 assert.Equal(t, int64(24*3600), response.Bucket.RetentionRules[0].EverySeconds) 58 59 _, err = client.Setup(context.Background(), "my-user", "my-password", "my-org", "my-bucket", 0) 60 require.NotNil(t, err) 61 assert.Equal(t, "conflict: onboarding has already been completed", err.Error()) 62} 63 64func TestReady(t *testing.T) { 65 client := influxdb2.NewClient(serverURL, "") 66 67 ready, err := client.Ready(context.Background()) 68 require.NoError(t, err) 69 require.NotNil(t, ready) 70 require.NotNil(t, ready.Started) 71 assert.True(t, ready.Started.Before(time.Now())) 72 dur, err := time.ParseDuration(*ready.Up) 73 require.NoError(t, err) 74 assert.True(t, dur.Seconds() > 0) 75} 76 77func TestHealth(t *testing.T) { 78 client := influxdb2.NewClient(serverURL, "") 79 80 health, err := client.Health(context.Background()) 81 if err != nil { 82 t.Error(err) 83 } 84 require.NotNil(t, health) 85 assert.Equal(t, domain.HealthCheckStatusPass, health.Status) 86} 87 88func TestPing(t *testing.T) { 89 client := influxdb2.NewClient(serverURL, "") 90 91 ok, err := client.Ping(context.Background()) 92 require.NoError(t, err) 93 assert.True(t, ok) 94} 95 96func TestWrite(t *testing.T) { 97 client := influxdb2.NewClientWithOptions(serverURL, authToken, influxdb2.DefaultOptions().SetLogLevel(3)) 98 writeAPI := client.WriteAPI("my-org", "my-bucket") 99 errCh := writeAPI.Errors() 100 errorsCount := 0 101 var wg sync.WaitGroup 102 wg.Add(1) 103 go func() { 104 for err := range errCh { 105 errorsCount++ 106 fmt.Println("Error proc: write error: ", err.Error()) 107 } 108 fmt.Println("Error proc: finished ") 109 wg.Done() 110 }() 111 timestamp := time.Now() 112 for i, f := 0, 3.3; i < 10; i++ { 113 writeAPI.WriteRecord(fmt.Sprintf("test,a=%d,b=local f=%.2f,i=%di %d", i%2, f, i, timestamp.UnixNano())) 114 //writeAPI.Flush() 115 f += 3.3 116 timestamp = timestamp.Add(time.Nanosecond) 117 } 118 119 for i, f := int64(10), 33.0; i < 20; i++ { 120 p := influxdb2.NewPoint("test", 121 map[string]string{"a": strconv.FormatInt(i%2, 10), "b": "static"}, 122 map[string]interface{}{"f": f, "i": i}, 123 timestamp) 124 writeAPI.WritePoint(p) 125 f += 3.3 126 timestamp = timestamp.Add(time.Nanosecond) 127 } 128 129 err := client.WriteAPIBlocking("my-org", "my-bucket").WritePoint(context.Background(), influxdb2.NewPointWithMeasurement("test"). 130 AddTag("a", "3").AddField("i", 20).AddField("f", 4.4)) 131 assert.NoError(t, err) 132 133 client.Close() 134 wg.Wait() 135 assert.Equal(t, 0, errorsCount) 136 137} 138 139func TestQueryRaw(t *testing.T) { 140 client := influxdb2.NewClient(serverURL, authToken) 141 142 queryAPI := client.QueryAPI("my-org") 143 res, err := queryAPI.QueryRaw(context.Background(), `from(bucket:"my-bucket")|> range(start: -24h) |> filter(fn: (r) => r._measurement == "test")`, influxdb2.DefaultDialect()) 144 if err != nil { 145 t.Error(err) 146 } else { 147 fmt.Println("QueryResult:") 148 fmt.Println(res) 149 } 150} 151 152func TestQuery(t *testing.T) { 153 client := influxdb2.NewClient(serverURL, authToken) 154 155 queryAPI := client.QueryAPI("my-org") 156 fmt.Println("QueryResult") 157 result, err := queryAPI.Query(context.Background(), `from(bucket:"my-bucket")|> range(start: -24h) |> filter(fn: (r) => r._measurement == "test")`) 158 if err != nil { 159 t.Error(err) 160 } else { 161 for result.Next() { 162 if result.TableChanged() { 163 fmt.Printf("table: %s\n", result.TableMetadata().String()) 164 } 165 fmt.Printf("row: %sv\n", result.Record().String()) 166 } 167 if result.Err() != nil { 168 t.Error(result.Err()) 169 } 170 } 171 172} 173 174func TestPingV1(t *testing.T) { 175 client := influxdb2.NewClient(serverV1URL, "") 176 177 ok, err := client.Ping(context.Background()) 178 require.NoError(t, err) 179 assert.True(t, ok) 180} 181 182func TestHealthV1Compatibility(t *testing.T) { 183 client := influxdb2.NewClient(serverV1URL, "") 184 185 health, err := client.Health(context.Background()) 186 if err != nil { 187 t.Error(err) 188 } 189 require.NotNil(t, health) 190 assert.Equal(t, domain.HealthCheckStatusPass, health.Status) 191} 192 193func TestWriteV1Compatibility(t *testing.T) { 194 client := influxdb2.NewClientWithOptions(serverV1URL, "", influxdb2.DefaultOptions().SetLogLevel(log.DebugLevel)) 195 writeAPI := client.WriteAPI("", "mydb/autogen") 196 errCh := writeAPI.Errors() 197 errorsCount := 0 198 var wg sync.WaitGroup 199 wg.Add(1) 200 go func() { 201 for err := range errCh { 202 errorsCount++ 203 fmt.Println("Error proc: write error: ", err.Error()) 204 } 205 wg.Done() 206 }() 207 timestamp := time.Now() 208 for i, f := 0, 3.3; i < 10; i++ { 209 writeAPI.WriteRecord(fmt.Sprintf("testv1,a=%d,b=local f=%.2f,i=%di %d", i%2, f, i, timestamp.UnixNano())) 210 //writeAPI.Flush() 211 f += 3.3 212 timestamp = timestamp.Add(time.Nanosecond) 213 } 214 215 for i, f := int64(10), 33.0; i < 20; i++ { 216 p := influxdb2.NewPoint("testv1", 217 map[string]string{"a": strconv.FormatInt(i%2, 10), "b": "static"}, 218 map[string]interface{}{"f": f, "i": i}, 219 timestamp) 220 writeAPI.WritePoint(p) 221 f += 3.3 222 timestamp = timestamp.Add(time.Nanosecond) 223 } 224 225 err := client.WriteAPIBlocking("", "mydb/autogen").WritePoint(context.Background(), influxdb2.NewPointWithMeasurement("testv1"). 226 AddTag("a", "3").AddField("i", 20).AddField("f", 4.4)) 227 assert.NoError(t, err) 228 229 client.Close() 230 wg.Wait() 231 assert.Equal(t, 0, errorsCount) 232 233} 234 235func TestQueryRawV1Compatibility(t *testing.T) { 236 client := influxdb2.NewClient(serverV1URL, "") 237 238 queryAPI := client.QueryAPI("") 239 res, err := queryAPI.QueryRaw(context.Background(), `from(bucket:"mydb/autogen")|> range(start: -24h) |> filter(fn: (r) => r._measurement == "testv1")`, influxdb2.DefaultDialect()) 240 if err != nil { 241 t.Error(err) 242 } else { 243 fmt.Println("QueryResult:") 244 fmt.Println(res) 245 } 246} 247 248func TestQueryV1Compatibility(t *testing.T) { 249 client := influxdb2.NewClient(serverV1URL, "") 250 251 queryAPI := client.QueryAPI("") 252 fmt.Println("QueryResult") 253 result, err := queryAPI.Query(context.Background(), `from(bucket:"mydb/autogen")|> range(start: -24h) |> filter(fn: (r) => r._measurement == "testv1")`) 254 if err != nil { 255 t.Error(err) 256 } else { 257 rows := 0 258 for result.Next() { 259 rows++ 260 if result.TableChanged() { 261 fmt.Printf("table: %s\n", result.TableMetadata().String()) 262 } 263 fmt.Printf("row: %sv\n", result.Record().String()) 264 } 265 if result.Err() != nil { 266 t.Error(result.Err()) 267 } 268 assert.True(t, rows > 0) 269 } 270} 271 272func TestV2APIAgainstV1Server(t *testing.T) { 273 client := influxdb2.NewClient(serverV1URL, "") 274 ctx := context.Background() 275 _, err := client.AuthorizationsAPI().GetAuthorizations(ctx) 276 require.Error(t, err) 277 _, err = client.UsersAPI().GetUsers(ctx) 278 require.Error(t, err) 279 _, err = client.OrganizationsAPI().GetOrganizations(ctx) 280 require.Error(t, err) 281 _, err = client.TasksAPI().FindTasks(ctx, nil) 282 require.Error(t, err) 283 _, err = client.LabelsAPI().GetLabels(ctx) 284 require.Error(t, err) 285 _, err = client.BucketsAPI().GetBuckets(ctx) 286 require.Error(t, err) 287 err = client.DeleteAPI().DeleteWithName(ctx, "org", "bucket", time.Now(), time.Now(), "") 288 require.Error(t, err) 289} 290 291func TestHTTPService(t *testing.T) { 292 client := influxdb2.NewClient(serverURL, authToken) 293 apiClient := domain.NewClientWithResponses(client.HTTPService()) 294 org, err := client.OrganizationsAPI().FindOrganizationByName(context.Background(), "my-org") 295 if err != nil { 296 //return err 297 t.Fatal(err) 298 } 299 taskDescription := "Example task" 300 taskFlux := `option task = { 301 name: "My task", 302 every: 1h 303} 304 305from(bucket:"my-bucket") |> range(start: -1m) |> last()` 306 taskStatus := domain.TaskStatusTypeActive 307 taskRequest := domain.TaskCreateRequest{ 308 Org: &org.Name, 309 OrgID: org.Id, 310 Description: &taskDescription, 311 Flux: taskFlux, 312 Status: &taskStatus, 313 } 314 resp, err := apiClient.PostTasksWithResponse(context.Background(), &domain.PostTasksParams{}, domain.PostTasksJSONRequestBody(taskRequest)) 315 if err != nil { 316 //return err 317 t.Error(err) 318 } 319 if resp.JSONDefault != nil { 320 t.Error(resp.JSONDefault.Message) 321 } 322 if assert.NotNil(t, resp.JSON201) { 323 assert.Equal(t, "My task", resp.JSON201.Name) 324 _, err := apiClient.DeleteTasksID(context.Background(), resp.JSON201.Id, &domain.DeleteTasksIDParams{}) 325 if err != nil { 326 //return err 327 t.Error(err) 328 } 329 } 330} 331 332func TestLogsConcurrent(t *testing.T) { 333 var wg sync.WaitGroup 334 w := func(loc string, temp float32) { 335 client1 := influxdb2.NewClientWithOptions(serverURL, authToken, influxdb2.DefaultOptions().SetLogLevel(log.ErrorLevel)) 336 for i := 0; i < 10000; i++ { 337 client1.WriteAPI("my-org", "my-bucket").WriteRecord(fmt.Sprintf("room,location=%s temp=%f", loc, temp)) 338 } 339 client1.Close() 340 wg.Done() 341 } 342 for i := 0; i < 2; i++ { 343 wg.Add(1) 344 go w(fmt.Sprintf("T%d", i), 23.3+float32(i)) 345 <-time.After(time.Nanosecond) 346 } 347 wg.Wait() 348} 349