xref: /freebsd/sys/dev/hyperv/vmbus/vmbus_br.c (revision bdd1243d)
1 /*-
2  * Copyright (c) 2009-2012,2016 Microsoft Corp.
3  * Copyright (c) 2012 NetApp Inc.
4  * Copyright (c) 2012 Citrix Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice unmodified, this list of conditions, and the following
12  *    disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include <sys/cdefs.h>
30 __FBSDID("$FreeBSD$");
31 
32 #include <sys/param.h>
33 #include <sys/lock.h>
34 #include <sys/mutex.h>
35 #include <sys/sysctl.h>
36 
37 #include <dev/hyperv/vmbus/vmbus_reg.h>
38 #include <dev/hyperv/vmbus/vmbus_brvar.h>
39 
40 /* Amount of space available for write */
41 #define	VMBUS_BR_WAVAIL(r, w, z)	\
42 	(((w) >= (r)) ? ((z) - ((w) - (r))) : ((r) - (w)))
43 
44 /* Increase bufing index */
45 #define VMBUS_BR_IDXINC(idx, inc, sz)	(((idx) + (inc)) % (sz))
46 
47 static int			vmbus_br_sysctl_state(SYSCTL_HANDLER_ARGS);
48 static int			vmbus_br_sysctl_state_bin(SYSCTL_HANDLER_ARGS);
49 static void			vmbus_br_setup(struct vmbus_br *, void *, int);
50 
51 static int
52 vmbus_br_sysctl_state(SYSCTL_HANDLER_ARGS)
53 {
54 	const struct vmbus_br *br = arg1;
55 	uint32_t rindex, windex, imask, psndsz, fvalue, ravail, wavail;
56 	uint64_t intrcnt;
57 	char state[256];
58 
59 	intrcnt = br->vbr_intrcnt;
60 	rindex = br->vbr_rindex;
61 	windex = br->vbr_windex;
62 	imask = br->vbr_imask;
63 	psndsz = br->vbr_psndsz;
64 	fvalue = br->vbr_fvalue;
65 	wavail = VMBUS_BR_WAVAIL(rindex, windex, br->vbr_dsize);
66 	ravail = br->vbr_dsize - wavail;
67 
68 	snprintf(state, sizeof(state),
69 	    "intrcnt:%ju rindex:%u windex:%u imask:%u psndsz:%u fvalue:%u "
70 	    "ravail:%u wavail:%u",
71 	    (uintmax_t)intrcnt, rindex, windex, imask, psndsz, fvalue,
72 	    ravail, wavail);
73 	return sysctl_handle_string(oidp, state, sizeof(state), req);
74 }
75 
76 /*
77  * Binary bufring states.
78  */
79 static int
80 vmbus_br_sysctl_state_bin(SYSCTL_HANDLER_ARGS)
81 {
82 #define BR_STATE_RIDX	0
83 #define BR_STATE_WIDX	1
84 #define BR_STATE_IMSK	2
85 #define BR_STATE_PSSZ	3
86 #define BR_STATE_FVAL	4
87 #define BR_STATE_RSPC	5
88 #define BR_STATE_WSPC	6
89 #define BR_STATE_MAX	7
90 
91 	const struct vmbus_br *br = arg1;
92 	uint32_t rindex, windex, wavail, state[BR_STATE_MAX];
93 
94 	rindex = br->vbr_rindex;
95 	windex = br->vbr_windex;
96 	wavail = VMBUS_BR_WAVAIL(rindex, windex, br->vbr_dsize);
97 
98 	state[BR_STATE_RIDX] = rindex;
99 	state[BR_STATE_WIDX] = windex;
100 	state[BR_STATE_IMSK] = br->vbr_imask;
101 	state[BR_STATE_PSSZ] = br->vbr_psndsz;
102 	state[BR_STATE_FVAL] = br->vbr_fvalue;
103 	state[BR_STATE_WSPC] = wavail;
104 	state[BR_STATE_RSPC] = br->vbr_dsize - wavail;
105 
106 	return sysctl_handle_opaque(oidp, state, sizeof(state), req);
107 }
108 
109 void
110 vmbus_br_sysctl_create(struct sysctl_ctx_list *ctx, struct sysctl_oid *br_tree,
111     struct vmbus_br *br, const char *name)
112 {
113 	struct sysctl_oid *tree;
114 	char desc[64];
115 
116 	tree = SYSCTL_ADD_NODE(ctx, SYSCTL_CHILDREN(br_tree), OID_AUTO,
117 	    name, CTLFLAG_RD | CTLFLAG_MPSAFE, 0, "");
118 	if (tree == NULL)
119 		return;
120 
121 	snprintf(desc, sizeof(desc), "%s state", name);
122 	SYSCTL_ADD_PROC(ctx, SYSCTL_CHILDREN(tree), OID_AUTO, "state",
123 	    CTLTYPE_STRING | CTLFLAG_RD | CTLFLAG_MPSAFE,
124 	    br, 0, vmbus_br_sysctl_state, "A", desc);
125 
126 	snprintf(desc, sizeof(desc), "%s binary state", name);
127 	SYSCTL_ADD_PROC(ctx, SYSCTL_CHILDREN(tree), OID_AUTO, "state_bin",
128 	    CTLTYPE_OPAQUE | CTLFLAG_RD | CTLFLAG_MPSAFE,
129 	    br, 0, vmbus_br_sysctl_state_bin, "IU", desc);
130 }
131 
132 void
133 vmbus_rxbr_intr_mask(struct vmbus_rxbr *rbr)
134 {
135 	rbr->rxbr_imask = 1;
136 	mb();
137 }
138 
139 static __inline uint32_t
140 vmbus_rxbr_avail(const struct vmbus_rxbr *rbr)
141 {
142 	uint32_t rindex, windex;
143 
144 	/* Get snapshot */
145 	rindex = atomic_load_acq_32(&rbr->rxbr_rindex);
146 	windex = atomic_load_acq_32(&rbr->rxbr_windex);
147 
148 	return (rbr->rxbr_dsize -
149 	    VMBUS_BR_WAVAIL(rindex, windex, rbr->rxbr_dsize));
150 }
151 
152 uint32_t
153 vmbus_rxbr_available(const struct vmbus_rxbr *rbr)
154 {
155 	return (vmbus_rxbr_avail(rbr));
156 }
157 
158 uint32_t
159 vmbus_rxbr_intr_unmask(struct vmbus_rxbr *rbr)
160 {
161 	rbr->rxbr_imask = 0;
162 	mb();
163 
164 	/*
165 	 * Now check to see if the ring buffer is still empty.
166 	 * If it is not, we raced and we need to process new
167 	 * incoming channel packets.
168 	 */
169 	return vmbus_rxbr_avail(rbr);
170 }
171 
172 static void
173 vmbus_br_setup(struct vmbus_br *br, void *buf, int blen)
174 {
175 	br->vbr = buf;
176 	br->vbr_dsize = blen - sizeof(struct vmbus_bufring);
177 }
178 
179 void
180 vmbus_rxbr_init(struct vmbus_rxbr *rbr)
181 {
182 	mtx_init(&rbr->rxbr_lock, "vmbus_rxbr", NULL, MTX_SPIN);
183 }
184 
185 void
186 vmbus_rxbr_deinit(struct vmbus_rxbr *rbr)
187 {
188 	mtx_destroy(&rbr->rxbr_lock);
189 }
190 
191 void
192 vmbus_rxbr_setup(struct vmbus_rxbr *rbr, void *buf, int blen)
193 {
194 	vmbus_br_setup(&rbr->rxbr, buf, blen);
195 }
196 
197 static __inline boolean_t
198 vmbus_rxbr_need_signal(const struct vmbus_rxbr *rbr, uint32_t bytes_read)
199 {
200 	uint32_t pending_snd_sz, canwrite_size;
201 
202 	/* No need to signal if host doesn't want us to */
203 	if (!rbr->rxbr_fpsndsz)
204 		return false;
205 
206 	mb();
207 
208 	pending_snd_sz = rbr->rxbr_psndsz;
209 	/* No need to signal if host sets pending_snd_sz to 0 */
210 	if (!pending_snd_sz)
211 		return false;
212 
213 	mb();
214 
215 	canwrite_size = rbr->rxbr_dsize - vmbus_rxbr_avail(rbr);
216 
217 	/* No need to signal if br already has enough space before read */
218 	if (canwrite_size - bytes_read > pending_snd_sz)
219 		return false;
220 
221 	/*
222 	 * No need to signal if still doesn't have enough space
223 	 * asked by host
224 	 */
225 	if (canwrite_size <= pending_snd_sz)
226 		return false;
227 
228 	return true;
229 }
230 
231 void
232 vmbus_txbr_init(struct vmbus_txbr *tbr)
233 {
234 	mtx_init(&tbr->txbr_lock, "vmbus_txbr", NULL, MTX_SPIN);
235 }
236 
237 void
238 vmbus_txbr_deinit(struct vmbus_txbr *tbr)
239 {
240 	mtx_destroy(&tbr->txbr_lock);
241 }
242 
243 void
244 vmbus_txbr_setup(struct vmbus_txbr *tbr, void *buf, int blen)
245 {
246 	vmbus_br_setup(&tbr->txbr, buf, blen);
247 
248 	/* Set feature bit enabling flow control */
249 	tbr->txbr_fpsndsz = 1;
250 }
251 
252 uint32_t
253 vmbus_txbr_get_imask(const struct vmbus_txbr *tbr)
254 {
255 	mb();
256 
257 	return(tbr->txbr_imask);
258 }
259 
260 void
261 vmbus_txbr_set_pending_snd_sz(struct vmbus_txbr *tbr, uint32_t size)
262 {
263 	tbr->txbr_psndsz = size;
264 }
265 
266 /*
267  * When we write to the ring buffer, check if the host needs to be
268  * signaled.
269  *
270  * The contract:
271  * - The host guarantees that while it is draining the TX bufring,
272  *   it will set the br_imask to indicate it does not need to be
273  *   interrupted when new data are added.
274  * - The host guarantees that it will completely drain the TX bufring
275  *   before exiting the read loop.  Further, once the TX bufring is
276  *   empty, it will clear the br_imask and re-check to see if new
277  *   data have arrived.
278  */
279 static __inline boolean_t
280 vmbus_txbr_need_signal(const struct vmbus_txbr *tbr, uint32_t old_windex)
281 {
282 	mb();
283 	if (tbr->txbr_imask)
284 		return (FALSE);
285 
286 	__compiler_membar();
287 
288 	/*
289 	 * This is the only case we need to signal when the
290 	 * ring transitions from being empty to non-empty.
291 	 */
292 	if (old_windex == atomic_load_acq_32(&tbr->txbr_rindex))
293 		return (TRUE);
294 
295 	return (FALSE);
296 }
297 
298 static __inline uint32_t
299 vmbus_txbr_avail(const struct vmbus_txbr *tbr)
300 {
301 	uint32_t rindex, windex;
302 
303 	/* Get snapshot */
304 	rindex = atomic_load_acq_32(&tbr->txbr_rindex);
305 	windex = atomic_load_acq_32(&tbr->txbr_windex);
306 
307 	return VMBUS_BR_WAVAIL(rindex, windex, tbr->txbr_dsize);
308 }
309 
310 static __inline uint32_t
311 vmbus_txbr_copyto(const struct vmbus_txbr *tbr, uint32_t windex,
312     const void *src0, uint32_t cplen)
313 {
314 	const uint8_t *src = src0;
315 	uint8_t *br_data = tbr->txbr_data;
316 	uint32_t br_dsize = tbr->txbr_dsize;
317 
318 	if (cplen > br_dsize - windex) {
319 		uint32_t fraglen = br_dsize - windex;
320 
321 		/* Wrap-around detected */
322 		memcpy(br_data + windex, src, fraglen);
323 		memcpy(br_data, src + fraglen, cplen - fraglen);
324 	} else {
325 		memcpy(br_data + windex, src, cplen);
326 	}
327 	return VMBUS_BR_IDXINC(windex, cplen, br_dsize);
328 }
329 
330 static __inline uint32_t
331 vmbus_txbr_copyto_call(const struct vmbus_txbr *tbr, uint32_t windex,
332     uint32_t cplen, vmbus_br_copy_callback_t cb, void *cbarg, int *ret)
333 {
334 	uint8_t *br_data = tbr->txbr_data;
335 	uint32_t br_dsize = tbr->txbr_dsize;
336 	int err = 0;
337 
338 	if (cplen > br_dsize - windex) {
339 		uint32_t fraglen = br_dsize - windex;
340 
341 		/* Wrap-around detected */
342 		err = cb((void *)(br_data + windex), fraglen, cbarg);
343 		if (!err)
344 			err = cb((void *)br_data, cplen - fraglen, cbarg);
345 	} else {
346 		err = cb((void *)(br_data + windex), cplen, cbarg);
347 	}
348 
349 	*ret = err;
350 
351 	return VMBUS_BR_IDXINC(windex, cplen, br_dsize);
352 }
353 
354 uint32_t
355 vmbus_txbr_available(const struct vmbus_txbr *tbr)
356 {
357 	return (vmbus_txbr_avail(tbr));
358 }
359 
360 /*
361  * NOTE:
362  * Not holding lock when calling user provided callback routine.
363  * Caller should hold lock to serialize ring buffer accesses.
364  */
365 int
366 vmbus_txbr_write_call(struct vmbus_txbr *tbr,
367     const struct iovec iov[], int iovlen,
368     vmbus_br_copy_callback_t cb, void *cbarg,
369     boolean_t *need_sig)
370 {
371 	uint32_t old_windex, windex, total;
372 	uint64_t save_windex;
373 	int i;
374 	int cb_ret = 0;
375 
376 	total = 0;
377 	for (i = 0; i < iovlen; i++)
378 		total += iov[i].iov_len;
379 	total += sizeof(save_windex);
380 
381 
382 	/*
383 	 * NOTE:
384 	 * If this write is going to make br_windex same as br_rindex,
385 	 * i.e. the available space for write is same as the write size,
386 	 * we can't do it then, since br_windex == br_rindex means that
387 	 * the bufring is empty.
388 	 */
389 	if (vmbus_txbr_avail(tbr) <= total) {
390 		return (EAGAIN);
391 	}
392 
393 	/* Save br_windex for later use */
394 	old_windex = tbr->txbr_windex;
395 
396 	/*
397 	 * Copy the scattered channel packet to the TX bufring.
398 	 */
399 	windex = old_windex;
400 	for (i = 0; i < iovlen; i++) {
401 		if (iov[i].iov_base != NULL) {
402 			windex = vmbus_txbr_copyto(tbr, windex,
403 			    iov[i].iov_base, iov[i].iov_len);
404 		} else if (cb != NULL) {
405 			windex = vmbus_txbr_copyto_call(tbr, windex,
406 			    iov[i].iov_len, cb, cbarg, &cb_ret);
407 			/*
408 			 * If callback fails, return without updating
409 			 * write index.
410 			 */
411 			if (cb_ret)
412 				return (cb_ret);
413 		}
414 	}
415 
416 	mtx_lock_spin(&tbr->txbr_lock);
417 
418 	/*
419 	 * Set the offset of the current channel packet.
420 	 */
421 	save_windex = ((uint64_t)old_windex) << 32;
422 	windex = vmbus_txbr_copyto(tbr, windex, &save_windex,
423 	    sizeof(save_windex));
424 
425 	/*
426 	 * Update the write index _after_ the channel packet
427 	 * is copied.
428 	 */
429 	__compiler_membar();
430 	atomic_store_rel_32(&tbr->txbr_windex, windex);
431 
432 	mtx_unlock_spin(&tbr->txbr_lock);
433 
434 	if (need_sig)
435 		*need_sig = vmbus_txbr_need_signal(tbr, old_windex);
436 
437 	return (0);
438 }
439 
440 /*
441  * Write scattered channel packet to TX bufring.
442  *
443  * The offset of this channel packet is written as a 64bits value
444  * immediately after this channel packet.
445  */
446 int
447 vmbus_txbr_write(struct vmbus_txbr *tbr, const struct iovec iov[], int iovlen,
448     boolean_t *need_sig)
449 {
450 	uint32_t old_windex, windex, total;
451 	uint64_t save_windex;
452 	int i;
453 
454 	total = 0;
455 	for (i = 0; i < iovlen; i++)
456 		total += iov[i].iov_len;
457 	total += sizeof(save_windex);
458 
459 	mtx_lock_spin(&tbr->txbr_lock);
460 
461 	/*
462 	 * NOTE:
463 	 * If this write is going to make br_windex same as br_rindex,
464 	 * i.e. the available space for write is same as the write size,
465 	 * we can't do it then, since br_windex == br_rindex means that
466 	 * the bufring is empty.
467 	 */
468 	if (vmbus_txbr_avail(tbr) <= total) {
469 		mtx_unlock_spin(&tbr->txbr_lock);
470 		return (EAGAIN);
471 	}
472 
473 	/* Save br_windex for later use */
474 	old_windex = atomic_load_acq_32(&tbr->txbr_windex);
475 
476 	/*
477 	 * Copy the scattered channel packet to the TX bufring.
478 	 */
479 	windex = old_windex;
480 	for (i = 0; i < iovlen; i++) {
481 		windex = vmbus_txbr_copyto(tbr, windex,
482 		    iov[i].iov_base, iov[i].iov_len);
483 	}
484 
485 	/*
486 	 * Set the offset of the current channel packet.
487 	 */
488 	save_windex = ((uint64_t)old_windex) << 32;
489 	windex = vmbus_txbr_copyto(tbr, windex, &save_windex,
490 	    sizeof(save_windex));
491 
492 	/*
493 	 * Update the write index _after_ the channel packet
494 	 * is copied.
495 	 */
496 	__compiler_membar();
497 	atomic_store_rel_32(&tbr->txbr_windex, windex);
498 
499 	mtx_unlock_spin(&tbr->txbr_lock);
500 
501 	*need_sig = vmbus_txbr_need_signal(tbr, old_windex);
502 
503 	return (0);
504 }
505 
506 static __inline uint32_t
507 vmbus_rxbr_copyfrom(const struct vmbus_rxbr *rbr, uint32_t rindex,
508     void *dst0, int cplen)
509 {
510 	uint8_t *dst = dst0;
511 	const uint8_t *br_data = rbr->rxbr_data;
512 	uint32_t br_dsize = rbr->rxbr_dsize;
513 
514 	if (cplen > br_dsize - rindex) {
515 		uint32_t fraglen = br_dsize - rindex;
516 
517 		/* Wrap-around detected. */
518 		memcpy(dst, br_data + rindex, fraglen);
519 		memcpy(dst + fraglen, br_data, cplen - fraglen);
520 	} else {
521 		memcpy(dst, br_data + rindex, cplen);
522 	}
523 	return VMBUS_BR_IDXINC(rindex, cplen, br_dsize);
524 }
525 
526 static __inline uint32_t
527 vmbus_rxbr_copyfrom_call(const struct vmbus_rxbr *rbr, uint32_t rindex,
528     int cplen, vmbus_br_copy_callback_t cb, void *cbarg)
529 {
530 	uint8_t *br_data = rbr->rxbr_data;
531 	uint32_t br_dsize = rbr->rxbr_dsize;
532 	int error = 0;
533 
534 	if (cplen > br_dsize - rindex) {
535 		uint32_t fraglen = br_dsize - rindex;
536 
537 		/* Wrap-around detected. */
538 		error = cb((void *)(br_data + rindex), fraglen, cbarg);
539 		if (!error)
540 			error = cb((void *)br_data, cplen - fraglen, cbarg);
541 	} else {
542 		error = cb((void *)(br_data + rindex), cplen, cbarg);
543 	}
544 	return (error);
545 }
546 
547 int
548 vmbus_rxbr_peek(struct vmbus_rxbr *rbr, void *data, int dlen)
549 {
550 	mtx_lock_spin(&rbr->rxbr_lock);
551 
552 	/*
553 	 * The requested data and the 64bits channel packet
554 	 * offset should be there at least.
555 	 */
556 	if (vmbus_rxbr_avail(rbr) < dlen + sizeof(uint64_t)) {
557 		mtx_unlock_spin(&rbr->rxbr_lock);
558 		return (EAGAIN);
559 	}
560 	vmbus_rxbr_copyfrom(rbr,
561 	    atomic_load_acq_32(&rbr->rxbr_rindex), data, dlen);
562 
563 	mtx_unlock_spin(&rbr->rxbr_lock);
564 
565 	return (0);
566 }
567 
568 /*
569  * NOTE:
570  * We only hold spin lock to check the ring buffer space. It is
571  * released before calling user provided callback routine.
572  * Caller should hold lock to serialize ring buffer accesses.
573  */
574 int
575 vmbus_rxbr_peek_call(struct vmbus_rxbr *rbr, int dlen, uint32_t skip,
576     vmbus_br_copy_callback_t cb, void *cbarg)
577 {
578 	uint32_t rindex, br_dsize0 = rbr->rxbr_dsize;
579 	int ret;
580 
581 	mtx_lock_spin(&rbr->rxbr_lock);
582 	/*
583 	 * The requested data + skip and the 64bits channel packet
584 	 * offset should be there at least.
585 	 */
586 	if (vmbus_rxbr_avail(rbr) < skip + dlen + sizeof(uint64_t)) {
587 		mtx_unlock_spin(&rbr->rxbr_lock);
588 		return (EAGAIN);
589 	}
590 
591 	rindex = VMBUS_BR_IDXINC(rbr->rxbr_rindex, skip, br_dsize0);
592 	mtx_unlock_spin(&rbr->rxbr_lock);
593 
594 	ret = vmbus_rxbr_copyfrom_call(rbr, rindex, dlen, cb, cbarg);
595 
596 	return (ret);
597 }
598 
599 /*
600  * NOTE:
601  * We assume idx_adv == sizeof(channel packet).
602  */
603 int
604 vmbus_rxbr_idxadv_peek(struct vmbus_rxbr *rbr, void *data, int dlen,
605     uint32_t idx_adv, boolean_t *need_sig)
606 {
607 	uint32_t rindex, br_dsize = rbr->rxbr_dsize;
608 
609 	mtx_lock_spin(&rbr->rxbr_lock);
610 	/*
611 	 * Make sure it has enough data to read.
612 	 */
613 	if (vmbus_rxbr_avail(rbr) < idx_adv + sizeof(uint64_t) + dlen) {
614 		mtx_unlock_spin(&rbr->rxbr_lock);
615 		return (EAGAIN);
616 	}
617 
618 	if (idx_adv > 0) {
619 		/*
620 		 * Advance the read index first, including the channel's 64bit
621 		 * previous write offset.
622 		 */
623 		rindex = VMBUS_BR_IDXINC(rbr->rxbr_rindex,
624 		    idx_adv + sizeof(uint64_t), br_dsize);
625 		__compiler_membar();
626 		atomic_store_rel_32(&rbr->rxbr_rindex, rindex);
627 	}
628 
629 	vmbus_rxbr_copyfrom(rbr,
630 	    atomic_load_acq_32(&rbr->rxbr_rindex), data, dlen);
631 
632 	mtx_unlock_spin(&rbr->rxbr_lock);
633 
634 	if (need_sig) {
635 		if (idx_adv > 0)
636 			*need_sig =
637 			    vmbus_rxbr_need_signal(rbr, idx_adv +
638 			    sizeof(uint64_t));
639 		else
640 			*need_sig = false;
641 	}
642 
643 	return (0);
644 }
645 
646 /*
647  * NOTE:
648  * Just update the RX rb index.
649  */
650 int
651 vmbus_rxbr_idxadv(struct vmbus_rxbr *rbr, uint32_t idx_adv,
652     boolean_t *need_sig)
653 {
654 	uint32_t rindex, br_dsize = rbr->rxbr_dsize;
655 
656 	mtx_lock_spin(&rbr->rxbr_lock);
657 	/*
658 	 * Make sure it has enough space to advance.
659 	 */
660 	if (vmbus_rxbr_avail(rbr) < idx_adv + sizeof(uint64_t)) {
661 		mtx_unlock_spin(&rbr->rxbr_lock);
662 		return (EAGAIN);
663 	}
664 
665 	/*
666 	 * Advance the read index, including the channel's 64bit
667 	 * previous write offset.
668 	 */
669 	rindex = VMBUS_BR_IDXINC(rbr->rxbr_rindex,
670 	    idx_adv + sizeof(uint64_t), br_dsize);
671 	__compiler_membar();
672 	atomic_store_rel_32(&rbr->rxbr_rindex, rindex);
673 
674 	mtx_unlock_spin(&rbr->rxbr_lock);
675 
676 	if (need_sig) {
677 		*need_sig =
678 		    vmbus_rxbr_need_signal(rbr, idx_adv + sizeof(uint64_t));
679 	}
680 
681 	return (0);
682 }
683 
684 /*
685  * NOTE:
686  * We assume (dlen + skip) == sizeof(channel packet).
687  */
688 int
689 vmbus_rxbr_read(struct vmbus_rxbr *rbr, void *data, int dlen, uint32_t skip)
690 {
691 	uint32_t rindex, br_dsize = rbr->rxbr_dsize;
692 
693 	KASSERT(dlen + skip > 0, ("invalid dlen %d, offset %u", dlen, skip));
694 
695 	mtx_lock_spin(&rbr->rxbr_lock);
696 
697 	if (vmbus_rxbr_avail(rbr) < dlen + skip + sizeof(uint64_t)) {
698 		mtx_unlock_spin(&rbr->rxbr_lock);
699 		return (EAGAIN);
700 	}
701 
702 	/*
703 	 * Copy channel packet from RX bufring.
704 	 */
705 	rindex = VMBUS_BR_IDXINC(atomic_load_acq_32(&rbr->rxbr_rindex),
706 	    skip, br_dsize);
707 	rindex = vmbus_rxbr_copyfrom(rbr, rindex, data, dlen);
708 
709 	/*
710 	 * Discard this channel packet's 64bits offset, which is useless to us.
711 	 */
712 	rindex = VMBUS_BR_IDXINC(rindex, sizeof(uint64_t), br_dsize);
713 
714 	/*
715 	 * Update the read index _after_ the channel packet is fetched.
716 	 */
717 	__compiler_membar();
718 	atomic_store_rel_32(&rbr->rxbr_rindex, rindex);
719 
720 	mtx_unlock_spin(&rbr->rxbr_lock);
721 
722 	return (0);
723 }
724