xref: /freebsd/sys/dev/liquidio/base/lio_droq.c (revision 069ac184)
1 /*
2  *   BSD LICENSE
3  *
4  *   Copyright(c) 2017 Cavium, Inc.. All rights reserved.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Cavium, Inc. nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER(S) OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "lio_bsd.h"
35 #include "lio_common.h"
36 #include "lio_droq.h"
37 #include "lio_iq.h"
38 #include "lio_response_manager.h"
39 #include "lio_device.h"
40 #include "lio_main.h"
41 #include "cn23xx_pf_device.h"
42 #include "lio_network.h"
43 
44 struct __dispatch {
45 	struct lio_stailq_node	node;
46 	struct lio_recv_info	*rinfo;
47 	lio_dispatch_fn_t	disp_fn;
48 };
49 
50 void	*lio_get_dispatch_arg(struct octeon_device *oct,
51 			      uint16_t opcode, uint16_t subcode);
52 
53 /*
54  *  Get the argument that the user set when registering dispatch
55  *  function for a given opcode/subcode.
56  *  @param  octeon_dev - the octeon device pointer.
57  *  @param  opcode     - the opcode for which the dispatch argument
58  *                       is to be checked.
59  *  @param  subcode    - the subcode for which the dispatch argument
60  *                       is to be checked.
61  *  @return  Success: void * (argument to the dispatch function)
62  *  @return  Failure: NULL
63  *
64  */
65 void   *
66 lio_get_dispatch_arg(struct octeon_device *octeon_dev,
67 		     uint16_t opcode, uint16_t subcode)
68 {
69 	struct lio_stailq_node	*dispatch;
70 	void			*fn_arg = NULL;
71 	int			idx;
72 	uint16_t		combined_opcode;
73 
74 	combined_opcode = LIO_OPCODE_SUBCODE(opcode, subcode);
75 
76 	idx = combined_opcode & LIO_OPCODE_MASK;
77 
78 	mtx_lock(&octeon_dev->dispatch.lock);
79 
80 	if (octeon_dev->dispatch.count == 0) {
81 		mtx_unlock(&octeon_dev->dispatch.lock);
82 		return (NULL);
83 	}
84 
85 	if (octeon_dev->dispatch.dlist[idx].opcode == combined_opcode) {
86 		fn_arg = octeon_dev->dispatch.dlist[idx].arg;
87 	} else {
88 		STAILQ_FOREACH(dispatch,
89 			       &octeon_dev->dispatch.dlist[idx].head, entries) {
90 			if (((struct lio_dispatch *)dispatch)->opcode ==
91 			    combined_opcode) {
92 				fn_arg = ((struct lio_dispatch *)dispatch)->arg;
93 				break;
94 			}
95 		}
96 	}
97 
98 	mtx_unlock(&octeon_dev->dispatch.lock);
99 	return (fn_arg);
100 }
101 
102 /*
103  *  Check for packets on Droq. This function should be called with lock held.
104  *  @param  droq - Droq on which count is checked.
105  *  @return Returns packet count.
106  */
107 uint32_t
108 lio_droq_check_hw_for_pkts(struct lio_droq *droq)
109 {
110 	struct octeon_device	*oct = droq->oct_dev;
111 	uint32_t		last_count;
112 	uint32_t		pkt_count = 0;
113 
114 	pkt_count = lio_read_csr32(oct, droq->pkts_sent_reg);
115 
116 	last_count = pkt_count - droq->pkt_count;
117 	droq->pkt_count = pkt_count;
118 
119 	/* we shall write to cnts at the end of processing */
120 	if (last_count)
121 		atomic_add_int(&droq->pkts_pending, last_count);
122 
123 	return (last_count);
124 }
125 
126 static void
127 lio_droq_compute_max_packet_bufs(struct lio_droq *droq)
128 {
129 	uint32_t	count = 0;
130 
131 	/*
132 	 * max_empty_descs is the max. no. of descs that can have no buffers.
133 	 * If the empty desc count goes beyond this value, we cannot safely
134 	 * read in a 64K packet sent by Octeon
135 	 * (64K is max pkt size from Octeon)
136 	 */
137 	droq->max_empty_descs = 0;
138 
139 	do {
140 		droq->max_empty_descs++;
141 		count += droq->buffer_size;
142 	} while (count < (64 * 1024));
143 
144 	droq->max_empty_descs = droq->max_count - droq->max_empty_descs;
145 }
146 
147 static void
148 lio_droq_reset_indices(struct lio_droq *droq)
149 {
150 
151 	droq->read_idx = 0;
152 	droq->refill_idx = 0;
153 	droq->refill_count = 0;
154 	atomic_store_rel_int(&droq->pkts_pending, 0);
155 }
156 
157 static void
158 lio_droq_destroy_ring_buffers(struct octeon_device *oct,
159 			      struct lio_droq *droq)
160 {
161 	uint32_t	i;
162 
163 	for (i = 0; i < droq->max_count; i++) {
164 		if (droq->recv_buf_list[i].buffer != NULL) {
165 			lio_recv_buffer_free(droq->recv_buf_list[i].buffer);
166 			droq->recv_buf_list[i].buffer = NULL;
167 		}
168 	}
169 
170 	lio_droq_reset_indices(droq);
171 }
172 
173 static int
174 lio_droq_setup_ring_buffers(struct octeon_device *oct,
175 			    struct lio_droq *droq)
176 {
177 	struct lio_droq_desc	*desc_ring = droq->desc_ring;
178 	void			*buf;
179 	uint32_t		i;
180 
181 	for (i = 0; i < droq->max_count; i++) {
182 		buf = lio_recv_buffer_alloc(droq->buffer_size);
183 
184 		if (buf == NULL) {
185 			lio_dev_err(oct, "%s buffer alloc failed\n",
186 				    __func__);
187 			droq->stats.rx_alloc_failure++;
188 			return (-ENOMEM);
189 		}
190 
191 		droq->recv_buf_list[i].buffer = buf;
192 		droq->recv_buf_list[i].data = ((struct mbuf *)buf)->m_data;
193 		desc_ring[i].info_ptr = 0;
194 		desc_ring[i].buffer_ptr =
195 			lio_map_ring(oct->device, droq->recv_buf_list[i].buffer,
196 				     droq->buffer_size);
197 	}
198 
199 	lio_droq_reset_indices(droq);
200 
201 	lio_droq_compute_max_packet_bufs(droq);
202 
203 	return (0);
204 }
205 
206 int
207 lio_delete_droq(struct octeon_device *oct, uint32_t q_no)
208 {
209 	struct lio_droq	*droq = oct->droq[q_no];
210 
211 	lio_dev_dbg(oct, "%s[%d]\n", __func__, q_no);
212 
213 	while (taskqueue_cancel(droq->droq_taskqueue, &droq->droq_task, NULL))
214 		taskqueue_drain(droq->droq_taskqueue, &droq->droq_task);
215 
216 	taskqueue_free(droq->droq_taskqueue);
217 	droq->droq_taskqueue = NULL;
218 
219 	lio_droq_destroy_ring_buffers(oct, droq);
220 	free(droq->recv_buf_list, M_DEVBUF);
221 
222 	if (droq->desc_ring != NULL)
223 		lio_dma_free((droq->max_count * LIO_DROQ_DESC_SIZE),
224 			     droq->desc_ring);
225 
226 	oct->io_qmask.oq &= ~(1ULL << q_no);
227 	bzero(oct->droq[q_no], sizeof(struct lio_droq));
228 	oct->num_oqs--;
229 
230 	return (0);
231 }
232 
233 void
234 lio_droq_bh(void *ptr, int pending __unused)
235 {
236 	struct lio_droq		*droq = ptr;
237 	struct octeon_device	*oct = droq->oct_dev;
238 	struct lio_instr_queue	*iq = oct->instr_queue[droq->q_no];
239 	int	reschedule, tx_done = 1;
240 
241 	reschedule = lio_droq_process_packets(oct, droq, oct->rx_budget);
242 
243 	if (atomic_load_acq_int(&iq->instr_pending))
244 		tx_done = lio_flush_iq(oct, iq, oct->tx_budget);
245 
246 	if (reschedule || !tx_done)
247 		taskqueue_enqueue(droq->droq_taskqueue, &droq->droq_task);
248 	else
249 		lio_enable_irq(droq, iq);
250 }
251 
252 int
253 lio_init_droq(struct octeon_device *oct, uint32_t q_no,
254 	      uint32_t num_descs, uint32_t desc_size, void *app_ctx)
255 {
256 	struct lio_droq	*droq;
257 	unsigned long	size;
258 	uint32_t	c_buf_size = 0, c_num_descs = 0, c_pkts_per_intr = 0;
259 	uint32_t	c_refill_threshold = 0, desc_ring_size = 0;
260 
261 	lio_dev_dbg(oct, "%s[%d]\n", __func__, q_no);
262 
263 	droq = oct->droq[q_no];
264 	bzero(droq, LIO_DROQ_SIZE);
265 
266 	droq->oct_dev = oct;
267 	droq->q_no = q_no;
268 	if (app_ctx != NULL)
269 		droq->app_ctx = app_ctx;
270 	else
271 		droq->app_ctx = (void *)(size_t)q_no;
272 
273 	c_num_descs = num_descs;
274 	c_buf_size = desc_size;
275 	if (LIO_CN23XX_PF(oct)) {
276 		struct lio_config *conf23 = LIO_CHIP_CONF(oct, cn23xx_pf);
277 
278 		c_pkts_per_intr =
279 			(uint32_t)LIO_GET_OQ_PKTS_PER_INTR_CFG(conf23);
280 		c_refill_threshold =
281 			(uint32_t)LIO_GET_OQ_REFILL_THRESHOLD_CFG(conf23);
282 	} else {
283 		return (1);
284 	}
285 
286 	droq->max_count = c_num_descs;
287 	droq->buffer_size = c_buf_size;
288 
289 	desc_ring_size = droq->max_count * LIO_DROQ_DESC_SIZE;
290 	droq->desc_ring = lio_dma_alloc(desc_ring_size, &droq->desc_ring_dma);
291 	if (droq->desc_ring == NULL) {
292 		lio_dev_err(oct, "Output queue %d ring alloc failed\n", q_no);
293 		return (1);
294 	}
295 
296 	lio_dev_dbg(oct, "droq[%d]: desc_ring: virt: 0x%p, dma: %llx\n", q_no,
297 		    droq->desc_ring, LIO_CAST64(droq->desc_ring_dma));
298 	lio_dev_dbg(oct, "droq[%d]: num_desc: %d\n", q_no, droq->max_count);
299 
300 	size = droq->max_count * LIO_DROQ_RECVBUF_SIZE;
301 	droq->recv_buf_list =
302 		(struct lio_recv_buffer *)malloc(size, M_DEVBUF,
303 						 M_NOWAIT | M_ZERO);
304 	if (droq->recv_buf_list == NULL) {
305 		lio_dev_err(oct, "Output queue recv buf list alloc failed\n");
306 		goto init_droq_fail;
307 	}
308 
309 	if (lio_droq_setup_ring_buffers(oct, droq))
310 		goto init_droq_fail;
311 
312 	droq->pkts_per_intr = c_pkts_per_intr;
313 	droq->refill_threshold = c_refill_threshold;
314 
315 	lio_dev_dbg(oct, "DROQ INIT: max_empty_descs: %d\n",
316 		    droq->max_empty_descs);
317 
318 	mtx_init(&droq->lock, "droq_lock", NULL, MTX_DEF);
319 
320 	STAILQ_INIT(&droq->dispatch_stq_head);
321 
322 	oct->fn_list.setup_oq_regs(oct, q_no);
323 
324 	oct->io_qmask.oq |= BIT_ULL(q_no);
325 
326 	/*
327 	 * Initialize the taskqueue that handles
328 	 * output queue packet processing.
329 	 */
330 	lio_dev_dbg(oct, "Initializing droq%d taskqueue\n", q_no);
331 	NET_TASK_INIT(&droq->droq_task, 0, lio_droq_bh, (void *)droq);
332 
333 	droq->droq_taskqueue = taskqueue_create_fast("lio_droq_task", M_NOWAIT,
334 						     taskqueue_thread_enqueue,
335 						     &droq->droq_taskqueue);
336 	taskqueue_start_threads_cpuset(&droq->droq_taskqueue, 1, PI_NET,
337 				       &oct->ioq_vector[q_no].affinity_mask,
338 				       "lio%d_droq%d_task", oct->octeon_id,
339 				       q_no);
340 
341 	return (0);
342 
343 init_droq_fail:
344 	lio_delete_droq(oct, q_no);
345 	return (1);
346 }
347 
348 /*
349  * lio_create_recv_info
350  * Parameters:
351  *  octeon_dev - pointer to the octeon device structure
352  *  droq       - droq in which the packet arrived.
353  *  buf_cnt    - no. of buffers used by the packet.
354  *  idx        - index in the descriptor for the first buffer in the packet.
355  * Description:
356  *  Allocates a recv_info_t and copies the buffer addresses for packet data
357  *  into the recv_pkt space which starts at an 8B offset from recv_info_t.
358  *  Flags the descriptors for refill later. If available descriptors go
359  *  below the threshold to receive a 64K pkt, new buffers are first allocated
360  *  before the recv_pkt_t is created.
361  *  This routine will be called in interrupt context.
362  * Returns:
363  *  Success: Pointer to recv_info_t
364  *  Failure: NULL.
365  * Locks:
366  *  The droq->lock is held when this routine is called.
367  */
368 static inline struct lio_recv_info *
369 lio_create_recv_info(struct octeon_device *octeon_dev, struct lio_droq *droq,
370 		     uint32_t buf_cnt, uint32_t idx)
371 {
372 	struct lio_droq_info	*info;
373 	struct lio_recv_pkt	*recv_pkt;
374 	struct lio_recv_info	*recv_info;
375 	uint32_t		bytes_left, i;
376 
377 	info = (struct lio_droq_info *)droq->recv_buf_list[idx].data;
378 
379 	recv_info = lio_alloc_recv_info(sizeof(struct __dispatch));
380 	if (recv_info == NULL)
381 		return (NULL);
382 
383 	recv_pkt = recv_info->recv_pkt;
384 	recv_pkt->rh = info->rh;
385 	recv_pkt->length = (uint32_t)info->length;
386 	recv_pkt->buffer_count = (uint16_t)buf_cnt;
387 	recv_pkt->octeon_id = (uint16_t)octeon_dev->octeon_id;
388 
389 	i = 0;
390 	bytes_left = (uint32_t)info->length;
391 
392 	while (buf_cnt) {
393 		recv_pkt->buffer_size[i] = (bytes_left >= droq->buffer_size) ?
394 			droq->buffer_size : bytes_left;
395 
396 		recv_pkt->buffer_ptr[i] = droq->recv_buf_list[idx].buffer;
397 		droq->recv_buf_list[idx].buffer = NULL;
398 
399 		idx = lio_incr_index(idx, 1, droq->max_count);
400 		bytes_left -= droq->buffer_size;
401 		i++;
402 		buf_cnt--;
403 	}
404 
405 	return (recv_info);
406 }
407 
408 /*
409  * If we were not able to refill all buffers, try to move around
410  * the buffers that were not dispatched.
411  */
412 static inline uint32_t
413 lio_droq_refill_pullup_descs(struct lio_droq *droq,
414 			     struct lio_droq_desc *desc_ring)
415 {
416 	uint32_t	desc_refilled = 0;
417 	uint32_t	refill_index = droq->refill_idx;
418 
419 	while (refill_index != droq->read_idx) {
420 		if (droq->recv_buf_list[refill_index].buffer != NULL) {
421 			droq->recv_buf_list[droq->refill_idx].buffer =
422 				droq->recv_buf_list[refill_index].buffer;
423 			droq->recv_buf_list[droq->refill_idx].data =
424 				droq->recv_buf_list[refill_index].data;
425 			desc_ring[droq->refill_idx].buffer_ptr =
426 				desc_ring[refill_index].buffer_ptr;
427 			droq->recv_buf_list[refill_index].buffer = NULL;
428 			desc_ring[refill_index].buffer_ptr = 0;
429 			do {
430 				droq->refill_idx =
431 					lio_incr_index(droq->refill_idx, 1,
432 						       droq->max_count);
433 				desc_refilled++;
434 				droq->refill_count--;
435 			} while (droq->recv_buf_list[droq->refill_idx].buffer !=
436 				 NULL);
437 		}
438 		refill_index = lio_incr_index(refill_index, 1, droq->max_count);
439 	}	/* while */
440 	return (desc_refilled);
441 }
442 
443 /*
444  * lio_droq_refill
445  * Parameters:
446  *  droq       - droq in which descriptors require new buffers.
447  * Description:
448  *  Called during normal DROQ processing in interrupt mode or by the poll
449  *  thread to refill the descriptors from which buffers were dispatched
450  *  to upper layers. Attempts to allocate new buffers. If that fails, moves
451  *  up buffers (that were not dispatched) to form a contiguous ring.
452  * Returns:
453  *  No of descriptors refilled.
454  * Locks:
455  *  This routine is called with droq->lock held.
456  */
457 uint32_t
458 lio_droq_refill(struct octeon_device *octeon_dev, struct lio_droq *droq)
459 {
460 	struct lio_droq_desc	*desc_ring;
461 	void			*buf = NULL;
462 	uint32_t		desc_refilled = 0;
463 	uint8_t			*data;
464 
465 	desc_ring = droq->desc_ring;
466 
467 	while (droq->refill_count && (desc_refilled < droq->max_count)) {
468 		/*
469 		 * If a valid buffer exists (happens if there is no dispatch),
470 		 * reuse
471 		 * the buffer, else allocate.
472 		 */
473 		if (droq->recv_buf_list[droq->refill_idx].buffer == NULL) {
474 			buf = lio_recv_buffer_alloc(droq->buffer_size);
475 			/*
476 			 * If a buffer could not be allocated, no point in
477 			 * continuing
478 			 */
479 			if (buf == NULL) {
480 				droq->stats.rx_alloc_failure++;
481 				break;
482 			}
483 
484 			droq->recv_buf_list[droq->refill_idx].buffer = buf;
485 			data = ((struct mbuf *)buf)->m_data;
486 		} else {
487 			data = ((struct mbuf *)droq->recv_buf_list
488 				[droq->refill_idx].buffer)->m_data;
489 		}
490 
491 		droq->recv_buf_list[droq->refill_idx].data = data;
492 
493 		desc_ring[droq->refill_idx].buffer_ptr =
494 		    lio_map_ring(octeon_dev->device,
495 				 droq->recv_buf_list[droq->refill_idx].buffer,
496 				 droq->buffer_size);
497 
498 		droq->refill_idx = lio_incr_index(droq->refill_idx, 1,
499 						  droq->max_count);
500 		desc_refilled++;
501 		droq->refill_count--;
502 	}
503 
504 	if (droq->refill_count)
505 		desc_refilled += lio_droq_refill_pullup_descs(droq, desc_ring);
506 
507 	/*
508 	 * if droq->refill_count
509 	 * The refill count would not change in pass two. We only moved buffers
510 	 * to close the gap in the ring, but we would still have the same no. of
511 	 * buffers to refill.
512 	 */
513 	return (desc_refilled);
514 }
515 
516 static inline uint32_t
517 lio_droq_get_bufcount(uint32_t buf_size, uint32_t total_len)
518 {
519 
520 	return ((total_len + buf_size - 1) / buf_size);
521 }
522 
523 static int
524 lio_droq_dispatch_pkt(struct octeon_device *oct, struct lio_droq *droq,
525 		      union octeon_rh *rh, struct lio_droq_info *info)
526 {
527 	struct lio_recv_info	*rinfo;
528 	lio_dispatch_fn_t	disp_fn;
529 	uint32_t		cnt;
530 
531 	cnt = lio_droq_get_bufcount(droq->buffer_size, (uint32_t)info->length);
532 
533 	disp_fn = lio_get_dispatch(oct, (uint16_t)rh->r.opcode,
534 				   (uint16_t)rh->r.subcode);
535 	if (disp_fn) {
536 		rinfo = lio_create_recv_info(oct, droq, cnt, droq->read_idx);
537 		if (rinfo != NULL) {
538 			struct __dispatch *rdisp = rinfo->rsvd;
539 
540 			rdisp->rinfo = rinfo;
541 			rdisp->disp_fn = disp_fn;
542 			rinfo->recv_pkt->rh = *rh;
543 			STAILQ_INSERT_TAIL(&droq->dispatch_stq_head,
544 					   &rdisp->node, entries);
545 		} else {
546 			droq->stats.dropped_nomem++;
547 		}
548 	} else {
549 		lio_dev_err(oct, "DROQ: No dispatch function (opcode %u/%u)\n",
550 			    (unsigned int)rh->r.opcode,
551 			    (unsigned int)rh->r.subcode);
552 		droq->stats.dropped_nodispatch++;
553 	}
554 
555 	return (cnt);
556 }
557 
558 static inline void
559 lio_droq_drop_packets(struct octeon_device *oct, struct lio_droq *droq,
560 		      uint32_t cnt)
561 {
562 	struct lio_droq_info	*info;
563 	uint32_t		i = 0, buf_cnt;
564 
565 	for (i = 0; i < cnt; i++) {
566 		info = (struct lio_droq_info *)
567 			droq->recv_buf_list[droq->read_idx].data;
568 
569 		lio_swap_8B_data((uint64_t *)info, 2);
570 
571 		if (info->length) {
572 			info->length += 8;
573 			droq->stats.bytes_received += info->length;
574 			buf_cnt = lio_droq_get_bufcount(droq->buffer_size,
575 							(uint32_t)info->length);
576 		} else {
577 			lio_dev_err(oct, "DROQ: In drop: pkt with len 0\n");
578 			buf_cnt = 1;
579 		}
580 
581 		droq->read_idx = lio_incr_index(droq->read_idx, buf_cnt,
582 						droq->max_count);
583 		droq->refill_count += buf_cnt;
584 	}
585 }
586 
587 static uint32_t
588 lio_droq_fast_process_packets(struct octeon_device *oct, struct lio_droq *droq,
589 			      uint32_t pkts_to_process)
590 {
591 	struct lio_droq_info	*info;
592 	union			octeon_rh *rh;
593 	uint32_t		pkt, pkt_count, total_len = 0;
594 
595 	pkt_count = pkts_to_process;
596 
597 	for (pkt = 0; pkt < pkt_count; pkt++) {
598 		struct mbuf	*nicbuf = NULL;
599 		uint32_t	pkt_len = 0;
600 
601 		info = (struct lio_droq_info *)
602 		    droq->recv_buf_list[droq->read_idx].data;
603 
604 		lio_swap_8B_data((uint64_t *)info, 2);
605 
606 		if (!info->length) {
607 			lio_dev_err(oct,
608 				    "DROQ[%d] idx: %d len:0, pkt_cnt: %d\n",
609 				    droq->q_no, droq->read_idx, pkt_count);
610 			hexdump((uint8_t *)info, LIO_DROQ_INFO_SIZE, NULL,
611 				HD_OMIT_CHARS);
612 			pkt++;
613 			lio_incr_index(droq->read_idx, 1, droq->max_count);
614 			droq->refill_count++;
615 			break;
616 		}
617 
618 		rh = &info->rh;
619 
620 		info->length += 8;
621 		rh->r_dh.len += (LIO_DROQ_INFO_SIZE + 7) / 8;
622 
623 		total_len += (uint32_t)info->length;
624 		if (lio_opcode_slow_path(rh)) {
625 			uint32_t	buf_cnt;
626 
627 			buf_cnt = lio_droq_dispatch_pkt(oct, droq, rh, info);
628 			droq->read_idx = lio_incr_index(droq->read_idx,	buf_cnt,
629 							droq->max_count);
630 			droq->refill_count += buf_cnt;
631 		} else {
632 			if (info->length <= droq->buffer_size) {
633 				pkt_len = (uint32_t)info->length;
634 				nicbuf = droq->recv_buf_list[
635 						       droq->read_idx].buffer;
636 				nicbuf->m_len = pkt_len;
637 				droq->recv_buf_list[droq->read_idx].buffer =
638 					NULL;
639 
640 				droq->read_idx =
641 					lio_incr_index(droq->read_idx,
642 						       1, droq->max_count);
643 				droq->refill_count++;
644 			} else {
645 				bool	secondary_frag = false;
646 
647 				pkt_len = 0;
648 
649 				while (pkt_len < info->length) {
650 					int	frag_len, idx = droq->read_idx;
651 					struct mbuf	*buffer;
652 
653 					frag_len =
654 						((pkt_len + droq->buffer_size) >
655 						 info->length) ?
656 						((uint32_t)info->length -
657 						 pkt_len) : droq->buffer_size;
658 
659 					buffer = ((struct mbuf *)
660 						  droq->recv_buf_list[idx].
661 						  buffer);
662 					buffer->m_len = frag_len;
663 					if (__predict_true(secondary_frag)) {
664 						m_cat(nicbuf, buffer);
665 					} else {
666 						nicbuf = buffer;
667 						secondary_frag = true;
668 					}
669 
670 					droq->recv_buf_list[droq->read_idx].
671 						buffer = NULL;
672 
673 					pkt_len += frag_len;
674 					droq->read_idx =
675 						lio_incr_index(droq->read_idx,
676 							       1,
677 							       droq->max_count);
678 					droq->refill_count++;
679 				}
680 			}
681 
682 			if (nicbuf != NULL) {
683 				if (droq->ops.fptr != NULL) {
684 					droq->ops.fptr(nicbuf, pkt_len, rh,
685 						       droq, droq->ops.farg);
686 				} else {
687 					lio_recv_buffer_free(nicbuf);
688 				}
689 			}
690 		}
691 
692 		if (droq->refill_count >= droq->refill_threshold) {
693 			int desc_refilled = lio_droq_refill(oct, droq);
694 
695 			/*
696 			 * Flush the droq descriptor data to memory to be sure
697 			 * that when we update the credits the data in memory
698 			 * is accurate.
699 			 */
700 			wmb();
701 			lio_write_csr32(oct, droq->pkts_credit_reg,
702 					desc_refilled);
703 			/* make sure mmio write completes */
704 			__compiler_membar();
705 		}
706 	}	/* for (each packet)... */
707 
708 	/* Increment refill_count by the number of buffers processed. */
709 	droq->stats.pkts_received += pkt;
710 	droq->stats.bytes_received += total_len;
711 
712 	tcp_lro_flush_all(&droq->lro);
713 
714 	if ((droq->ops.drop_on_max) && (pkts_to_process - pkt)) {
715 		lio_droq_drop_packets(oct, droq, (pkts_to_process - pkt));
716 
717 		droq->stats.dropped_toomany += (pkts_to_process - pkt);
718 		return (pkts_to_process);
719 	}
720 
721 	return (pkt);
722 }
723 
724 int
725 lio_droq_process_packets(struct octeon_device *oct, struct lio_droq *droq,
726 			 uint32_t budget)
727 {
728 	struct lio_stailq_node	*tmp, *tmp2;
729 	uint32_t		pkt_count = 0, pkts_processed = 0;
730 
731 	/* Grab the droq lock */
732 	mtx_lock(&droq->lock);
733 
734 	lio_droq_check_hw_for_pkts(droq);
735 	pkt_count = atomic_load_acq_int(&droq->pkts_pending);
736 
737 	if (!pkt_count) {
738 		mtx_unlock(&droq->lock);
739 		return (0);
740 	}
741 	if (pkt_count > budget)
742 		pkt_count = budget;
743 
744 	pkts_processed = lio_droq_fast_process_packets(oct, droq, pkt_count);
745 
746 	atomic_subtract_int(&droq->pkts_pending, pkts_processed);
747 
748 	/* Release the lock */
749 	mtx_unlock(&droq->lock);
750 
751 	STAILQ_FOREACH_SAFE(tmp, &droq->dispatch_stq_head, entries, tmp2) {
752 		struct __dispatch *rdisp = (struct __dispatch *)tmp;
753 
754 		STAILQ_REMOVE_HEAD(&droq->dispatch_stq_head, entries);
755 		rdisp->disp_fn(rdisp->rinfo, lio_get_dispatch_arg(oct,
756 			(uint16_t)rdisp->rinfo->recv_pkt->rh.r.opcode,
757 			(uint16_t)rdisp->rinfo->recv_pkt->rh.r.subcode));
758 	}
759 
760 	/* If there are packets pending. schedule tasklet again */
761 	if (atomic_load_acq_int(&droq->pkts_pending))
762 		return (1);
763 
764 	return (0);
765 }
766 
767 int
768 lio_register_droq_ops(struct octeon_device *oct, uint32_t q_no,
769 		      struct lio_droq_ops *ops)
770 {
771 	struct lio_droq		*droq;
772 	struct lio_config	*lio_cfg = NULL;
773 
774 	lio_cfg = lio_get_conf(oct);
775 
776 	if (lio_cfg == NULL)
777 		return (-EINVAL);
778 
779 	if (ops == NULL) {
780 		lio_dev_err(oct, "%s: droq_ops pointer is NULL\n", __func__);
781 		return (-EINVAL);
782 	}
783 
784 	if (q_no >= LIO_GET_OQ_MAX_Q_CFG(lio_cfg)) {
785 		lio_dev_err(oct, "%s: droq id (%d) exceeds MAX (%d)\n",
786 			    __func__, q_no, (oct->num_oqs - 1));
787 		return (-EINVAL);
788 	}
789 	droq = oct->droq[q_no];
790 
791 	mtx_lock(&droq->lock);
792 
793 	memcpy(&droq->ops, ops, sizeof(struct lio_droq_ops));
794 
795 	mtx_unlock(&droq->lock);
796 
797 	return (0);
798 }
799 
800 int
801 lio_unregister_droq_ops(struct octeon_device *oct, uint32_t q_no)
802 {
803 	struct lio_droq		*droq;
804 	struct lio_config	*lio_cfg = NULL;
805 
806 	lio_cfg = lio_get_conf(oct);
807 
808 	if (lio_cfg == NULL)
809 		return (-EINVAL);
810 
811 	if (q_no >= LIO_GET_OQ_MAX_Q_CFG(lio_cfg)) {
812 		lio_dev_err(oct, "%s: droq id (%d) exceeds MAX (%d)\n",
813 			    __func__, q_no, oct->num_oqs - 1);
814 		return (-EINVAL);
815 	}
816 
817 	droq = oct->droq[q_no];
818 
819 	if (droq == NULL) {
820 		lio_dev_info(oct, "Droq id (%d) not available.\n", q_no);
821 		return (0);
822 	}
823 
824 	mtx_lock(&droq->lock);
825 
826 	droq->ops.fptr = NULL;
827 	droq->ops.farg = NULL;
828 	droq->ops.drop_on_max = 0;
829 
830 	mtx_unlock(&droq->lock);
831 
832 	return (0);
833 }
834 
835 int
836 lio_create_droq(struct octeon_device *oct, uint32_t q_no, uint32_t num_descs,
837 		uint32_t desc_size, void *app_ctx)
838 {
839 
840 	if (oct->droq[q_no]->oct_dev != NULL) {
841 		lio_dev_dbg(oct, "Droq already in use. Cannot create droq %d again\n",
842 			    q_no);
843 		return (1);
844 	}
845 
846 	/* Initialize the Droq */
847 	if (lio_init_droq(oct, q_no, num_descs, desc_size, app_ctx)) {
848 		bzero(oct->droq[q_no], sizeof(struct lio_droq));
849 		goto create_droq_fail;
850 	}
851 
852 	oct->num_oqs++;
853 
854 	lio_dev_dbg(oct, "%s: Total number of OQ: %d\n", __func__,
855 		    oct->num_oqs);
856 
857 	/* Global Droq register settings */
858 
859 	/*
860 	 * As of now not required, as setting are done for all 32 Droqs at
861 	 * the same time.
862 	 */
863 	return (0);
864 
865 create_droq_fail:
866 	return (-ENOMEM);
867 }
868