1defmodule KafkaEx.Server0P9P0.Test do 2 use ExUnit.Case 3 import TestHelper 4 5 alias KafkaEx.Protocol.Heartbeat.Request, as: HeartbeatRequest 6 alias KafkaEx.Protocol.JoinGroup.Request, as: JoinGroupRequest 7 alias KafkaEx.Protocol.LeaveGroup.Request, as: LeaveGroupRequest 8 alias KafkaEx.Protocol.SyncGroup.Request, as: SyncGroupRequest 9 10 @moduletag :server_0_p_9_p_0 11 12 test "can join a consumer group" do 13 random_group = generate_random_string() 14 15 KafkaEx.create_worker( 16 :join_group, 17 uris: uris(), 18 consumer_group: random_group 19 ) 20 21 request = %JoinGroupRequest{ 22 group_name: random_group, 23 member_id: "", 24 topics: ["foo", "bar"], 25 session_timeout: 6000 26 } 27 28 answer = KafkaEx.join_group(request, worker_name: :join_group) 29 assert answer.error_code == :no_error 30 assert answer.generation_id == 1 31 # We should be the leader 32 assert answer.member_id == answer.leader_id 33 end 34 35 test "can send a simple leader sync for a consumer group" do 36 # A lot of repetition with the previous test. Leaving it in now, waiting for 37 # how this pans out eventually as we add more and more 0.9 consumer group code 38 random_group = generate_random_string() 39 40 KafkaEx.create_worker( 41 :sync_group, 42 uris: uris(), 43 consumer_group: random_group 44 ) 45 46 request = %JoinGroupRequest{ 47 group_name: random_group, 48 member_id: "", 49 topics: ["foo", "bar"], 50 session_timeout: 6000 51 } 52 53 answer = KafkaEx.join_group(request, worker_name: :sync_group) 54 assert answer.error_code == :no_error 55 56 member_id = answer.member_id 57 generation_id = answer.generation_id 58 my_assignments = [{"foo", [1]}, {"bar", [2]}] 59 assignments = [{member_id, my_assignments}] 60 61 request = %SyncGroupRequest{ 62 group_name: random_group, 63 member_id: member_id, 64 generation_id: generation_id, 65 assignments: assignments 66 } 67 68 answer = KafkaEx.sync_group(request, worker_name: :sync_group) 69 assert answer.error_code == :no_error 70 71 # Parsing happens to return the assignments reversed, which is fine as there's no 72 # ordering. Just reverse what we expect to match 73 assert answer.assignments == Enum.reverse(my_assignments) 74 end 75 76 test "can leave a consumer group" do 77 # A lot of repetition with the previous tests. Leaving it in now, waiting for 78 # how this pans out eventually as we add more and more 0.9 consumer group code 79 random_group = generate_random_string() 80 81 KafkaEx.create_worker( 82 :leave_group, 83 uris: uris(), 84 consumer_group: random_group 85 ) 86 87 request = %JoinGroupRequest{ 88 group_name: random_group, 89 member_id: "", 90 topics: ["foo", "bar"], 91 session_timeout: 6000 92 } 93 94 answer = KafkaEx.join_group(request, worker_name: :leave_group) 95 assert answer.error_code == :no_error 96 97 member_id = answer.member_id 98 99 request = %LeaveGroupRequest{ 100 group_name: random_group, 101 member_id: member_id 102 } 103 104 answer = KafkaEx.leave_group(request, worker_name: :leave_group) 105 assert answer.error_code == :no_error 106 end 107 108 test "can heartbeat" do 109 # See sync test. Removing repetition in the next iteration 110 random_group = generate_random_string() 111 112 KafkaEx.create_worker( 113 :heartbeat, 114 uris: uris(), 115 consumer_group: random_group 116 ) 117 118 request = %JoinGroupRequest{ 119 group_name: random_group, 120 member_id: "", 121 topics: ["foo", "bar"], 122 session_timeout: 6000 123 } 124 125 answer = KafkaEx.join_group(request, worker_name: :heartbeat) 126 assert answer.error_code == :no_error 127 128 member_id = answer.member_id 129 generation_id = answer.generation_id 130 my_assignments = [{"foo", [1]}, {"bar", [2]}] 131 assignments = [{member_id, my_assignments}] 132 133 request = %SyncGroupRequest{ 134 group_name: random_group, 135 member_id: member_id, 136 generation_id: generation_id, 137 assignments: assignments 138 } 139 140 answer = KafkaEx.sync_group(request, worker_name: :heartbeat) 141 assert answer.error_code == :no_error 142 143 request = %HeartbeatRequest{ 144 group_name: random_group, 145 member_id: member_id, 146 generation_id: generation_id 147 } 148 149 answer = KafkaEx.heartbeat(request, worker_name: :heartbeat) 150 assert answer.error_code == :no_error 151 end 152end 153