1 /*
2  * Copyright (c) 2004-2006 Voltaire, Inc. All rights reserved.
3  * Copyright (c) 2002-2005 Mellanox Technologies LTD. All rights reserved.
4  * Copyright (c) 1996-2003 Intel Corporation. All rights reserved.
5  *
6  * This software is available to you under a choice of one of two
7  * licenses.  You may choose to be licensed under the terms of the GNU
8  * General Public License (GPL) Version 2, available from the file
9  * COPYING in the main directory of this source tree, or the
10  * OpenIB.org BSD license below:
11  *
12  *     Redistribution and use in source and binary forms, with or
13  *     without modification, are permitted provided that the following
14  *     conditions are met:
15  *
16  *      - Redistributions of source code must retain the above
17  *        copyright notice, this list of conditions and the following
18  *        disclaimer.
19  *
20  *      - Redistributions in binary form must reproduce the above
21  *        copyright notice, this list of conditions and the following
22  *        disclaimer in the documentation and/or other materials
23  *        provided with the distribution.
24  *
25  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
26  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
27  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
28  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
29  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
30  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
31  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
32  * SOFTWARE.
33  *
34  */
35 
36 /*
37  * Abstract:
38  *    Implementation of Dispatcher abstraction.
39  *
40  */
41 
42 #if HAVE_CONFIG_H
43 #  include <config.h>
44 #endif				/* HAVE_CONFIG_H */
45 
46 #include <stdlib.h>
47 #include <complib/cl_dispatcher.h>
48 #include <complib/cl_thread.h>
49 #include <complib/cl_timer.h>
50 
51 /* give some guidance when we build our cl_pool of messages */
52 #define CL_DISP_INITIAL_MSG_COUNT   256
53 #define CL_DISP_MSG_GROW_SIZE       64
54 
55 /* give some guidance when we build our cl_pool of registration elements */
56 #define CL_DISP_INITIAL_REG_COUNT   16
57 #define CL_DISP_REG_GROW_SIZE       16
58 
59 /********************************************************************
60    __cl_disp_worker
61 
62    Description:
63    This function takes messages off the FIFO and calls Processmsg()
64    This function executes as passive level.
65 
66    Inputs:
67    p_disp - Pointer to Dispatcher object
68 
69    Outputs:
70    None
71 
72    Returns:
73    None
74 ********************************************************************/
75 void __cl_disp_worker(IN void *context)
76 {
77 	cl_disp_msg_t *p_msg;
78 	cl_dispatcher_t *p_disp = (cl_dispatcher_t *) context;
79 
80 	cl_spinlock_acquire(&p_disp->lock);
81 
82 	/* Process the FIFO until we drain it dry. */
83 	while (cl_qlist_count(&p_disp->msg_fifo)) {
84 		/* Pop the message at the head from the FIFO. */
85 		p_msg =
86 		    (cl_disp_msg_t *) cl_qlist_remove_head(&p_disp->msg_fifo);
87 
88 		/* we track the tim ethe last message spent in the queue */
89 		p_disp->last_msg_queue_time_us =
90 		    cl_get_time_stamp() - p_msg->in_time;
91 
92 		/*
93 		 * Release the spinlock while the message is processed.
94 		 * The user's callback may reenter the dispatcher
95 		 * and cause the lock to be reaquired.
96 		 */
97 		cl_spinlock_release(&p_disp->lock);
98 		p_msg->p_dest_reg->pfn_rcv_callback((void *)p_msg->p_dest_reg->
99 						    context,
100 						    (void *)p_msg->p_data);
101 
102 		cl_atomic_dec(&p_msg->p_dest_reg->ref_cnt);
103 
104 		/* The client has seen the data.  Notify the sender as appropriate. */
105 		if (p_msg->pfn_xmt_callback) {
106 			p_msg->pfn_xmt_callback((void *)p_msg->context,
107 						(void *)p_msg->p_data);
108 			cl_atomic_dec(&p_msg->p_src_reg->ref_cnt);
109 		}
110 
111 		/* Grab the lock for the next iteration through the list. */
112 		cl_spinlock_acquire(&p_disp->lock);
113 
114 		/* Return this message to the pool. */
115 		cl_qpool_put(&p_disp->msg_pool, (cl_pool_item_t *) p_msg);
116 	}
117 
118 	cl_spinlock_release(&p_disp->lock);
119 }
120 
121 void cl_disp_construct(IN cl_dispatcher_t * const p_disp)
122 {
123 	CL_ASSERT(p_disp);
124 
125 	cl_qlist_init(&p_disp->reg_list);
126 	cl_ptr_vector_construct(&p_disp->reg_vec);
127 	cl_qlist_init(&p_disp->msg_fifo);
128 	cl_spinlock_construct(&p_disp->lock);
129 	cl_qpool_construct(&p_disp->msg_pool);
130 }
131 
132 void cl_disp_shutdown(IN cl_dispatcher_t * const p_disp)
133 {
134 	CL_ASSERT(p_disp);
135 
136 	/* Stop the thread pool. */
137 	cl_thread_pool_destroy(&p_disp->worker_threads);
138 
139 	/* Process all outstanding callbacks. */
140 	__cl_disp_worker(p_disp);
141 
142 	/* Free all registration info. */
143 	while (!cl_is_qlist_empty(&p_disp->reg_list))
144 		free(cl_qlist_remove_head(&p_disp->reg_list));
145 }
146 
147 void cl_disp_destroy(IN cl_dispatcher_t * const p_disp)
148 {
149 	CL_ASSERT(p_disp);
150 
151 	cl_spinlock_destroy(&p_disp->lock);
152 	/* Destroy the message pool */
153 	cl_qpool_destroy(&p_disp->msg_pool);
154 	/* Destroy the pointer vector of registrants. */
155 	cl_ptr_vector_destroy(&p_disp->reg_vec);
156 }
157 
158 cl_status_t cl_disp_init(IN cl_dispatcher_t * const p_disp,
159 			 IN const uint32_t thread_count,
160 			 IN const char *const name)
161 {
162 	cl_status_t status;
163 
164 	CL_ASSERT(p_disp);
165 
166 	cl_disp_construct(p_disp);
167 
168 	status = cl_spinlock_init(&p_disp->lock);
169 	if (status != CL_SUCCESS) {
170 		cl_disp_destroy(p_disp);
171 		return (status);
172 	}
173 
174 	/* Specify no upper limit to the number of messages in the pool */
175 	status = cl_qpool_init(&p_disp->msg_pool, CL_DISP_INITIAL_MSG_COUNT,
176 			       0, CL_DISP_MSG_GROW_SIZE, sizeof(cl_disp_msg_t),
177 			       NULL, NULL, NULL);
178 	if (status != CL_SUCCESS) {
179 		cl_disp_destroy(p_disp);
180 		return (status);
181 	}
182 
183 	status = cl_ptr_vector_init(&p_disp->reg_vec, CL_DISP_INITIAL_REG_COUNT,
184 				    CL_DISP_REG_GROW_SIZE);
185 	if (status != CL_SUCCESS) {
186 		cl_disp_destroy(p_disp);
187 		return (status);
188 	}
189 
190 	status = cl_thread_pool_init(&p_disp->worker_threads, thread_count,
191 				     __cl_disp_worker, p_disp, name);
192 	if (status != CL_SUCCESS)
193 		cl_disp_destroy(p_disp);
194 
195 	return (status);
196 }
197 
198 cl_disp_reg_handle_t cl_disp_register(IN cl_dispatcher_t * const p_disp,
199 				      IN const cl_disp_msgid_t msg_id,
200 				      IN cl_pfn_msgrcv_cb_t pfn_callback
201 				      OPTIONAL,
202 				      IN const void *const context OPTIONAL)
203 {
204 	cl_disp_reg_info_t *p_reg;
205 	cl_status_t status;
206 
207 	CL_ASSERT(p_disp);
208 
209 	/* Check that the requested registrant ID is available. */
210 	cl_spinlock_acquire(&p_disp->lock);
211 	if ((msg_id != CL_DISP_MSGID_NONE) &&
212 	    (msg_id < cl_ptr_vector_get_size(&p_disp->reg_vec)) &&
213 	    (cl_ptr_vector_get(&p_disp->reg_vec, msg_id))) {
214 		cl_spinlock_release(&p_disp->lock);
215 		return (NULL);
216 	}
217 
218 	/* Get a registration info from the pool. */
219 	p_reg = (cl_disp_reg_info_t *) malloc(sizeof(cl_disp_reg_info_t));
220 	if (!p_reg) {
221 		cl_spinlock_release(&p_disp->lock);
222 		return (NULL);
223 	} else {
224 		memset(p_reg, 0, sizeof(cl_disp_reg_info_t));
225 	}
226 
227 	p_reg->p_disp = p_disp;
228 	p_reg->ref_cnt = 0;
229 	p_reg->pfn_rcv_callback = pfn_callback;
230 	p_reg->context = context;
231 	p_reg->msg_id = msg_id;
232 
233 	/* Insert the registration in the list. */
234 	cl_qlist_insert_tail(&p_disp->reg_list, (cl_list_item_t *) p_reg);
235 
236 	/* Set the array entry to the registrant. */
237 	/* The ptr_vector grow automatically as necessary. */
238 	if (msg_id != CL_DISP_MSGID_NONE) {
239 		status = cl_ptr_vector_set(&p_disp->reg_vec, msg_id, p_reg);
240 		if (status != CL_SUCCESS) {
241 			free(p_reg);
242 			cl_spinlock_release(&p_disp->lock);
243 			return (NULL);
244 		}
245 	}
246 
247 	cl_spinlock_release(&p_disp->lock);
248 
249 	return (p_reg);
250 }
251 
252 void cl_disp_unregister(IN const cl_disp_reg_handle_t handle)
253 {
254 	cl_disp_reg_info_t *p_reg;
255 	cl_dispatcher_t *p_disp;
256 
257 	if (handle == CL_DISP_INVALID_HANDLE)
258 		return;
259 
260 	p_reg = (cl_disp_reg_info_t *) handle;
261 	p_disp = p_reg->p_disp;
262 	CL_ASSERT(p_disp);
263 
264 	cl_spinlock_acquire(&p_disp->lock);
265 	/*
266 	 * Clear the registrant vector entry.  This will cause any further
267 	 * post calls to fail.
268 	 */
269 	if (p_reg->msg_id != CL_DISP_MSGID_NONE) {
270 		CL_ASSERT(p_reg->msg_id <
271 			  cl_ptr_vector_get_size(&p_disp->reg_vec));
272 		cl_ptr_vector_set(&p_disp->reg_vec, p_reg->msg_id, NULL);
273 	}
274 	cl_spinlock_release(&p_disp->lock);
275 
276 	while (p_reg->ref_cnt > 0)
277 		cl_thread_suspend(1);
278 
279 	cl_spinlock_acquire(&p_disp->lock);
280 	/* Remove the registrant from the list. */
281 	cl_qlist_remove_item(&p_disp->reg_list, (cl_list_item_t *) p_reg);
282 	/* Return the registration info to the pool */
283 	free(p_reg);
284 
285 	cl_spinlock_release(&p_disp->lock);
286 }
287 
288 cl_status_t cl_disp_post(IN const cl_disp_reg_handle_t handle,
289 			 IN const cl_disp_msgid_t msg_id,
290 			 IN const void *const p_data,
291 			 IN cl_pfn_msgdone_cb_t pfn_callback OPTIONAL,
292 			 IN const void *const context OPTIONAL)
293 {
294 	cl_disp_reg_info_t *p_src_reg = (cl_disp_reg_info_t *) handle;
295 	cl_disp_reg_info_t *p_dest_reg;
296 	cl_dispatcher_t *p_disp;
297 	cl_disp_msg_t *p_msg;
298 
299 	p_disp = handle->p_disp;
300 	CL_ASSERT(p_disp);
301 	CL_ASSERT(msg_id != CL_DISP_MSGID_NONE);
302 
303 	cl_spinlock_acquire(&p_disp->lock);
304 	/* Check that the recipient exists. */
305 	if (cl_ptr_vector_get_size(&p_disp->reg_vec) <= msg_id) {
306 		cl_spinlock_release(&p_disp->lock);
307 		return (CL_NOT_FOUND);
308 	}
309 
310 	p_dest_reg = cl_ptr_vector_get(&p_disp->reg_vec, msg_id);
311 	if (!p_dest_reg) {
312 		cl_spinlock_release(&p_disp->lock);
313 		return (CL_NOT_FOUND);
314 	}
315 
316 	/* Get a free message from the pool. */
317 	p_msg = (cl_disp_msg_t *) cl_qpool_get(&p_disp->msg_pool);
318 	if (!p_msg) {
319 		cl_spinlock_release(&p_disp->lock);
320 		return (CL_INSUFFICIENT_MEMORY);
321 	}
322 
323 	/* Initialize the message */
324 	p_msg->p_src_reg = p_src_reg;
325 	p_msg->p_dest_reg = p_dest_reg;
326 	p_msg->p_data = p_data;
327 	p_msg->pfn_xmt_callback = pfn_callback;
328 	p_msg->context = context;
329 	p_msg->in_time = cl_get_time_stamp();
330 
331 	/*
332 	 * Increment the sender's reference count if they request a completion
333 	 * notification.
334 	 */
335 	if (pfn_callback)
336 		cl_atomic_inc(&p_src_reg->ref_cnt);
337 
338 	/* Increment the recipient's reference count. */
339 	cl_atomic_inc(&p_dest_reg->ref_cnt);
340 
341 	/* Queue the message in the FIFO. */
342 	cl_qlist_insert_tail(&p_disp->msg_fifo, (cl_list_item_t *) p_msg);
343 	cl_spinlock_release(&p_disp->lock);
344 
345 	/* Signal the thread pool that there is work to be done. */
346 	cl_thread_pool_signal(&p_disp->worker_threads);
347 	return (CL_SUCCESS);
348 }
349 
350 void cl_disp_get_queue_status(IN const cl_disp_reg_handle_t handle,
351 			      OUT uint32_t * p_num_queued_msgs,
352 			      OUT uint64_t * p_last_msg_queue_time_ms)
353 {
354 	cl_dispatcher_t *p_disp = ((cl_disp_reg_info_t *) handle)->p_disp;
355 
356 	cl_spinlock_acquire(&p_disp->lock);
357 
358 	if (p_last_msg_queue_time_ms)
359 		*p_last_msg_queue_time_ms =
360 		    p_disp->last_msg_queue_time_us / 1000;
361 
362 	if (p_num_queued_msgs)
363 		*p_num_queued_msgs = cl_qlist_count(&p_disp->msg_fifo);
364 
365 	cl_spinlock_release(&p_disp->lock);
366 }
367