1 /*
2     Copyright 2016 Garrett D'Amore <garrett@damore.org>
3 
4     Permission is hereby granted, free of charge, to any person obtaining a copy
5     of this software and associated documentation files (the "Software"),
6     to deal in the Software without restriction, including without limitation
7     the rights to use, copy, modify, merge, publish, distribute, sublicense,
8     and/or sell copies of the Software, and to permit persons to whom
9     the Software is furnished to do so, subject to the following conditions:
10 
11     The above copyright notice and this permission notice shall be included
12     in all copies or substantial portions of the Software.
13 
14     THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15     IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16     FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
17     THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18     LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19     FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20     IN THE SOFTWARE.
21 
22     "nanomsg" is a trademark of Martin Sustrik
23 */
24 
25 /*  This program serves as an example for how to write a simple PUB SUB
26     service, The server is just a single threaded for loop which broadcasts
27     messages to clients, every so often.  The message is a binary format
28     message, containing two 32-bit unsigned integers.  The first is UNIX time,
29     and the second is the number of directly connected subscribers.
30 
31     The clients stay connected and print a message with this information
32     along with their process ID to standard output.
33 
34     To run this program, start the server as pubsub_demo <url> -s
35     Then connect to it with the client as pubsub_demo <url>
36     For example:
37 
38     % ./pubsub_demo tcp://127.0.0.1:5555 -s &
39     % ./pubsub_demo tcp://127.0.0.1:5555 &
40     % ./pubsub_demo tcp://127.0.0.1:5555 &
41     11:23:54 <pid 1254> There are 2 clients connected.
42     11:24:04 <pid 1255> There are 2 clients connected.
43     ..
44 */
45 
46 #include <stdio.h>
47 #include <stdlib.h>
48 #include <string.h>
49 #include <time.h>
50 #include <netinet/in.h>  /* For htonl and ntohl */
51 #include <unistd.h>
52 
53 #include <nanomsg/nn.h>
54 #include <nanomsg/pubsub.h>
55 
56 /*  The server runs forever. */
server(const char * url)57 int server(const char *url)
58 {
59     int fd;
60 
61     /*  Create the socket. */
62     fd = nn_socket (AF_SP, NN_PUB);
63     if (fd < 0) {
64         fprintf (stderr, "nn_socket: %s\n", nn_strerror (nn_errno ()));
65         return (-1);
66     }
67 
68     /*  Bind to the URL.  This will bind to the address and listen
69         synchronously; new clients will be accepted asynchronously
70         without further action from the calling program. */
71 
72     if (nn_bind (fd, url) < 0) {
73         fprintf (stderr, "nn_bind: %s\n", nn_strerror (nn_errno ()));
74         nn_close (fd);
75         return (-1);
76     }
77 
78     /*  Now we can just publish results.  Note that there is no explicit
79         accept required.  We just start writing the information. */
80 
81     for (;;) {
82         uint8_t msg[2 * sizeof (uint32_t)];
83         uint32_t secs, subs;
84         int rc;
85 
86         secs = (uint32_t) time (NULL);
87         subs = (uint32_t) nn_get_statistic (fd, NN_STAT_CURRENT_CONNECTIONS);
88 
89         secs = htonl (secs);
90         subs = htonl (subs);
91 
92         memcpy (msg, &secs, sizeof (secs));
93         memcpy (msg + sizeof (secs), &subs, sizeof (subs));
94 
95         rc = nn_send (fd, msg, sizeof (msg), 0);
96         if (rc < 0) {
97             /*  There are several legitimate reasons this can fail.
98                 We note them for debugging purposes, but then ignore
99                 otherwise. */
100             fprintf (stderr, "nn_send: %s (ignoring)\n",
101                 nn_strerror (nn_errno ()));
102         }
103         sleep(10);
104     }
105 
106     /* NOTREACHED */
107     nn_close (fd);
108     return (-1);
109 }
110 
111 /*  The client runs in a loop, displaying the content. */
client(const char * url)112 int client (const char *url)
113 {
114     int fd;
115     int rc;
116 
117     fd = nn_socket (AF_SP, NN_SUB);
118     if (fd < 0) {
119         fprintf (stderr, "nn_socket: %s\n", nn_strerror (nn_errno ()));
120         return (-1);
121     }
122 
123     if (nn_connect (fd, url) < 0) {
124         fprintf (stderr, "nn_socket: %s\n", nn_strerror (nn_errno ()));
125         nn_close (fd);
126         return (-1);
127     }
128 
129     /*  We want all messages, so just subscribe to the empty value. */
130     if (nn_setsockopt (fd, NN_SUB, NN_SUB_SUBSCRIBE, "", 0) < 0) {
131         fprintf (stderr, "nn_setsockopt: %s\n", nn_strerror (nn_errno ()));
132         nn_close (fd);
133         return (-1);
134     }
135 
136     for (;;) {
137         uint8_t msg[2 * sizeof (uint32_t)];
138         char hhmmss[9];  /* HH:MM:SS\0 */
139         uint32_t subs, secs;
140         time_t t;
141 
142         rc = nn_recv (fd, msg, sizeof (msg), 0);
143         if (rc < 0) {
144             fprintf (stderr, "nn_recv: %s\n", nn_strerror (nn_errno ()));
145             break;
146         }
147         if (rc != sizeof (msg)) {
148             fprintf (stderr, "nn_recv: got %d bytes, wanted %d\n",
149                 rc, (int)sizeof (msg));
150              break;
151         }
152         memcpy (&secs, msg, sizeof (secs));
153         memcpy (&subs, msg + sizeof (secs), sizeof (subs));
154 
155         t = (time_t) ntohl(secs);
156         strftime (hhmmss, sizeof (hhmmss), "%T", localtime (&t));
157 
158         printf ("%s <pid %u> There are %u clients connected.\n", hhmmss,
159             (unsigned) getpid(), (unsigned) ntohl(subs));
160     }
161 
162     nn_close (fd);
163     return (-1);
164 }
165 
main(int argc,char ** argv)166 int main (int argc, char **argv)
167 {
168     int rc;
169     if ((argc == 3) && (strcmp (argv[2], "-s") == 0)) {
170         rc = server (argv[1]);
171     } else if (argc == 2) {
172         rc = client (argv[1]);
173     } else {
174         fprintf (stderr, "Usage: %s <url> [-s]\n", argv[0]);
175         exit (EXIT_FAILURE);
176     }
177     exit (rc == 0 ? EXIT_SUCCESS : EXIT_FAILURE);
178 }
179