1 /**
2  * Copyright (c) 2010-2012 Broadcom. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  * 1. Redistributions of source code must retain the above copyright
8  *    notice, this list of conditions, and the following disclaimer,
9  *    without modification.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  * 3. The names of the above-listed copyright holders may not be used
14  *    to endorse or promote products derived from this software without
15  *    specific prior written permission.
16  *
17  * ALTERNATIVELY, this software may be distributed under the terms of the
18  * GNU General Public License ("GPL") version 2, as published by the Free
19  * Software Foundation.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
22  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
23  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
25  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
29  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "vchiq_core.h"
35 
36 #define VCHIQ_SLOT_HANDLER_STACK 8192
37 
38 #define HANDLE_STATE_SHIFT 12
39 
40 #define SLOT_INFO_FROM_INDEX(state, index) (state->slot_info + (index))
41 #define SLOT_DATA_FROM_INDEX(state, index) (state->slot_data + (index))
42 #define SLOT_INDEX_FROM_DATA(state, data) \
43 	(((unsigned int)((char *)data - (char *)state->slot_data)) / \
44 	VCHIQ_SLOT_SIZE)
45 #define SLOT_INDEX_FROM_INFO(state, info) \
46 	((unsigned int)(info - state->slot_info))
47 #define SLOT_QUEUE_INDEX_FROM_POS(pos) \
48 	((int)((unsigned int)(pos) / VCHIQ_SLOT_SIZE))
49 
50 
51 #define BULK_INDEX(x) (x & (VCHIQ_NUM_SERVICE_BULKS - 1))
52 
53 
54 struct vchiq_open_payload {
55 	int fourcc;
56 	int client_id;
57 	short version;
58 	short version_min;
59 };
60 
61 struct vchiq_openack_payload {
62 	short version;
63 };
64 
65 /* we require this for consistency between endpoints */
66 vchiq_static_assert(sizeof(VCHIQ_HEADER_T) == 8);
67 vchiq_static_assert(IS_POW2(sizeof(VCHIQ_HEADER_T)));
68 vchiq_static_assert(IS_POW2(VCHIQ_NUM_CURRENT_BULKS));
69 vchiq_static_assert(IS_POW2(VCHIQ_NUM_SERVICE_BULKS));
70 vchiq_static_assert(IS_POW2(VCHIQ_MAX_SERVICES));
71 vchiq_static_assert(VCHIQ_VERSION >= VCHIQ_VERSION_MIN);
72 
73 /* Run time control of log level, based on KERN_XXX level. */
74 int vchiq_core_log_level = VCHIQ_LOG_DEFAULT;
75 int vchiq_core_msg_log_level = VCHIQ_LOG_DEFAULT;
76 int vchiq_sync_log_level = VCHIQ_LOG_DEFAULT;
77 
78 static atomic_t pause_bulks_count = ATOMIC_INIT(0);
79 
80 static DEFINE_SPINLOCK(service_spinlock);
81 DEFINE_SPINLOCK(bulk_waiter_spinlock);
82 DEFINE_SPINLOCK(quota_spinlock);
83 
84 void
85 vchiq_core_initialize(void)
86 {
87 	spin_lock_init(&service_spinlock);
88 	spin_lock_init(&bulk_waiter_spinlock);
89 	spin_lock_init(&quota_spinlock);
90 }
91 
92 VCHIQ_STATE_T *vchiq_states[VCHIQ_MAX_STATES];
93 static unsigned int handle_seq;
94 
95 static const char *const srvstate_names[] = {
96 	"FREE",
97 	"HIDDEN",
98 	"LISTENING",
99 	"OPENING",
100 	"OPEN",
101 	"OPENSYNC",
102 	"CLOSESENT",
103 	"CLOSERECVD",
104 	"CLOSEWAIT",
105 	"CLOSED"
106 };
107 
108 static const char *const reason_names[] = {
109 	"SERVICE_OPENED",
110 	"SERVICE_CLOSED",
111 	"MESSAGE_AVAILABLE",
112 	"BULK_TRANSMIT_DONE",
113 	"BULK_RECEIVE_DONE",
114 	"BULK_TRANSMIT_ABORTED",
115 	"BULK_RECEIVE_ABORTED"
116 };
117 
118 static const char *const conn_state_names[] = {
119 	"DISCONNECTED",
120 	"CONNECTING",
121 	"CONNECTED",
122 	"PAUSING",
123 	"PAUSE_SENT",
124 	"PAUSED",
125 	"RESUMING",
126 	"PAUSE_TIMEOUT",
127 	"RESUME_TIMEOUT"
128 };
129 
130 
131 static void
132 release_message_sync(VCHIQ_STATE_T *state, VCHIQ_HEADER_T *header);
133 
134 static const char *msg_type_str(unsigned int msg_type)
135 {
136 	switch (msg_type) {
137 	case VCHIQ_MSG_PADDING:       return "PADDING";
138 	case VCHIQ_MSG_CONNECT:       return "CONNECT";
139 	case VCHIQ_MSG_OPEN:          return "OPEN";
140 	case VCHIQ_MSG_OPENACK:       return "OPENACK";
141 	case VCHIQ_MSG_CLOSE:         return "CLOSE";
142 	case VCHIQ_MSG_DATA:          return "DATA";
143 	case VCHIQ_MSG_BULK_RX:       return "BULK_RX";
144 	case VCHIQ_MSG_BULK_TX:       return "BULK_TX";
145 	case VCHIQ_MSG_BULK_RX_DONE:  return "BULK_RX_DONE";
146 	case VCHIQ_MSG_BULK_TX_DONE:  return "BULK_TX_DONE";
147 	case VCHIQ_MSG_PAUSE:         return "PAUSE";
148 	case VCHIQ_MSG_RESUME:        return "RESUME";
149 	case VCHIQ_MSG_REMOTE_USE:    return "REMOTE_USE";
150 	case VCHIQ_MSG_REMOTE_RELEASE:      return "REMOTE_RELEASE";
151 	case VCHIQ_MSG_REMOTE_USE_ACTIVE:   return "REMOTE_USE_ACTIVE";
152 	}
153 	return "???";
154 }
155 
156 static inline void
157 vchiq_set_service_state(VCHIQ_SERVICE_T *service, int newstate)
158 {
159 	vchiq_log_info(vchiq_core_log_level, "%d: srv:%d %s->%s",
160 		service->state->id, service->localport,
161 		srvstate_names[service->srvstate],
162 		srvstate_names[newstate]);
163 	service->srvstate = newstate;
164 }
165 
166 VCHIQ_SERVICE_T *
167 find_service_by_handle(VCHIQ_SERVICE_HANDLE_T handle)
168 {
169 	VCHIQ_SERVICE_T *service;
170 
171 	spin_lock(&service_spinlock);
172 	service = handle_to_service(handle);
173 	if (service && (service->srvstate != VCHIQ_SRVSTATE_FREE) &&
174 		(service->handle == handle)) {
175 		BUG_ON(service->ref_count == 0);
176 		service->ref_count++;
177 	} else
178 		service = NULL;
179 	spin_unlock(&service_spinlock);
180 
181 	if (!service)
182 		vchiq_log_info(vchiq_core_log_level,
183 			"Invalid service handle 0x%x", handle);
184 
185 	return service;
186 }
187 
188 VCHIQ_SERVICE_T *
189 find_service_by_port(VCHIQ_STATE_T *state, int localport)
190 {
191 	VCHIQ_SERVICE_T *service = NULL;
192 	if ((unsigned int)localport <= VCHIQ_PORT_MAX) {
193 		spin_lock(&service_spinlock);
194 		service = state->services[localport];
195 		if (service && (service->srvstate != VCHIQ_SRVSTATE_FREE)) {
196 			BUG_ON(service->ref_count == 0);
197 			service->ref_count++;
198 		} else
199 			service = NULL;
200 		spin_unlock(&service_spinlock);
201 	}
202 
203 	if (!service)
204 		vchiq_log_info(vchiq_core_log_level,
205 			"Invalid port %d", localport);
206 
207 	return service;
208 }
209 
210 VCHIQ_SERVICE_T *
211 find_service_for_instance(VCHIQ_INSTANCE_T instance,
212 	VCHIQ_SERVICE_HANDLE_T handle) {
213 	VCHIQ_SERVICE_T *service;
214 
215 	spin_lock(&service_spinlock);
216 	service = handle_to_service(handle);
217 	if (service && (service->srvstate != VCHIQ_SRVSTATE_FREE) &&
218 		(service->handle == handle) &&
219 		(service->instance == instance)) {
220 		BUG_ON(service->ref_count == 0);
221 		service->ref_count++;
222 	} else
223 		service = NULL;
224 	spin_unlock(&service_spinlock);
225 
226 	if (!service)
227 		vchiq_log_info(vchiq_core_log_level,
228 			"Invalid service handle 0x%x", handle);
229 
230 	return service;
231 }
232 
233 VCHIQ_SERVICE_T *
234 next_service_by_instance(VCHIQ_STATE_T *state, VCHIQ_INSTANCE_T instance,
235 	int *pidx)
236 {
237 	VCHIQ_SERVICE_T *service = NULL;
238 	int idx = *pidx;
239 
240 	spin_lock(&service_spinlock);
241 	while (idx < state->unused_service) {
242 		VCHIQ_SERVICE_T *srv = state->services[idx++];
243 		if (srv && (srv->srvstate != VCHIQ_SRVSTATE_FREE) &&
244 			(srv->instance == instance)) {
245 			service = srv;
246 			BUG_ON(service->ref_count == 0);
247 			service->ref_count++;
248 			break;
249 		}
250 	}
251 	spin_unlock(&service_spinlock);
252 
253 	*pidx = idx;
254 
255 	return service;
256 }
257 
258 void
259 lock_service(VCHIQ_SERVICE_T *service)
260 {
261 	spin_lock(&service_spinlock);
262 	BUG_ON(!service || (service->ref_count == 0));
263 	if (service)
264 		service->ref_count++;
265 	spin_unlock(&service_spinlock);
266 }
267 
268 void
269 unlock_service(VCHIQ_SERVICE_T *service)
270 {
271 	VCHIQ_STATE_T *state = service->state;
272 	spin_lock(&service_spinlock);
273 	BUG_ON(!service || (service->ref_count == 0));
274 	if (service && service->ref_count) {
275 		service->ref_count--;
276 		if (!service->ref_count) {
277 			BUG_ON(service->srvstate != VCHIQ_SRVSTATE_FREE);
278 			state->services[service->localport] = NULL;
279 
280 			_sema_destroy(&service->remove_event);
281 			_sema_destroy(&service->bulk_remove_event);
282 			lmutex_destroy(&service->bulk_mutex);
283 		} else
284 			service = NULL;
285 	}
286 	spin_unlock(&service_spinlock);
287 
288 	if (service && service->userdata_term)
289 		service->userdata_term(service->base.userdata);
290 
291 	kfree(service);
292 }
293 
294 int
295 vchiq_get_client_id(VCHIQ_SERVICE_HANDLE_T handle)
296 {
297 	VCHIQ_SERVICE_T *service = find_service_by_handle(handle);
298 	int id;
299 
300 	id = service ? service->client_id : 0;
301 	if (service)
302 		unlock_service(service);
303 
304 	return id;
305 }
306 
307 void *
308 vchiq_get_service_userdata(VCHIQ_SERVICE_HANDLE_T handle)
309 {
310 	VCHIQ_SERVICE_T *service = handle_to_service(handle);
311 
312 	return service ? service->base.userdata : NULL;
313 }
314 
315 int
316 vchiq_get_service_fourcc(VCHIQ_SERVICE_HANDLE_T handle)
317 {
318 	VCHIQ_SERVICE_T *service = handle_to_service(handle);
319 
320 	return service ? service->base.fourcc : 0;
321 }
322 
323 static void
324 mark_service_closing_internal(VCHIQ_SERVICE_T *service, int sh_thread)
325 {
326 	VCHIQ_STATE_T *state = service->state;
327 	VCHIQ_SERVICE_QUOTA_T *service_quota;
328 
329 	service->closing = 1;
330 
331 	/* Synchronise with other threads. */
332 	lmutex_lock(&state->recycle_mutex);
333 	lmutex_unlock(&state->recycle_mutex);
334 	if (!sh_thread || (state->conn_state != VCHIQ_CONNSTATE_PAUSE_SENT)) {
335 		/* If we're pausing then the slot_mutex is held until resume
336 		 * by the slot handler.  Therefore don't try to acquire this
337 		 * mutex if we're the slot handler and in the pause sent state.
338 		 * We don't need to in this case anyway. */
339 		lmutex_lock(&state->slot_mutex);
340 		lmutex_unlock(&state->slot_mutex);
341 	}
342 
343 	/* Unblock any sending thread. */
344 	service_quota = &state->service_quotas[service->localport];
345 	up(&service_quota->quota_event);
346 }
347 
348 static void
349 mark_service_closing(VCHIQ_SERVICE_T *service)
350 {
351 	mark_service_closing_internal(service, 0);
352 }
353 
354 static inline VCHIQ_STATUS_T
355 make_service_callback(VCHIQ_SERVICE_T *service, VCHIQ_REASON_T reason,
356 	VCHIQ_HEADER_T *header, void *bulk_userdata)
357 {
358 	VCHIQ_STATUS_T status;
359 	vchiq_log_trace(vchiq_core_log_level, "%d: callback:%d (%s, %x, %x)",
360 		service->state->id, service->localport, reason_names[reason],
361 		(unsigned int)header, (unsigned int)bulk_userdata);
362 	status = service->base.callback(reason, header, service->handle,
363 		bulk_userdata);
364 	if (status == VCHIQ_ERROR) {
365 		vchiq_log_warning(vchiq_core_log_level,
366 			"%d: ignoring ERROR from callback to service %x",
367 			service->state->id, service->handle);
368 		status = VCHIQ_SUCCESS;
369 	}
370 	return status;
371 }
372 
373 inline void
374 vchiq_set_conn_state(VCHIQ_STATE_T *state, VCHIQ_CONNSTATE_T newstate)
375 {
376 	VCHIQ_CONNSTATE_T oldstate = state->conn_state;
377 	vchiq_log_info(vchiq_core_log_level, "%d: %s->%s", state->id,
378 		conn_state_names[oldstate],
379 		conn_state_names[newstate]);
380 	state->conn_state = newstate;
381 	vchiq_platform_conn_state_changed(state, oldstate, newstate);
382 }
383 
384 static inline void
385 remote_event_create(REMOTE_EVENT_T *event)
386 {
387 	event->armed = 0;
388 	/* Don't clear the 'fired' flag because it may already have been set
389 	** by the other side. */
390 	_sema_init(event->event, 0);
391 }
392 
393 __unused static inline void
394 remote_event_destroy(REMOTE_EVENT_T *event)
395 {
396 	(void)event;
397 }
398 
399 static inline int
400 remote_event_wait(REMOTE_EVENT_T *event)
401 {
402 	if (!event->fired) {
403 		event->armed = 1;
404 		dsb();
405 		if (!event->fired) {
406 			if (down_interruptible(event->event) != 0) {
407 				event->armed = 0;
408 				return 0;
409 			}
410 		}
411 		event->armed = 0;
412 		wmb();
413 	}
414 
415 	event->fired = 0;
416 	return 1;
417 }
418 
419 static inline void
420 remote_event_signal_local(REMOTE_EVENT_T *event)
421 {
422 	event->armed = 0;
423 	up(event->event);
424 }
425 
426 static inline void
427 remote_event_poll(REMOTE_EVENT_T *event)
428 {
429 	if (event->fired && event->armed)
430 		remote_event_signal_local(event);
431 }
432 
433 void
434 remote_event_pollall(VCHIQ_STATE_T *state)
435 {
436 	remote_event_poll(&state->local->sync_trigger);
437 	remote_event_poll(&state->local->sync_release);
438 	remote_event_poll(&state->local->trigger);
439 	remote_event_poll(&state->local->recycle);
440 }
441 
442 /* Round up message sizes so that any space at the end of a slot is always big
443 ** enough for a header. This relies on header size being a power of two, which
444 ** has been verified earlier by a static assertion. */
445 
446 static inline unsigned int
447 calc_stride(unsigned int size)
448 {
449 	/* Allow room for the header */
450 	size += sizeof(VCHIQ_HEADER_T);
451 
452 	/* Round up */
453 	return (size + sizeof(VCHIQ_HEADER_T) - 1) & ~(sizeof(VCHIQ_HEADER_T)
454 		- 1);
455 }
456 
457 /* Called by the slot handler thread */
458 static VCHIQ_SERVICE_T *
459 get_listening_service(VCHIQ_STATE_T *state, int fourcc)
460 {
461 	int i;
462 
463 	WARN_ON(fourcc == VCHIQ_FOURCC_INVALID);
464 
465 	for (i = 0; i < state->unused_service; i++) {
466 		VCHIQ_SERVICE_T *service = state->services[i];
467 		if (service &&
468 			(service->public_fourcc == fourcc) &&
469 			((service->srvstate == VCHIQ_SRVSTATE_LISTENING) ||
470 			((service->srvstate == VCHIQ_SRVSTATE_OPEN) &&
471 			(service->remoteport == VCHIQ_PORT_FREE)))) {
472 			lock_service(service);
473 			return service;
474 		}
475 	}
476 
477 	return NULL;
478 }
479 
480 /* Called by the slot handler thread */
481 static VCHIQ_SERVICE_T *
482 get_connected_service(VCHIQ_STATE_T *state, unsigned int port)
483 {
484 	int i;
485 	for (i = 0; i < state->unused_service; i++) {
486 		VCHIQ_SERVICE_T *service = state->services[i];
487 		if (service && (service->srvstate == VCHIQ_SRVSTATE_OPEN)
488 			&& (service->remoteport == port)) {
489 			lock_service(service);
490 			return service;
491 		}
492 	}
493 	return NULL;
494 }
495 
496 inline void
497 request_poll(VCHIQ_STATE_T *state, VCHIQ_SERVICE_T *service, int poll_type)
498 {
499 	uint32_t value;
500 
501 	if (service) {
502 		do {
503 			value = atomic_read(&service->poll_flags);
504 		} while (atomic_cmpxchg(&service->poll_flags, value,
505 			value | (1 << poll_type)) != value);
506 
507 		do {
508 			value = atomic_read(&state->poll_services[
509 				service->localport>>5]);
510 		} while (atomic_cmpxchg(
511 			&state->poll_services[service->localport>>5],
512 			value, value | (1 << (service->localport & 0x1f)))
513 			!= value);
514 	}
515 
516 	state->poll_needed = 1;
517 	wmb();
518 
519 	/* ... and ensure the slot handler runs. */
520 	remote_event_signal_local(&state->local->trigger);
521 }
522 
523 /* Called from queue_message, by the slot handler and application threads,
524 ** with slot_mutex held */
525 static VCHIQ_HEADER_T *
526 reserve_space(VCHIQ_STATE_T *state, int space, int is_blocking)
527 {
528 	VCHIQ_SHARED_STATE_T *local = state->local;
529 	int tx_pos = state->local_tx_pos;
530 	int slot_space = VCHIQ_SLOT_SIZE - (tx_pos & VCHIQ_SLOT_MASK);
531 
532 	if (space > slot_space) {
533 		VCHIQ_HEADER_T *header;
534 		/* Fill the remaining space with padding */
535 		WARN_ON(state->tx_data == NULL);
536 		header = (VCHIQ_HEADER_T *)
537 			(state->tx_data + (tx_pos & VCHIQ_SLOT_MASK));
538 		header->msgid = VCHIQ_MSGID_PADDING;
539 		header->size = slot_space - sizeof(VCHIQ_HEADER_T);
540 
541 		tx_pos += slot_space;
542 	}
543 
544 	/* If necessary, get the next slot. */
545 	if ((tx_pos & VCHIQ_SLOT_MASK) == 0) {
546 		int slot_index;
547 
548 		/* If there is no free slot... */
549 
550 		if (down_trylock(&state->slot_available_event) != 0) {
551 			/* ...wait for one. */
552 
553 			VCHIQ_STATS_INC(state, slot_stalls);
554 
555 			/* But first, flush through the last slot. */
556 			state->local_tx_pos = tx_pos;
557 			local->tx_pos = tx_pos;
558 			remote_event_signal(&state->remote->trigger);
559 
560 			if (!is_blocking ||
561 				(down_interruptible(
562 				&state->slot_available_event) != 0))
563 				return NULL; /* No space available */
564 		}
565 
566 		BUG_ON(tx_pos ==
567 			(state->slot_queue_available * VCHIQ_SLOT_SIZE));
568 
569 		slot_index = local->slot_queue[
570 			SLOT_QUEUE_INDEX_FROM_POS(tx_pos) &
571 			VCHIQ_SLOT_QUEUE_MASK];
572 		state->tx_data =
573 			(char *)SLOT_DATA_FROM_INDEX(state, slot_index);
574 	}
575 
576 	state->local_tx_pos = tx_pos + space;
577 
578 	return (VCHIQ_HEADER_T *)(state->tx_data + (tx_pos & VCHIQ_SLOT_MASK));
579 }
580 
581 /* Called by the recycle thread. */
582 static void
583 process_free_queue(VCHIQ_STATE_T *state)
584 {
585 	VCHIQ_SHARED_STATE_T *local = state->local;
586 	BITSET_T service_found[BITSET_SIZE(VCHIQ_MAX_SERVICES)];
587 	int slot_queue_available;
588 
589 	/* Use a read memory barrier to ensure that any state that may have
590 	** been modified by another thread is not masked by stale prefetched
591 	** values. */
592 	rmb();
593 
594 	/* Find slots which have been freed by the other side, and return them
595 	** to the available queue. */
596 	slot_queue_available = state->slot_queue_available;
597 
598 	while (slot_queue_available != local->slot_queue_recycle) {
599 		unsigned int pos;
600 		int slot_index = local->slot_queue[slot_queue_available++ &
601 			VCHIQ_SLOT_QUEUE_MASK];
602 		char *data = (char *)SLOT_DATA_FROM_INDEX(state, slot_index);
603 		int data_found = 0;
604 
605 		vchiq_log_trace(vchiq_core_log_level, "%d: pfq %d=%x %x %x",
606 			state->id, slot_index, (unsigned int)data,
607 			local->slot_queue_recycle, slot_queue_available);
608 
609 		/* Initialise the bitmask for services which have used this
610 		** slot */
611 		BITSET_ZERO(service_found);
612 
613 		pos = 0;
614 
615 		while (pos < VCHIQ_SLOT_SIZE) {
616 			VCHIQ_HEADER_T *header =
617 				(VCHIQ_HEADER_T *)(data + pos);
618 			int msgid = header->msgid;
619 			if (VCHIQ_MSG_TYPE(msgid) == VCHIQ_MSG_DATA) {
620 				int port = VCHIQ_MSG_SRCPORT(msgid);
621 				VCHIQ_SERVICE_QUOTA_T *service_quota =
622 					&state->service_quotas[port];
623 				int count;
624 				spin_lock(&quota_spinlock);
625 				count = service_quota->message_use_count;
626 				if (count > 0)
627 					service_quota->message_use_count =
628 						count - 1;
629 				spin_unlock(&quota_spinlock);
630 
631 				if (count == service_quota->message_quota)
632 					/* Signal the service that it
633 					** has dropped below its quota
634 					*/
635 					up(&service_quota->quota_event);
636 				else if (count == 0) {
637 					vchiq_log_error(vchiq_core_log_level,
638 						"service %d "
639 						"message_use_count=%d "
640 						"(header %x, msgid %x, "
641 						"header->msgid %x, "
642 						"header->size %x)",
643 						port,
644 						service_quota->
645 							message_use_count,
646 						(unsigned int)header, msgid,
647 						header->msgid,
648 						header->size);
649 					WARN(1, "invalid message use count\n");
650 				}
651 				if (!BITSET_IS_SET(service_found, port)) {
652 					/* Set the found bit for this service */
653 					BITSET_SET(service_found, port);
654 
655 					spin_lock(&quota_spinlock);
656 					count = service_quota->slot_use_count;
657 					if (count > 0)
658 						service_quota->slot_use_count =
659 							count - 1;
660 					spin_unlock(&quota_spinlock);
661 
662 					if (count > 0) {
663 						/* Signal the service in case
664 						** it has dropped below its
665 						** quota */
666 						up(&service_quota->quota_event);
667 						vchiq_log_trace(
668 							vchiq_core_log_level,
669 							"%d: pfq:%d %x@%x - "
670 							"slot_use->%d",
671 							state->id, port,
672 							header->size,
673 							(unsigned int)header,
674 							count - 1);
675 					} else {
676 						vchiq_log_error(
677 							vchiq_core_log_level,
678 								"service %d "
679 								"slot_use_count"
680 								"=%d (header %x"
681 								", msgid %x, "
682 								"header->msgid"
683 								" %x, header->"
684 								"size %x)",
685 							port, count,
686 							(unsigned int)header,
687 							msgid,
688 							header->msgid,
689 							header->size);
690 						WARN(1, "bad slot use count\n");
691 					}
692 				}
693 
694 				data_found = 1;
695 			}
696 
697 			pos += calc_stride(header->size);
698 			if (pos > VCHIQ_SLOT_SIZE) {
699 				vchiq_log_error(vchiq_core_log_level,
700 					"pfq - pos %x: header %x, msgid %x, "
701 					"header->msgid %x, header->size %x",
702 					pos, (unsigned int)header, msgid,
703 					header->msgid, header->size);
704 				WARN(1, "invalid slot position\n");
705 			}
706 		}
707 
708 		if (data_found) {
709 			int count;
710 			spin_lock(&quota_spinlock);
711 			count = state->data_use_count;
712 			if (count > 0)
713 				state->data_use_count =
714 					count - 1;
715 			spin_unlock(&quota_spinlock);
716 			if (count == state->data_quota)
717 				up(&state->data_quota_event);
718 		}
719 
720 		state->slot_queue_available = slot_queue_available;
721 		up(&state->slot_available_event);
722 	}
723 }
724 
725 /* Called by the slot handler and application threads */
726 static VCHIQ_STATUS_T
727 queue_message(VCHIQ_STATE_T *state, VCHIQ_SERVICE_T *service,
728 	int msgid, const VCHIQ_ELEMENT_T *elements,
729 	int count, int size, int is_blocking)
730 {
731 	VCHIQ_SHARED_STATE_T *local;
732 	VCHIQ_SERVICE_QUOTA_T *service_quota = NULL;
733 	VCHIQ_HEADER_T *header;
734 	int type = VCHIQ_MSG_TYPE(msgid);
735 
736 	unsigned int stride;
737 
738 	local = state->local;
739 
740 	stride = calc_stride(size);
741 
742 	WARN_ON(!(stride <= VCHIQ_SLOT_SIZE));
743 
744 	if ((type != VCHIQ_MSG_RESUME) &&
745 		(lmutex_lock_interruptible(&state->slot_mutex) != 0))
746 		return VCHIQ_RETRY;
747 
748 	if (type == VCHIQ_MSG_DATA) {
749 		int tx_end_index;
750 
751 		BUG_ON(!service);
752 
753 		if (service->closing) {
754 			/* The service has been closed */
755 			lmutex_unlock(&state->slot_mutex);
756 			return VCHIQ_ERROR;
757 		}
758 
759 		service_quota = &state->service_quotas[service->localport];
760 
761 		spin_lock(&quota_spinlock);
762 
763 		/* Ensure this service doesn't use more than its quota of
764 		** messages or slots */
765 		tx_end_index = SLOT_QUEUE_INDEX_FROM_POS(
766 			state->local_tx_pos + stride - 1);
767 
768 		/* Ensure data messages don't use more than their quota of
769 		** slots */
770 		while ((tx_end_index != state->previous_data_index) &&
771 			(state->data_use_count == state->data_quota)) {
772 			VCHIQ_STATS_INC(state, data_stalls);
773 			spin_unlock(&quota_spinlock);
774 			lmutex_unlock(&state->slot_mutex);
775 
776 			if (down_interruptible(&state->data_quota_event)
777 				!= 0)
778 				return VCHIQ_RETRY;
779 
780 			lmutex_lock(&state->slot_mutex);
781 			spin_lock(&quota_spinlock);
782 			tx_end_index = SLOT_QUEUE_INDEX_FROM_POS(
783 				state->local_tx_pos + stride - 1);
784 			if ((tx_end_index == state->previous_data_index) ||
785 				(state->data_use_count < state->data_quota)) {
786 				/* Pass the signal on to other waiters */
787 				up(&state->data_quota_event);
788 				break;
789 			}
790 		}
791 
792 		while ((service_quota->message_use_count ==
793 				service_quota->message_quota) ||
794 			((tx_end_index != service_quota->previous_tx_index) &&
795 			(service_quota->slot_use_count ==
796 				service_quota->slot_quota))) {
797 			spin_unlock(&quota_spinlock);
798 			vchiq_log_trace(vchiq_core_log_level,
799 				"%d: qm:%d %s,%x - quota stall "
800 				"(msg %d, slot %d)",
801 				state->id, service->localport,
802 				msg_type_str(type), size,
803 				service_quota->message_use_count,
804 				service_quota->slot_use_count);
805 			VCHIQ_SERVICE_STATS_INC(service, quota_stalls);
806 			lmutex_unlock(&state->slot_mutex);
807 			if (down_interruptible(&service_quota->quota_event)
808 				!= 0)
809 				return VCHIQ_RETRY;
810 			if (service->closing)
811 				return VCHIQ_ERROR;
812 			if (lmutex_lock_interruptible(&state->slot_mutex) != 0)
813 				return VCHIQ_RETRY;
814 			if (service->srvstate != VCHIQ_SRVSTATE_OPEN) {
815 				/* The service has been closed */
816 				lmutex_unlock(&state->slot_mutex);
817 				return VCHIQ_ERROR;
818 			}
819 			spin_lock(&quota_spinlock);
820 			tx_end_index = SLOT_QUEUE_INDEX_FROM_POS(
821 				state->local_tx_pos + stride - 1);
822 		}
823 
824 		spin_unlock(&quota_spinlock);
825 	}
826 
827 	header = reserve_space(state, stride, is_blocking);
828 
829 	if (!header) {
830 		if (service)
831 			VCHIQ_SERVICE_STATS_INC(service, slot_stalls);
832 		lmutex_unlock(&state->slot_mutex);
833 		return VCHIQ_RETRY;
834 	}
835 
836 	if (type == VCHIQ_MSG_DATA) {
837 		int i, pos;
838 		int tx_end_index;
839 		int slot_use_count;
840 
841 		vchiq_log_info(vchiq_core_log_level,
842 			"%d: qm %s@%x,%x (%d->%d)",
843 			state->id,
844 			msg_type_str(VCHIQ_MSG_TYPE(msgid)),
845 			(unsigned int)header, size,
846 			VCHIQ_MSG_SRCPORT(msgid),
847 			VCHIQ_MSG_DSTPORT(msgid));
848 
849 		BUG_ON(!service);
850 
851 		for (i = 0, pos = 0; i < (unsigned int)count;
852 			pos += elements[i++].size)
853 			if (elements[i].size) {
854 				if (vchiq_copy_from_user
855 					(header->data + pos, elements[i].data,
856 					(size_t) elements[i].size) !=
857 					VCHIQ_SUCCESS) {
858 					lmutex_unlock(&state->slot_mutex);
859 					VCHIQ_SERVICE_STATS_INC(service,
860 						error_count);
861 					return VCHIQ_ERROR;
862 				}
863 				if (i == 0) {
864 					if (vchiq_core_msg_log_level >=
865 						VCHIQ_LOG_INFO)
866 						vchiq_log_dump_mem("Sent", 0,
867 							header->data + pos,
868 							min(64,
869 							elements[0].size));
870 				}
871 			}
872 
873 		spin_lock(&quota_spinlock);
874 		service_quota->message_use_count++;
875 
876 		tx_end_index =
877 			SLOT_QUEUE_INDEX_FROM_POS(state->local_tx_pos - 1);
878 
879 		/* If this transmission can't fit in the last slot used by any
880 		** service, the data_use_count must be increased. */
881 		if (tx_end_index != state->previous_data_index) {
882 			state->previous_data_index = tx_end_index;
883 			state->data_use_count++;
884 		}
885 
886 		/* If this isn't the same slot last used by this service,
887 		** the service's slot_use_count must be increased. */
888 		if (tx_end_index != service_quota->previous_tx_index) {
889 			service_quota->previous_tx_index = tx_end_index;
890 			slot_use_count = ++service_quota->slot_use_count;
891 		} else {
892 			slot_use_count = 0;
893 		}
894 
895 		spin_unlock(&quota_spinlock);
896 
897 		if (slot_use_count)
898 			vchiq_log_trace(vchiq_core_log_level,
899 				"%d: qm:%d %s,%x - slot_use->%d (hdr %p)",
900 				state->id, service->localport,
901 				msg_type_str(VCHIQ_MSG_TYPE(msgid)), size,
902 				slot_use_count, header);
903 
904 		VCHIQ_SERVICE_STATS_INC(service, ctrl_tx_count);
905 		VCHIQ_SERVICE_STATS_ADD(service, ctrl_tx_bytes, size);
906 	} else {
907 		vchiq_log_info(vchiq_core_log_level,
908 			"%d: qm %s@%x,%x (%d->%d)", state->id,
909 			msg_type_str(VCHIQ_MSG_TYPE(msgid)),
910 			(unsigned int)header, size,
911 			VCHIQ_MSG_SRCPORT(msgid),
912 			VCHIQ_MSG_DSTPORT(msgid));
913 		if (size != 0) {
914 			WARN_ON(!((count == 1) && (size == elements[0].size)));
915 			memcpy(header->data, elements[0].data,
916 				elements[0].size);
917 		}
918 		VCHIQ_STATS_INC(state, ctrl_tx_count);
919 	}
920 
921 	header->msgid = msgid;
922 	header->size = size;
923 
924 	{
925 		int svc_fourcc;
926 
927 		svc_fourcc = service
928 			? service->base.fourcc
929 			: VCHIQ_MAKE_FOURCC('?', '?', '?', '?');
930 
931 		vchiq_log_info(vchiq_core_msg_log_level,
932 			"Sent Msg %s(%u) to %c%c%c%c s:%u d:%d len:%d",
933 			msg_type_str(VCHIQ_MSG_TYPE(msgid)),
934 			VCHIQ_MSG_TYPE(msgid),
935 			VCHIQ_FOURCC_AS_4CHARS(svc_fourcc),
936 			VCHIQ_MSG_SRCPORT(msgid),
937 			VCHIQ_MSG_DSTPORT(msgid),
938 			size);
939 	}
940 
941 	/* Make sure the new header is visible to the peer. */
942 	wmb();
943 
944 	/* Make the new tx_pos visible to the peer. */
945 	local->tx_pos = state->local_tx_pos;
946 	wmb();
947 
948 	if (service && (type == VCHIQ_MSG_CLOSE))
949 		vchiq_set_service_state(service, VCHIQ_SRVSTATE_CLOSESENT);
950 
951 	if (VCHIQ_MSG_TYPE(msgid) != VCHIQ_MSG_PAUSE)
952 		lmutex_unlock(&state->slot_mutex);
953 
954 	remote_event_signal(&state->remote->trigger);
955 
956 	return VCHIQ_SUCCESS;
957 }
958 
959 /* Called by the slot handler and application threads */
960 static VCHIQ_STATUS_T
961 queue_message_sync(VCHIQ_STATE_T *state, VCHIQ_SERVICE_T *service,
962 	int msgid, const VCHIQ_ELEMENT_T *elements,
963 	int count, int size, int is_blocking)
964 {
965 	VCHIQ_SHARED_STATE_T *local;
966 	VCHIQ_HEADER_T *header;
967 
968 	local = state->local;
969 
970 	if ((VCHIQ_MSG_TYPE(msgid) != VCHIQ_MSG_RESUME) &&
971 		(lmutex_lock_interruptible(&state->sync_mutex) != 0))
972 		return VCHIQ_RETRY;
973 
974 	remote_event_wait(&local->sync_release);
975 
976 	rmb();
977 
978 	header = (VCHIQ_HEADER_T *)SLOT_DATA_FROM_INDEX(state,
979 		local->slot_sync);
980 
981 	{
982 		int oldmsgid = header->msgid;
983 		if (oldmsgid != VCHIQ_MSGID_PADDING)
984 			vchiq_log_error(vchiq_core_log_level,
985 				"%d: qms - msgid %x, not PADDING",
986 				state->id, oldmsgid);
987 	}
988 
989 	if (service) {
990 		int i, pos;
991 
992 		vchiq_log_info(vchiq_sync_log_level,
993 			"%d: qms %s@%x,%x (%d->%d)", state->id,
994 			msg_type_str(VCHIQ_MSG_TYPE(msgid)),
995 			(unsigned int)header, size,
996 			VCHIQ_MSG_SRCPORT(msgid),
997 			VCHIQ_MSG_DSTPORT(msgid));
998 
999 		for (i = 0, pos = 0; i < (unsigned int)count;
1000 			pos += elements[i++].size)
1001 			if (elements[i].size) {
1002 				if (vchiq_copy_from_user
1003 					(header->data + pos, elements[i].data,
1004 					(size_t) elements[i].size) !=
1005 					VCHIQ_SUCCESS) {
1006 					lmutex_unlock(&state->sync_mutex);
1007 					VCHIQ_SERVICE_STATS_INC(service,
1008 						error_count);
1009 					return VCHIQ_ERROR;
1010 				}
1011 				if (i == 0) {
1012 					if (vchiq_sync_log_level >=
1013 						VCHIQ_LOG_TRACE)
1014 						vchiq_log_dump_mem("Sent Sync",
1015 							0, header->data + pos,
1016 							min(64,
1017 							elements[0].size));
1018 				}
1019 			}
1020 
1021 		VCHIQ_SERVICE_STATS_INC(service, ctrl_tx_count);
1022 		VCHIQ_SERVICE_STATS_ADD(service, ctrl_tx_bytes, size);
1023 	} else {
1024 		vchiq_log_info(vchiq_sync_log_level,
1025 			"%d: qms %s@%x,%x (%d->%d)", state->id,
1026 			msg_type_str(VCHIQ_MSG_TYPE(msgid)),
1027 			(unsigned int)header, size,
1028 			VCHIQ_MSG_SRCPORT(msgid),
1029 			VCHIQ_MSG_DSTPORT(msgid));
1030 		if (size != 0) {
1031 			WARN_ON(!((count == 1) && (size == elements[0].size)));
1032 			memcpy(header->data, elements[0].data,
1033 				elements[0].size);
1034 		}
1035 		VCHIQ_STATS_INC(state, ctrl_tx_count);
1036 	}
1037 
1038 	header->size = size;
1039 	header->msgid = msgid;
1040 
1041 	if (vchiq_sync_log_level >= VCHIQ_LOG_TRACE) {
1042 		int svc_fourcc;
1043 
1044 		svc_fourcc = service
1045 			? service->base.fourcc
1046 			: VCHIQ_MAKE_FOURCC('?', '?', '?', '?');
1047 
1048 		vchiq_log_trace(vchiq_sync_log_level,
1049 			"Sent Sync Msg %s(%u) to %c%c%c%c s:%u d:%d len:%d",
1050 			msg_type_str(VCHIQ_MSG_TYPE(msgid)),
1051 			VCHIQ_MSG_TYPE(msgid),
1052 			VCHIQ_FOURCC_AS_4CHARS(svc_fourcc),
1053 			VCHIQ_MSG_SRCPORT(msgid),
1054 			VCHIQ_MSG_DSTPORT(msgid),
1055 			size);
1056 	}
1057 
1058 	/* Make sure the new header is visible to the peer. */
1059 	wmb();
1060 
1061 	remote_event_signal(&state->remote->sync_trigger);
1062 
1063 	if (VCHIQ_MSG_TYPE(msgid) != VCHIQ_MSG_PAUSE)
1064 		lmutex_unlock(&state->sync_mutex);
1065 
1066 	return VCHIQ_SUCCESS;
1067 }
1068 
1069 static inline void
1070 claim_slot(VCHIQ_SLOT_INFO_T *slot)
1071 {
1072 	slot->use_count++;
1073 }
1074 
1075 static void
1076 release_slot(VCHIQ_STATE_T *state, VCHIQ_SLOT_INFO_T *slot_info,
1077 	VCHIQ_HEADER_T *header, VCHIQ_SERVICE_T *service)
1078 {
1079 	int release_count;
1080 
1081 	lmutex_lock(&state->recycle_mutex);
1082 
1083 	if (header) {
1084 		int msgid = header->msgid;
1085 		if (((msgid & VCHIQ_MSGID_CLAIMED) == 0) ||
1086 			(service && service->closing)) {
1087 			lmutex_unlock(&state->recycle_mutex);
1088 			return;
1089 		}
1090 
1091 		/* Rewrite the message header to prevent a double
1092 		** release */
1093 		header->msgid = msgid & ~VCHIQ_MSGID_CLAIMED;
1094 	}
1095 
1096 	release_count = slot_info->release_count;
1097 	slot_info->release_count = ++release_count;
1098 
1099 	if (release_count == slot_info->use_count) {
1100 		int slot_queue_recycle;
1101 		/* Add to the freed queue */
1102 
1103 		/* A read barrier is necessary here to prevent speculative
1104 		** fetches of remote->slot_queue_recycle from overtaking the
1105 		** mutex. */
1106 		rmb();
1107 
1108 		slot_queue_recycle = state->remote->slot_queue_recycle;
1109 		state->remote->slot_queue[slot_queue_recycle &
1110 			VCHIQ_SLOT_QUEUE_MASK] =
1111 			SLOT_INDEX_FROM_INFO(state, slot_info);
1112 		state->remote->slot_queue_recycle = slot_queue_recycle + 1;
1113 		vchiq_log_info(vchiq_core_log_level,
1114 			"%d: release_slot %d - recycle->%x",
1115 			state->id, SLOT_INDEX_FROM_INFO(state, slot_info),
1116 			state->remote->slot_queue_recycle);
1117 
1118 		/* A write barrier is necessary, but remote_event_signal
1119 		** contains one. */
1120 		remote_event_signal(&state->remote->recycle);
1121 	}
1122 
1123 	lmutex_unlock(&state->recycle_mutex);
1124 }
1125 
1126 /* Called by the slot handler - don't hold the bulk mutex */
1127 static VCHIQ_STATUS_T
1128 notify_bulks(VCHIQ_SERVICE_T *service, VCHIQ_BULK_QUEUE_T *queue,
1129 	int retry_poll)
1130 {
1131 	VCHIQ_STATUS_T status = VCHIQ_SUCCESS;
1132 
1133 	vchiq_log_trace(vchiq_core_log_level,
1134 		"%d: nb:%d %cx - p=%x rn=%x r=%x",
1135 		service->state->id, service->localport,
1136 		(queue == &service->bulk_tx) ? 't' : 'r',
1137 		queue->process, queue->remote_notify, queue->remove);
1138 
1139 	if (service->state->is_master) {
1140 		while (queue->remote_notify != queue->process) {
1141 			VCHIQ_BULK_T *bulk =
1142 				&queue->bulks[BULK_INDEX(queue->remote_notify)];
1143 			int msgtype = (bulk->dir == VCHIQ_BULK_TRANSMIT) ?
1144 				VCHIQ_MSG_BULK_RX_DONE : VCHIQ_MSG_BULK_TX_DONE;
1145 			int msgid = VCHIQ_MAKE_MSG(msgtype, service->localport,
1146 				service->remoteport);
1147 			VCHIQ_ELEMENT_T element = { &bulk->actual, 4 };
1148 			/* Only reply to non-dummy bulk requests */
1149 			if (bulk->remote_data) {
1150 				status = queue_message(service->state, NULL,
1151 					msgid, &element, 1, 4, 0);
1152 				if (status != VCHIQ_SUCCESS)
1153 					break;
1154 			}
1155 			queue->remote_notify++;
1156 		}
1157 	} else {
1158 		queue->remote_notify = queue->process;
1159 	}
1160 
1161 	if (status == VCHIQ_SUCCESS) {
1162 		while (queue->remove != queue->remote_notify) {
1163 			VCHIQ_BULK_T *bulk =
1164 				&queue->bulks[BULK_INDEX(queue->remove)];
1165 
1166 			/* Only generate callbacks for non-dummy bulk
1167 			** requests, and non-terminated services */
1168 			if (bulk->data && service->instance) {
1169 				if (bulk->actual != VCHIQ_BULK_ACTUAL_ABORTED) {
1170 					if (bulk->dir == VCHIQ_BULK_TRANSMIT) {
1171 						VCHIQ_SERVICE_STATS_INC(service,
1172 							bulk_tx_count);
1173 						VCHIQ_SERVICE_STATS_ADD(service,
1174 							bulk_tx_bytes,
1175 							bulk->actual);
1176 					} else {
1177 						VCHIQ_SERVICE_STATS_INC(service,
1178 							bulk_rx_count);
1179 						VCHIQ_SERVICE_STATS_ADD(service,
1180 							bulk_rx_bytes,
1181 							bulk->actual);
1182 					}
1183 				} else {
1184 					VCHIQ_SERVICE_STATS_INC(service,
1185 						bulk_aborted_count);
1186 				}
1187 				if (bulk->mode == VCHIQ_BULK_MODE_BLOCKING) {
1188 					struct bulk_waiter *waiter;
1189 					spin_lock(&bulk_waiter_spinlock);
1190 					waiter = bulk->userdata;
1191 					if (waiter) {
1192 						waiter->actual = bulk->actual;
1193 						up(&waiter->event);
1194 					}
1195 					spin_unlock(&bulk_waiter_spinlock);
1196 				} else if (bulk->mode ==
1197 					VCHIQ_BULK_MODE_CALLBACK) {
1198 					VCHIQ_REASON_T reason = (bulk->dir ==
1199 						VCHIQ_BULK_TRANSMIT) ?
1200 						((bulk->actual ==
1201 						VCHIQ_BULK_ACTUAL_ABORTED) ?
1202 						VCHIQ_BULK_TRANSMIT_ABORTED :
1203 						VCHIQ_BULK_TRANSMIT_DONE) :
1204 						((bulk->actual ==
1205 						VCHIQ_BULK_ACTUAL_ABORTED) ?
1206 						VCHIQ_BULK_RECEIVE_ABORTED :
1207 						VCHIQ_BULK_RECEIVE_DONE);
1208 					status = make_service_callback(service,
1209 						reason,	NULL, bulk->userdata);
1210 					if (status == VCHIQ_RETRY)
1211 						break;
1212 				}
1213 			}
1214 
1215 			queue->remove++;
1216 			up(&service->bulk_remove_event);
1217 		}
1218 		if (!retry_poll)
1219 			status = VCHIQ_SUCCESS;
1220 	}
1221 
1222 	if (status == VCHIQ_RETRY)
1223 		request_poll(service->state, service,
1224 			(queue == &service->bulk_tx) ?
1225 			VCHIQ_POLL_TXNOTIFY : VCHIQ_POLL_RXNOTIFY);
1226 
1227 	return status;
1228 }
1229 
1230 /* Called by the slot handler thread */
1231 static void
1232 poll_services(VCHIQ_STATE_T *state)
1233 {
1234 	int group, i;
1235 
1236 	for (group = 0; group < BITSET_SIZE(state->unused_service); group++) {
1237 		uint32_t flags;
1238 		flags = atomic_xchg(&state->poll_services[group], 0);
1239 		for (i = 0; flags; i++) {
1240 			if (flags & (1 << i)) {
1241 				VCHIQ_SERVICE_T *service =
1242 					find_service_by_port(state,
1243 						(group<<5) + i);
1244 				uint32_t service_flags;
1245 				flags &= ~(1 << i);
1246 				if (!service)
1247 					continue;
1248 				service_flags =
1249 					atomic_xchg(&service->poll_flags, 0);
1250 				if (service_flags &
1251 					(1 << VCHIQ_POLL_REMOVE)) {
1252 					vchiq_log_info(vchiq_core_log_level,
1253 						"%d: ps - remove %d<->%d",
1254 						state->id, service->localport,
1255 						service->remoteport);
1256 
1257 					/* Make it look like a client, because
1258 					   it must be removed and not left in
1259 					   the LISTENING state. */
1260 					service->public_fourcc =
1261 						VCHIQ_FOURCC_INVALID;
1262 
1263 					if (vchiq_close_service_internal(
1264 						service, 0/*!close_recvd*/) !=
1265 						VCHIQ_SUCCESS)
1266 						request_poll(state, service,
1267 							VCHIQ_POLL_REMOVE);
1268 				} else if (service_flags &
1269 					(1 << VCHIQ_POLL_TERMINATE)) {
1270 					vchiq_log_info(vchiq_core_log_level,
1271 						"%d: ps - terminate %d<->%d",
1272 						state->id, service->localport,
1273 						service->remoteport);
1274 					if (vchiq_close_service_internal(
1275 						service, 0/*!close_recvd*/) !=
1276 						VCHIQ_SUCCESS)
1277 						request_poll(state, service,
1278 							VCHIQ_POLL_TERMINATE);
1279 				}
1280 				if (service_flags & (1 << VCHIQ_POLL_TXNOTIFY))
1281 					notify_bulks(service,
1282 						&service->bulk_tx,
1283 						1/*retry_poll*/);
1284 				if (service_flags & (1 << VCHIQ_POLL_RXNOTIFY))
1285 					notify_bulks(service,
1286 						&service->bulk_rx,
1287 						1/*retry_poll*/);
1288 				unlock_service(service);
1289 			}
1290 		}
1291 	}
1292 }
1293 
1294 /* Called by the slot handler or application threads, holding the bulk mutex. */
1295 static int
1296 resolve_bulks(VCHIQ_SERVICE_T *service, VCHIQ_BULK_QUEUE_T *queue)
1297 {
1298 	VCHIQ_STATE_T *state = service->state;
1299 	int resolved = 0;
1300 	int rc;
1301 
1302 	while ((queue->process != queue->local_insert) &&
1303 		(queue->process != queue->remote_insert)) {
1304 		VCHIQ_BULK_T *bulk = &queue->bulks[BULK_INDEX(queue->process)];
1305 
1306 		vchiq_log_trace(vchiq_core_log_level,
1307 			"%d: rb:%d %cx - li=%x ri=%x p=%x",
1308 			state->id, service->localport,
1309 			(queue == &service->bulk_tx) ? 't' : 'r',
1310 			queue->local_insert, queue->remote_insert,
1311 			queue->process);
1312 
1313 		WARN_ON(!((int)(queue->local_insert - queue->process) > 0));
1314 		WARN_ON(!((int)(queue->remote_insert - queue->process) > 0));
1315 
1316 		rc = lmutex_lock_interruptible(&state->bulk_transfer_mutex);
1317 		if (rc != 0)
1318 			break;
1319 
1320 		vchiq_transfer_bulk(bulk);
1321 		lmutex_unlock(&state->bulk_transfer_mutex);
1322 
1323 		if (vchiq_core_msg_log_level >= VCHIQ_LOG_INFO) {
1324 			const char *header = (queue == &service->bulk_tx) ?
1325 				"Send Bulk to" : "Recv Bulk from";
1326 			if (bulk->actual != VCHIQ_BULK_ACTUAL_ABORTED)
1327 				vchiq_log_info(vchiq_core_msg_log_level,
1328 					"%s %c%c%c%c d:%d len:%d %x<->%x",
1329 					header,
1330 					VCHIQ_FOURCC_AS_4CHARS(
1331 						service->base.fourcc),
1332 					service->remoteport,
1333 					bulk->size,
1334 					(unsigned int)bulk->data,
1335 					(unsigned int)bulk->remote_data);
1336 			else
1337 				vchiq_log_info(vchiq_core_msg_log_level,
1338 					"%s %c%c%c%c d:%d ABORTED - tx len:%d,"
1339 					" rx len:%d %x<->%x",
1340 					header,
1341 					VCHIQ_FOURCC_AS_4CHARS(
1342 						service->base.fourcc),
1343 					service->remoteport,
1344 					bulk->size,
1345 					bulk->remote_size,
1346 					(unsigned int)bulk->data,
1347 					(unsigned int)bulk->remote_data);
1348 		}
1349 
1350 		vchiq_complete_bulk(bulk);
1351 		queue->process++;
1352 		resolved++;
1353 	}
1354 	return resolved;
1355 }
1356 
1357 /* Called with the bulk_mutex held */
1358 static void
1359 abort_outstanding_bulks(VCHIQ_SERVICE_T *service, VCHIQ_BULK_QUEUE_T *queue)
1360 {
1361 	int is_tx = (queue == &service->bulk_tx);
1362 	vchiq_log_trace(vchiq_core_log_level,
1363 		"%d: aob:%d %cx - li=%x ri=%x p=%x",
1364 		service->state->id, service->localport, is_tx ? 't' : 'r',
1365 		queue->local_insert, queue->remote_insert, queue->process);
1366 
1367 	WARN_ON(!((int)(queue->local_insert - queue->process) >= 0));
1368 	WARN_ON(!((int)(queue->remote_insert - queue->process) >= 0));
1369 
1370 	while ((queue->process != queue->local_insert) ||
1371 		(queue->process != queue->remote_insert)) {
1372 		VCHIQ_BULK_T *bulk = &queue->bulks[BULK_INDEX(queue->process)];
1373 
1374 		if (queue->process == queue->remote_insert) {
1375 			/* fabricate a matching dummy bulk */
1376 			bulk->remote_data = NULL;
1377 			bulk->remote_size = 0;
1378 			queue->remote_insert++;
1379 		}
1380 
1381 		if (queue->process != queue->local_insert) {
1382 			vchiq_complete_bulk(bulk);
1383 
1384 			vchiq_log_info(vchiq_core_msg_log_level,
1385 				"%s %c%c%c%c d:%d ABORTED - tx len:%d, "
1386 				"rx len:%d",
1387 				is_tx ? "Send Bulk to" : "Recv Bulk from",
1388 				VCHIQ_FOURCC_AS_4CHARS(service->base.fourcc),
1389 				service->remoteport,
1390 				bulk->size,
1391 				bulk->remote_size);
1392 		} else {
1393 			/* fabricate a matching dummy bulk */
1394 			bulk->data = NULL;
1395 			bulk->size = 0;
1396 			bulk->actual = VCHIQ_BULK_ACTUAL_ABORTED;
1397 			bulk->dir = is_tx ? VCHIQ_BULK_TRANSMIT :
1398 				VCHIQ_BULK_RECEIVE;
1399 			queue->local_insert++;
1400 		}
1401 
1402 		queue->process++;
1403 	}
1404 }
1405 
1406 /* Called from the slot handler thread */
1407 static void
1408 pause_bulks(VCHIQ_STATE_T *state)
1409 {
1410 	if (unlikely(atomic_inc_return(&pause_bulks_count) != 1)) {
1411 		WARN_ON_ONCE(1);
1412 		atomic_set(&pause_bulks_count, 1);
1413 		return;
1414 	}
1415 
1416 	/* Block bulk transfers from all services */
1417 	lmutex_lock(&state->bulk_transfer_mutex);
1418 }
1419 
1420 /* Called from the slot handler thread */
1421 static void
1422 resume_bulks(VCHIQ_STATE_T *state)
1423 {
1424 	int i;
1425 	if (unlikely(atomic_dec_return(&pause_bulks_count) != 0)) {
1426 		WARN_ON_ONCE(1);
1427 		atomic_set(&pause_bulks_count, 0);
1428 		return;
1429 	}
1430 
1431 	/* Allow bulk transfers from all services */
1432 	lmutex_unlock(&state->bulk_transfer_mutex);
1433 
1434 	if (state->deferred_bulks == 0)
1435 		return;
1436 
1437 	/* Deal with any bulks which had to be deferred due to being in
1438 	 * paused state.  Don't try to match up to number of deferred bulks
1439 	 * in case we've had something come and close the service in the
1440 	 * interim - just process all bulk queues for all services */
1441 	vchiq_log_info(vchiq_core_log_level, "%s: processing %d deferred bulks",
1442 		__func__, state->deferred_bulks);
1443 
1444 	for (i = 0; i < state->unused_service; i++) {
1445 		VCHIQ_SERVICE_T *service = state->services[i];
1446 		int resolved_rx = 0;
1447 		int resolved_tx = 0;
1448 		if (!service || (service->srvstate != VCHIQ_SRVSTATE_OPEN))
1449 			continue;
1450 
1451 		lmutex_lock(&service->bulk_mutex);
1452 		resolved_rx = resolve_bulks(service, &service->bulk_rx);
1453 		resolved_tx = resolve_bulks(service, &service->bulk_tx);
1454 		lmutex_unlock(&service->bulk_mutex);
1455 		if (resolved_rx)
1456 			notify_bulks(service, &service->bulk_rx, 1);
1457 		if (resolved_tx)
1458 			notify_bulks(service, &service->bulk_tx, 1);
1459 	}
1460 	state->deferred_bulks = 0;
1461 }
1462 
1463 static int
1464 parse_open(VCHIQ_STATE_T *state, VCHIQ_HEADER_T *header)
1465 {
1466 	VCHIQ_SERVICE_T *service = NULL;
1467 	int msgid, size;
1468 	unsigned int localport, remoteport;
1469 
1470 	msgid = header->msgid;
1471 	size = header->size;
1472 	//int type = VCHIQ_MSG_TYPE(msgid);
1473 	localport = VCHIQ_MSG_DSTPORT(msgid);
1474 	remoteport = VCHIQ_MSG_SRCPORT(msgid);
1475 	if (size >= sizeof(struct vchiq_open_payload)) {
1476 		const struct vchiq_open_payload *payload =
1477 			(struct vchiq_open_payload *)header->data;
1478 		unsigned int fourcc;
1479 
1480 		fourcc = payload->fourcc;
1481 		vchiq_log_info(vchiq_core_log_level,
1482 			"%d: prs OPEN@%x (%d->'%c%c%c%c')",
1483 			state->id, (unsigned int)header,
1484 			localport,
1485 			VCHIQ_FOURCC_AS_4CHARS(fourcc));
1486 
1487 		service = get_listening_service(state, fourcc);
1488 
1489 		if (service) {
1490 			/* A matching service exists */
1491 			short v = payload->version;
1492 			short version_min = payload->version_min;
1493 			if ((service->version < version_min) ||
1494 				(v < service->version_min)) {
1495 				/* Version mismatch */
1496 				vchiq_loud_error_header();
1497 				vchiq_loud_error("%d: service %d (%c%c%c%c) "
1498 					"version mismatch - local (%d, min %d)"
1499 					" vs. remote (%d, min %d)",
1500 					state->id, service->localport,
1501 					VCHIQ_FOURCC_AS_4CHARS(fourcc),
1502 					service->version, service->version_min,
1503 					v, version_min);
1504 				vchiq_loud_error_footer();
1505 				unlock_service(service);
1506 				goto fail_open;
1507 			}
1508 			service->peer_version = v;
1509 
1510 			if (service->srvstate == VCHIQ_SRVSTATE_LISTENING) {
1511 				struct vchiq_openack_payload ack_payload = {
1512 					service->version
1513 				};
1514 				VCHIQ_ELEMENT_T body = {
1515 					&ack_payload,
1516 					sizeof(ack_payload)
1517 				};
1518 
1519 				/* Acknowledge the OPEN */
1520 				if (service->sync) {
1521 					if (queue_message_sync(state, NULL,
1522 						VCHIQ_MAKE_MSG(
1523 							VCHIQ_MSG_OPENACK,
1524 							service->localport,
1525 							remoteport),
1526 						&body, 1, sizeof(ack_payload),
1527 						0) == VCHIQ_RETRY)
1528 						goto bail_not_ready;
1529 				} else {
1530 					if (queue_message(state, NULL,
1531 						VCHIQ_MAKE_MSG(
1532 							VCHIQ_MSG_OPENACK,
1533 							service->localport,
1534 							remoteport),
1535 						&body, 1, sizeof(ack_payload),
1536 						0) == VCHIQ_RETRY)
1537 						goto bail_not_ready;
1538 				}
1539 
1540 				/* The service is now open */
1541 				vchiq_set_service_state(service,
1542 					service->sync ? VCHIQ_SRVSTATE_OPENSYNC
1543 					: VCHIQ_SRVSTATE_OPEN);
1544 			}
1545 
1546 			service->remoteport = remoteport;
1547 			service->client_id = ((int *)header->data)[1];
1548 			if (make_service_callback(service, VCHIQ_SERVICE_OPENED,
1549 				NULL, NULL) == VCHIQ_RETRY) {
1550 				/* Bail out if not ready */
1551 				service->remoteport = VCHIQ_PORT_FREE;
1552 				goto bail_not_ready;
1553 			}
1554 
1555 			/* Success - the message has been dealt with */
1556 			unlock_service(service);
1557 			return 1;
1558 		}
1559 	}
1560 
1561 fail_open:
1562 	/* No available service, or an invalid request - send a CLOSE */
1563 	if (queue_message(state, NULL,
1564 		VCHIQ_MAKE_MSG(VCHIQ_MSG_CLOSE, 0, VCHIQ_MSG_SRCPORT(msgid)),
1565 		NULL, 0, 0, 0) == VCHIQ_RETRY)
1566 		goto bail_not_ready;
1567 
1568 	return 1;
1569 
1570 bail_not_ready:
1571 	if (service)
1572 		unlock_service(service);
1573 
1574 	return 0;
1575 }
1576 
1577 /* Called by the slot handler thread */
1578 static void
1579 parse_rx_slots(VCHIQ_STATE_T *state)
1580 {
1581 	VCHIQ_SHARED_STATE_T *remote = state->remote;
1582 	VCHIQ_SERVICE_T *service = NULL;
1583 	int tx_pos;
1584 	DEBUG_INITIALISE(state->local)
1585 
1586 	tx_pos = remote->tx_pos;
1587 
1588 	while (state->rx_pos != tx_pos) {
1589 		VCHIQ_HEADER_T *header;
1590 		int msgid, size;
1591 		int type;
1592 		unsigned int localport, remoteport;
1593 
1594 		DEBUG_TRACE(PARSE_LINE);
1595 		if (!state->rx_data) {
1596 			int rx_index;
1597 			WARN_ON(!((state->rx_pos & VCHIQ_SLOT_MASK) == 0));
1598 			rx_index = remote->slot_queue[
1599 				SLOT_QUEUE_INDEX_FROM_POS(state->rx_pos) &
1600 				VCHIQ_SLOT_QUEUE_MASK];
1601 			state->rx_data = (char *)SLOT_DATA_FROM_INDEX(state,
1602 				rx_index);
1603 			state->rx_info = SLOT_INFO_FROM_INDEX(state, rx_index);
1604 
1605 			/* Initialise use_count to one, and increment
1606 			** release_count at the end of the slot to avoid
1607 			** releasing the slot prematurely. */
1608 			state->rx_info->use_count = 1;
1609 			state->rx_info->release_count = 0;
1610 		}
1611 
1612 		header = (VCHIQ_HEADER_T *)(state->rx_data +
1613 			(state->rx_pos & VCHIQ_SLOT_MASK));
1614 		DEBUG_VALUE(PARSE_HEADER, (int)header);
1615 		msgid = header->msgid;
1616 		DEBUG_VALUE(PARSE_MSGID, msgid);
1617 		size = header->size;
1618 		type = VCHIQ_MSG_TYPE(msgid);
1619 		localport = VCHIQ_MSG_DSTPORT(msgid);
1620 		remoteport = VCHIQ_MSG_SRCPORT(msgid);
1621 
1622 		if (type != VCHIQ_MSG_DATA)
1623 			VCHIQ_STATS_INC(state, ctrl_rx_count);
1624 
1625 		switch (type) {
1626 		case VCHIQ_MSG_OPENACK:
1627 		case VCHIQ_MSG_CLOSE:
1628 		case VCHIQ_MSG_DATA:
1629 		case VCHIQ_MSG_BULK_RX:
1630 		case VCHIQ_MSG_BULK_TX:
1631 		case VCHIQ_MSG_BULK_RX_DONE:
1632 		case VCHIQ_MSG_BULK_TX_DONE:
1633 			service = find_service_by_port(state, localport);
1634 			if ((!service || service->remoteport != remoteport) &&
1635 				(localport == 0) &&
1636 				(type == VCHIQ_MSG_CLOSE)) {
1637 				/* This could be a CLOSE from a client which
1638 				   hadn't yet received the OPENACK - look for
1639 				   the connected service */
1640 				if (service)
1641 					unlock_service(service);
1642 				service = get_connected_service(state,
1643 					remoteport);
1644 				if (service)
1645 					vchiq_log_warning(vchiq_core_log_level,
1646 						"%d: prs %s@%x (%d->%d) - "
1647 						"found connected service %d",
1648 						state->id, msg_type_str(type),
1649 						(unsigned int)header,
1650 						remoteport, localport,
1651 						service->localport);
1652 			}
1653 
1654 			if (!service) {
1655 				vchiq_log_error(vchiq_core_log_level,
1656 					"%d: prs %s@%x (%d->%d) - "
1657 					"invalid/closed service %d",
1658 					state->id, msg_type_str(type),
1659 					(unsigned int)header,
1660 					remoteport, localport, localport);
1661 				goto skip_message;
1662 			}
1663 			break;
1664 		default:
1665 			break;
1666 		}
1667 
1668 		if (vchiq_core_msg_log_level >= VCHIQ_LOG_INFO) {
1669 			int svc_fourcc;
1670 
1671 			svc_fourcc = service
1672 				? service->base.fourcc
1673 				: VCHIQ_MAKE_FOURCC('?', '?', '?', '?');
1674 			vchiq_log_info(vchiq_core_msg_log_level,
1675 				"Rcvd Msg %s(%u) from %c%c%c%c s:%d d:%d "
1676 				"len:%d",
1677 				msg_type_str(type), type,
1678 				VCHIQ_FOURCC_AS_4CHARS(svc_fourcc),
1679 				remoteport, localport, size);
1680 			if (size > 0)
1681 				vchiq_log_dump_mem("Rcvd", 0, header->data,
1682 					min(64, size));
1683 		}
1684 
1685 		if (((unsigned int)header & VCHIQ_SLOT_MASK) + calc_stride(size)
1686 			> VCHIQ_SLOT_SIZE) {
1687 			vchiq_log_error(vchiq_core_log_level,
1688 				"header %x (msgid %x) - size %x too big for "
1689 				"slot",
1690 				(unsigned int)header, (unsigned int)msgid,
1691 				(unsigned int)size);
1692 			WARN(1, "oversized for slot\n");
1693 		}
1694 
1695 		switch (type) {
1696 		case VCHIQ_MSG_OPEN:
1697 			WARN_ON(!(VCHIQ_MSG_DSTPORT(msgid) == 0));
1698 			if (!parse_open(state, header))
1699 				goto bail_not_ready;
1700 			break;
1701 		case VCHIQ_MSG_OPENACK:
1702 			if (size >= sizeof(struct vchiq_openack_payload)) {
1703 				const struct vchiq_openack_payload *payload =
1704 					(struct vchiq_openack_payload *)
1705 					header->data;
1706 				service->peer_version = payload->version;
1707 			}
1708 			vchiq_log_info(vchiq_core_log_level,
1709 				"%d: prs OPENACK@%x,%x (%d->%d) v:%d",
1710 				state->id, (unsigned int)header, size,
1711 				remoteport, localport, service->peer_version);
1712 			if (service->srvstate ==
1713 				VCHIQ_SRVSTATE_OPENING) {
1714 				service->remoteport = remoteport;
1715 				vchiq_set_service_state(service,
1716 					VCHIQ_SRVSTATE_OPEN);
1717 				up(&service->remove_event);
1718 			} else
1719 				vchiq_log_error(vchiq_core_log_level,
1720 					"OPENACK received in state %s",
1721 					srvstate_names[service->srvstate]);
1722 			break;
1723 		case VCHIQ_MSG_CLOSE:
1724 			WARN_ON(size != 0); /* There should be no data */
1725 
1726 			vchiq_log_info(vchiq_core_log_level,
1727 				"%d: prs CLOSE@%x (%d->%d)",
1728 				state->id, (unsigned int)header,
1729 				remoteport, localport);
1730 
1731 			mark_service_closing_internal(service, 1);
1732 
1733 			if (vchiq_close_service_internal(service,
1734 				1/*close_recvd*/) == VCHIQ_RETRY)
1735 				goto bail_not_ready;
1736 
1737 			vchiq_log_info(vchiq_core_log_level,
1738 				"Close Service %c%c%c%c s:%u d:%d",
1739 				VCHIQ_FOURCC_AS_4CHARS(service->base.fourcc),
1740 				service->localport,
1741 				service->remoteport);
1742 			break;
1743 		case VCHIQ_MSG_DATA:
1744 			vchiq_log_trace(vchiq_core_log_level,
1745 				"%d: prs DATA@%x,%x (%d->%d)",
1746 				state->id, (unsigned int)header, size,
1747 				remoteport, localport);
1748 
1749 			if ((service->remoteport == remoteport)
1750 				&& (service->srvstate ==
1751 				VCHIQ_SRVSTATE_OPEN)) {
1752 				header->msgid = msgid | VCHIQ_MSGID_CLAIMED;
1753 				claim_slot(state->rx_info);
1754 				DEBUG_TRACE(PARSE_LINE);
1755 				if (make_service_callback(service,
1756 					VCHIQ_MESSAGE_AVAILABLE, header,
1757 					NULL) == VCHIQ_RETRY) {
1758 					DEBUG_TRACE(PARSE_LINE);
1759 					goto bail_not_ready;
1760 				}
1761 				VCHIQ_SERVICE_STATS_INC(service, ctrl_rx_count);
1762 				VCHIQ_SERVICE_STATS_ADD(service, ctrl_rx_bytes,
1763 					size);
1764 			} else {
1765 				VCHIQ_STATS_INC(state, error_count);
1766 			}
1767 			break;
1768 		case VCHIQ_MSG_CONNECT:
1769 			vchiq_log_info(vchiq_core_log_level,
1770 				"%d: prs CONNECT@%x",
1771 				state->id, (unsigned int)header);
1772 			up(&state->connect);
1773 			break;
1774 		case VCHIQ_MSG_BULK_RX:
1775 		case VCHIQ_MSG_BULK_TX: {
1776 			VCHIQ_BULK_QUEUE_T *queue;
1777 			WARN_ON(!state->is_master);
1778 			queue = (type == VCHIQ_MSG_BULK_RX) ?
1779 				&service->bulk_tx : &service->bulk_rx;
1780 			if ((service->remoteport == remoteport)
1781 				&& (service->srvstate ==
1782 				VCHIQ_SRVSTATE_OPEN)) {
1783 				VCHIQ_BULK_T *bulk;
1784 				int resolved = 0;
1785 
1786 				DEBUG_TRACE(PARSE_LINE);
1787 				if (lmutex_lock_interruptible(
1788 					&service->bulk_mutex) != 0) {
1789 					DEBUG_TRACE(PARSE_LINE);
1790 					goto bail_not_ready;
1791 				}
1792 
1793 				WARN_ON(!(queue->remote_insert < queue->remove +
1794 					VCHIQ_NUM_SERVICE_BULKS));
1795 				bulk = &queue->bulks[
1796 					BULK_INDEX(queue->remote_insert)];
1797 				bulk->remote_data =
1798 					(void *)((int *)header->data)[0];
1799 				bulk->remote_size = ((int *)header->data)[1];
1800 				wmb();
1801 
1802 				vchiq_log_info(vchiq_core_log_level,
1803 					"%d: prs %s@%x (%d->%d) %x@%x",
1804 					state->id, msg_type_str(type),
1805 					(unsigned int)header,
1806 					remoteport, localport,
1807 					bulk->remote_size,
1808 					(unsigned int)bulk->remote_data);
1809 
1810 				queue->remote_insert++;
1811 
1812 				if (atomic_read(&pause_bulks_count)) {
1813 					state->deferred_bulks++;
1814 					vchiq_log_info(vchiq_core_log_level,
1815 						"%s: deferring bulk (%d)",
1816 						__func__,
1817 						state->deferred_bulks);
1818 					if (state->conn_state !=
1819 						VCHIQ_CONNSTATE_PAUSE_SENT)
1820 						vchiq_log_error(
1821 							vchiq_core_log_level,
1822 							"%s: bulks paused in "
1823 							"unexpected state %s",
1824 							__func__,
1825 							conn_state_names[
1826 							state->conn_state]);
1827 				} else if (state->conn_state ==
1828 					VCHIQ_CONNSTATE_CONNECTED) {
1829 					DEBUG_TRACE(PARSE_LINE);
1830 					resolved = resolve_bulks(service,
1831 						queue);
1832 				}
1833 
1834 				lmutex_unlock(&service->bulk_mutex);
1835 				if (resolved)
1836 					notify_bulks(service, queue,
1837 						1/*retry_poll*/);
1838 			}
1839 		} break;
1840 		case VCHIQ_MSG_BULK_RX_DONE:
1841 		case VCHIQ_MSG_BULK_TX_DONE:
1842 			WARN_ON(state->is_master);
1843 			if ((service->remoteport == remoteport)
1844 				&& (service->srvstate !=
1845 				VCHIQ_SRVSTATE_FREE)) {
1846 				VCHIQ_BULK_QUEUE_T *queue;
1847 				VCHIQ_BULK_T *bulk;
1848 
1849 				queue = (type == VCHIQ_MSG_BULK_RX_DONE) ?
1850 					&service->bulk_rx : &service->bulk_tx;
1851 
1852 				DEBUG_TRACE(PARSE_LINE);
1853 				if (lmutex_lock_interruptible(
1854 					&service->bulk_mutex) != 0) {
1855 					DEBUG_TRACE(PARSE_LINE);
1856 					goto bail_not_ready;
1857 				}
1858 				if ((int)(queue->remote_insert -
1859 					queue->local_insert) >= 0) {
1860 					vchiq_log_error(vchiq_core_log_level,
1861 						"%d: prs %s@%x (%d->%d) "
1862 						"unexpected (ri=%d,li=%d)",
1863 						state->id, msg_type_str(type),
1864 						(unsigned int)header,
1865 						remoteport, localport,
1866 						queue->remote_insert,
1867 						queue->local_insert);
1868 					lmutex_unlock(&service->bulk_mutex);
1869 					break;
1870 				}
1871 
1872 				BUG_ON(queue->process == queue->local_insert);
1873 				BUG_ON(queue->process != queue->remote_insert);
1874 
1875 				bulk = &queue->bulks[
1876 					BULK_INDEX(queue->remote_insert)];
1877 				bulk->actual = *(int *)header->data;
1878 				queue->remote_insert++;
1879 
1880 				vchiq_log_info(vchiq_core_log_level,
1881 					"%d: prs %s@%x (%d->%d) %x@%x",
1882 					state->id, msg_type_str(type),
1883 					(unsigned int)header,
1884 					remoteport, localport,
1885 					bulk->actual, (unsigned int)bulk->data);
1886 
1887 				vchiq_log_trace(vchiq_core_log_level,
1888 					"%d: prs:%d %cx li=%x ri=%x p=%x",
1889 					state->id, localport,
1890 					(type == VCHIQ_MSG_BULK_RX_DONE) ?
1891 						'r' : 't',
1892 					queue->local_insert,
1893 					queue->remote_insert, queue->process);
1894 
1895 				DEBUG_TRACE(PARSE_LINE);
1896 				WARN_ON(queue->process == queue->local_insert);
1897 				vchiq_complete_bulk(bulk);
1898 				queue->process++;
1899 				lmutex_unlock(&service->bulk_mutex);
1900 				DEBUG_TRACE(PARSE_LINE);
1901 				notify_bulks(service, queue, 1/*retry_poll*/);
1902 				DEBUG_TRACE(PARSE_LINE);
1903 			}
1904 			break;
1905 		case VCHIQ_MSG_PADDING:
1906 			vchiq_log_trace(vchiq_core_log_level,
1907 				"%d: prs PADDING@%x,%x",
1908 				state->id, (unsigned int)header, size);
1909 			break;
1910 		case VCHIQ_MSG_PAUSE:
1911 			/* If initiated, signal the application thread */
1912 			vchiq_log_trace(vchiq_core_log_level,
1913 				"%d: prs PAUSE@%x,%x",
1914 				state->id, (unsigned int)header, size);
1915 			if (state->conn_state == VCHIQ_CONNSTATE_PAUSED) {
1916 				vchiq_log_error(vchiq_core_log_level,
1917 					"%d: PAUSE received in state PAUSED",
1918 					state->id);
1919 				break;
1920 			}
1921 			if (state->conn_state != VCHIQ_CONNSTATE_PAUSE_SENT) {
1922 				/* Send a PAUSE in response */
1923 				if (queue_message(state, NULL,
1924 					VCHIQ_MAKE_MSG(VCHIQ_MSG_PAUSE, 0, 0),
1925 					NULL, 0, 0, 0) == VCHIQ_RETRY)
1926 					goto bail_not_ready;
1927 				if (state->is_master)
1928 					pause_bulks(state);
1929 			}
1930 			/* At this point slot_mutex is held */
1931 			vchiq_set_conn_state(state, VCHIQ_CONNSTATE_PAUSED);
1932 			vchiq_platform_paused(state);
1933 			break;
1934 		case VCHIQ_MSG_RESUME:
1935 			vchiq_log_trace(vchiq_core_log_level,
1936 				"%d: prs RESUME@%x,%x",
1937 				state->id, (unsigned int)header, size);
1938 			/* Release the slot mutex */
1939 			lmutex_unlock(&state->slot_mutex);
1940 			if (state->is_master)
1941 				resume_bulks(state);
1942 			vchiq_set_conn_state(state, VCHIQ_CONNSTATE_CONNECTED);
1943 			vchiq_platform_resumed(state);
1944 			break;
1945 
1946 		case VCHIQ_MSG_REMOTE_USE:
1947 			vchiq_on_remote_use(state);
1948 			break;
1949 		case VCHIQ_MSG_REMOTE_RELEASE:
1950 			vchiq_on_remote_release(state);
1951 			break;
1952 		case VCHIQ_MSG_REMOTE_USE_ACTIVE:
1953 			vchiq_on_remote_use_active(state);
1954 			break;
1955 
1956 		default:
1957 			vchiq_log_error(vchiq_core_log_level,
1958 				"%d: prs invalid msgid %x@%x,%x",
1959 				state->id, msgid, (unsigned int)header, size);
1960 			WARN(1, "invalid message\n");
1961 			break;
1962 		}
1963 
1964 skip_message:
1965 		if (service) {
1966 			unlock_service(service);
1967 			service = NULL;
1968 		}
1969 
1970 		state->rx_pos += calc_stride(size);
1971 
1972 		DEBUG_TRACE(PARSE_LINE);
1973 		/* Perform some housekeeping when the end of the slot is
1974 		** reached. */
1975 		if ((state->rx_pos & VCHIQ_SLOT_MASK) == 0) {
1976 			/* Remove the extra reference count. */
1977 			release_slot(state, state->rx_info, NULL, NULL);
1978 			state->rx_data = NULL;
1979 		}
1980 	}
1981 
1982 bail_not_ready:
1983 	if (service)
1984 		unlock_service(service);
1985 }
1986 
1987 /* Called by the slot handler thread */
1988 int slot_handler_func(void *v);
1989 int
1990 slot_handler_func(void *v)
1991 {
1992 	VCHIQ_STATE_T *state = (VCHIQ_STATE_T *) v;
1993 	VCHIQ_SHARED_STATE_T *local = state->local;
1994 	DEBUG_INITIALISE(local)
1995 
1996 	while (1) {
1997 		DEBUG_COUNT(SLOT_HANDLER_COUNT);
1998 		DEBUG_TRACE(SLOT_HANDLER_LINE);
1999 		remote_event_wait(&local->trigger);
2000 
2001 		rmb();
2002 
2003 		DEBUG_TRACE(SLOT_HANDLER_LINE);
2004 		if (state->poll_needed) {
2005 			/* Check if we need to suspend - may change our
2006 			 * conn_state */
2007 			vchiq_platform_check_suspend(state);
2008 
2009 			state->poll_needed = 0;
2010 
2011 			/* Handle service polling and other rare conditions here
2012 			** out of the mainline code */
2013 			switch (state->conn_state) {
2014 			case VCHIQ_CONNSTATE_CONNECTED:
2015 				/* Poll the services as requested */
2016 				poll_services(state);
2017 				break;
2018 
2019 			case VCHIQ_CONNSTATE_PAUSING:
2020 				if (state->is_master)
2021 					pause_bulks(state);
2022 				if (queue_message(state, NULL,
2023 					VCHIQ_MAKE_MSG(VCHIQ_MSG_PAUSE, 0, 0),
2024 					NULL, 0, 0, 0) != VCHIQ_RETRY) {
2025 					vchiq_set_conn_state(state,
2026 						VCHIQ_CONNSTATE_PAUSE_SENT);
2027 				} else {
2028 					if (state->is_master)
2029 						resume_bulks(state);
2030 					/* Retry later */
2031 					state->poll_needed = 1;
2032 				}
2033 				break;
2034 
2035 			case VCHIQ_CONNSTATE_PAUSED:
2036 				vchiq_platform_resume(state);
2037 				break;
2038 
2039 			case VCHIQ_CONNSTATE_RESUMING:
2040 				if (queue_message(state, NULL,
2041 					VCHIQ_MAKE_MSG(VCHIQ_MSG_RESUME, 0, 0),
2042 					NULL, 0, 0, 0) != VCHIQ_RETRY) {
2043 					if (state->is_master)
2044 						resume_bulks(state);
2045 					vchiq_set_conn_state(state,
2046 						VCHIQ_CONNSTATE_CONNECTED);
2047 					vchiq_platform_resumed(state);
2048 				} else {
2049 					/* This should really be impossible,
2050 					** since the PAUSE should have flushed
2051 					** through outstanding messages. */
2052 					vchiq_log_error(vchiq_core_log_level,
2053 						"Failed to send RESUME "
2054 						"message");
2055 					BUG();
2056 				}
2057 				break;
2058 
2059 			case VCHIQ_CONNSTATE_PAUSE_TIMEOUT:
2060 			case VCHIQ_CONNSTATE_RESUME_TIMEOUT:
2061 				vchiq_platform_handle_timeout(state);
2062 				break;
2063 			default:
2064 				break;
2065 			}
2066 
2067 
2068 		}
2069 
2070 		DEBUG_TRACE(SLOT_HANDLER_LINE);
2071 		parse_rx_slots(state);
2072 	}
2073 	return 0;
2074 }
2075 
2076 
2077 /* Called by the recycle thread */
2078 int recycle_func(void *v);
2079 int
2080 recycle_func(void *v)
2081 {
2082 	VCHIQ_STATE_T *state = (VCHIQ_STATE_T *) v;
2083 	VCHIQ_SHARED_STATE_T *local = state->local;
2084 
2085 	while (1) {
2086 		remote_event_wait(&local->recycle);
2087 
2088 		process_free_queue(state);
2089 	}
2090 	return 0;
2091 }
2092 
2093 
2094 /* Called by the sync thread */
2095 int sync_func(void *v);
2096 int
2097 sync_func(void *v)
2098 {
2099 	VCHIQ_STATE_T *state = (VCHIQ_STATE_T *) v;
2100 	VCHIQ_SHARED_STATE_T *local = state->local;
2101 	VCHIQ_HEADER_T *header = (VCHIQ_HEADER_T *)SLOT_DATA_FROM_INDEX(state,
2102 		state->remote->slot_sync);
2103 
2104 	while (1) {
2105 		VCHIQ_SERVICE_T *service;
2106 		int msgid, size;
2107 		int type;
2108 		unsigned int localport, remoteport;
2109 
2110 		remote_event_wait(&local->sync_trigger);
2111 
2112 		rmb();
2113 
2114 		msgid = header->msgid;
2115 		size = header->size;
2116 		type = VCHIQ_MSG_TYPE(msgid);
2117 		localport = VCHIQ_MSG_DSTPORT(msgid);
2118 		remoteport = VCHIQ_MSG_SRCPORT(msgid);
2119 
2120 		service = find_service_by_port(state, localport);
2121 
2122 		if (!service) {
2123 			vchiq_log_error(vchiq_sync_log_level,
2124 				"%d: sf %s@%x (%d->%d) - "
2125 				"invalid/closed service %d",
2126 				state->id, msg_type_str(type),
2127 				(unsigned int)header,
2128 				remoteport, localport, localport);
2129 			release_message_sync(state, header);
2130 			continue;
2131 		}
2132 
2133 		if (vchiq_sync_log_level >= VCHIQ_LOG_TRACE) {
2134 			int svc_fourcc;
2135 
2136 			svc_fourcc = service
2137 				? service->base.fourcc
2138 				: VCHIQ_MAKE_FOURCC('?', '?', '?', '?');
2139 			vchiq_log_trace(vchiq_sync_log_level,
2140 				"Rcvd Msg %s from %c%c%c%c s:%d d:%d len:%d",
2141 				msg_type_str(type),
2142 				VCHIQ_FOURCC_AS_4CHARS(svc_fourcc),
2143 				remoteport, localport, size);
2144 			if (size > 0)
2145 				vchiq_log_dump_mem("Rcvd", 0, header->data,
2146 					min(64, size));
2147 		}
2148 
2149 		switch (type) {
2150 		case VCHIQ_MSG_OPENACK:
2151 			if (size >= sizeof(struct vchiq_openack_payload)) {
2152 				const struct vchiq_openack_payload *payload =
2153 					(struct vchiq_openack_payload *)
2154 					header->data;
2155 				service->peer_version = payload->version;
2156 			}
2157 			vchiq_log_info(vchiq_sync_log_level,
2158 				"%d: sf OPENACK@%x,%x (%d->%d) v:%d",
2159 				state->id, (unsigned int)header, size,
2160 				remoteport, localport, service->peer_version);
2161 			if (service->srvstate == VCHIQ_SRVSTATE_OPENING) {
2162 				service->remoteport = remoteport;
2163 				vchiq_set_service_state(service,
2164 					VCHIQ_SRVSTATE_OPENSYNC);
2165 				up(&service->remove_event);
2166 			}
2167 			release_message_sync(state, header);
2168 			break;
2169 
2170 		case VCHIQ_MSG_DATA:
2171 			vchiq_log_trace(vchiq_sync_log_level,
2172 				"%d: sf DATA@%x,%x (%d->%d)",
2173 				state->id, (unsigned int)header, size,
2174 				remoteport, localport);
2175 
2176 			if ((service->remoteport == remoteport) &&
2177 				(service->srvstate ==
2178 				VCHIQ_SRVSTATE_OPENSYNC)) {
2179 				if (make_service_callback(service,
2180 					VCHIQ_MESSAGE_AVAILABLE, header,
2181 					NULL) == VCHIQ_RETRY)
2182 					vchiq_log_error(vchiq_sync_log_level,
2183 						"synchronous callback to "
2184 						"service %d returns "
2185 						"VCHIQ_RETRY",
2186 						localport);
2187 			}
2188 			break;
2189 
2190 		default:
2191 			vchiq_log_error(vchiq_sync_log_level,
2192 				"%d: sf unexpected msgid %x@%x,%x",
2193 				state->id, msgid, (unsigned int)header, size);
2194 			release_message_sync(state, header);
2195 			break;
2196 		}
2197 
2198 		unlock_service(service);
2199 	}
2200 
2201 	return 0;
2202 }
2203 
2204 
2205 static void
2206 init_bulk_queue(VCHIQ_BULK_QUEUE_T *queue)
2207 {
2208 	queue->local_insert = 0;
2209 	queue->remote_insert = 0;
2210 	queue->process = 0;
2211 	queue->remote_notify = 0;
2212 	queue->remove = 0;
2213 }
2214 
2215 
2216 inline const char *
2217 get_conn_state_name(VCHIQ_CONNSTATE_T conn_state)
2218 {
2219 	return conn_state_names[conn_state];
2220 }
2221 
2222 
2223 VCHIQ_SLOT_ZERO_T *
2224 vchiq_init_slots(void *mem_base, int mem_size)
2225 {
2226 	int mem_align = (VCHIQ_SLOT_SIZE - (int)mem_base) & VCHIQ_SLOT_MASK;
2227 	VCHIQ_SLOT_ZERO_T *slot_zero =
2228 		(VCHIQ_SLOT_ZERO_T *)((char *)mem_base + mem_align);
2229 	int num_slots = (mem_size - mem_align)/VCHIQ_SLOT_SIZE;
2230 	int first_data_slot = VCHIQ_SLOT_ZERO_SLOTS;
2231 
2232 	/* Ensure there is enough memory to run an absolutely minimum system */
2233 	num_slots -= first_data_slot;
2234 
2235 	if (num_slots < 4) {
2236 		vchiq_log_error(vchiq_core_log_level,
2237 			"vchiq_init_slots - insufficient memory %x bytes",
2238 			mem_size);
2239 		return NULL;
2240 	}
2241 
2242 	memset(slot_zero, 0, sizeof(VCHIQ_SLOT_ZERO_T));
2243 
2244 	slot_zero->magic = VCHIQ_MAGIC;
2245 	slot_zero->version = VCHIQ_VERSION;
2246 	slot_zero->version_min = VCHIQ_VERSION_MIN;
2247 	slot_zero->slot_zero_size = sizeof(VCHIQ_SLOT_ZERO_T);
2248 	slot_zero->slot_size = VCHIQ_SLOT_SIZE;
2249 	slot_zero->max_slots = VCHIQ_MAX_SLOTS;
2250 	slot_zero->max_slots_per_side = VCHIQ_MAX_SLOTS_PER_SIDE;
2251 
2252 	slot_zero->master.slot_sync = first_data_slot;
2253 	slot_zero->master.slot_first = first_data_slot + 1;
2254 	slot_zero->master.slot_last = first_data_slot + (num_slots/2) - 1;
2255 	slot_zero->slave.slot_sync = first_data_slot + (num_slots/2);
2256 	slot_zero->slave.slot_first = first_data_slot + (num_slots/2) + 1;
2257 	slot_zero->slave.slot_last = first_data_slot + num_slots - 1;
2258 
2259 	return slot_zero;
2260 }
2261 
2262 VCHIQ_STATUS_T
2263 vchiq_init_state(VCHIQ_STATE_T *state, VCHIQ_SLOT_ZERO_T *slot_zero,
2264 		 int is_master)
2265 {
2266 	VCHIQ_SHARED_STATE_T *local;
2267 	VCHIQ_SHARED_STATE_T *remote;
2268 	VCHIQ_STATUS_T status;
2269 	char threadname[10];
2270 	static int id;
2271 	int i;
2272 
2273 	/* Check the input configuration */
2274 
2275 	if (slot_zero->magic != VCHIQ_MAGIC) {
2276 		vchiq_loud_error_header();
2277 		vchiq_loud_error("Invalid VCHIQ magic value found.");
2278 		vchiq_loud_error("slot_zero=%x: magic=%x (expected %x)",
2279 			(unsigned int)slot_zero, slot_zero->magic, VCHIQ_MAGIC);
2280 		vchiq_loud_error_footer();
2281 		return VCHIQ_ERROR;
2282 	}
2283 
2284 	vchiq_log_warning(vchiq_core_log_level,
2285 		"local ver %d (min %d), remote ver %d.",
2286 		VCHIQ_VERSION, VCHIQ_VERSION_MIN,
2287 		slot_zero->version);
2288 
2289 	if (slot_zero->version < VCHIQ_VERSION_MIN) {
2290 		vchiq_loud_error_header();
2291 		vchiq_loud_error("Incompatible VCHIQ versions found.");
2292 		vchiq_loud_error("slot_zero=%x: VideoCore version=%d "
2293 			"(minimum %d)",
2294 			(unsigned int)slot_zero, slot_zero->version,
2295 			VCHIQ_VERSION_MIN);
2296 		vchiq_loud_error("Restart with a newer VideoCore image.");
2297 		vchiq_loud_error_footer();
2298 		return VCHIQ_ERROR;
2299 	}
2300 
2301 	if (VCHIQ_VERSION < slot_zero->version_min) {
2302 		vchiq_loud_error_header();
2303 		vchiq_loud_error("Incompatible VCHIQ versions found.");
2304 		vchiq_loud_error("slot_zero=%x: version=%d (VideoCore "
2305 			"minimum %d)",
2306 			(unsigned int)slot_zero, VCHIQ_VERSION,
2307 			slot_zero->version_min);
2308 		vchiq_loud_error("Restart with a newer kernel.");
2309 		vchiq_loud_error_footer();
2310 		return VCHIQ_ERROR;
2311 	}
2312 
2313 	if ((slot_zero->slot_zero_size != sizeof(VCHIQ_SLOT_ZERO_T)) ||
2314 		 (slot_zero->slot_size != VCHIQ_SLOT_SIZE) ||
2315 		 (slot_zero->max_slots != VCHIQ_MAX_SLOTS) ||
2316 		 (slot_zero->max_slots_per_side != VCHIQ_MAX_SLOTS_PER_SIDE)) {
2317 		vchiq_loud_error_header();
2318 		if (slot_zero->slot_zero_size != sizeof(VCHIQ_SLOT_ZERO_T))
2319 			vchiq_loud_error("slot_zero=%x: slot_zero_size=%x "
2320 				"(expected %zx)",
2321 				(unsigned int)slot_zero,
2322 				slot_zero->slot_zero_size,
2323 				sizeof(VCHIQ_SLOT_ZERO_T));
2324 		if (slot_zero->slot_size != VCHIQ_SLOT_SIZE)
2325 			vchiq_loud_error("slot_zero=%x: slot_size=%d "
2326 				"(expected %d",
2327 				(unsigned int)slot_zero, slot_zero->slot_size,
2328 				VCHIQ_SLOT_SIZE);
2329 		if (slot_zero->max_slots != VCHIQ_MAX_SLOTS)
2330 			vchiq_loud_error("slot_zero=%x: max_slots=%d "
2331 				"(expected %d)",
2332 				(unsigned int)slot_zero, slot_zero->max_slots,
2333 				VCHIQ_MAX_SLOTS);
2334 		if (slot_zero->max_slots_per_side != VCHIQ_MAX_SLOTS_PER_SIDE)
2335 			vchiq_loud_error("slot_zero=%x: max_slots_per_side=%d "
2336 				"(expected %d)",
2337 				(unsigned int)slot_zero,
2338 				slot_zero->max_slots_per_side,
2339 				VCHIQ_MAX_SLOTS_PER_SIDE);
2340 		vchiq_loud_error_footer();
2341 		return VCHIQ_ERROR;
2342 	}
2343 
2344 	if (is_master) {
2345 		local = &slot_zero->master;
2346 		remote = &slot_zero->slave;
2347 	} else {
2348 		local = &slot_zero->slave;
2349 		remote = &slot_zero->master;
2350 	}
2351 
2352 	if (local->initialised) {
2353 		vchiq_loud_error_header();
2354 		if (remote->initialised)
2355 			vchiq_loud_error("local state has already been "
2356 				"initialised");
2357 		else
2358 			vchiq_loud_error("master/slave mismatch - two %ss",
2359 				is_master ? "master" : "slave");
2360 		vchiq_loud_error_footer();
2361 		return VCHIQ_ERROR;
2362 	}
2363 
2364 	memset(state, 0, sizeof(VCHIQ_STATE_T));
2365 
2366 	state->id = id++;
2367 	state->is_master = is_master;
2368 
2369 	/*
2370 		initialize shared state pointers
2371 	 */
2372 
2373 	state->local = local;
2374 	state->remote = remote;
2375 	state->slot_data = (VCHIQ_SLOT_T *)slot_zero;
2376 
2377 	/*
2378 		initialize events and mutexes
2379 	 */
2380 
2381 	_sema_init(&state->connect, 0);
2382 	lmutex_init(&state->mutex);
2383 	_sema_init(&state->trigger_event, 0);
2384 	_sema_init(&state->recycle_event, 0);
2385 	_sema_init(&state->sync_trigger_event, 0);
2386 	_sema_init(&state->sync_release_event, 0);
2387 
2388 	lmutex_init(&state->slot_mutex);
2389 	lmutex_init(&state->recycle_mutex);
2390 	lmutex_init(&state->sync_mutex);
2391 	lmutex_init(&state->bulk_transfer_mutex);
2392 
2393 	_sema_init(&state->slot_available_event, 0);
2394 	_sema_init(&state->slot_remove_event, 0);
2395 	_sema_init(&state->data_quota_event, 0);
2396 
2397 	state->slot_queue_available = 0;
2398 
2399 	for (i = 0; i < VCHIQ_MAX_SERVICES; i++) {
2400 		VCHIQ_SERVICE_QUOTA_T *service_quota =
2401 			&state->service_quotas[i];
2402 		_sema_init(&service_quota->quota_event, 0);
2403 	}
2404 
2405 	for (i = local->slot_first; i <= local->slot_last; i++) {
2406 		local->slot_queue[state->slot_queue_available++] = i;
2407 		up(&state->slot_available_event);
2408 	}
2409 
2410 	state->default_slot_quota = state->slot_queue_available/2;
2411 	state->default_message_quota =
2412 		min((unsigned short)(state->default_slot_quota * 256),
2413 		(unsigned short)~0);
2414 
2415 	state->previous_data_index = -1;
2416 	state->data_use_count = 0;
2417 	state->data_quota = state->slot_queue_available - 1;
2418 
2419 	local->trigger.event = &state->trigger_event;
2420 	remote_event_create(&local->trigger);
2421 	local->tx_pos = 0;
2422 
2423 	local->recycle.event = &state->recycle_event;
2424 	remote_event_create(&local->recycle);
2425 	local->slot_queue_recycle = state->slot_queue_available;
2426 
2427 	local->sync_trigger.event = &state->sync_trigger_event;
2428 	remote_event_create(&local->sync_trigger);
2429 
2430 	local->sync_release.event = &state->sync_release_event;
2431 	remote_event_create(&local->sync_release);
2432 
2433 	/* At start-of-day, the slot is empty and available */
2434 	((VCHIQ_HEADER_T *)SLOT_DATA_FROM_INDEX(state, local->slot_sync))->msgid
2435 		= VCHIQ_MSGID_PADDING;
2436 	remote_event_signal_local(&local->sync_release);
2437 
2438 	local->debug[DEBUG_ENTRIES] = DEBUG_MAX;
2439 
2440 	status = vchiq_platform_init_state(state);
2441 
2442 	/*
2443 		bring up slot handler thread
2444 	 */
2445 	snprintf(threadname, sizeof(threadname), "VCHIQ-%d", state->id);
2446 	state->slot_handler_thread = vchiq_thread_create(&slot_handler_func,
2447 		(void *)state,
2448 		threadname);
2449 
2450 	if (state->slot_handler_thread == NULL) {
2451 		vchiq_loud_error_header();
2452 		vchiq_loud_error("couldn't create thread %s", threadname);
2453 		vchiq_loud_error_footer();
2454 		return VCHIQ_ERROR;
2455 	}
2456 	set_user_nice(state->slot_handler_thread, -19);
2457 	wake_up_process(state->slot_handler_thread);
2458 
2459 	snprintf(threadname, sizeof(threadname), "VCHIQr-%d", state->id);
2460 	state->recycle_thread = vchiq_thread_create(&recycle_func,
2461 		(void *)state,
2462 		threadname);
2463 	if (state->recycle_thread == NULL) {
2464 		vchiq_loud_error_header();
2465 		vchiq_loud_error("couldn't create thread %s", threadname);
2466 		vchiq_loud_error_footer();
2467 		return VCHIQ_ERROR;
2468 	}
2469 	set_user_nice(state->recycle_thread, -19);
2470 	wake_up_process(state->recycle_thread);
2471 
2472 	snprintf(threadname, sizeof(threadname), "VCHIQs-%d", state->id);
2473 	state->sync_thread = vchiq_thread_create(&sync_func,
2474 		(void *)state,
2475 		threadname);
2476 	if (state->sync_thread == NULL) {
2477 		vchiq_loud_error_header();
2478 		vchiq_loud_error("couldn't create thread %s", threadname);
2479 		vchiq_loud_error_footer();
2480 		return VCHIQ_ERROR;
2481 	}
2482 	set_user_nice(state->sync_thread, -20);
2483 	wake_up_process(state->sync_thread);
2484 
2485 	BUG_ON(state->id >= VCHIQ_MAX_STATES);
2486 	vchiq_states[state->id] = state;
2487 
2488 	/* Indicate readiness to the other side */
2489 	local->initialised = 1;
2490 
2491 	return status;
2492 }
2493 
2494 /* Called from application thread when a client or server service is created. */
2495 VCHIQ_SERVICE_T *
2496 vchiq_add_service_internal(VCHIQ_STATE_T *state,
2497 	const VCHIQ_SERVICE_PARAMS_T *params, int srvstate,
2498 	VCHIQ_INSTANCE_T instance, VCHIQ_USERDATA_TERM_T userdata_term)
2499 {
2500 	VCHIQ_SERVICE_T *service;
2501 
2502 	service = kmalloc(sizeof(VCHIQ_SERVICE_T), GFP_KERNEL);
2503 	if (service) {
2504 		service->base.fourcc   = params->fourcc;
2505 		service->base.callback = params->callback;
2506 		service->base.userdata = params->userdata;
2507 		service->handle        = VCHIQ_SERVICE_HANDLE_INVALID;
2508 		service->ref_count     = 1;
2509 		service->srvstate      = VCHIQ_SRVSTATE_FREE;
2510 		service->userdata_term = userdata_term;
2511 		service->localport     = VCHIQ_PORT_FREE;
2512 		service->remoteport    = VCHIQ_PORT_FREE;
2513 
2514 		service->public_fourcc = (srvstate == VCHIQ_SRVSTATE_OPENING) ?
2515 			VCHIQ_FOURCC_INVALID : params->fourcc;
2516 		service->client_id     = 0;
2517 		service->auto_close    = 1;
2518 		service->sync          = 0;
2519 		service->closing       = 0;
2520 		atomic_set(&service->poll_flags, 0);
2521 		service->version       = params->version;
2522 		service->version_min   = params->version_min;
2523 		service->state         = state;
2524 		service->instance      = instance;
2525 		service->service_use_count = 0;
2526 		init_bulk_queue(&service->bulk_tx);
2527 		init_bulk_queue(&service->bulk_rx);
2528 		_sema_init(&service->remove_event, 0);
2529 		_sema_init(&service->bulk_remove_event, 0);
2530 		lmutex_init(&service->bulk_mutex);
2531 		memset(&service->stats, 0, sizeof(service->stats));
2532 	} else {
2533 		vchiq_log_error(vchiq_core_log_level,
2534 			"Out of memory");
2535 	}
2536 
2537 	if (service) {
2538 		VCHIQ_SERVICE_T **pservice = NULL;
2539 		int i;
2540 
2541 		/* Although it is perfectly possible to use service_spinlock
2542 		** to protect the creation of services, it is overkill as it
2543 		** disables interrupts while the array is searched.
2544 		** The only danger is of another thread trying to create a
2545 		** service - service deletion is safe.
2546 		** Therefore it is preferable to use state->mutex which,
2547 		** although slower to claim, doesn't block interrupts while
2548 		** it is held.
2549 		*/
2550 
2551 		lmutex_lock(&state->mutex);
2552 
2553 		/* Prepare to use a previously unused service */
2554 		if (state->unused_service < VCHIQ_MAX_SERVICES)
2555 			pservice = &state->services[state->unused_service];
2556 
2557 		if (srvstate == VCHIQ_SRVSTATE_OPENING) {
2558 			for (i = 0; i < state->unused_service; i++) {
2559 				VCHIQ_SERVICE_T *srv = state->services[i];
2560 				if (!srv) {
2561 					pservice = &state->services[i];
2562 					break;
2563 				}
2564 			}
2565 		} else {
2566 			for (i = (state->unused_service - 1); i >= 0; i--) {
2567 				VCHIQ_SERVICE_T *srv = state->services[i];
2568 				if (!srv)
2569 					pservice = &state->services[i];
2570 				else if ((srv->public_fourcc == params->fourcc)
2571 					&& ((srv->instance != instance) ||
2572 					(srv->base.callback !=
2573 					params->callback))) {
2574 					/* There is another server using this
2575 					** fourcc which doesn't match. */
2576 					pservice = NULL;
2577 					break;
2578 				}
2579 			}
2580 		}
2581 
2582 		if (pservice) {
2583 			service->localport = (pservice - state->services);
2584 			if (!handle_seq)
2585 				handle_seq = VCHIQ_MAX_STATES *
2586 					 VCHIQ_MAX_SERVICES;
2587 			service->handle = handle_seq |
2588 				(state->id * VCHIQ_MAX_SERVICES) |
2589 				service->localport;
2590 			handle_seq += VCHIQ_MAX_STATES * VCHIQ_MAX_SERVICES;
2591 			*pservice = service;
2592 			if (pservice == &state->services[state->unused_service])
2593 				state->unused_service++;
2594 		}
2595 
2596 		lmutex_unlock(&state->mutex);
2597 
2598 		if (!pservice) {
2599 			_sema_destroy(&service->remove_event);
2600 			_sema_destroy(&service->bulk_remove_event);
2601 			lmutex_destroy(&service->bulk_mutex);
2602 
2603 			kfree(service);
2604 			service = NULL;
2605 		}
2606 	}
2607 
2608 	if (service) {
2609 		VCHIQ_SERVICE_QUOTA_T *service_quota =
2610 			&state->service_quotas[service->localport];
2611 		service_quota->slot_quota = state->default_slot_quota;
2612 		service_quota->message_quota = state->default_message_quota;
2613 		if (service_quota->slot_use_count == 0)
2614 			service_quota->previous_tx_index =
2615 				SLOT_QUEUE_INDEX_FROM_POS(state->local_tx_pos)
2616 				- 1;
2617 
2618 		/* Bring this service online */
2619 		vchiq_set_service_state(service, srvstate);
2620 
2621 		vchiq_log_info(vchiq_core_msg_log_level,
2622 			"%s Service %c%c%c%c SrcPort:%d",
2623 			(srvstate == VCHIQ_SRVSTATE_OPENING)
2624 			? "Open" : "Add",
2625 			VCHIQ_FOURCC_AS_4CHARS(params->fourcc),
2626 			service->localport);
2627 	}
2628 
2629 	/* Don't unlock the service - leave it with a ref_count of 1. */
2630 
2631 	return service;
2632 }
2633 
2634 VCHIQ_STATUS_T
2635 vchiq_open_service_internal(VCHIQ_SERVICE_T *service, int client_id)
2636 {
2637 	struct vchiq_open_payload payload = {
2638 		service->base.fourcc,
2639 		client_id,
2640 		service->version,
2641 		service->version_min
2642 	};
2643 	VCHIQ_ELEMENT_T body = { &payload, sizeof(payload) };
2644 	VCHIQ_STATUS_T status = VCHIQ_SUCCESS;
2645 
2646 	service->client_id = client_id;
2647 	vchiq_use_service_internal(service);
2648 	status = queue_message(service->state, NULL,
2649 		VCHIQ_MAKE_MSG(VCHIQ_MSG_OPEN, service->localport, 0),
2650 		&body, 1, sizeof(payload), 1);
2651 	if (status == VCHIQ_SUCCESS) {
2652 		if (down_interruptible(&service->remove_event) != 0) {
2653 			status = VCHIQ_RETRY;
2654 			vchiq_release_service_internal(service);
2655 		} else if ((service->srvstate != VCHIQ_SRVSTATE_OPEN) &&
2656 			(service->srvstate != VCHIQ_SRVSTATE_OPENSYNC)) {
2657 			if (service->srvstate != VCHIQ_SRVSTATE_CLOSEWAIT)
2658 				vchiq_log_error(vchiq_core_log_level,
2659 					"%d: osi - srvstate = %s (ref %d)",
2660 					service->state->id,
2661 					srvstate_names[service->srvstate],
2662 					service->ref_count);
2663 			status = VCHIQ_ERROR;
2664 			VCHIQ_SERVICE_STATS_INC(service, error_count);
2665 			vchiq_release_service_internal(service);
2666 		}
2667 	}
2668 	return status;
2669 }
2670 
2671 static void
2672 release_service_messages(VCHIQ_SERVICE_T *service)
2673 {
2674 	VCHIQ_STATE_T *state = service->state;
2675 	int slot_last = state->remote->slot_last;
2676 	int i;
2677 
2678 	/* Release any claimed messages */
2679 	for (i = state->remote->slot_first; i <= slot_last; i++) {
2680 		VCHIQ_SLOT_INFO_T *slot_info =
2681 			SLOT_INFO_FROM_INDEX(state, i);
2682 		if (slot_info->release_count != slot_info->use_count) {
2683 			char *data =
2684 				(char *)SLOT_DATA_FROM_INDEX(state, i);
2685 			unsigned int pos, end;
2686 
2687 			end = VCHIQ_SLOT_SIZE;
2688 			if (data == state->rx_data)
2689 				/* This buffer is still being read from - stop
2690 				** at the current read position */
2691 				end = state->rx_pos & VCHIQ_SLOT_MASK;
2692 
2693 			pos = 0;
2694 
2695 			while (pos < end) {
2696 				VCHIQ_HEADER_T *header =
2697 					(VCHIQ_HEADER_T *)(data + pos);
2698 				int msgid = header->msgid;
2699 				int port = VCHIQ_MSG_DSTPORT(msgid);
2700 				if ((port == service->localport) &&
2701 					(msgid & VCHIQ_MSGID_CLAIMED)) {
2702 					vchiq_log_info(vchiq_core_log_level,
2703 						"  fsi - hdr %x",
2704 						(unsigned int)header);
2705 					release_slot(state, slot_info, header,
2706 						NULL);
2707 				}
2708 				pos += calc_stride(header->size);
2709 				if (pos > VCHIQ_SLOT_SIZE) {
2710 					vchiq_log_error(vchiq_core_log_level,
2711 						"fsi - pos %x: header %x, "
2712 						"msgid %x, header->msgid %x, "
2713 						"header->size %x",
2714 						pos, (unsigned int)header,
2715 						msgid, header->msgid,
2716 						header->size);
2717 					WARN(1, "invalid slot position\n");
2718 				}
2719 			}
2720 		}
2721 	}
2722 }
2723 
2724 static int
2725 do_abort_bulks(VCHIQ_SERVICE_T *service)
2726 {
2727 	VCHIQ_STATUS_T status;
2728 
2729 	/* Abort any outstanding bulk transfers */
2730 	if (lmutex_lock_interruptible(&service->bulk_mutex) != 0)
2731 		return 0;
2732 	abort_outstanding_bulks(service, &service->bulk_tx);
2733 	abort_outstanding_bulks(service, &service->bulk_rx);
2734 	lmutex_unlock(&service->bulk_mutex);
2735 
2736 	status = notify_bulks(service, &service->bulk_tx, 0/*!retry_poll*/);
2737 	if (status == VCHIQ_SUCCESS)
2738 		status = notify_bulks(service, &service->bulk_rx,
2739 			0/*!retry_poll*/);
2740 	return (status == VCHIQ_SUCCESS);
2741 }
2742 
2743 static VCHIQ_STATUS_T
2744 close_service_complete(VCHIQ_SERVICE_T *service, int failstate)
2745 {
2746 	VCHIQ_STATUS_T status;
2747 	int is_server = (service->public_fourcc != VCHIQ_FOURCC_INVALID);
2748 	int newstate;
2749 
2750 	switch (service->srvstate) {
2751 	case VCHIQ_SRVSTATE_OPEN:
2752 	case VCHIQ_SRVSTATE_CLOSESENT:
2753 	case VCHIQ_SRVSTATE_CLOSERECVD:
2754 		if (is_server) {
2755 			if (service->auto_close) {
2756 				service->client_id = 0;
2757 				service->remoteport = VCHIQ_PORT_FREE;
2758 				newstate = VCHIQ_SRVSTATE_LISTENING;
2759 			} else
2760 				newstate = VCHIQ_SRVSTATE_CLOSEWAIT;
2761 		} else
2762 			newstate = VCHIQ_SRVSTATE_CLOSED;
2763 		vchiq_set_service_state(service, newstate);
2764 		break;
2765 	case VCHIQ_SRVSTATE_LISTENING:
2766 		break;
2767 	default:
2768 		vchiq_log_error(vchiq_core_log_level,
2769 			"close_service_complete(%x) called in state %s",
2770 			service->handle, srvstate_names[service->srvstate]);
2771 		WARN(1, "close_service_complete in unexpected state\n");
2772 		return VCHIQ_ERROR;
2773 	}
2774 
2775 	status = make_service_callback(service,
2776 		VCHIQ_SERVICE_CLOSED, NULL, NULL);
2777 
2778 	if (status != VCHIQ_RETRY) {
2779 		int uc = service->service_use_count;
2780 		int i;
2781 		/* Complete the close process */
2782 		for (i = 0; i < uc; i++)
2783 			/* cater for cases where close is forced and the
2784 			** client may not close all it's handles */
2785 			vchiq_release_service_internal(service);
2786 
2787 		service->client_id = 0;
2788 		service->remoteport = VCHIQ_PORT_FREE;
2789 
2790 		if (service->srvstate == VCHIQ_SRVSTATE_CLOSED)
2791 			vchiq_free_service_internal(service);
2792 		else if (service->srvstate != VCHIQ_SRVSTATE_CLOSEWAIT) {
2793 			if (is_server)
2794 				service->closing = 0;
2795 
2796 			up(&service->remove_event);
2797 		}
2798 	} else
2799 		vchiq_set_service_state(service, failstate);
2800 
2801 	return status;
2802 }
2803 
2804 /* Called by the slot handler */
2805 VCHIQ_STATUS_T
2806 vchiq_close_service_internal(VCHIQ_SERVICE_T *service, int close_recvd)
2807 {
2808 	VCHIQ_STATE_T *state = service->state;
2809 	VCHIQ_STATUS_T status = VCHIQ_SUCCESS;
2810 	int is_server = (service->public_fourcc != VCHIQ_FOURCC_INVALID);
2811 
2812 	vchiq_log_info(vchiq_core_log_level, "%d: csi:%d,%d (%s)",
2813 		service->state->id, service->localport, close_recvd,
2814 		srvstate_names[service->srvstate]);
2815 
2816 	switch (service->srvstate) {
2817 	case VCHIQ_SRVSTATE_CLOSED:
2818 	case VCHIQ_SRVSTATE_HIDDEN:
2819 	case VCHIQ_SRVSTATE_LISTENING:
2820 	case VCHIQ_SRVSTATE_CLOSEWAIT:
2821 		if (close_recvd)
2822 			vchiq_log_error(vchiq_core_log_level,
2823 				"vchiq_close_service_internal(1) called "
2824 				"in state %s",
2825 				srvstate_names[service->srvstate]);
2826 		else if (is_server) {
2827 			if (service->srvstate == VCHIQ_SRVSTATE_LISTENING) {
2828 				status = VCHIQ_ERROR;
2829 			} else {
2830 				service->client_id = 0;
2831 				service->remoteport = VCHIQ_PORT_FREE;
2832 				if (service->srvstate ==
2833 					VCHIQ_SRVSTATE_CLOSEWAIT)
2834 					vchiq_set_service_state(service,
2835 						VCHIQ_SRVSTATE_LISTENING);
2836 			}
2837 			up(&service->remove_event);
2838 		} else
2839 			vchiq_free_service_internal(service);
2840 		break;
2841 	case VCHIQ_SRVSTATE_OPENING:
2842 		if (close_recvd) {
2843 			/* The open was rejected - tell the user */
2844 			vchiq_set_service_state(service,
2845 				VCHIQ_SRVSTATE_CLOSEWAIT);
2846 			up(&service->remove_event);
2847 		} else {
2848 			/* Shutdown mid-open - let the other side know */
2849 			status = queue_message(state, service,
2850 				VCHIQ_MAKE_MSG
2851 				(VCHIQ_MSG_CLOSE,
2852 				service->localport,
2853 				VCHIQ_MSG_DSTPORT(service->remoteport)),
2854 				NULL, 0, 0, 0);
2855 		}
2856 		break;
2857 
2858 	case VCHIQ_SRVSTATE_OPENSYNC:
2859 		lmutex_lock(&state->sync_mutex);
2860 		/* Drop through */
2861 
2862 	case VCHIQ_SRVSTATE_OPEN:
2863 		if (state->is_master || close_recvd) {
2864 			if (!do_abort_bulks(service))
2865 				status = VCHIQ_RETRY;
2866 		}
2867 
2868 		release_service_messages(service);
2869 
2870 		if (status == VCHIQ_SUCCESS)
2871 			status = queue_message(state, service,
2872 				VCHIQ_MAKE_MSG
2873 				(VCHIQ_MSG_CLOSE,
2874 				service->localport,
2875 				VCHIQ_MSG_DSTPORT(service->remoteport)),
2876 				NULL, 0, 0, 0);
2877 
2878 		if (status == VCHIQ_SUCCESS) {
2879 			if (!close_recvd)
2880 				break;
2881 		} else if (service->srvstate == VCHIQ_SRVSTATE_OPENSYNC) {
2882 			lmutex_unlock(&state->sync_mutex);
2883 			break;
2884 		} else
2885 			break;
2886 
2887 		status = close_service_complete(service,
2888 				VCHIQ_SRVSTATE_CLOSERECVD);
2889 		break;
2890 
2891 	case VCHIQ_SRVSTATE_CLOSESENT:
2892 		if (!close_recvd)
2893 			/* This happens when a process is killed mid-close */
2894 			break;
2895 
2896 		if (!state->is_master) {
2897 			if (!do_abort_bulks(service)) {
2898 				status = VCHIQ_RETRY;
2899 				break;
2900 			}
2901 		}
2902 
2903 		if (status == VCHIQ_SUCCESS)
2904 			status = close_service_complete(service,
2905 				VCHIQ_SRVSTATE_CLOSERECVD);
2906 		break;
2907 
2908 	case VCHIQ_SRVSTATE_CLOSERECVD:
2909 		if (!close_recvd && is_server)
2910 			/* Force into LISTENING mode */
2911 			vchiq_set_service_state(service,
2912 				VCHIQ_SRVSTATE_LISTENING);
2913 		status = close_service_complete(service,
2914 			VCHIQ_SRVSTATE_CLOSERECVD);
2915 		break;
2916 
2917 	default:
2918 		vchiq_log_error(vchiq_core_log_level,
2919 			"vchiq_close_service_internal(%d) called in state %s",
2920 			close_recvd, srvstate_names[service->srvstate]);
2921 		break;
2922 	}
2923 
2924 	return status;
2925 }
2926 
2927 /* Called from the application process upon process death */
2928 void
2929 vchiq_terminate_service_internal(VCHIQ_SERVICE_T *service)
2930 {
2931 	VCHIQ_STATE_T *state = service->state;
2932 
2933 	vchiq_log_info(vchiq_core_log_level, "%d: tsi - (%d<->%d)",
2934 		state->id, service->localport, service->remoteport);
2935 
2936 	mark_service_closing(service);
2937 
2938 	/* Mark the service for removal by the slot handler */
2939 	request_poll(state, service, VCHIQ_POLL_REMOVE);
2940 }
2941 
2942 /* Called from the slot handler */
2943 void
2944 vchiq_free_service_internal(VCHIQ_SERVICE_T *service)
2945 {
2946 	VCHIQ_STATE_T *state = service->state;
2947 
2948 	vchiq_log_info(vchiq_core_log_level, "%d: fsi - (%d)",
2949 		state->id, service->localport);
2950 
2951 	switch (service->srvstate) {
2952 	case VCHIQ_SRVSTATE_OPENING:
2953 	case VCHIQ_SRVSTATE_CLOSED:
2954 	case VCHIQ_SRVSTATE_HIDDEN:
2955 	case VCHIQ_SRVSTATE_LISTENING:
2956 	case VCHIQ_SRVSTATE_CLOSEWAIT:
2957 		break;
2958 	default:
2959 		vchiq_log_error(vchiq_core_log_level,
2960 			"%d: fsi - (%d) in state %s",
2961 			state->id, service->localport,
2962 			srvstate_names[service->srvstate]);
2963 		return;
2964 	}
2965 
2966 	vchiq_set_service_state(service, VCHIQ_SRVSTATE_FREE);
2967 
2968 	up(&service->remove_event);
2969 
2970 	/* Release the initial lock */
2971 	unlock_service(service);
2972 }
2973 
2974 VCHIQ_STATUS_T
2975 vchiq_connect_internal(VCHIQ_STATE_T *state, VCHIQ_INSTANCE_T instance)
2976 {
2977 	VCHIQ_SERVICE_T *service;
2978 	int i;
2979 
2980 	/* Find all services registered to this client and enable them. */
2981 	i = 0;
2982 	while ((service = next_service_by_instance(state, instance,
2983 		&i)) !=	NULL) {
2984 		if (service->srvstate == VCHIQ_SRVSTATE_HIDDEN)
2985 			vchiq_set_service_state(service,
2986 				VCHIQ_SRVSTATE_LISTENING);
2987 		unlock_service(service);
2988 	}
2989 
2990 	if (state->conn_state == VCHIQ_CONNSTATE_DISCONNECTED) {
2991 		if (queue_message(state, NULL,
2992 			VCHIQ_MAKE_MSG(VCHIQ_MSG_CONNECT, 0, 0), NULL, 0,
2993 			0, 1) == VCHIQ_RETRY)
2994 			return VCHIQ_RETRY;
2995 
2996 		vchiq_set_conn_state(state, VCHIQ_CONNSTATE_CONNECTING);
2997 	}
2998 
2999 	if (state->conn_state == VCHIQ_CONNSTATE_CONNECTING) {
3000 		if (down_interruptible(&state->connect) != 0)
3001 			return VCHIQ_RETRY;
3002 
3003 		vchiq_set_conn_state(state, VCHIQ_CONNSTATE_CONNECTED);
3004 		up(&state->connect);
3005 	}
3006 
3007 	return VCHIQ_SUCCESS;
3008 }
3009 
3010 VCHIQ_STATUS_T
3011 vchiq_shutdown_internal(VCHIQ_STATE_T *state, VCHIQ_INSTANCE_T instance)
3012 {
3013 	VCHIQ_SERVICE_T *service;
3014 	int i;
3015 
3016 	/* Find all services registered to this client and enable them. */
3017 	i = 0;
3018 	while ((service = next_service_by_instance(state, instance,
3019 		&i)) !=	NULL) {
3020 		(void)vchiq_remove_service(service->handle);
3021 		unlock_service(service);
3022 	}
3023 
3024 	return VCHIQ_SUCCESS;
3025 }
3026 
3027 VCHIQ_STATUS_T
3028 vchiq_pause_internal(VCHIQ_STATE_T *state)
3029 {
3030 	VCHIQ_STATUS_T status = VCHIQ_SUCCESS;
3031 
3032 	switch (state->conn_state) {
3033 	case VCHIQ_CONNSTATE_CONNECTED:
3034 		/* Request a pause */
3035 		vchiq_set_conn_state(state, VCHIQ_CONNSTATE_PAUSING);
3036 		request_poll(state, NULL, 0);
3037 		break;
3038 	default:
3039 		vchiq_log_error(vchiq_core_log_level,
3040 			"vchiq_pause_internal in state %s\n",
3041 			conn_state_names[state->conn_state]);
3042 		status = VCHIQ_ERROR;
3043 		VCHIQ_STATS_INC(state, error_count);
3044 		break;
3045 	}
3046 
3047 	return status;
3048 }
3049 
3050 VCHIQ_STATUS_T
3051 vchiq_resume_internal(VCHIQ_STATE_T *state)
3052 {
3053 	VCHIQ_STATUS_T status = VCHIQ_SUCCESS;
3054 
3055 	if (state->conn_state == VCHIQ_CONNSTATE_PAUSED) {
3056 		vchiq_set_conn_state(state, VCHIQ_CONNSTATE_RESUMING);
3057 		request_poll(state, NULL, 0);
3058 	} else {
3059 		status = VCHIQ_ERROR;
3060 		VCHIQ_STATS_INC(state, error_count);
3061 	}
3062 
3063 	return status;
3064 }
3065 
3066 VCHIQ_STATUS_T
3067 vchiq_close_service(VCHIQ_SERVICE_HANDLE_T handle)
3068 {
3069 	/* Unregister the service */
3070 	VCHIQ_SERVICE_T *service = find_service_by_handle(handle);
3071 	VCHIQ_STATUS_T status = VCHIQ_SUCCESS;
3072 
3073 	if (!service)
3074 		return VCHIQ_ERROR;
3075 
3076 	vchiq_log_info(vchiq_core_log_level,
3077 		"%d: close_service:%d",
3078 		service->state->id, service->localport);
3079 
3080 	if ((service->srvstate == VCHIQ_SRVSTATE_FREE) ||
3081 		(service->srvstate == VCHIQ_SRVSTATE_LISTENING) ||
3082 		(service->srvstate == VCHIQ_SRVSTATE_HIDDEN)) {
3083 		unlock_service(service);
3084 		return VCHIQ_ERROR;
3085 	}
3086 
3087 	mark_service_closing(service);
3088 
3089 	if (current == service->state->slot_handler_thread) {
3090 		status = vchiq_close_service_internal(service,
3091 			0/*!close_recvd*/);
3092 		BUG_ON(status == VCHIQ_RETRY);
3093 	} else {
3094 	/* Mark the service for termination by the slot handler */
3095 		request_poll(service->state, service, VCHIQ_POLL_TERMINATE);
3096 	}
3097 
3098 	while (1) {
3099 		if (down_interruptible(&service->remove_event) != 0) {
3100 			status = VCHIQ_RETRY;
3101 			break;
3102 		}
3103 
3104 		if ((service->srvstate == VCHIQ_SRVSTATE_FREE) ||
3105 			(service->srvstate == VCHIQ_SRVSTATE_LISTENING) ||
3106 			(service->srvstate == VCHIQ_SRVSTATE_OPEN))
3107 			break;
3108 
3109 		vchiq_log_warning(vchiq_core_log_level,
3110 			"%d: close_service:%d - waiting in state %s",
3111 			service->state->id, service->localport,
3112 			srvstate_names[service->srvstate]);
3113 	}
3114 
3115 	if ((status == VCHIQ_SUCCESS) &&
3116 		(service->srvstate != VCHIQ_SRVSTATE_FREE) &&
3117 		(service->srvstate != VCHIQ_SRVSTATE_LISTENING))
3118 		status = VCHIQ_ERROR;
3119 
3120 	unlock_service(service);
3121 
3122 	return status;
3123 }
3124 
3125 VCHIQ_STATUS_T
3126 vchiq_remove_service(VCHIQ_SERVICE_HANDLE_T handle)
3127 {
3128 	/* Unregister the service */
3129 	VCHIQ_SERVICE_T *service = find_service_by_handle(handle);
3130 	VCHIQ_STATUS_T status = VCHIQ_SUCCESS;
3131 
3132 	if (!service)
3133 		return VCHIQ_ERROR;
3134 
3135 	vchiq_log_info(vchiq_core_log_level,
3136 		"%d: remove_service:%d",
3137 		service->state->id, service->localport);
3138 
3139 	if (service->srvstate == VCHIQ_SRVSTATE_FREE) {
3140 		unlock_service(service);
3141 		return VCHIQ_ERROR;
3142 	}
3143 
3144 	mark_service_closing(service);
3145 
3146 	if ((service->srvstate == VCHIQ_SRVSTATE_HIDDEN) ||
3147 		(current == service->state->slot_handler_thread)) {
3148 		/* Make it look like a client, because it must be removed and
3149 		   not left in the LISTENING state. */
3150 		service->public_fourcc = VCHIQ_FOURCC_INVALID;
3151 
3152 		status = vchiq_close_service_internal(service,
3153 			0/*!close_recvd*/);
3154 		BUG_ON(status == VCHIQ_RETRY);
3155 	} else {
3156 		/* Mark the service for removal by the slot handler */
3157 		request_poll(service->state, service, VCHIQ_POLL_REMOVE);
3158 	}
3159 	while (1) {
3160 		if (down_interruptible(&service->remove_event) != 0) {
3161 			status = VCHIQ_RETRY;
3162 			break;
3163 		}
3164 
3165 		if ((service->srvstate == VCHIQ_SRVSTATE_FREE) ||
3166 			(service->srvstate == VCHIQ_SRVSTATE_OPEN))
3167 			break;
3168 
3169 		vchiq_log_warning(vchiq_core_log_level,
3170 			"%d: remove_service:%d - waiting in state %s",
3171 			service->state->id, service->localport,
3172 			srvstate_names[service->srvstate]);
3173 	}
3174 
3175 	if ((status == VCHIQ_SUCCESS) &&
3176 		(service->srvstate != VCHIQ_SRVSTATE_FREE))
3177 		status = VCHIQ_ERROR;
3178 
3179 	unlock_service(service);
3180 
3181 	return status;
3182 }
3183 
3184 
3185 /* This function may be called by kernel threads or user threads.
3186  * User threads may receive VCHIQ_RETRY to indicate that a signal has been
3187  * received and the call should be retried after being returned to user
3188  * context.
3189  * When called in blocking mode, the userdata field points to a bulk_waiter
3190  * structure.
3191  */
3192 VCHIQ_STATUS_T
3193 vchiq_bulk_transfer(VCHIQ_SERVICE_HANDLE_T handle,
3194 	VCHI_MEM_HANDLE_T memhandle, void *offset, int size, void *userdata,
3195 	VCHIQ_BULK_MODE_T mode, VCHIQ_BULK_DIR_T dir)
3196 {
3197 	VCHIQ_SERVICE_T *service = find_service_by_handle(handle);
3198 	VCHIQ_BULK_QUEUE_T *queue;
3199 	VCHIQ_BULK_T *bulk;
3200 	VCHIQ_STATE_T *state;
3201 	struct bulk_waiter *bulk_waiter = NULL;
3202 	const char dir_char = (dir == VCHIQ_BULK_TRANSMIT) ? 't' : 'r';
3203 	const int dir_msgtype = (dir == VCHIQ_BULK_TRANSMIT) ?
3204 		VCHIQ_MSG_BULK_TX : VCHIQ_MSG_BULK_RX;
3205 	VCHIQ_STATUS_T status = VCHIQ_ERROR;
3206 
3207 	if (!service ||
3208 		 (service->srvstate != VCHIQ_SRVSTATE_OPEN) ||
3209 		 ((memhandle == VCHI_MEM_HANDLE_INVALID) && (offset == NULL)) ||
3210 		 (vchiq_check_service(service) != VCHIQ_SUCCESS))
3211 		goto error_exit;
3212 
3213 	switch (mode) {
3214 	case VCHIQ_BULK_MODE_NOCALLBACK:
3215 	case VCHIQ_BULK_MODE_CALLBACK:
3216 		break;
3217 	case VCHIQ_BULK_MODE_BLOCKING:
3218 		bulk_waiter = (struct bulk_waiter *)userdata;
3219 		_sema_init(&bulk_waiter->event, 0);
3220 		bulk_waiter->actual = 0;
3221 		bulk_waiter->bulk = NULL;
3222 		break;
3223 	case VCHIQ_BULK_MODE_WAITING:
3224 		bulk_waiter = (struct bulk_waiter *)userdata;
3225 		bulk = bulk_waiter->bulk;
3226 		goto waiting;
3227 	default:
3228 		goto error_exit;
3229 	}
3230 
3231 	state = service->state;
3232 
3233 	queue = (dir == VCHIQ_BULK_TRANSMIT) ?
3234 		&service->bulk_tx : &service->bulk_rx;
3235 
3236 	if (lmutex_lock_interruptible(&service->bulk_mutex) != 0) {
3237 		status = VCHIQ_RETRY;
3238 		goto error_exit;
3239 	}
3240 
3241 	if (queue->local_insert == queue->remove + VCHIQ_NUM_SERVICE_BULKS) {
3242 		VCHIQ_SERVICE_STATS_INC(service, bulk_stalls);
3243 		do {
3244 			lmutex_unlock(&service->bulk_mutex);
3245 			if (down_interruptible(&service->bulk_remove_event)
3246 				!= 0) {
3247 				status = VCHIQ_RETRY;
3248 				goto error_exit;
3249 			}
3250 			if (lmutex_lock_interruptible(&service->bulk_mutex)
3251 				!= 0) {
3252 				status = VCHIQ_RETRY;
3253 				goto error_exit;
3254 			}
3255 		} while (queue->local_insert == queue->remove +
3256 				VCHIQ_NUM_SERVICE_BULKS);
3257 	}
3258 
3259 	bulk = &queue->bulks[BULK_INDEX(queue->local_insert)];
3260 
3261 	bulk->mode = mode;
3262 	bulk->dir = dir;
3263 	bulk->userdata = userdata;
3264 	bulk->size = size;
3265 	bulk->actual = VCHIQ_BULK_ACTUAL_ABORTED;
3266 
3267 	if (vchiq_prepare_bulk_data(bulk, memhandle, offset, size, dir) !=
3268 		VCHIQ_SUCCESS)
3269 		goto unlock_error_exit;
3270 
3271 	wmb();
3272 
3273 	vchiq_log_info(vchiq_core_log_level,
3274 		"%d: bt (%d->%d) %cx %x@%x %x",
3275 		state->id,
3276 		service->localport, service->remoteport, dir_char,
3277 		size, (unsigned int)bulk->data, (unsigned int)userdata);
3278 
3279 	if (state->is_master) {
3280 		queue->local_insert++;
3281 		if (resolve_bulks(service, queue))
3282 			request_poll(state, service,
3283 				(dir == VCHIQ_BULK_TRANSMIT) ?
3284 				VCHIQ_POLL_TXNOTIFY : VCHIQ_POLL_RXNOTIFY);
3285 	} else {
3286 		int payload[2] = { (int)bulk->data, bulk->size };
3287 		VCHIQ_ELEMENT_T element = { payload, sizeof(payload) };
3288 
3289 		status = queue_message(state, NULL,
3290 			VCHIQ_MAKE_MSG(dir_msgtype,
3291 				service->localport, service->remoteport),
3292 			&element, 1, sizeof(payload), 1);
3293 		if (status != VCHIQ_SUCCESS) {
3294 			vchiq_complete_bulk(bulk);
3295 			goto unlock_error_exit;
3296 		}
3297 		queue->local_insert++;
3298 	}
3299 
3300 	lmutex_unlock(&service->bulk_mutex);
3301 
3302 	vchiq_log_trace(vchiq_core_log_level,
3303 		"%d: bt:%d %cx li=%x ri=%x p=%x",
3304 		state->id,
3305 		service->localport, dir_char,
3306 		queue->local_insert, queue->remote_insert, queue->process);
3307 
3308 waiting:
3309 	unlock_service(service);
3310 
3311 	status = VCHIQ_SUCCESS;
3312 
3313 	if (bulk_waiter) {
3314 		bulk_waiter->bulk = bulk;
3315 		if (down_interruptible(&bulk_waiter->event) != 0)
3316 			status = VCHIQ_RETRY;
3317 		else if (bulk_waiter->actual == VCHIQ_BULK_ACTUAL_ABORTED)
3318 			status = VCHIQ_ERROR;
3319 	}
3320 
3321 	return status;
3322 
3323 unlock_error_exit:
3324 	lmutex_unlock(&service->bulk_mutex);
3325 
3326 error_exit:
3327 	if (service)
3328 		unlock_service(service);
3329 	return status;
3330 }
3331 
3332 VCHIQ_STATUS_T
3333 vchiq_queue_message(VCHIQ_SERVICE_HANDLE_T handle,
3334 	const VCHIQ_ELEMENT_T *elements, unsigned int count)
3335 {
3336 	VCHIQ_SERVICE_T *service = find_service_by_handle(handle);
3337 	VCHIQ_STATUS_T status = VCHIQ_ERROR;
3338 
3339 	unsigned int size = 0;
3340 	unsigned int i;
3341 
3342 	if (!service ||
3343 		(vchiq_check_service(service) != VCHIQ_SUCCESS))
3344 		goto error_exit;
3345 
3346 	for (i = 0; i < (unsigned int)count; i++) {
3347 		if (elements[i].size) {
3348 			if (elements[i].data == NULL) {
3349 				VCHIQ_SERVICE_STATS_INC(service, error_count);
3350 				goto error_exit;
3351 			}
3352 			size += elements[i].size;
3353 		}
3354 	}
3355 
3356 	if (size > VCHIQ_MAX_MSG_SIZE) {
3357 		VCHIQ_SERVICE_STATS_INC(service, error_count);
3358 		goto error_exit;
3359 	}
3360 
3361 	switch (service->srvstate) {
3362 	case VCHIQ_SRVSTATE_OPEN:
3363 		status = queue_message(service->state, service,
3364 				VCHIQ_MAKE_MSG(VCHIQ_MSG_DATA,
3365 					service->localport,
3366 					service->remoteport),
3367 				elements, count, size, 1);
3368 		break;
3369 	case VCHIQ_SRVSTATE_OPENSYNC:
3370 		status = queue_message_sync(service->state, service,
3371 				VCHIQ_MAKE_MSG(VCHIQ_MSG_DATA,
3372 					service->localport,
3373 					service->remoteport),
3374 				elements, count, size, 1);
3375 		break;
3376 	default:
3377 		status = VCHIQ_ERROR;
3378 		break;
3379 	}
3380 
3381 error_exit:
3382 	if (service)
3383 		unlock_service(service);
3384 
3385 	return status;
3386 }
3387 
3388 void
3389 vchiq_release_message(VCHIQ_SERVICE_HANDLE_T handle, VCHIQ_HEADER_T *header)
3390 {
3391 	VCHIQ_SERVICE_T *service = find_service_by_handle(handle);
3392 	VCHIQ_SHARED_STATE_T *remote;
3393 	VCHIQ_STATE_T *state;
3394 	int slot_index;
3395 
3396 	if (!service)
3397 		return;
3398 
3399 	state = service->state;
3400 	remote = state->remote;
3401 
3402 	slot_index = SLOT_INDEX_FROM_DATA(state, (void *)header);
3403 
3404 	if ((slot_index >= remote->slot_first) &&
3405 		(slot_index <= remote->slot_last)) {
3406 		int msgid = header->msgid;
3407 		if (msgid & VCHIQ_MSGID_CLAIMED) {
3408 			VCHIQ_SLOT_INFO_T *slot_info =
3409 				SLOT_INFO_FROM_INDEX(state, slot_index);
3410 
3411 			release_slot(state, slot_info, header, service);
3412 		}
3413 	} else if (slot_index == remote->slot_sync)
3414 		release_message_sync(state, header);
3415 
3416 	unlock_service(service);
3417 }
3418 
3419 static void
3420 release_message_sync(VCHIQ_STATE_T *state, VCHIQ_HEADER_T *header)
3421 {
3422 	header->msgid = VCHIQ_MSGID_PADDING;
3423 	wmb();
3424 	remote_event_signal(&state->remote->sync_release);
3425 }
3426 
3427 VCHIQ_STATUS_T
3428 vchiq_get_peer_version(VCHIQ_SERVICE_HANDLE_T handle, short *peer_version)
3429 {
3430    VCHIQ_STATUS_T status = VCHIQ_ERROR;
3431    VCHIQ_SERVICE_T *service = find_service_by_handle(handle);
3432 
3433    if (!service ||
3434       (vchiq_check_service(service) != VCHIQ_SUCCESS) ||
3435       !peer_version)
3436       goto exit;
3437    *peer_version = service->peer_version;
3438    status = VCHIQ_SUCCESS;
3439 
3440 exit:
3441    if (service)
3442       unlock_service(service);
3443    return status;
3444 }
3445 
3446 VCHIQ_STATUS_T
3447 vchiq_get_config(VCHIQ_INSTANCE_T instance,
3448 	int config_size, VCHIQ_CONFIG_T *pconfig)
3449 {
3450 	VCHIQ_CONFIG_T config;
3451 
3452 	(void)instance;
3453 
3454 	config.max_msg_size           = VCHIQ_MAX_MSG_SIZE;
3455 	config.bulk_threshold         = VCHIQ_MAX_MSG_SIZE;
3456 	config.max_outstanding_bulks  = VCHIQ_NUM_SERVICE_BULKS;
3457 	config.max_services           = VCHIQ_MAX_SERVICES;
3458 	config.version                = VCHIQ_VERSION;
3459 	config.version_min            = VCHIQ_VERSION_MIN;
3460 
3461 	if (config_size > sizeof(VCHIQ_CONFIG_T))
3462 		return VCHIQ_ERROR;
3463 
3464 	memcpy(pconfig, &config,
3465 		min(config_size, (int)(sizeof(VCHIQ_CONFIG_T))));
3466 
3467 	return VCHIQ_SUCCESS;
3468 }
3469 
3470 VCHIQ_STATUS_T
3471 vchiq_set_service_option(VCHIQ_SERVICE_HANDLE_T handle,
3472 	VCHIQ_SERVICE_OPTION_T option, int value)
3473 {
3474 	VCHIQ_SERVICE_T *service = find_service_by_handle(handle);
3475 	VCHIQ_STATUS_T status = VCHIQ_ERROR;
3476 
3477 	if (service) {
3478 		switch (option) {
3479 		case VCHIQ_SERVICE_OPTION_AUTOCLOSE:
3480 			service->auto_close = value;
3481 			status = VCHIQ_SUCCESS;
3482 			break;
3483 
3484 		case VCHIQ_SERVICE_OPTION_SLOT_QUOTA: {
3485 			VCHIQ_SERVICE_QUOTA_T *service_quota =
3486 				&service->state->service_quotas[
3487 					service->localport];
3488 			if (value == 0)
3489 				value = service->state->default_slot_quota;
3490 			if ((value >= service_quota->slot_use_count) &&
3491 				 (value < (unsigned short)~0)) {
3492 				service_quota->slot_quota = value;
3493 				if ((value >= service_quota->slot_use_count) &&
3494 					(service_quota->message_quota >=
3495 					 service_quota->message_use_count)) {
3496 					/* Signal the service that it may have
3497 					** dropped below its quota */
3498 					up(&service_quota->quota_event);
3499 				}
3500 				status = VCHIQ_SUCCESS;
3501 			}
3502 		} break;
3503 
3504 		case VCHIQ_SERVICE_OPTION_MESSAGE_QUOTA: {
3505 			VCHIQ_SERVICE_QUOTA_T *service_quota =
3506 				&service->state->service_quotas[
3507 					service->localport];
3508 			if (value == 0)
3509 				value = service->state->default_message_quota;
3510 			if ((value >= service_quota->message_use_count) &&
3511 				 (value < (unsigned short)~0)) {
3512 				service_quota->message_quota = value;
3513 				if ((value >=
3514 					service_quota->message_use_count) &&
3515 					(service_quota->slot_quota >=
3516 					service_quota->slot_use_count))
3517 					/* Signal the service that it may have
3518 					** dropped below its quota */
3519 					up(&service_quota->quota_event);
3520 				status = VCHIQ_SUCCESS;
3521 			}
3522 		} break;
3523 
3524 		case VCHIQ_SERVICE_OPTION_SYNCHRONOUS:
3525 			if ((service->srvstate == VCHIQ_SRVSTATE_HIDDEN) ||
3526 				(service->srvstate ==
3527 				VCHIQ_SRVSTATE_LISTENING)) {
3528 				service->sync = value;
3529 				status = VCHIQ_SUCCESS;
3530 			}
3531 			break;
3532 
3533 		default:
3534 			break;
3535 		}
3536 		unlock_service(service);
3537 	}
3538 
3539 	return status;
3540 }
3541 
3542 static void
3543 vchiq_dump_shared_state(void *dump_context, VCHIQ_STATE_T *state,
3544 	VCHIQ_SHARED_STATE_T *shared, const char *label)
3545 {
3546 	static const char *const debug_names[] = {
3547 		"<entries>",
3548 		"SLOT_HANDLER_COUNT",
3549 		"SLOT_HANDLER_LINE",
3550 		"PARSE_LINE",
3551 		"PARSE_HEADER",
3552 		"PARSE_MSGID",
3553 		"AWAIT_COMPLETION_LINE",
3554 		"DEQUEUE_MESSAGE_LINE",
3555 		"SERVICE_CALLBACK_LINE",
3556 		"MSG_QUEUE_FULL_COUNT",
3557 		"COMPLETION_QUEUE_FULL_COUNT"
3558 	};
3559 	int i;
3560 
3561 	char buf[80];
3562 	int len;
3563 	len = snprintf(buf, sizeof(buf),
3564 		"  %s: slots %d-%d tx_pos=%x recycle=%x",
3565 		label, shared->slot_first, shared->slot_last,
3566 		shared->tx_pos, shared->slot_queue_recycle);
3567 	vchiq_dump(dump_context, buf, len + 1);
3568 
3569 	len = snprintf(buf, sizeof(buf),
3570 		"    Slots claimed:");
3571 	vchiq_dump(dump_context, buf, len + 1);
3572 
3573 	for (i = shared->slot_first; i <= shared->slot_last; i++) {
3574 		VCHIQ_SLOT_INFO_T slot_info = *SLOT_INFO_FROM_INDEX(state, i);
3575 		if (slot_info.use_count != slot_info.release_count) {
3576 			len = snprintf(buf, sizeof(buf),
3577 				"      %d: %d/%d", i, slot_info.use_count,
3578 				slot_info.release_count);
3579 			vchiq_dump(dump_context, buf, len + 1);
3580 		}
3581 	}
3582 
3583 	for (i = 1; i < shared->debug[DEBUG_ENTRIES]; i++) {
3584 		len = snprintf(buf, sizeof(buf), "    DEBUG: %s = %d(%x)",
3585 			debug_names[i], shared->debug[i], shared->debug[i]);
3586 		vchiq_dump(dump_context, buf, len + 1);
3587 	}
3588 }
3589 
3590 void
3591 vchiq_dump_state(void *dump_context, VCHIQ_STATE_T *state)
3592 {
3593 	char buf[80];
3594 	int len;
3595 	int i;
3596 
3597 	len = snprintf(buf, sizeof(buf), "State %d: %s", state->id,
3598 		conn_state_names[state->conn_state]);
3599 	vchiq_dump(dump_context, buf, len + 1);
3600 
3601 	len = snprintf(buf, sizeof(buf),
3602 		"  tx_pos=%x(@%x), rx_pos=%x(@%x)",
3603 		state->local->tx_pos,
3604 		(uint32_t)state->tx_data +
3605 			(state->local_tx_pos & VCHIQ_SLOT_MASK),
3606 		state->rx_pos,
3607 		(uint32_t)state->rx_data +
3608 			(state->rx_pos & VCHIQ_SLOT_MASK));
3609 	vchiq_dump(dump_context, buf, len + 1);
3610 
3611 	len = snprintf(buf, sizeof(buf),
3612 		"  Version: %d (min %d)",
3613 		VCHIQ_VERSION, VCHIQ_VERSION_MIN);
3614 	vchiq_dump(dump_context, buf, len + 1);
3615 
3616 	if (VCHIQ_ENABLE_STATS) {
3617 		len = snprintf(buf, sizeof(buf),
3618 			"  Stats: ctrl_tx_count=%d, ctrl_rx_count=%d, "
3619 			"error_count=%d",
3620 			state->stats.ctrl_tx_count, state->stats.ctrl_rx_count,
3621 			state->stats.error_count);
3622 		vchiq_dump(dump_context, buf, len + 1);
3623 	}
3624 
3625 	len = snprintf(buf, sizeof(buf),
3626 		"  Slots: %d available (%d data), %d recyclable, %d stalls "
3627 		"(%d data)",
3628 		((state->slot_queue_available * VCHIQ_SLOT_SIZE) -
3629 			state->local_tx_pos) / VCHIQ_SLOT_SIZE,
3630 		state->data_quota - state->data_use_count,
3631 		state->local->slot_queue_recycle - state->slot_queue_available,
3632 		state->stats.slot_stalls, state->stats.data_stalls);
3633 	vchiq_dump(dump_context, buf, len + 1);
3634 
3635 	vchiq_dump_platform_state(dump_context);
3636 
3637 	vchiq_dump_shared_state(dump_context, state, state->local, "Local");
3638 	vchiq_dump_shared_state(dump_context, state, state->remote, "Remote");
3639 
3640 	vchiq_dump_platform_instances(dump_context);
3641 
3642 	for (i = 0; i < state->unused_service; i++) {
3643 		VCHIQ_SERVICE_T *service = find_service_by_port(state, i);
3644 
3645 		if (service) {
3646 			vchiq_dump_service_state(dump_context, service);
3647 			unlock_service(service);
3648 		}
3649 	}
3650 }
3651 
3652 void
3653 vchiq_dump_service_state(void *dump_context, VCHIQ_SERVICE_T *service)
3654 {
3655 	char buf[120];
3656 	int len;
3657 
3658 	len = snprintf(buf, sizeof(buf), "Service %d: %s (ref %u)",
3659 		service->localport, srvstate_names[service->srvstate],
3660 		service->ref_count - 1); /*Don't include the lock just taken*/
3661 
3662 	if (service->srvstate != VCHIQ_SRVSTATE_FREE) {
3663 		char remoteport[30];
3664 		VCHIQ_SERVICE_QUOTA_T *service_quota =
3665 			&service->state->service_quotas[service->localport];
3666 		int fourcc = service->base.fourcc;
3667 		int tx_pending, rx_pending;
3668 		if (service->remoteport != VCHIQ_PORT_FREE) {
3669 			int len2 = snprintf(remoteport, sizeof(remoteport),
3670 				"%d", service->remoteport);
3671 			if (service->public_fourcc != VCHIQ_FOURCC_INVALID)
3672 				snprintf(remoteport + len2,
3673 					sizeof(remoteport) - len2,
3674 					" (client %8x)", service->client_id);
3675 		} else
3676 			strcpy(remoteport, "n/a");
3677 
3678 		len += snprintf(buf + len, sizeof(buf) - len,
3679 			" '%c%c%c%c' remote %s (msg use %d/%d, slot use %d/%d)",
3680 			VCHIQ_FOURCC_AS_4CHARS(fourcc),
3681 			remoteport,
3682 			service_quota->message_use_count,
3683 			service_quota->message_quota,
3684 			service_quota->slot_use_count,
3685 			service_quota->slot_quota);
3686 
3687 		vchiq_dump(dump_context, buf, len + 1);
3688 
3689 		tx_pending = service->bulk_tx.local_insert -
3690 			service->bulk_tx.remote_insert;
3691 
3692 		rx_pending = service->bulk_rx.local_insert -
3693 			service->bulk_rx.remote_insert;
3694 
3695 		len = snprintf(buf, sizeof(buf),
3696 			"  Bulk: tx_pending=%d (size %d),"
3697 			" rx_pending=%d (size %d)",
3698 			tx_pending,
3699 			tx_pending ? service->bulk_tx.bulks[
3700 			BULK_INDEX(service->bulk_tx.remove)].size : 0,
3701 			rx_pending,
3702 			rx_pending ? service->bulk_rx.bulks[
3703 			BULK_INDEX(service->bulk_rx.remove)].size : 0);
3704 
3705 		if (VCHIQ_ENABLE_STATS) {
3706 			vchiq_dump(dump_context, buf, len + 1);
3707 
3708 			len = snprintf(buf, sizeof(buf),
3709 				"  Ctrl: tx_count=%d, tx_bytes=%llu, "
3710 				"rx_count=%d, rx_bytes=%llu",
3711 				service->stats.ctrl_tx_count,
3712 				service->stats.ctrl_tx_bytes,
3713 				service->stats.ctrl_rx_count,
3714 				service->stats.ctrl_rx_bytes);
3715 			vchiq_dump(dump_context, buf, len + 1);
3716 
3717 			len = snprintf(buf, sizeof(buf),
3718 				"  Bulk: tx_count=%d, tx_bytes=%llu, "
3719 				"rx_count=%d, rx_bytes=%llu",
3720 				service->stats.bulk_tx_count,
3721 				service->stats.bulk_tx_bytes,
3722 				service->stats.bulk_rx_count,
3723 				service->stats.bulk_rx_bytes);
3724 			vchiq_dump(dump_context, buf, len + 1);
3725 
3726 			len = snprintf(buf, sizeof(buf),
3727 				"  %d quota stalls, %d slot stalls, "
3728 				"%d bulk stalls, %d aborted, %d errors",
3729 				service->stats.quota_stalls,
3730 				service->stats.slot_stalls,
3731 				service->stats.bulk_stalls,
3732 				service->stats.bulk_aborted_count,
3733 				service->stats.error_count);
3734 		 }
3735 	}
3736 
3737 	vchiq_dump(dump_context, buf, len + 1);
3738 
3739 	if (service->srvstate != VCHIQ_SRVSTATE_FREE)
3740 		vchiq_dump_platform_service_state(dump_context, service);
3741 }
3742 
3743 
3744 void
3745 vchiq_loud_error_header(void)
3746 {
3747 	vchiq_log_error(vchiq_core_log_level,
3748 		"============================================================"
3749 		"================");
3750 	vchiq_log_error(vchiq_core_log_level,
3751 		"============================================================"
3752 		"================");
3753 	vchiq_log_error(vchiq_core_log_level, "=====");
3754 }
3755 
3756 void
3757 vchiq_loud_error_footer(void)
3758 {
3759 	vchiq_log_error(vchiq_core_log_level, "=====");
3760 	vchiq_log_error(vchiq_core_log_level,
3761 		"============================================================"
3762 		"================");
3763 	vchiq_log_error(vchiq_core_log_level,
3764 		"============================================================"
3765 		"================");
3766 }
3767 
3768 
3769 VCHIQ_STATUS_T vchiq_send_remote_use(VCHIQ_STATE_T *state)
3770 {
3771 	VCHIQ_STATUS_T status = VCHIQ_RETRY;
3772 	if (state->conn_state != VCHIQ_CONNSTATE_DISCONNECTED)
3773 		status = queue_message(state, NULL,
3774 			VCHIQ_MAKE_MSG(VCHIQ_MSG_REMOTE_USE, 0, 0),
3775 			NULL, 0, 0, 0);
3776 	return status;
3777 }
3778 
3779 VCHIQ_STATUS_T vchiq_send_remote_release(VCHIQ_STATE_T *state)
3780 {
3781 	VCHIQ_STATUS_T status = VCHIQ_RETRY;
3782 	if (state->conn_state != VCHIQ_CONNSTATE_DISCONNECTED)
3783 		status = queue_message(state, NULL,
3784 			VCHIQ_MAKE_MSG(VCHIQ_MSG_REMOTE_RELEASE, 0, 0),
3785 			NULL, 0, 0, 0);
3786 	return status;
3787 }
3788 
3789 VCHIQ_STATUS_T vchiq_send_remote_use_active(VCHIQ_STATE_T *state)
3790 {
3791 	VCHIQ_STATUS_T status = VCHIQ_RETRY;
3792 	if (state->conn_state != VCHIQ_CONNSTATE_DISCONNECTED)
3793 		status = queue_message(state, NULL,
3794 			VCHIQ_MAKE_MSG(VCHIQ_MSG_REMOTE_USE_ACTIVE, 0, 0),
3795 			NULL, 0, 0, 0);
3796 	return status;
3797 }
3798 
3799 void vchiq_log_dump_mem(const char *label, uint32_t addr, const void *voidMem,
3800 	size_t numBytes)
3801 {
3802 	const uint8_t  *mem = (const uint8_t *)voidMem;
3803 	size_t          offset;
3804 	char            lineBuf[100];
3805 	char           *s;
3806 
3807 	while (numBytes > 0) {
3808 		s = lineBuf;
3809 
3810 		for (offset = 0; offset < 16; offset++) {
3811 			if (offset < numBytes)
3812 				s += snprintf(s, 4, "%02x ", mem[offset]);
3813 			else
3814 				s += snprintf(s, 4, "   ");
3815 		}
3816 
3817 		for (offset = 0; offset < 16; offset++) {
3818 			if (offset < numBytes) {
3819 				uint8_t ch = mem[offset];
3820 
3821 				if ((ch < ' ') || (ch > '~'))
3822 					ch = '.';
3823 				*s++ = (char)ch;
3824 			}
3825 		}
3826 		*s++ = '\0';
3827 
3828 		if ((label != NULL) && (*label != '\0'))
3829 			vchiq_log_trace(VCHIQ_LOG_TRACE,
3830 				"%s: %08x: %s", label, addr, lineBuf);
3831 		else
3832 			vchiq_log_trace(VCHIQ_LOG_TRACE,
3833 				"%08x: %s", addr, lineBuf);
3834 
3835 		addr += 16;
3836 		mem += 16;
3837 		if (numBytes > 16)
3838 			numBytes -= 16;
3839 		else
3840 			numBytes = 0;
3841 	}
3842 }
3843