1# Copyright Maciej Sobczak 2008-2019. 2# This file is part of YAMI4. 3# 4# YAMI4 is free software: you can redistribute it and/or modify 5# it under the terms of the GNU General Public License as published by 6# the Free Software Foundation, either version 3 of the License, or 7# (at your option) any later version. 8# 9# YAMI4 is distributed in the hope that it will be useful, 10# but WITHOUT ANY WARRANTY; without even the implied warranty of 11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12# GNU General Public License for more details. 13# 14# You should have received a copy of the GNU General Public License 15# along with YAMI4. If not, see <http://www.gnu.org/licenses/>. 16 17import yami 18 19import time 20 21local_listener = "tcp://*:*" 22 23def test_1(): 24 """Attempt to send a message to non-existing agent.""" 25 26 client_agent = yami.Agent() 27 28 try: 29 client_agent.send_one_way("tcp://nosuchaddress:12345", 30 "nosuchobject", "badmessage") 31 32 assert False 33 34 except yami.YAMIError as e: 35 assert str(e) == "I/O error." 36 37 try: 38 # a bit dodgy, but 4 is an unassigned port in the list 39 # of well-known services, so there is a chance that 40 # no existing process uses it on the machine where this test 41 # is executed 42 # - if this test fails then it is a sign that some process 43 # has a listening socket on port 4 - pick another dummy number 44 45 client_agent.send_one_way("tcp://localhost:4", 46 "nosuchobject", "badmessage") 47 48 assert False 49 50 except yami.YAMIError as e: 51 assert str(e) == "I/O error." 52 53 client_agent.close() 54 55 56def test_2(): 57 """Message sent to nonexisting object.""" 58 59 server_agent = yami.Agent() 60 server_address = server_agent.add_listener(local_listener) 61 62 client_agent = yami.Agent() 63 64 # one-way message does not report any error 65 client_agent.send_one_way(server_address, "nosuchobject", "badmessage") 66 67 # two-way message is rejected 68 message = client_agent.send(server_address, "nosuchobject", "badmessage") 69 70 message.wait_for_completion() 71 72 assert message.get_state()[0] == yami.OutgoingMessage.REJECTED 73 74 assert message.get_exception_msg() == "Unknown destination object." 75 76 message.close() 77 client_agent.close() 78 server_agent.close() 79 80 81def test_2a(): 82 """Message sent to nonexisting object, explicit connection management.""" 83 84 server_agent = yami.Agent() 85 server_address = server_agent.add_listener(local_listener) 86 87 client_agent = yami.Agent() 88 89 dummy_content = {} 90 priority = 0 91 auto_connect = False 92 93 # message fails if there is no channel and no auto-connect 94 try: 95 message = client_agent.send( 96 server_address, "nosuchobject", "badmessage", 97 dummy_content, priority, auto_connect) 98 assert False 99 except yami.YAMIError as e: 100 assert str(e) == "I/O error." 101 102 # explicitly open the channel 103 104 client_agent.open_connection(server_address) 105 106 # message is successfully sent over existing channel, 107 # but later rejected by server 108 109 message = client_agent.send( 110 server_address, "nosuchobject", "badmessage", 111 dummy_content, priority, auto_connect) 112 113 message.wait_for_completion() 114 115 assert message.get_state()[0] == yami.OutgoingMessage.REJECTED 116 117 assert message.get_exception_msg() == "Unknown destination object." 118 119 message.close() 120 client_agent.close() 121 server_agent.close() 122 123 124object_name = "object" 125message_name = "message" 126 127class ObjectTypeForTest3(object): 128 129 def __init__(self): 130 self.got_message = False 131 132 def __call__(self, message): 133 self.got_message = True 134 135 assert message.get_object_name() == object_name 136 assert message.get_message_name() == message_name 137 138 content = message.get_parameters() 139 assert len(content) == 1 140 assert content["value"] == "ping" 141 142 content["value"] = "pong" 143 message.reply(content) 144 145def test_3(): 146 """Message sent and replied to.""" 147 148 server_agent = yami.Agent() 149 server_address = server_agent.add_listener(local_listener) 150 151 my_object = ObjectTypeForTest3() 152 server_agent.register_object(object_name, my_object) 153 154 client_agent = yami.Agent() 155 156 content = {"value":"ping"} 157 158 message = client_agent.send(server_address, 159 object_name, message_name, content) 160 161 message.wait_for_transmission() 162 163 # after transmission the whole message is pushed out 164 state, sent_bytes, total_byte_count = message.get_state() 165 166 assert sent_bytes == total_byte_count 167 168 message.wait_for_completion() 169 170 assert my_object.got_message 171 172 assert message.get_state()[0] == yami.OutgoingMessage.REPLIED 173 174 content = message.get_reply() 175 assert content["value"] == "pong" 176 177 message.close() 178 client_agent.close() 179 server_agent.close() 180 181 182class ObjectTypeForTest4(object): 183 184 def __init__(self): 185 self.got_message = False 186 187 def __call__(self, message): 188 self.got_message = True 189 190 # expect empty parameters if no content is sent 191 content = message.get_parameters() 192 assert len(content) == 0 193 194 message.reject("some reason") 195 196def test_4(): 197 """Message rejected by server.""" 198 199 server_agent = yami.Agent() 200 server_address = server_agent.add_listener(local_listener) 201 202 my_object = ObjectTypeForTest4() 203 server_agent.register_object(object_name, my_object) 204 205 client_agent = yami.Agent() 206 207 message = client_agent.send(server_address, object_name, message_name) 208 209 message.wait_for_completion() 210 211 assert my_object.got_message 212 213 assert message.get_state()[0] == yami.OutgoingMessage.REJECTED 214 assert message.get_exception_msg() == "some reason" 215 216 message.close() 217 client_agent.close() 218 server_agent.close() 219 220 221class ObjectTypeForTest5(object): 222 223 def __init__(self): 224 self.got_message = False 225 226 def __call__(self, message): 227 self.got_message = True 228 229 raise Exception("something bad happened") 230 231def test_5(): 232 """Message rejected due to exception in user code at the server side.""" 233 234 server_agent = yami.Agent() 235 server_address = server_agent.add_listener(local_listener) 236 237 my_object = ObjectTypeForTest5() 238 server_agent.register_object(object_name, my_object) 239 240 client_agent = yami.Agent() 241 242 message = client_agent.send(server_address, object_name, message_name) 243 244 message.wait_for_completion() 245 246 assert my_object.got_message 247 248 assert message.get_state()[0] == yami.OutgoingMessage.REJECTED 249 assert message.get_exception_msg() == "something bad happened" 250 251 message.close() 252 client_agent.close() 253 server_agent.close() 254 255 256num_of_messages_in_test6 = 10 257 258class ObjectTypeForTest6(object): 259 260 def __init__(self, big_string): 261 self.got_message = [] 262 for i in range(num_of_messages_in_test6): 263 self.got_message.append(False) 264 self.big_string = big_string 265 266 def __call__(self, message): 267 content = message.get_parameters() 268 id = content["id"] 269 self.got_message[id] = True 270 271 # uncomment it to see how the messages get reordered 272 #print("received message", id) 273 274 # verify the big value 275 value = content["big"] 276 assert value == self.big_string 277 278 message.reply() 279 280def test_6(): 281 """Big messages sent with different priorities.""" 282 283 # Note: 284 # The messages are sent with different priorities, which means 285 # that they might complete in the order that is different from the 286 # order of posting them to the outgoing queue. 287 # The messages are posted with increasing priorities (first message 288 # is sent with lowest priority, last message with highest), 289 # so it is *very likely* that they will be received by server 290 # in the reversed order, but this cannot be guaranteed as there is 291 # no relation between the speed of posting and the speed 292 # of transmission. 293 294 size_of_big_string = 1000000 295 296 big_string = size_of_big_string * "x" 297 298 server_agent = yami.Agent() 299 server_address = server_agent.add_listener(local_listener) 300 301 my_object = ObjectTypeForTest6(big_string) 302 server_agent.register_object(object_name, my_object) 303 304 client_agent = yami.Agent() 305 306 content = {"big":big_string} 307 308 messages = [] 309 310 # send all messages with different ids and priorities 311 for i in range(num_of_messages_in_test6): 312 id = i 313 priority = i # increasing priority 314 315 content["id"] = id 316 317 messages.append(client_agent.send( 318 server_address, object_name, message_name, content, priority)) 319 320 # wait for all messages to complete 321 for message in messages: 322 message.wait_for_completion() 323 324 for i in range(num_of_messages_in_test6): 325 assert my_object.got_message[i] 326 messages[i].close() 327 328 client_agent.close() 329 server_agent.close() 330 331 332class ObjectTypeForTest_7_8(object): 333 334 def __init__(self): 335 self.got_message = False 336 337 def __call__(self, message): 338 self.got_message = True 339 340 message.reply() 341 342 343def test_7(): 344 """Message sent to load-balanced pair of destinations.""" 345 346 server_agent_1 = yami.Agent() 347 server_address_1 = server_agent_1.add_listener(local_listener) 348 349 server_agent_2 = yami.Agent() 350 server_address_2 = server_agent_2.add_listener(local_listener) 351 352 load_balanced_target = "failover:(" + \ 353 server_address_1 + "|" + server_address_2 + ")" 354 355 my_object_1 = ObjectTypeForTest_7_8() 356 server_agent_1.register_object(object_name, my_object_1) 357 358 my_object_2 = ObjectTypeForTest_7_8() 359 server_agent_2.register_object(object_name, my_object_2) 360 361 client_agent = yami.Agent() 362 363 message = client_agent.send(load_balanced_target, 364 object_name, message_name) 365 366 # since this is a load-balanced (and failover) target, 367 # the message is implicitly waited for completion 368 369 assert message.get_state()[0] == yami.OutgoingMessage.REPLIED 370 371 # exactly one of two servers got the message 372 assert my_object_1.got_message and not my_object_2.got_message or \ 373 (not my_object_1.got_message and my_object_2.got_message) 374 375 message.close() 376 client_agent.close() 377 server_agent_1.close() 378 server_agent_2.close() 379 380 381def test_8(): 382 """Message sent to failover pair of destinations.""" 383 384 # the failover pair consists of one proper address and one 385 # that is certainly not working 386 387 server_agent = yami.Agent() 388 server_address = server_agent.add_listener(local_listener) 389 390 my_object = ObjectTypeForTest_7_8() 391 server_agent.register_object(object_name, my_object) 392 393 client_agent = yami.Agent() 394 395 broken_target = "tcp://nosuchhost:4" 396 397 load_balanced_target = "failover:(" + \ 398 server_address + "|" + broken_target + ")" 399 400 message = client_agent.send(load_balanced_target, 401 object_name, message_name) 402 403 # since this is a failover target, 404 # the message is implicitly waited for completion 405 406 assert message.get_state()[0] == yami.OutgoingMessage.REPLIED 407 408 # the working server in the failover pair got the message 409 assert my_object.got_message 410 411 message.close() 412 client_agent.close() 413 server_agent.close() 414 415 416def test_9(): 417 """Empty failover group is an error.""" 418 419 client_agent = yami.Agent() 420 421 try: 422 client_agent.send_one_way("failover:()", 423 object_name, message_name) 424 425 assert False 426 427 except yami.YAMIError as e: 428 assert str(e) == "Empty failover group is not allowed." 429 430 client_agent.close() 431 432 433class EventCallbackTypeForTest11(object): 434 435 def __init__(self): 436 self.events = "" 437 438 def __call__(self, name, event): 439 if event == yami.Agent.NEW_INCOMING_CONNECTION: 440 self.events += "incoming " 441 elif event == yami.Agent.NEW_OUTGOING_CONNECTION: 442 self.events += "outgoing " 443 else: 444 self.events += "closed " 445 446def test_11(): 447 """Connection event notifications.""" 448 449 server_callback = EventCallbackTypeForTest11() 450 server_agent = yami.Agent({}, server_callback) 451 452 client_callback = EventCallbackTypeForTest11() 453 client_agent = yami.Agent({}, client_callback) 454 455 server_address = server_agent.add_listener(local_listener) 456 457 # no communication yet -> no connections 458 assert server_callback.events == "" 459 assert client_callback.events == "" 460 461 message = client_agent.send(server_address, "no_such_object", "hello") 462 message.wait_for_completion() 463 message.close() 464 465 # in Python connection events are not reported synchronously, 466 # but reuse the dispatching threads - this means that 467 # there might be some time between connection event getting 468 # recognized at the low level and having it delivered to user code 469 time.sleep(1) 470 471 # one connection open 472 assert server_callback.events == "incoming " 473 assert client_callback.events == "outgoing " 474 475 client_agent.close_connection(server_address) 476 477 # one connection open and one closed, 478 # but it is a race for both the client and the server 479 assert (server_callback.events == "incoming " or 480 server_callback.events == "incoming closed ") 481 assert (client_callback.events == "outgoing " or 482 client_callback.events == "outgoing closed ") 483 484 client_agent.close() 485 server_agent.close() 486 487 488def test_12(): 489 """Frame size border conditions - messages with all possible lengths.""" 490 491 server_agent = yami.Agent() 492 server_address = server_agent.add_listener(local_listener) 493 494 client_agent = yami.Agent() 495 496 max_string_size = 10000 497 for string_size in range(max_string_size): 498 s = string_size * 'x' 499 params = {"value":s} 500 501 message = client_agent.send( 502 server_address, "nosuchobject", "hello", params) 503 504 message.wait_for_completion() 505 506 assert message.get_state()[0] == yami.OutgoingMessage.REJECTED 507 508 message.close() 509 510 client_agent.close() 511 server_agent.close() 512 513 514test_1() 515test_2() 516test_2a() 517test_3() 518test_4() 519test_5() 520test_6() 521test_7() 522test_8() 523test_9() 524test_11() 525test_12() 526