1 /*--------------------------------------------------------------------------
2 *
3 * test.c
4 * Test harness code for shared memory message queues.
5 *
6 * Copyright (c) 2013-2021, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/test/modules/test_shm_mq/test.c
10 *
11 * -------------------------------------------------------------------------
12 */
13
14 #include "postgres.h"
15
16 #include "fmgr.h"
17 #include "miscadmin.h"
18 #include "pgstat.h"
19
20 #include "test_shm_mq.h"
21
22 PG_MODULE_MAGIC;
23
24 PG_FUNCTION_INFO_V1(test_shm_mq);
25 PG_FUNCTION_INFO_V1(test_shm_mq_pipelined);
26
27 void _PG_init(void);
28
29 static void verify_message(Size origlen, char *origdata, Size newlen,
30 char *newdata);
31
32 /*
33 * Simple test of the shared memory message queue infrastructure.
34 *
35 * We set up a ring of message queues passing through 1 or more background
36 * processes and eventually looping back to ourselves. We then send a message
37 * through the ring a number of times indicated by the loop count. At the end,
38 * we check whether the final message matches the one we started with.
39 */
40 Datum
test_shm_mq(PG_FUNCTION_ARGS)41 test_shm_mq(PG_FUNCTION_ARGS)
42 {
43 int64 queue_size = PG_GETARG_INT64(0);
44 text *message = PG_GETARG_TEXT_PP(1);
45 char *message_contents = VARDATA_ANY(message);
46 int message_size = VARSIZE_ANY_EXHDR(message);
47 int32 loop_count = PG_GETARG_INT32(2);
48 int32 nworkers = PG_GETARG_INT32(3);
49 dsm_segment *seg;
50 shm_mq_handle *outqh;
51 shm_mq_handle *inqh;
52 shm_mq_result res;
53 Size len;
54 void *data;
55
56 /* A negative loopcount is nonsensical. */
57 if (loop_count < 0)
58 ereport(ERROR,
59 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
60 errmsg("repeat count size must be an integer value greater than or equal to zero")));
61
62 /*
63 * Since this test sends data using the blocking interfaces, it cannot
64 * send data to itself. Therefore, a minimum of 1 worker is required. Of
65 * course, a negative worker count is nonsensical.
66 */
67 if (nworkers <= 0)
68 ereport(ERROR,
69 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
70 errmsg("number of workers must be an integer value greater than zero")));
71
72 /* Set up dynamic shared memory segment and background workers. */
73 test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
74
75 /* Send the initial message. */
76 res = shm_mq_send(outqh, message_size, message_contents, false);
77 if (res != SHM_MQ_SUCCESS)
78 ereport(ERROR,
79 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
80 errmsg("could not send message")));
81
82 /*
83 * Receive a message and send it back out again. Do this a number of
84 * times equal to the loop count.
85 */
86 for (;;)
87 {
88 /* Receive a message. */
89 res = shm_mq_receive(inqh, &len, &data, false);
90 if (res != SHM_MQ_SUCCESS)
91 ereport(ERROR,
92 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
93 errmsg("could not receive message")));
94
95 /* If this is supposed to be the last iteration, stop here. */
96 if (--loop_count <= 0)
97 break;
98
99 /* Send it back out. */
100 res = shm_mq_send(outqh, len, data, false);
101 if (res != SHM_MQ_SUCCESS)
102 ereport(ERROR,
103 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
104 errmsg("could not send message")));
105 }
106
107 /*
108 * Finally, check that we got back the same message from the last
109 * iteration that we originally sent.
110 */
111 verify_message(message_size, message_contents, len, data);
112
113 /* Clean up. */
114 dsm_detach(seg);
115
116 PG_RETURN_VOID();
117 }
118
119 /*
120 * Pipelined test of the shared memory message queue infrastructure.
121 *
122 * As in the basic test, we set up a ring of message queues passing through
123 * 1 or more background processes and eventually looping back to ourselves.
124 * Then, we send N copies of the user-specified message through the ring and
125 * receive them all back. Since this might fill up all message queues in the
126 * ring and then stall, we must be prepared to begin receiving the messages
127 * back before we've finished sending them.
128 */
129 Datum
test_shm_mq_pipelined(PG_FUNCTION_ARGS)130 test_shm_mq_pipelined(PG_FUNCTION_ARGS)
131 {
132 int64 queue_size = PG_GETARG_INT64(0);
133 text *message = PG_GETARG_TEXT_PP(1);
134 char *message_contents = VARDATA_ANY(message);
135 int message_size = VARSIZE_ANY_EXHDR(message);
136 int32 loop_count = PG_GETARG_INT32(2);
137 int32 nworkers = PG_GETARG_INT32(3);
138 bool verify = PG_GETARG_BOOL(4);
139 int32 send_count = 0;
140 int32 receive_count = 0;
141 dsm_segment *seg;
142 shm_mq_handle *outqh;
143 shm_mq_handle *inqh;
144 shm_mq_result res;
145 Size len;
146 void *data;
147
148 /* A negative loopcount is nonsensical. */
149 if (loop_count < 0)
150 ereport(ERROR,
151 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
152 errmsg("repeat count size must be an integer value greater than or equal to zero")));
153
154 /*
155 * Using the nonblocking interfaces, we can even send data to ourselves,
156 * so the minimum number of workers for this test is zero.
157 */
158 if (nworkers < 0)
159 ereport(ERROR,
160 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
161 errmsg("number of workers must be an integer value greater than or equal to zero")));
162
163 /* Set up dynamic shared memory segment and background workers. */
164 test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
165
166 /* Main loop. */
167 for (;;)
168 {
169 bool wait = true;
170
171 /*
172 * If we haven't yet sent the message the requisite number of times,
173 * try again to send it now. Note that when shm_mq_send() returns
174 * SHM_MQ_WOULD_BLOCK, the next call to that function must pass the
175 * same message size and contents; that's not an issue here because
176 * we're sending the same message every time.
177 */
178 if (send_count < loop_count)
179 {
180 res = shm_mq_send(outqh, message_size, message_contents, true);
181 if (res == SHM_MQ_SUCCESS)
182 {
183 ++send_count;
184 wait = false;
185 }
186 else if (res == SHM_MQ_DETACHED)
187 ereport(ERROR,
188 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
189 errmsg("could not send message")));
190 }
191
192 /*
193 * If we haven't yet received the message the requisite number of
194 * times, try to receive it again now.
195 */
196 if (receive_count < loop_count)
197 {
198 res = shm_mq_receive(inqh, &len, &data, true);
199 if (res == SHM_MQ_SUCCESS)
200 {
201 ++receive_count;
202 /* Verifying every time is slow, so it's optional. */
203 if (verify)
204 verify_message(message_size, message_contents, len, data);
205 wait = false;
206 }
207 else if (res == SHM_MQ_DETACHED)
208 ereport(ERROR,
209 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
210 errmsg("could not receive message")));
211 }
212 else
213 {
214 /*
215 * Otherwise, we've received the message enough times. This
216 * shouldn't happen unless we've also sent it enough times.
217 */
218 if (send_count != receive_count)
219 ereport(ERROR,
220 (errcode(ERRCODE_INTERNAL_ERROR),
221 errmsg("message sent %d times, but received %d times",
222 send_count, receive_count)));
223 break;
224 }
225
226 if (wait)
227 {
228 /*
229 * If we made no progress, wait for one of the other processes to
230 * which we are connected to set our latch, indicating that they
231 * have read or written data and therefore there may now be work
232 * for us to do.
233 */
234 (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
235 PG_WAIT_EXTENSION);
236 ResetLatch(MyLatch);
237 CHECK_FOR_INTERRUPTS();
238 }
239 }
240
241 /* Clean up. */
242 dsm_detach(seg);
243
244 PG_RETURN_VOID();
245 }
246
247 /*
248 * Verify that two messages are the same.
249 */
250 static void
verify_message(Size origlen,char * origdata,Size newlen,char * newdata)251 verify_message(Size origlen, char *origdata, Size newlen, char *newdata)
252 {
253 Size i;
254
255 if (origlen != newlen)
256 ereport(ERROR,
257 (errmsg("message corrupted"),
258 errdetail("The original message was %zu bytes but the final message is %zu bytes.",
259 origlen, newlen)));
260
261 for (i = 0; i < origlen; ++i)
262 if (origdata[i] != newdata[i])
263 ereport(ERROR,
264 (errmsg("message corrupted"),
265 errdetail("The new and original messages differ at byte %zu of %zu.", i, origlen)));
266 }
267