1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Copyright 2014 Couchbase, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 #include "mcreq.h"
19 #ifdef __cplusplus
20 extern "C" {
21 #endif
22
23 typedef struct {
24 mc_PIPELINE *pl;
25 hrtime_t now;
26 } mc__FLUSHINFO;
27
28 /**
29 * Inline operations for flush. To use this, include this file into your
30 * own source code.
31 */
32
33 /**
34 * Fill a series of IOVs with data to flush
35 * @param pipeline the pipeline to flush
36 * @param iov the iov array to fill
37 * @param niov the number of input items
38 * @param nused set to the number of IOVs actually used
39 * @return the number of data inside all the IOVs
40 */
41 static unsigned int
mcreq_flush_iov_fill(mc_PIPELINE * pipeline,nb_IOV * iov,int niov,int * nused)42 mcreq_flush_iov_fill(mc_PIPELINE *pipeline, nb_IOV *iov, int niov, int *nused)
43 {
44 return netbuf_start_flush(&pipeline->nbmgr, iov, niov, nused);
45 }
46
47 static nb_SIZE
mcreq__pktflush_callback(void * p,nb_SIZE hint,void * arg)48 mcreq__pktflush_callback(void *p, nb_SIZE hint, void *arg)
49 {
50 nb_SIZE pktsize;
51 mc_PACKET *pkt = (mc_PACKET *)p;
52 mc__FLUSHINFO *info = (mc__FLUSHINFO *)arg;
53
54 pktsize = mcreq_get_size(pkt);
55
56 if (info->now && hint) {
57 MCREQ_PKT_RDATA(pkt)->start = info->now;
58 }
59
60 if (hint < pktsize) {
61 return pktsize;
62 }
63
64 /** Packet is flushed */
65 pkt->flags |= MCREQ_F_FLUSHED;
66
67 if (pkt->flags & MCREQ_F_INVOKED) {
68 mcreq_packet_done(info->pl, pkt);
69 }
70 if (info->pl->metrics) {
71 info->pl->metrics->packets_sent++;
72 info->pl->metrics->packets_queued--;
73 info->pl->metrics->bytes_queued -= pktsize;
74 }
75 return pktsize;
76 }
77
78 /**
79 * Called when a chunk of data has been flushed from the network.
80 * @param pl the pipeline which was to be flushed
81 * @param nflushed how much data was actually flushed
82 * @param expected how much data was expected to be flushed (i.e. the return
83 * value from the corresponding iov_fill).
84 *
85 * @param now if present, will reset the start time of each traversed packet
86 * to the value passed.
87 *
88 * This is a thin wrapper around netbuf_end_flush (and optionally
89 * nebtuf_reset_flush())
90 */
91 static void
mcreq_flush_done_ex(mc_PIPELINE * pl,unsigned nflushed,unsigned expected,lcb_U64 now)92 mcreq_flush_done_ex(mc_PIPELINE *pl,
93 unsigned nflushed, unsigned expected, lcb_U64 now)
94 {
95 if (nflushed) {
96 mc__FLUSHINFO info = { pl, now };
97 netbuf_end_flush2(&pl->nbmgr, nflushed,
98 mcreq__pktflush_callback,
99 offsetof(mc_PACKET, sl_flushq), &info);
100 }
101 if (nflushed < expected) {
102 netbuf_reset_flush(&pl->nbmgr);
103 }
104 }
105
106 /* Mainly for tests */
107 static void
mcreq_flush_done(mc_PIPELINE * pl,unsigned nflushed,unsigned expected)108 mcreq_flush_done(mc_PIPELINE *pl, unsigned nflushed, unsigned expected)
109 {
110 mcreq_flush_done_ex(pl, nflushed, expected, 0);
111 }
112
113 #ifdef __cplusplus
114 }
115 #endif
116