1# Copyright Maciej Sobczak 2008-2019.
2# This file is part of YAMI4.
3#
4# YAMI4 is free software: you can redistribute it and/or modify
5# it under the terms of the GNU General Public License as published by
6# the Free Software Foundation, either version 3 of the License, or
7# (at your option) any later version.
8#
9# YAMI4 is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12# GNU General Public License for more details.
13#
14# You should have received a copy of the GNU General Public License
15# along with YAMI4.  If not, see <http://www.gnu.org/licenses/>.
16
17import yami
18
19import time
20
21local_listener = "tcp://*:*"
22
23def test_1():
24    """Attempt to send a message to non-existing agent."""
25
26    client_agent = yami.Agent()
27
28    try:
29        client_agent.send_one_way("tcp://nosuchaddress:12345",
30                                  "nosuchobject", "badmessage")
31
32        assert False
33
34    except yami.YAMIError as e:
35        assert str(e) == "I/O error."
36
37    try:
38        # a bit dodgy, but 4 is an unassigned port in the list
39        # of well-known services, so there is a chance that
40        # no existing process uses it on the machine where this test
41        # is executed
42        # - if this test fails then it is a sign that some process
43        # has a listening socket on port 4 - pick another dummy number
44
45        client_agent.send_one_way("tcp://localhost:4",
46                                  "nosuchobject", "badmessage")
47
48        assert False
49
50    except yami.YAMIError as e:
51        assert str(e) == "I/O error."
52
53    client_agent.close()
54
55
56def test_2():
57    """Message sent to nonexisting object."""
58
59    server_agent = yami.Agent()
60    server_address = server_agent.add_listener(local_listener)
61
62    client_agent = yami.Agent()
63
64    # one-way message does not report any error
65    client_agent.send_one_way(server_address, "nosuchobject", "badmessage")
66
67    # two-way message is rejected
68    message = client_agent.send(server_address, "nosuchobject", "badmessage")
69
70    message.wait_for_completion()
71
72    assert message.get_state()[0] == yami.OutgoingMessage.REJECTED
73
74    assert message.get_exception_msg() == "Unknown destination object."
75
76    message.close()
77    client_agent.close()
78    server_agent.close()
79
80
81def test_2a():
82    """Message sent to nonexisting object, explicit connection management."""
83
84    server_agent = yami.Agent()
85    server_address = server_agent.add_listener(local_listener)
86
87    client_agent = yami.Agent()
88
89    dummy_content = {}
90    priority = 0
91    auto_connect = False
92
93    # message fails if there is no channel and no auto-connect
94    try:
95        message = client_agent.send(
96            server_address, "nosuchobject", "badmessage",
97            dummy_content, priority, auto_connect)
98        assert False
99    except yami.YAMIError as e:
100        assert str(e) == "I/O error."
101
102    # explicitly open the channel
103
104    client_agent.open_connection(server_address)
105
106    # message is successfully sent over existing channel,
107    # but later rejected by server
108
109    message = client_agent.send(
110        server_address, "nosuchobject", "badmessage",
111        dummy_content, priority, auto_connect)
112
113    message.wait_for_completion()
114
115    assert message.get_state()[0] == yami.OutgoingMessage.REJECTED
116
117    assert message.get_exception_msg() == "Unknown destination object."
118
119    message.close()
120    client_agent.close()
121    server_agent.close()
122
123
124object_name = "object"
125message_name = "message"
126
127class ObjectTypeForTest3(object):
128
129    def __init__(self):
130        self.got_message = False
131
132    def __call__(self, message):
133        self.got_message = True
134
135        assert message.get_object_name() == object_name
136        assert message.get_message_name() == message_name
137
138        content = message.get_parameters()
139        assert len(content) == 1
140        assert content["value"] == "ping"
141
142        content["value"] = "pong"
143        message.reply(content)
144
145def test_3():
146    """Message sent and replied to."""
147
148    server_agent = yami.Agent()
149    server_address = server_agent.add_listener(local_listener)
150
151    my_object = ObjectTypeForTest3()
152    server_agent.register_object(object_name, my_object)
153
154    client_agent = yami.Agent()
155
156    content = {"value":"ping"}
157
158    message = client_agent.send(server_address,
159                                object_name, message_name, content)
160
161    message.wait_for_transmission()
162
163    # after transmission the whole message is pushed out
164    state, sent_bytes, total_byte_count = message.get_state()
165
166    assert sent_bytes == total_byte_count
167
168    message.wait_for_completion()
169
170    assert my_object.got_message
171
172    assert message.get_state()[0] == yami.OutgoingMessage.REPLIED
173
174    content = message.get_reply()
175    assert content["value"] == "pong"
176
177    message.close()
178    client_agent.close()
179    server_agent.close()
180
181
182class ObjectTypeForTest4(object):
183
184    def __init__(self):
185        self.got_message = False
186
187    def __call__(self, message):
188        self.got_message = True
189
190        # expect empty parameters if no content is sent
191        content = message.get_parameters()
192        assert len(content) == 0
193
194        message.reject("some reason")
195
196def test_4():
197    """Message rejected by server."""
198
199    server_agent = yami.Agent()
200    server_address = server_agent.add_listener(local_listener)
201
202    my_object = ObjectTypeForTest4()
203    server_agent.register_object(object_name, my_object)
204
205    client_agent = yami.Agent()
206
207    message = client_agent.send(server_address, object_name, message_name)
208
209    message.wait_for_completion()
210
211    assert my_object.got_message
212
213    assert message.get_state()[0] == yami.OutgoingMessage.REJECTED
214    assert message.get_exception_msg() == "some reason"
215
216    message.close()
217    client_agent.close()
218    server_agent.close()
219
220
221class ObjectTypeForTest5(object):
222
223    def __init__(self):
224        self.got_message = False
225
226    def __call__(self, message):
227        self.got_message = True
228
229        raise Exception("something bad happened")
230
231def test_5():
232    """Message rejected due to exception in user code at the server side."""
233
234    server_agent = yami.Agent()
235    server_address = server_agent.add_listener(local_listener)
236
237    my_object = ObjectTypeForTest5()
238    server_agent.register_object(object_name, my_object)
239
240    client_agent = yami.Agent()
241
242    message = client_agent.send(server_address, object_name, message_name)
243
244    message.wait_for_completion()
245
246    assert my_object.got_message
247
248    assert message.get_state()[0] == yami.OutgoingMessage.REJECTED
249    assert message.get_exception_msg() == "something bad happened"
250
251    message.close()
252    client_agent.close()
253    server_agent.close()
254
255
256num_of_messages_in_test6 = 10
257
258class ObjectTypeForTest6(object):
259
260    def __init__(self, big_string):
261        self.got_message = []
262        for i in range(num_of_messages_in_test6):
263            self.got_message.append(False)
264        self.big_string = big_string
265
266    def __call__(self, message):
267        content = message.get_parameters()
268        id = content["id"]
269        self.got_message[id] = True
270
271        # uncomment it to see how the messages get reordered
272        #print("received message", id)
273
274        # verify the big value
275        value = content["big"]
276        assert value == self.big_string
277
278        message.reply()
279
280def test_6():
281    """Big messages sent with different priorities."""
282
283    # Note:
284    # The messages are sent with different priorities, which means
285    # that they might complete in the order that is different from the
286    # order of posting them to the outgoing queue.
287    # The messages are posted with increasing priorities (first message
288    # is sent with lowest priority, last message with highest),
289    # so it is *very likely* that they will be received by server
290    # in the reversed order, but this cannot be guaranteed as there is
291    # no relation between the speed of posting and the speed
292    # of transmission.
293
294    size_of_big_string = 1000000
295
296    big_string = size_of_big_string * "x"
297
298    server_agent = yami.Agent()
299    server_address = server_agent.add_listener(local_listener)
300
301    my_object = ObjectTypeForTest6(big_string)
302    server_agent.register_object(object_name, my_object)
303
304    client_agent = yami.Agent()
305
306    content = {"big":big_string}
307
308    messages = []
309
310    # send all messages with different ids and priorities
311    for i in range(num_of_messages_in_test6):
312        id = i
313        priority = i # increasing priority
314
315        content["id"] = id
316
317        messages.append(client_agent.send(
318                server_address, object_name, message_name, content, priority))
319
320    # wait for all messages to complete
321    for message in messages:
322        message.wait_for_completion()
323
324    for i in range(num_of_messages_in_test6):
325        assert my_object.got_message[i]
326        messages[i].close()
327
328    client_agent.close()
329    server_agent.close()
330
331
332class ObjectTypeForTest_7_8(object):
333
334    def __init__(self):
335        self.got_message = False
336
337    def __call__(self, message):
338        self.got_message = True
339
340        message.reply()
341
342
343def test_7():
344    """Message sent to load-balanced pair of destinations."""
345
346    server_agent_1 = yami.Agent()
347    server_address_1 = server_agent_1.add_listener(local_listener)
348
349    server_agent_2 = yami.Agent()
350    server_address_2 = server_agent_2.add_listener(local_listener)
351
352    load_balanced_target = "failover:(" + \
353        server_address_1 + "|" + server_address_2 + ")"
354
355    my_object_1 = ObjectTypeForTest_7_8()
356    server_agent_1.register_object(object_name, my_object_1)
357
358    my_object_2 = ObjectTypeForTest_7_8()
359    server_agent_2.register_object(object_name, my_object_2)
360
361    client_agent = yami.Agent()
362
363    message = client_agent.send(load_balanced_target,
364                                object_name, message_name)
365
366    # since this is a load-balanced (and failover) target,
367    # the message is implicitly waited for completion
368
369    assert message.get_state()[0] == yami.OutgoingMessage.REPLIED
370
371    # exactly one of two servers got the message
372    assert my_object_1.got_message and not my_object_2.got_message or \
373        (not my_object_1.got_message and my_object_2.got_message)
374
375    message.close()
376    client_agent.close()
377    server_agent_1.close()
378    server_agent_2.close()
379
380
381def test_8():
382    """Message sent to failover pair of destinations."""
383
384    # the failover pair consists of one proper address and one
385    # that is certainly not working
386
387    server_agent = yami.Agent()
388    server_address = server_agent.add_listener(local_listener)
389
390    my_object = ObjectTypeForTest_7_8()
391    server_agent.register_object(object_name, my_object)
392
393    client_agent = yami.Agent()
394
395    broken_target = "tcp://nosuchhost:4"
396
397    load_balanced_target = "failover:(" + \
398        server_address + "|" + broken_target + ")"
399
400    message = client_agent.send(load_balanced_target,
401                                object_name, message_name)
402
403    # since this is a failover target,
404    # the message is implicitly waited for completion
405
406    assert message.get_state()[0] == yami.OutgoingMessage.REPLIED
407
408    # the working server in the failover pair got the message
409    assert my_object.got_message
410
411    message.close()
412    client_agent.close()
413    server_agent.close()
414
415
416def test_9():
417    """Empty failover group is an error."""
418
419    client_agent = yami.Agent()
420
421    try:
422        client_agent.send_one_way("failover:()",
423                                  object_name, message_name)
424
425        assert False
426
427    except yami.YAMIError as e:
428        assert str(e) == "Empty failover group is not allowed."
429
430    client_agent.close()
431
432
433class EventCallbackTypeForTest11(object):
434
435    def __init__(self):
436        self.events = ""
437
438    def __call__(self, name, event):
439        if event == yami.Agent.NEW_INCOMING_CONNECTION:
440            self.events += "incoming "
441        elif event == yami.Agent.NEW_OUTGOING_CONNECTION:
442            self.events += "outgoing "
443        else:
444            self.events += "closed "
445
446def test_11():
447    """Connection event notifications."""
448
449    server_callback = EventCallbackTypeForTest11()
450    server_agent = yami.Agent({}, server_callback)
451
452    client_callback = EventCallbackTypeForTest11()
453    client_agent = yami.Agent({}, client_callback)
454
455    server_address = server_agent.add_listener(local_listener)
456
457    # no communication yet -> no connections
458    assert server_callback.events == ""
459    assert client_callback.events == ""
460
461    message = client_agent.send(server_address, "no_such_object", "hello")
462    message.wait_for_completion()
463    message.close()
464
465    # in Python connection events are not reported synchronously,
466    # but reuse the dispatching threads - this means that
467    # there might be some time between connection event getting
468    # recognized at the low level and having it delivered to user code
469    time.sleep(1)
470
471    # one connection open
472    assert server_callback.events == "incoming "
473    assert client_callback.events == "outgoing "
474
475    client_agent.close_connection(server_address)
476
477    # one connection open and one closed,
478    # but it is a race for both the client and the server
479    assert (server_callback.events == "incoming " or
480            server_callback.events == "incoming closed ")
481    assert (client_callback.events == "outgoing " or
482            client_callback.events == "outgoing closed ")
483
484    client_agent.close()
485    server_agent.close()
486
487
488def test_12():
489    """Frame size border conditions - messages with all possible lengths."""
490
491    server_agent = yami.Agent()
492    server_address = server_agent.add_listener(local_listener)
493
494    client_agent = yami.Agent()
495
496    max_string_size = 10000
497    for string_size in range(max_string_size):
498        s  = string_size * 'x'
499        params = {"value":s}
500
501        message = client_agent.send(
502            server_address, "nosuchobject", "hello", params)
503
504        message.wait_for_completion()
505
506        assert message.get_state()[0] == yami.OutgoingMessage.REJECTED
507
508        message.close()
509
510    client_agent.close()
511    server_agent.close()
512
513
514test_1()
515test_2()
516test_2a()
517test_3()
518test_4()
519test_5()
520test_6()
521test_7()
522test_8()
523test_9()
524test_11()
525test_12()
526