1 /*  =========================================================================
2     zgossip - decentralized configuration management
3 
4     Copyright (c) the Contributors as noted in the AUTHORS file.
5     This file is part of CZMQ, the high-level C binding for 0MQ:
6     http://czmq.zeromq.org.
7 
8     This Source Code Form is subject to the terms of the Mozilla Public
9     License, v. 2.0. If a copy of the MPL was not distributed with this
10     file, You can obtain one at http://mozilla.org/MPL/2.0/.
11     =========================================================================
12 */
13 
14 /*
15 @header
16     Implements a gossip protocol for decentralized configuration management.
17     Your applications nodes form a loosely connected network (which can have
18     cycles), and publish name/value tuples. Each node re-distributes the new
19     tuples it receives, so that the entire network eventually achieves a
20     consistent state. The current design does not expire tuples.
21 
22     Provides these commands (sent as multipart strings to the actor):
23 
24     * BIND endpoint -- binds the gossip service to specified endpoint
25     * PORT -- returns the last TCP port, if any, used for binding
26     * LOAD configfile -- load configuration from specified file
27     * SET configpath value -- set configuration path = value
28     * SAVE configfile -- save configuration to specified file
29     * CONNECT endpoint -- connect the gossip service to the specified peer
30     * PUBLISH key value -- publish a key/value pair to the gossip cluster
31     * STATUS -- return number of key/value pairs held by gossip service
32     * ZAP DOMAIN domain -- set the ZAP DOMAIN domain = value
33 
34     Returns these messages:
35 
36     * PORT number -- reply to PORT command
37     * STATUS number -- reply to STATUS command
38     * DELIVER key value -- new tuple delivered from network
39 @discuss
40     The gossip protocol distributes information around a loosely-connected
41     network of gossip services. The information consists of name/value pairs
42     published by applications at any point in the network. The goal of the
43     gossip protocol is to create eventual consistency between all the using
44     applications.
45 
46     The name/value pairs (tuples) can be used for configuration data, for
47     status updates, for presence, or for discovery. When used for discovery,
48     the gossip protocol works as an alternative to e.g. UDP beaconing.
49 
50     The gossip network consists of a set of loosely-coupled nodes that
51     exchange tuples. Nodes can be connected across arbitrary transports,
52     so the gossip network can have nodes that communicate over inproc,
53     over IPC, and/or over TCP, at the same time.
54 
55     Each node runs the same stack, which is a server-client hybrid using
56     a modified Harmony pattern (from Chapter 8 of the Guide):
57     http://zguide.zeromq.org/page:all#True-Peer-Connectivity-Harmony-Pattern
58 
59     Each node provides a ROUTER socket that accepts client connections on an
60     key defined by the application via a BIND command. The state machine
61     for these connections is in zgossip.xml, and the generated code is in
62     zgossip_engine.inc.
63 
64     Each node additionally creates outbound connections via DEALER sockets
65     to a set of servers ("remotes"), and under control of the calling app,
66     which sends CONNECT commands for each configured remote.
67 
68     The messages between client and server are defined in zgossip_msg.xml.
69     We built this stack using the zeromq/zproto toolkit.
70 
71     To join the gossip network, a node connects to one or more peers. Each
72     peer acts as a forwarder. This loosely-coupled network can scale to
73     thousands of nodes. However the gossip protocol is NOT designed to be
74     efficient, and should not be used for application data, as the same
75     tuples may be sent many times across the network.
76 
77     The basic logic of the gossip service is to accept PUBLISH messages
78     from its owning application, and to forward these to every remote, and
79     every client it talks to. When a node gets a duplicate tuple, it throws
80     it away. When a node gets a new tuple, it stores it, and forwards it as
81     just described.
82 
83     At present there is no way to expire tuples from the network.
84 
85     The assumptions in this design are:
86 
87     * The data set is slow-changing. Thus, the cost of the gossip protocol
88       is irrelevant with respect to other traffic.
89 @end
90 */
91 
92 #include "czmq_classes.h"
93 
94 // TODO- project.xml?
95 #ifdef CZMQ_BUILD_DRAFT_API
96 #define CZMQ_ZGOSSIP_ZAP_DOMAIN "global"
97 #endif
98 
99 
100 //  ---------------------------------------------------------------------
101 //  Forward declarations for the two main classes we use here
102 
103 typedef struct _server_t server_t;
104 typedef struct _client_t client_t;
105 typedef struct _tuple_t tuple_t;
106 
107 //  ---------------------------------------------------------------------
108 //  This structure defines the context for each running server. Store
109 //  whatever properties and structures you need for the server.
110 
111 struct _server_t {
112     //  These properties must always be present in the server_t
113     //  and are set by the generated engine; do not modify them!
114     zsock_t *pipe;              //  Actor pipe back to caller
115     zconfig_t *config;          //  Current loaded configuration
116 
117     //  Add any properties you need here
118     zlistx_t *remotes;          //  Parents, as zsock_t instances
119     zhashx_t *tuples;           //  Tuples, indexed by key
120 
121     tuple_t *cur_tuple;         //  Holds current tuple to publish
122     zgossip_msg_t *message;     //  Message to broadcast
123 
124     char *public_key;
125     char *secret_key;
126 #ifdef CZMQ_BUILD_DRAFT_API
127     char *zap_domain;
128 #endif
129 };
130 
131 //  ---------------------------------------------------------------------
132 //  This structure defines the state for each client connection. It will
133 //  be passed to each action in the 'self' argument.
134 
135 struct _client_t {
136     //  These properties must always be present in the client_t
137     //  and are set by the generated engine; do not modify them!
138     server_t *server;           //  Reference to parent server
139     zgossip_msg_t *message;     //  Message in and out
140 };
141 
142 
143 //  ---------------------------------------------------------------------
144 //  This structure defines one tuple that we track
145 
146 struct _tuple_t {
147     zhashx_t *container;         //  Hash table that holds this item
148     char *key;                  //  Tuple key
149     char *value;                //  Tuple value
150 };
151 
152 //  Callback when we remove a tuple from its container
153 
154 static void
tuple_free(void * argument)155 tuple_free (void *argument)
156 {
157     tuple_t *self = (tuple_t *) argument;
158     freen (self->key);
159     freen (self->value);
160     freen (self);
161 }
162 
163 //  Handle traffic from remotes
164 static int
165 remote_handler (zloop_t *loop, zsock_t *remote, void *argument);
166 
167 //  ---------------------------------------------------------------------
168 //  Include the generated server engine
169 
170 #include "zgossip_engine.inc"
171 
172 //  Allocate properties and structures for a new server instance.
173 //  Return 0 if OK, or -1 if there was an error.
174 
175 static int
server_initialize(server_t * self)176 server_initialize (server_t *self)
177 {
178     //  Default timeout for clients is one second; the caller can
179     //  override this with a SET message.
180     engine_configure (self, "server/timeout", "1000");
181     self->message = zgossip_msg_new ();
182 
183     self->remotes = zlistx_new ();
184     assert (self->remotes);
185     zlistx_set_destructor (self->remotes, (czmq_destructor *) zsock_destroy);
186 
187     self->tuples = zhashx_new ();
188     assert (self->tuples);
189 
190 #ifdef CZMQ_BUILD_DRAFT_API
191     self->zap_domain = strdup(CZMQ_ZGOSSIP_ZAP_DOMAIN);
192 #endif
193     return 0;
194 }
195 
196 //  Free properties and structures for a server instance
197 
198 static void
server_terminate(server_t * self)199 server_terminate (server_t *self)
200 {
201     zgossip_msg_destroy (&self->message);
202     zlistx_destroy (&self->remotes);
203     zhashx_destroy (&self->tuples);
204     zstr_free (&self->public_key);
205     zstr_free (&self->secret_key);
206 #ifdef CZMQ_BUILD_DRAFT_API
207     zstr_free (&self->zap_domain);
208 #endif
209 }
210 
211 //  Connect to a remote server
212 static void
213 #ifdef CZMQ_BUILD_DRAFT_API
server_connect(server_t * self,const char * endpoint,const char * public_key)214 server_connect (server_t *self, const char *endpoint, const char *public_key)
215 #else
216 server_connect (server_t *self, const char *endpoint)
217 #endif
218 {
219     zsock_t *remote = zsock_new (ZMQ_DEALER);
220     assert (remote);          //  No recovery if exhausted
221 
222 #ifdef CZMQ_BUILD_DRAFT_API
223     if (public_key){
224         zcert_t *cert = zcert_new_from_txt (self->public_key, self->secret_key);
225         zcert_apply(cert, remote);
226         zsock_set_curve_serverkey (remote, public_key);
227 #ifndef ZMQ_CURVE
228         // legacy ZMQ support
229         // inline incase the underlying assert is removed
230         bool ZMQ_CURVE = false;
231 #endif
232         assert (zsock_mechanism (remote) == ZMQ_CURVE);
233         zcert_destroy(&cert);
234     }
235 #endif
236     //  Never block on sending; we use an infinite HWM and buffer as many
237     //  messages as needed in outgoing pipes. Note that the maximum number
238     //  is the overall tuple set size.
239     zsock_set_unbounded (remote);
240 
241     if (zsock_connect (remote, "%s", endpoint)) {
242         zsys_warning ("bad zgossip endpoint '%s'", endpoint);
243         zsock_destroy (&remote);
244         return;
245     }
246     //  Send HELLO and then PUBLISH for each tuple we have
247     zgossip_msg_t *gossip = zgossip_msg_new ();
248     zgossip_msg_set_id (gossip, ZGOSSIP_MSG_HELLO);
249     zgossip_msg_send (gossip, remote);
250 
251     tuple_t *tuple = (tuple_t *) zhashx_first (self->tuples);
252     while (tuple) {
253         zgossip_msg_set_id (gossip, ZGOSSIP_MSG_PUBLISH);
254         zgossip_msg_set_key (gossip, tuple->key);
255         zgossip_msg_set_value (gossip, tuple->value);
256         zgossip_msg_send (gossip, remote);
257         tuple = (tuple_t *) zhashx_next (self->tuples);
258     }
259     //  Now monitor this remote for incoming messages
260     zgossip_msg_destroy (&gossip);
261     engine_handle_socket (self, remote, remote_handler);
262     zlistx_add_end (self->remotes, remote);
263 }
264 
265 
266 //  Process an incoming tuple on this server.
267 
268 static void
server_accept(server_t * self,const char * key,const char * value)269 server_accept (server_t *self, const char *key, const char *value)
270 {
271     tuple_t *tuple = (tuple_t *) zhashx_lookup (self->tuples, key);
272     if (tuple && streq (tuple->value, value))
273         return;                 //  Duplicate tuple, do nothing
274 
275     //  Create new tuple
276     tuple = (tuple_t *) zmalloc (sizeof (tuple_t));
277     assert (tuple);
278     tuple->container = self->tuples;
279     tuple->key = strdup (key);
280     tuple->value = strdup (value);
281 
282     //  Store new tuple
283     zhashx_update (tuple->container, key, tuple);
284     zhashx_freefn (tuple->container, key, tuple_free);
285 
286     //  Deliver to calling application
287     zstr_sendx (self->pipe, "DELIVER", key, value, NULL);
288 
289     //  Hold in server context so we can broadcast to all clients
290     self->cur_tuple = tuple;
291     engine_broadcast_event (self, NULL, forward_event);
292 
293     //  Copy new tuple announcement to all remotes
294     zgossip_msg_t *gossip = zgossip_msg_new ();
295     zgossip_msg_set_id (gossip, ZGOSSIP_MSG_PUBLISH);
296     zsock_t *remote = (zsock_t *) zlistx_first (self->remotes);
297     while (remote) {
298         zgossip_msg_set_key (gossip, tuple->key);
299         zgossip_msg_set_value (gossip, tuple->value);
300         zgossip_msg_send (gossip, remote);
301         remote = (zsock_t *) zlistx_next (self->remotes);
302     }
303     zgossip_msg_destroy (&gossip);
304 }
305 
306 //  Process server API method, return reply message if any
307 
308 static zmsg_t *
server_method(server_t * self,const char * method,zmsg_t * msg)309 server_method (server_t *self, const char *method, zmsg_t *msg)
310 {
311     //  Connect to a remote
312     zmsg_t *reply = NULL;
313     if (streq (method, "CONNECT")) {
314         char *endpoint = zmsg_popstr (msg);
315         assert (endpoint);
316 #ifdef CZMQ_BUILD_DRAFT_API
317         // leaving this in here for now because if/def changes the server_connect
318         // function args. it doesn't look like server_connect is used anywhere else
319         // but want to leave this in until we're sure this is stable..
320         char *public_key = zmsg_popstr (msg);
321         server_connect (self, endpoint, public_key);
322         zstr_free (&public_key);
323 #else
324         server_connect (self, endpoint);
325 #endif
326         zstr_free (&endpoint);
327     }
328     else
329     if (streq (method, "PUBLISH")) {
330         char *key = zmsg_popstr (msg);
331         char *value = zmsg_popstr (msg);
332         server_accept (self, key, value);
333         zstr_free (&key);
334         zstr_free (&value);
335     }
336     else
337     if (streq (method, "STATUS")) {
338         //  Return number of tuples we have stored
339         reply = zmsg_new ();
340         assert (reply);
341         zmsg_addstr (reply, "STATUS");
342         zmsg_addstrf (reply, "%d", (int) zhashx_size (self->tuples));
343     }
344 #ifdef CZMQ_BUILD_DRAFT_API
345     else
346     if (streq (method, "SET PUBLICKEY")) {
347         char *key = zmsg_popstr (msg);
348         self->public_key = strdup (key);
349         assert (self->public_key);
350         zstr_free (&key);
351     }
352     else
353     if (streq (method, "SET SECRETKEY")) {
354         char *key = zmsg_popstr (msg);
355         self->secret_key = strdup(key);
356         assert (self->secret_key);
357         zstr_free (&key);
358     }
359     else
360     if (streq (method, "ZAP DOMAIN")) {
361         char *value = zmsg_popstr (msg);
362         zstr_free(&self->zap_domain);
363         self->zap_domain = strdup(value);
364         assert (self->zap_domain);
365         zstr_free (&value);
366     }
367 #endif
368     else
369         zsys_error ("unknown zgossip method '%s'", method);
370 
371     return reply;
372 }
373 
374 //  Apply new configuration.
375 
376 static void
server_configuration(server_t * self,zconfig_t * config)377 server_configuration (server_t *self, zconfig_t *config)
378 {
379     ZPROTO_UNUSED(self);
380     ZPROTO_UNUSED(config);
381     //  Apply new configuration
382 }
383 
384 //  Allocate properties and structures for a new client connection and
385 //  optionally engine_set_next_event (). Return 0 if OK, or -1 on error.
386 
387 static int
client_initialize(client_t * self)388 client_initialize (client_t *self)
389 {
390     //  Construct properties here
391     return 0;
392 }
393 
394 //  Free properties and structures for a client connection
395 
396 static void
client_terminate(client_t * self)397 client_terminate (client_t *self)
398 {
399     //  Destroy properties here
400 }
401 
402 
403 //  --------------------------------------------------------------------------
404 //  get_first_tuple
405 //
406 
407 static void
get_first_tuple(client_t * self)408 get_first_tuple (client_t *self)
409 {
410     tuple_t *tuple = (tuple_t *) zhashx_first (self->server->tuples);
411     if (tuple) {
412         zgossip_msg_set_key (self->message, tuple->key);
413         zgossip_msg_set_value (self->message, tuple->value);
414         engine_set_next_event (self, ok_event);
415     }
416     else
417         engine_set_next_event (self, finished_event);
418 }
419 
420 
421 //  --------------------------------------------------------------------------
422 //  get_next_tuple
423 //
424 
425 static void
get_next_tuple(client_t * self)426 get_next_tuple (client_t *self)
427 {
428     tuple_t *tuple = (tuple_t *) zhashx_next (self->server->tuples);
429     if (tuple) {
430         zgossip_msg_set_key (self->message, tuple->key);
431         zgossip_msg_set_value (self->message, tuple->value);
432         engine_set_next_event (self, ok_event);
433     }
434     else
435         engine_set_next_event (self, finished_event);
436 }
437 
438 
439 //  --------------------------------------------------------------------------
440 //  store_tuple_if_new
441 //
442 
443 static void
store_tuple_if_new(client_t * self)444 store_tuple_if_new (client_t *self)
445 {
446     server_accept (self->server,
447                    zgossip_msg_key (self->message),
448                    zgossip_msg_value (self->message));
449 }
450 
451 
452 //  --------------------------------------------------------------------------
453 //  get_tuple_to_forward
454 //
455 
456 static void
get_tuple_to_forward(client_t * self)457 get_tuple_to_forward (client_t *self)
458 {
459     //  Hold this in server->cur_tuple so it's available to all
460     //  clients; the whole broadcast operation happens in one thread
461     //  so there's no risk of confusion here.
462     tuple_t *tuple = self->server->cur_tuple;
463     zgossip_msg_set_key (self->message, tuple->key);
464     zgossip_msg_set_value (self->message, tuple->value);
465 }
466 
467 
468 //  --------------------------------------------------------------------------
469 //  Handle messages coming from remotes
470 
471 static int
remote_handler(zloop_t * loop,zsock_t * remote,void * argument)472 remote_handler (zloop_t *loop, zsock_t *remote, void *argument)
473 {
474     server_t *self = (server_t *) argument;
475     if (zgossip_msg_recv (self->message, remote))
476         return -1;          //  Interrupted
477 
478     if (zgossip_msg_id (self->message) == ZGOSSIP_MSG_PUBLISH)
479         server_accept (self,
480                        zgossip_msg_key (self->message),
481                        zgossip_msg_value (self->message));
482     else
483     if (zgossip_msg_id (self->message) == ZGOSSIP_MSG_INVALID) {
484         //  Connection was reset, so send HELLO again
485         zgossip_msg_set_id (self->message, ZGOSSIP_MSG_HELLO);
486         zgossip_msg_send (self->message, remote);
487     }
488     else
489     if (zgossip_msg_id (self->message) == ZGOSSIP_MSG_PONG)
490         assert (true);   //  Do nothing with PONGs
491 
492     return 0;
493 }
494 
495 
496 //  --------------------------------------------------------------------------
497 //  Selftest
498 
499 void
zgossip_test(bool verbose)500 zgossip_test (bool verbose)
501 {
502     printf (" * zgossip: ");
503     if (verbose)
504         printf ("\n");
505 
506     //  @selftest
507     //  Test basic client-to-server operation of the protocol
508     zactor_t *server = zactor_new (zgossip, "server");
509     assert (server);
510     if (verbose)
511         zstr_send (server, "VERBOSE");
512     zstr_sendx (server, "BIND", "inproc://zgossip", NULL);
513 
514     zsock_t *client = zsock_new (ZMQ_DEALER);
515     assert (client);
516     zsock_set_rcvtimeo (client, 2000);
517     int rc = zsock_connect (client, "inproc://zgossip");
518     assert (rc == 0);
519 
520     //  Send HELLO, which gets no message
521     zgossip_msg_t *message = zgossip_msg_new ();
522     zgossip_msg_set_id (message, ZGOSSIP_MSG_HELLO);
523     zgossip_msg_send (message, client);
524 
525     //  Send PING, expect PONG back
526     zgossip_msg_set_id (message, ZGOSSIP_MSG_PING);
527     zgossip_msg_send (message, client);
528     zgossip_msg_recv (message, client);
529     assert (zgossip_msg_id (message) == ZGOSSIP_MSG_PONG);
530     zgossip_msg_destroy (&message);
531 
532     zactor_destroy (&server);
533     zsock_destroy (&client);
534 
535     //  Test peer-to-peer operations
536     zactor_t *base = zactor_new (zgossip, "base");
537     assert (base);
538     if (verbose)
539         zstr_send (base, "VERBOSE");
540     //  Set a 100msec timeout on clients so we can test expiry
541     zstr_sendx (base, "SET", "server/timeout", "100", NULL);
542     zstr_sendx (base, "BIND", "inproc://base", NULL);
543 
544     zactor_t *alpha = zactor_new (zgossip, "alpha");
545     assert (alpha);
546     zstr_sendx (alpha, "CONNECT", "inproc://base", NULL);
547     zstr_sendx (alpha, "PUBLISH", "inproc://alpha-1", "service1", NULL);
548     zstr_sendx (alpha, "PUBLISH", "inproc://alpha-2", "service2", NULL);
549 
550     zactor_t *beta = zactor_new (zgossip, "beta");
551     assert (beta);
552     zstr_sendx (beta, "CONNECT", "inproc://base", NULL);
553     zstr_sendx (beta, "PUBLISH", "inproc://beta-1", "service1", NULL);
554     zstr_sendx (beta, "PUBLISH", "inproc://beta-2", "service2", NULL);
555 
556     //  got nothing
557     zclock_sleep (200);
558 
559     zstr_send (alpha, "STATUS");
560     char *command, *status, *key, *value;
561 
562     zstr_recvx (alpha, &command, &key, &value, NULL);
563     assert (streq (command, "DELIVER"));
564     assert (streq (key, "inproc://alpha-1"));
565     assert (streq (value, "service1"));
566     zstr_free (&command);
567     zstr_free (&key);
568     zstr_free (&value);
569 
570     zstr_recvx (alpha, &command, &key, &value, NULL);
571     assert (streq (command, "DELIVER"));
572     assert (streq (key, "inproc://alpha-2"));
573     assert (streq (value, "service2"));
574     zstr_free (&command);
575     zstr_free (&key);
576     zstr_free (&value);
577 
578     zstr_recvx (alpha, &command, &key, &value, NULL);
579     assert (streq (command, "DELIVER"));
580     assert (streq (key, "inproc://beta-1"));
581     assert (streq (value, "service1"));
582     zstr_free (&command);
583     zstr_free (&key);
584     zstr_free (&value);
585 
586     zstr_recvx (alpha, &command, &key, &value, NULL);
587     assert (streq (command, "DELIVER"));
588     assert (streq (key, "inproc://beta-2"));
589     assert (streq (value, "service2"));
590     zstr_free (&command);
591     zstr_free (&key);
592     zstr_free (&value);
593 
594     zstr_recvx (alpha, &command, &status, NULL);
595     assert (streq (command, "STATUS"));
596     assert (atoi (status) == 4);
597     zstr_free (&command);
598     zstr_free (&status);
599 
600     zactor_destroy (&base);
601     zactor_destroy (&alpha);
602     zactor_destroy (&beta);
603 
604 #ifdef CZMQ_BUILD_DRAFT_API
605     // curve
606     if (zsys_has_curve()) {
607         if (verbose)
608             printf("testing CURVE support");
609         zclock_sleep (2000);
610         zactor_t *auth = zactor_new(zauth, NULL);
611         assert (auth);
612         if (verbose) {
613             zstr_sendx (auth, "VERBOSE", NULL);
614             zsock_wait (auth);
615         }
616         zstr_sendx(auth,"ALLOW","127.0.0.1",NULL);
617         zsock_wait(auth);
618         zstr_sendx (auth, "CURVE", CURVE_ALLOW_ANY, NULL);
619         zsock_wait (auth);
620 
621         server = zactor_new (zgossip, "server");
622         if (verbose)
623             zstr_send (server, "VERBOSE");
624         assert (server);
625 
626         zcert_t *client1_cert = zcert_new ();
627         zcert_t *server_cert = zcert_new ();
628 
629         zstr_sendx (server, "SET PUBLICKEY", zcert_public_txt (server_cert), NULL);
630         zstr_sendx (server, "SET SECRETKEY", zcert_secret_txt (server_cert), NULL);
631         zstr_sendx (server, "ZAP DOMAIN", "TEST", NULL);
632 
633         zstr_sendx (server, "BIND", "tcp://127.0.0.1:*", NULL);
634         zstr_sendx (server, "PORT", NULL);
635         zstr_recvx (server, &command, &value, NULL);
636         assert (streq (command, "PORT"));
637         int port = atoi (value);
638         zstr_free (&command);
639         zstr_free (&value);
640         char endpoint [32];
641         sprintf (endpoint, "tcp://127.0.0.1:%d", port);
642 
643         zactor_t *client1 = zactor_new (zgossip, "client");
644         if (verbose)
645             zstr_send (client1, "VERBOSE");
646         assert (client1);
647 
648         zstr_sendx (client1, "SET PUBLICKEY", zcert_public_txt (client1_cert), NULL);
649         zstr_sendx (client1, "SET SECRETKEY", zcert_secret_txt (client1_cert), NULL);
650         zstr_sendx (client1, "ZAP DOMAIN", "TEST", NULL);
651 
652         const char *public_txt = zcert_public_txt (server_cert);
653         zstr_sendx (client1, "CONNECT", endpoint, public_txt, NULL);
654         zstr_sendx (client1, "PUBLISH", "tcp://127.0.0.1:9001", "service1", NULL);
655 
656         zclock_sleep (500);
657 
658         zstr_send (server, "STATUS");
659         zclock_sleep (500);
660 
661         zstr_recvx (server, &command, &key, &value, NULL);
662         assert (streq (command, "DELIVER"));
663         assert (streq (value, "service1"));
664 
665         zstr_free (&command);
666         zstr_free (&key);
667         zstr_free (&value);
668 
669         zstr_sendx (client1, "$TERM", NULL);
670         zstr_sendx (server, "$TERM", NULL);
671 
672         zclock_sleep(500);
673 
674         zcert_destroy (&client1_cert);
675         zcert_destroy (&server_cert);
676 
677         zactor_destroy (&client1);
678         zactor_destroy (&server);
679         zactor_destroy (&auth);
680     }
681 #endif
682 
683 #if defined (__WINDOWS__)
684     zsys_shutdown();
685 #endif
686 
687     //  @end
688     printf ("OK\n");
689 }
690