1 /*
2 Copyright 2016 Garrett D'Amore <garrett@damore.org>
3
4 Permission is hereby granted, free of charge, to any person obtaining a copy
5 of this software and associated documentation files (the "Software"),
6 to deal in the Software without restriction, including without limitation
7 the rights to use, copy, modify, merge, publish, distribute, sublicense,
8 and/or sell copies of the Software, and to permit persons to whom
9 the Software is furnished to do so, subject to the following conditions:
10
11 The above copyright notice and this permission notice shall be included
12 in all copies or substantial portions of the Software.
13
14 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
17 THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20 IN THE SOFTWARE.
21
22 "nanomsg" is a trademark of Martin Sustrik
23 */
24
25 /* This program serves as an example for how to write a simple PUB SUB
26 service, The server is just a single threaded for loop which broadcasts
27 messages to clients, every so often. The message is a binary format
28 message, containing two 32-bit unsigned integers. The first is UNIX time,
29 and the second is the number of directly connected subscribers.
30
31 The clients stay connected and print a message with this information
32 along with their process ID to standard output.
33
34 To run this program, start the server as pubsub_demo <url> -s
35 Then connect to it with the client as pubsub_demo <url>
36 For example:
37
38 % ./pubsub_demo tcp://127.0.0.1:5555 -s &
39 % ./pubsub_demo tcp://127.0.0.1:5555 &
40 % ./pubsub_demo tcp://127.0.0.1:5555 &
41 11:23:54 <pid 1254> There are 2 clients connected.
42 11:24:04 <pid 1255> There are 2 clients connected.
43 ..
44 */
45
46 #include <stdio.h>
47 #include <stdlib.h>
48 #include <string.h>
49 #include <time.h>
50 #include <netinet/in.h> /* For htonl and ntohl */
51 #include <unistd.h>
52
53 #include <nanomsg/nn.h>
54 #include <nanomsg/pubsub.h>
55
56 /* The server runs forever. */
server(const char * url)57 int server(const char *url)
58 {
59 int fd;
60
61 /* Create the socket. */
62 fd = nn_socket (AF_SP, NN_PUB);
63 if (fd < 0) {
64 fprintf (stderr, "nn_socket: %s\n", nn_strerror (nn_errno ()));
65 return (-1);
66 }
67
68 /* Bind to the URL. This will bind to the address and listen
69 synchronously; new clients will be accepted asynchronously
70 without further action from the calling program. */
71
72 if (nn_bind (fd, url) < 0) {
73 fprintf (stderr, "nn_bind: %s\n", nn_strerror (nn_errno ()));
74 nn_close (fd);
75 return (-1);
76 }
77
78 /* Now we can just publish results. Note that there is no explicit
79 accept required. We just start writing the information. */
80
81 for (;;) {
82 uint8_t msg[2 * sizeof (uint32_t)];
83 uint32_t secs, subs;
84 int rc;
85
86 secs = (uint32_t) time (NULL);
87 subs = (uint32_t) nn_get_statistic (fd, NN_STAT_CURRENT_CONNECTIONS);
88
89 secs = htonl (secs);
90 subs = htonl (subs);
91
92 memcpy (msg, &secs, sizeof (secs));
93 memcpy (msg + sizeof (secs), &subs, sizeof (subs));
94
95 rc = nn_send (fd, msg, sizeof (msg), 0);
96 if (rc < 0) {
97 /* There are several legitimate reasons this can fail.
98 We note them for debugging purposes, but then ignore
99 otherwise. */
100 fprintf (stderr, "nn_send: %s (ignoring)\n",
101 nn_strerror (nn_errno ()));
102 }
103 sleep(10);
104 }
105
106 /* NOTREACHED */
107 nn_close (fd);
108 return (-1);
109 }
110
111 /* The client runs in a loop, displaying the content. */
client(const char * url)112 int client (const char *url)
113 {
114 int fd;
115 int rc;
116
117 fd = nn_socket (AF_SP, NN_SUB);
118 if (fd < 0) {
119 fprintf (stderr, "nn_socket: %s\n", nn_strerror (nn_errno ()));
120 return (-1);
121 }
122
123 if (nn_connect (fd, url) < 0) {
124 fprintf (stderr, "nn_socket: %s\n", nn_strerror (nn_errno ()));
125 nn_close (fd);
126 return (-1);
127 }
128
129 /* We want all messages, so just subscribe to the empty value. */
130 if (nn_setsockopt (fd, NN_SUB, NN_SUB_SUBSCRIBE, "", 0) < 0) {
131 fprintf (stderr, "nn_setsockopt: %s\n", nn_strerror (nn_errno ()));
132 nn_close (fd);
133 return (-1);
134 }
135
136 for (;;) {
137 uint8_t msg[2 * sizeof (uint32_t)];
138 char hhmmss[9]; /* HH:MM:SS\0 */
139 uint32_t subs, secs;
140 time_t t;
141
142 rc = nn_recv (fd, msg, sizeof (msg), 0);
143 if (rc < 0) {
144 fprintf (stderr, "nn_recv: %s\n", nn_strerror (nn_errno ()));
145 break;
146 }
147 if (rc != sizeof (msg)) {
148 fprintf (stderr, "nn_recv: got %d bytes, wanted %d\n",
149 rc, (int)sizeof (msg));
150 break;
151 }
152 memcpy (&secs, msg, sizeof (secs));
153 memcpy (&subs, msg + sizeof (secs), sizeof (subs));
154
155 t = (time_t) ntohl(secs);
156 strftime (hhmmss, sizeof (hhmmss), "%T", localtime (&t));
157
158 printf ("%s <pid %u> There are %u clients connected.\n", hhmmss,
159 (unsigned) getpid(), (unsigned) ntohl(subs));
160 }
161
162 nn_close (fd);
163 return (-1);
164 }
165
main(int argc,char ** argv)166 int main (int argc, char **argv)
167 {
168 int rc;
169 if ((argc == 3) && (strcmp (argv[2], "-s") == 0)) {
170 rc = server (argv[1]);
171 } else if (argc == 2) {
172 rc = client (argv[1]);
173 } else {
174 fprintf (stderr, "Usage: %s <url> [-s]\n", argv[0]);
175 exit (EXIT_FAILURE);
176 }
177 exit (rc == 0 ? EXIT_SUCCESS : EXIT_FAILURE);
178 }
179