1defmodule KafkaEx.Server0P9P0.Test do
2  use ExUnit.Case
3  import TestHelper
4
5  alias KafkaEx.Protocol.Heartbeat.Request, as: HeartbeatRequest
6  alias KafkaEx.Protocol.JoinGroup.Request, as: JoinGroupRequest
7  alias KafkaEx.Protocol.LeaveGroup.Request, as: LeaveGroupRequest
8  alias KafkaEx.Protocol.SyncGroup.Request, as: SyncGroupRequest
9
10  @moduletag :server_0_p_9_p_0
11
12  test "can join a consumer group" do
13    random_group = generate_random_string()
14
15    KafkaEx.create_worker(
16      :join_group,
17      uris: uris(),
18      consumer_group: random_group
19    )
20
21    request = %JoinGroupRequest{
22      group_name: random_group,
23      member_id: "",
24      topics: ["foo", "bar"],
25      session_timeout: 6000
26    }
27
28    answer = KafkaEx.join_group(request, worker_name: :join_group)
29    assert answer.error_code == :no_error
30    assert answer.generation_id == 1
31    # We should be the leader
32    assert answer.member_id == answer.leader_id
33  end
34
35  test "can send a simple leader sync for a consumer group" do
36    # A lot of repetition with the previous test. Leaving it in now, waiting for
37    # how this pans out eventually as we add more and more 0.9 consumer group code
38    random_group = generate_random_string()
39
40    KafkaEx.create_worker(
41      :sync_group,
42      uris: uris(),
43      consumer_group: random_group
44    )
45
46    request = %JoinGroupRequest{
47      group_name: random_group,
48      member_id: "",
49      topics: ["foo", "bar"],
50      session_timeout: 6000
51    }
52
53    answer = KafkaEx.join_group(request, worker_name: :sync_group)
54    assert answer.error_code == :no_error
55
56    member_id = answer.member_id
57    generation_id = answer.generation_id
58    my_assignments = [{"foo", [1]}, {"bar", [2]}]
59    assignments = [{member_id, my_assignments}]
60
61    request = %SyncGroupRequest{
62      group_name: random_group,
63      member_id: member_id,
64      generation_id: generation_id,
65      assignments: assignments
66    }
67
68    answer = KafkaEx.sync_group(request, worker_name: :sync_group)
69    assert answer.error_code == :no_error
70
71    # Parsing happens to return the assignments reversed, which is fine as there's no
72    # ordering. Just reverse what we expect to match
73    assert answer.assignments == Enum.reverse(my_assignments)
74  end
75
76  test "can leave a consumer group" do
77    # A lot of repetition with the previous tests. Leaving it in now, waiting for
78    # how this pans out eventually as we add more and more 0.9 consumer group code
79    random_group = generate_random_string()
80
81    KafkaEx.create_worker(
82      :leave_group,
83      uris: uris(),
84      consumer_group: random_group
85    )
86
87    request = %JoinGroupRequest{
88      group_name: random_group,
89      member_id: "",
90      topics: ["foo", "bar"],
91      session_timeout: 6000
92    }
93
94    answer = KafkaEx.join_group(request, worker_name: :leave_group)
95    assert answer.error_code == :no_error
96
97    member_id = answer.member_id
98
99    request = %LeaveGroupRequest{
100      group_name: random_group,
101      member_id: member_id
102    }
103
104    answer = KafkaEx.leave_group(request, worker_name: :leave_group)
105    assert answer.error_code == :no_error
106  end
107
108  test "can heartbeat" do
109    # See sync test. Removing repetition in the next iteration
110    random_group = generate_random_string()
111
112    KafkaEx.create_worker(
113      :heartbeat,
114      uris: uris(),
115      consumer_group: random_group
116    )
117
118    request = %JoinGroupRequest{
119      group_name: random_group,
120      member_id: "",
121      topics: ["foo", "bar"],
122      session_timeout: 6000
123    }
124
125    answer = KafkaEx.join_group(request, worker_name: :heartbeat)
126    assert answer.error_code == :no_error
127
128    member_id = answer.member_id
129    generation_id = answer.generation_id
130    my_assignments = [{"foo", [1]}, {"bar", [2]}]
131    assignments = [{member_id, my_assignments}]
132
133    request = %SyncGroupRequest{
134      group_name: random_group,
135      member_id: member_id,
136      generation_id: generation_id,
137      assignments: assignments
138    }
139
140    answer = KafkaEx.sync_group(request, worker_name: :heartbeat)
141    assert answer.error_code == :no_error
142
143    request = %HeartbeatRequest{
144      group_name: random_group,
145      member_id: member_id,
146      generation_id: generation_id
147    }
148
149    answer = KafkaEx.heartbeat(request, worker_name: :heartbeat)
150    assert answer.error_code == :no_error
151  end
152end
153