1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 #include <errno.h>
20 #include <getopt.h>
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <ts/ts.h>
25 #include <ts/remap.h>
26 #include <sys/types.h>
27 #include <sys/socket.h>
28 
29 static const char *PLUGIN_NAME = "fq_pacing";
30 
31 // Sanity check max rate at 100Gbps
32 #define MAX_PACING_RATE 100000000000
33 
34 typedef struct fq_pacing_config {
35   unsigned long pacing_rate;
36 } fq_pacing_cfg_t;
37 
38 typedef struct fq_pacing_cont {
39   int client_fd;
40 } fq_pacing_cont_t;
41 
42 // Copied from ts/ink_sock.cc since that function is not exposed to plugins
43 int
safe_setsockopt(int s,int level,int optname,char * optval,int optlevel)44 safe_setsockopt(int s, int level, int optname, char *optval, int optlevel)
45 {
46   int r;
47   do {
48     r = setsockopt(s, level, optname, optval, optlevel);
49   } while (r < 0 && (errno == EAGAIN || errno == EINTR));
50   return r;
51 }
52 
53 static int
fq_is_default_qdisc()54 fq_is_default_qdisc()
55 {
56   TSFile f       = 0;
57   ssize_t s      = 0;
58   char buffer[5] = {};
59   int rc         = 0;
60 
61   f = TSfopen("/proc/sys/net/core/default_qdisc", "r");
62   if (!f) {
63     return 0;
64   }
65 
66   s = TSfread(f, buffer, sizeof(buffer));
67   if (s > 0) {
68     buffer[s] = 0;
69   } else {
70     TSfclose(f);
71     return 0;
72   }
73 
74   if (buffer[2] == '\n') {
75     buffer[2] = 0;
76   }
77 
78   rc = (strncmp(buffer, "fq", sizeof(buffer)) == 0);
79   TSfclose(f);
80   return (rc);
81 }
82 
83 void
TSPluginInit(int argc,const char * argv[])84 TSPluginInit(int argc, const char *argv[])
85 {
86   TSPluginRegistrationInfo info;
87 
88   info.plugin_name   = (char *)"fq_pacing";
89   info.vendor_name   = (char *)"Cisco Systems";
90   info.support_email = (char *)"omdbuild@cisco.com";
91 
92   if (TSPluginRegister(&info) != TS_SUCCESS) {
93     TSError("[fq_pacing] plugin registration failed");
94   }
95 }
96 
97 TSReturnCode
TSRemapInit(TSRemapInterface * api_info,char * errbuf,int errbuf_size)98 TSRemapInit(TSRemapInterface *api_info, char *errbuf, int errbuf_size)
99 {
100   if (!api_info) {
101     strncpy(errbuf, "[fq_pacing] - Invalid TSRemapInterface argument", (size_t)(errbuf_size - 1));
102     return TS_ERROR;
103   }
104 
105   if (api_info->size < sizeof(TSRemapInterface)) {
106     strncpy(errbuf, "[TSRemapInit] - Incorrect size of TSRemapInterface structure", errbuf_size - 1);
107     return TS_ERROR;
108   }
109 
110   if (api_info->tsremap_version < TSREMAP_VERSION) {
111     snprintf(errbuf, errbuf_size - 1, "[TSRemapInit] - Incorrect API version %ld.%ld", api_info->tsremap_version >> 16,
112              (api_info->tsremap_version & 0xffff));
113     return TS_ERROR;
114   }
115 
116   if (!fq_is_default_qdisc()) {
117     snprintf(errbuf, errbuf_size - 1, "[TSRemapInit] - fq qdisc is not active");
118     return TS_ERROR;
119   }
120 
121   TSDebug(PLUGIN_NAME, "plugin is successfully initialized");
122   return TS_SUCCESS;
123 }
124 
125 TSReturnCode
TSRemapNewInstance(int argc,char * argv[],void ** ih,char * errbuf,int errbuf_size)126 TSRemapNewInstance(int argc, char *argv[], void **ih, char *errbuf, int errbuf_size)
127 {
128   fq_pacing_cfg_t *cfg      = NULL;
129   unsigned long pacing_rate = 0;
130 
131   TSDebug(PLUGIN_NAME, "Instantiating a new remap.config plugin rule");
132 
133   if (argc > 1) {
134     int c;
135     static const struct option longopts[] = {{"rate", required_argument, NULL, 'r'}, {NULL, 0, NULL, 0}};
136 
137     // The "-" in optstring is required to prevent permutation of argv, which
138     // makes the plugin loader crashy
139     while ((c = getopt_long(argc, (char *const *)argv, "-r:", longopts, NULL)) != -1) {
140       switch (c) {
141       case 'r':
142         errno       = 0;
143         pacing_rate = strtoul(optarg, NULL, 0);
144         if (errno != 0) {
145           snprintf(errbuf, errbuf_size - 1, "[TsRemapNewInstance] input pacing value is not a valid positive integer");
146           return TS_ERROR;
147         }
148 
149         break;
150       }
151     }
152   }
153 
154   if (pacing_rate > MAX_PACING_RATE) {
155     snprintf(errbuf, errbuf_size - 1, "[TsRemapNewInstance] input pacing value is too large (%lu), max(%lu)", pacing_rate,
156              MAX_PACING_RATE);
157     return TS_ERROR;
158   }
159 
160   cfg = TSmalloc(sizeof(fq_pacing_cfg_t));
161   memset(cfg, 0, sizeof(*cfg));
162   cfg->pacing_rate = pacing_rate;
163   *ih              = (void *)cfg;
164   TSDebug(PLUGIN_NAME, "Setting pacing rate to %lu", pacing_rate);
165 
166   return TS_SUCCESS;
167 }
168 
169 void
TSRemapDeleteInstance(void * instance)170 TSRemapDeleteInstance(void *instance)
171 {
172   TSError("[fq_pacing] Cleaning up...");
173 
174   if (instance != NULL) {
175     TSfree((fq_pacing_cfg_t *)instance);
176   }
177 }
178 
179 static int
reset_pacing_cont(TSCont contp,TSEvent event,void * edata)180 reset_pacing_cont(TSCont contp, TSEvent event, void *edata)
181 {
182   TSHttpTxn txnp             = (TSHttpTxn)edata;
183   fq_pacing_cont_t *txn_data = TSContDataGet(contp);
184 
185 #ifdef SO_MAX_PACING_RATE
186   unsigned int pacing_off = ~0U;
187   if (txn_data->client_fd > 0) {
188     TSDebug(PLUGIN_NAME, "Disabling SO_MAX_PACING_RATE for client_fd=%d", txn_data->client_fd);
189     int res = 0;
190     res     = safe_setsockopt(txn_data->client_fd, SOL_SOCKET, SO_MAX_PACING_RATE, (char *)&pacing_off, sizeof(pacing_off));
191     // EBADF indicates possible client abort
192     if ((res < 0) && (errno != EBADF)) {
193       TSError("[fq_pacing] Error disabling SO_MAX_PACING_RATE, errno=%d", errno);
194     }
195   }
196 #endif
197 
198   TSfree(txn_data);
199   TSContDestroy(contp);
200   TSHttpTxnReenable(txnp, TS_EVENT_HTTP_CONTINUE);
201   return 0;
202 }
203 
204 TSRemapStatus
TSRemapDoRemap(void * instance,TSHttpTxn txnp,TSRemapRequestInfo * rri)205 TSRemapDoRemap(void *instance, TSHttpTxn txnp, TSRemapRequestInfo *rri)
206 {
207   if (TSHttpTxnClientProtocolStackContains(txnp, TS_PROTO_TAG_HTTP_2_0) != NULL) {
208     TSDebug(PLUGIN_NAME, "Skipping plugin execution for HTTP/2 requests");
209     return TSREMAP_NO_REMAP;
210   }
211 
212   int client_fd = 0;
213   if (TSHttpTxnClientFdGet(txnp, &client_fd) != TS_SUCCESS) {
214     TSError("[fq_pacing] Error getting client fd");
215   }
216 
217 #ifdef SO_MAX_PACING_RATE
218   fq_pacing_cfg_t *cfg = (fq_pacing_cfg_t *)instance;
219   int res              = 0;
220 
221   res = safe_setsockopt(client_fd, SOL_SOCKET, SO_MAX_PACING_RATE, (char *)&cfg->pacing_rate, sizeof(cfg->pacing_rate));
222   if ((res < 0)) {
223     TSError("[fq_pacing] Error setting SO_MAX_PACING_RATE, errno=%d", errno);
224   }
225   TSDebug(PLUGIN_NAME, "Setting SO_MAX_PACING_RATE for client_fd=%d to %lu Bps", client_fd, cfg->pacing_rate);
226 #endif
227 
228   // Reset pacing at end of transaction in case session is
229   // reused for another delivery service w/o pacing
230   TSCont cont = TSContCreate(reset_pacing_cont, NULL);
231 
232   fq_pacing_cont_t *txn_data = TSmalloc(sizeof(fq_pacing_cont_t));
233   txn_data->client_fd        = client_fd;
234   TSContDataSet(cont, txn_data);
235 
236   TSHttpTxnHookAdd(txnp, TS_HTTP_TXN_CLOSE_HOOK, cont);
237   return TSREMAP_NO_REMAP;
238 }
239