1 /*--------------------------------------------------------------------------
2  *
3  * test.c
4  *		Test harness code for shared memory message queues.
5  *
6  * Copyright (c) 2013-2017, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *		src/test/modules/test_shm_mq/test.c
10  *
11  * -------------------------------------------------------------------------
12  */
13 
14 #include "postgres.h"
15 
16 #include "fmgr.h"
17 #include "miscadmin.h"
18 #include "pgstat.h"
19 
20 #include "test_shm_mq.h"
21 
22 PG_MODULE_MAGIC;
23 
24 PG_FUNCTION_INFO_V1(test_shm_mq);
25 PG_FUNCTION_INFO_V1(test_shm_mq_pipelined);
26 
27 void		_PG_init(void);
28 
29 static void verify_message(Size origlen, char *origdata, Size newlen,
30 			   char *newdata);
31 
32 /*
33  * Simple test of the shared memory message queue infrastructure.
34  *
35  * We set up a ring of message queues passing through 1 or more background
36  * processes and eventually looping back to ourselves.  We then send a message
37  * through the ring a number of times indicated by the loop count.  At the end,
38  * we check whether the final message matches the one we started with.
39  */
40 Datum
test_shm_mq(PG_FUNCTION_ARGS)41 test_shm_mq(PG_FUNCTION_ARGS)
42 {
43 	int64		queue_size = PG_GETARG_INT64(0);
44 	text	   *message = PG_GETARG_TEXT_PP(1);
45 	char	   *message_contents = VARDATA_ANY(message);
46 	int			message_size = VARSIZE_ANY_EXHDR(message);
47 	int32		loop_count = PG_GETARG_INT32(2);
48 	int32		nworkers = PG_GETARG_INT32(3);
49 	dsm_segment *seg;
50 	shm_mq_handle *outqh;
51 	shm_mq_handle *inqh;
52 	shm_mq_result res;
53 	Size		len;
54 	void	   *data;
55 
56 	/* A negative loopcount is nonsensical. */
57 	if (loop_count < 0)
58 		ereport(ERROR,
59 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
60 				 errmsg("repeat count size must be an integer value greater than or equal to zero")));
61 
62 	/*
63 	 * Since this test sends data using the blocking interfaces, it cannot
64 	 * send data to itself.  Therefore, a minimum of 1 worker is required. Of
65 	 * course, a negative worker count is nonsensical.
66 	 */
67 	if (nworkers <= 0)
68 		ereport(ERROR,
69 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
70 				 errmsg("number of workers must be an integer value greater than zero")));
71 
72 	/* Set up dynamic shared memory segment and background workers. */
73 	test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
74 
75 	/* Send the initial message. */
76 	res = shm_mq_send(outqh, message_size, message_contents, false);
77 	if (res != SHM_MQ_SUCCESS)
78 		ereport(ERROR,
79 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
80 				 errmsg("could not send message")));
81 
82 	/*
83 	 * Receive a message and send it back out again.  Do this a number of
84 	 * times equal to the loop count.
85 	 */
86 	for (;;)
87 	{
88 		/* Receive a message. */
89 		res = shm_mq_receive(inqh, &len, &data, false);
90 		if (res != SHM_MQ_SUCCESS)
91 			ereport(ERROR,
92 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
93 					 errmsg("could not receive message")));
94 
95 		/* If this is supposed to be the last iteration, stop here. */
96 		if (--loop_count <= 0)
97 			break;
98 
99 		/* Send it back out. */
100 		res = shm_mq_send(outqh, len, data, false);
101 		if (res != SHM_MQ_SUCCESS)
102 			ereport(ERROR,
103 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
104 					 errmsg("could not send message")));
105 	}
106 
107 	/*
108 	 * Finally, check that we got back the same message from the last
109 	 * iteration that we originally sent.
110 	 */
111 	verify_message(message_size, message_contents, len, data);
112 
113 	/* Clean up. */
114 	dsm_detach(seg);
115 
116 	PG_RETURN_VOID();
117 }
118 
119 /*
120  * Pipelined test of the shared memory message queue infrastructure.
121  *
122  * As in the basic test, we set up a ring of message queues passing through
123  * 1 or more background processes and eventually looping back to ourselves.
124  * Then, we send N copies of the user-specified message through the ring and
125  * receive them all back.  Since this might fill up all message queues in the
126  * ring and then stall, we must be prepared to begin receiving the messages
127  * back before we've finished sending them.
128  */
129 Datum
test_shm_mq_pipelined(PG_FUNCTION_ARGS)130 test_shm_mq_pipelined(PG_FUNCTION_ARGS)
131 {
132 	int64		queue_size = PG_GETARG_INT64(0);
133 	text	   *message = PG_GETARG_TEXT_PP(1);
134 	char	   *message_contents = VARDATA_ANY(message);
135 	int			message_size = VARSIZE_ANY_EXHDR(message);
136 	int32		loop_count = PG_GETARG_INT32(2);
137 	int32		nworkers = PG_GETARG_INT32(3);
138 	bool		verify = PG_GETARG_BOOL(4);
139 	int32		send_count = 0;
140 	int32		receive_count = 0;
141 	dsm_segment *seg;
142 	shm_mq_handle *outqh;
143 	shm_mq_handle *inqh;
144 	shm_mq_result res;
145 	Size		len;
146 	void	   *data;
147 
148 	/* A negative loopcount is nonsensical. */
149 	if (loop_count < 0)
150 		ereport(ERROR,
151 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
152 				 errmsg("repeat count size must be an integer value greater than or equal to zero")));
153 
154 	/*
155 	 * Using the nonblocking interfaces, we can even send data to ourselves,
156 	 * so the minimum number of workers for this test is zero.
157 	 */
158 	if (nworkers < 0)
159 		ereport(ERROR,
160 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
161 				 errmsg("number of workers must be an integer value greater than or equal to zero")));
162 
163 	/* Set up dynamic shared memory segment and background workers. */
164 	test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
165 
166 	/* Main loop. */
167 	for (;;)
168 	{
169 		bool		wait = true;
170 
171 		/*
172 		 * If we haven't yet sent the message the requisite number of times,
173 		 * try again to send it now.  Note that when shm_mq_send() returns
174 		 * SHM_MQ_WOULD_BLOCK, the next call to that function must pass the
175 		 * same message size and contents; that's not an issue here because
176 		 * we're sending the same message every time.
177 		 */
178 		if (send_count < loop_count)
179 		{
180 			res = shm_mq_send(outqh, message_size, message_contents, true);
181 			if (res == SHM_MQ_SUCCESS)
182 			{
183 				++send_count;
184 				wait = false;
185 			}
186 			else if (res == SHM_MQ_DETACHED)
187 				ereport(ERROR,
188 						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
189 						 errmsg("could not send message")));
190 		}
191 
192 		/*
193 		 * If we haven't yet received the message the requisite number of
194 		 * times, try to receive it again now.
195 		 */
196 		if (receive_count < loop_count)
197 		{
198 			res = shm_mq_receive(inqh, &len, &data, true);
199 			if (res == SHM_MQ_SUCCESS)
200 			{
201 				++receive_count;
202 				/* Verifying every time is slow, so it's optional. */
203 				if (verify)
204 					verify_message(message_size, message_contents, len, data);
205 				wait = false;
206 			}
207 			else if (res == SHM_MQ_DETACHED)
208 				ereport(ERROR,
209 						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
210 						 errmsg("could not receive message")));
211 		}
212 		else
213 		{
214 			/*
215 			 * Otherwise, we've received the message enough times.  This
216 			 * shouldn't happen unless we've also sent it enough times.
217 			 */
218 			if (send_count != receive_count)
219 				ereport(ERROR,
220 						(errcode(ERRCODE_INTERNAL_ERROR),
221 						 errmsg("message sent %d times, but received %d times",
222 								send_count, receive_count)));
223 			break;
224 		}
225 
226 		if (wait)
227 		{
228 			/*
229 			 * If we made no progress, wait for one of the other processes to
230 			 * which we are connected to set our latch, indicating that they
231 			 * have read or written data and therefore there may now be work
232 			 * for us to do.
233 			 */
234 			WaitLatch(MyLatch, WL_LATCH_SET, 0, PG_WAIT_EXTENSION);
235 			ResetLatch(MyLatch);
236 			CHECK_FOR_INTERRUPTS();
237 		}
238 	}
239 
240 	/* Clean up. */
241 	dsm_detach(seg);
242 
243 	PG_RETURN_VOID();
244 }
245 
246 /*
247  * Verify that two messages are the same.
248  */
249 static void
verify_message(Size origlen,char * origdata,Size newlen,char * newdata)250 verify_message(Size origlen, char *origdata, Size newlen, char *newdata)
251 {
252 	Size		i;
253 
254 	if (origlen != newlen)
255 		ereport(ERROR,
256 				(errmsg("message corrupted"),
257 				 errdetail("The original message was %zu bytes but the final message is %zu bytes.",
258 						   origlen, newlen)));
259 
260 	for (i = 0; i < origlen; ++i)
261 		if (origdata[i] != newdata[i])
262 			ereport(ERROR,
263 					(errmsg("message corrupted"),
264 					 errdetail("The new and original messages differ at byte %zu of %zu.", i, origlen)));
265 }
266