1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2014 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include "mcreq.h"
19 #ifdef __cplusplus
20 extern "C" {
21 #endif
22 
23 typedef struct {
24     mc_PIPELINE *pl;
25     hrtime_t now;
26 } mc__FLUSHINFO;
27 
28 /**
29  * Inline operations for flush. To use this, include this file into your
30  * own source code.
31  */
32 
33 /**
34  * Fill a series of IOVs with data to flush
35  * @param pipeline the pipeline to flush
36  * @param iov the iov array to fill
37  * @param niov the number of input items
38  * @param nused set to the number of IOVs actually used
39  * @return the number of data inside all the IOVs
40  */
41 static unsigned int
mcreq_flush_iov_fill(mc_PIPELINE * pipeline,nb_IOV * iov,int niov,int * nused)42 mcreq_flush_iov_fill(mc_PIPELINE *pipeline, nb_IOV *iov, int niov, int *nused)
43 {
44     return netbuf_start_flush(&pipeline->nbmgr, iov, niov, nused);
45 }
46 
47 static nb_SIZE
mcreq__pktflush_callback(void * p,nb_SIZE hint,void * arg)48 mcreq__pktflush_callback(void *p, nb_SIZE hint, void *arg)
49 {
50     nb_SIZE pktsize;
51     mc_PACKET *pkt = (mc_PACKET *)p;
52     mc__FLUSHINFO *info = (mc__FLUSHINFO *)arg;
53 
54     pktsize = mcreq_get_size(pkt);
55 
56     if (info->now && hint) {
57         MCREQ_PKT_RDATA(pkt)->start = info->now;
58     }
59 
60     if (hint < pktsize) {
61         return pktsize;
62     }
63 
64     /** Packet is flushed */
65     pkt->flags |= MCREQ_F_FLUSHED;
66 
67     if (pkt->flags & MCREQ_F_INVOKED) {
68         mcreq_packet_done(info->pl, pkt);
69     }
70     if (info->pl->metrics) {
71         info->pl->metrics->packets_sent++;
72         info->pl->metrics->packets_queued--;
73         info->pl->metrics->bytes_queued -= pktsize;
74     }
75     return pktsize;
76 }
77 
78 /**
79  * Called when a chunk of data has been flushed from the network.
80  * @param pl the pipeline which was to be flushed
81  * @param nflushed how much data was actually flushed
82  * @param expected how much data was expected to be flushed (i.e. the return
83  *        value from the corresponding iov_fill).
84  *
85  * @param now if present, will reset the start time of each traversed packet
86  *        to the value passed.
87  *
88  * This is a thin wrapper around netbuf_end_flush (and optionally
89  * nebtuf_reset_flush())
90  */
91 static void
mcreq_flush_done_ex(mc_PIPELINE * pl,unsigned nflushed,unsigned expected,lcb_U64 now)92 mcreq_flush_done_ex(mc_PIPELINE *pl,
93     unsigned nflushed, unsigned expected, lcb_U64 now)
94 {
95     if (nflushed) {
96         mc__FLUSHINFO info = { pl, now };
97         netbuf_end_flush2(&pl->nbmgr, nflushed,
98                           mcreq__pktflush_callback,
99                           offsetof(mc_PACKET, sl_flushq), &info);
100     }
101     if (nflushed < expected) {
102         netbuf_reset_flush(&pl->nbmgr);
103     }
104 }
105 
106 /* Mainly for tests */
107 static void
mcreq_flush_done(mc_PIPELINE * pl,unsigned nflushed,unsigned expected)108 mcreq_flush_done(mc_PIPELINE *pl, unsigned nflushed, unsigned expected)
109 {
110     mcreq_flush_done_ex(pl, nflushed, expected, 0);
111 }
112 
113 #ifdef __cplusplus
114 }
115 #endif
116