1 /* spice-server char device flow control code
2 
3    Copyright (C) 2012-2015 Red Hat, Inc.
4 
5    Red Hat Authors:
6    Yonit Halperin <yhalperi@redhat.com>
7 
8    This library is free software; you can redistribute it and/or
9    modify it under the terms of the GNU Lesser General Public
10    License as published by the Free Software Foundation; either
11    version 2.1 of the License, or (at your option) any later version.
12 
13    This library is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16    Lesser General Public License for more details.
17 
18    You should have received a copy of the GNU Lesser General Public
19    License along with this library; if not, see <http:www.gnu.org/licenses/>.
20 */
21 
22 
23 #include <config.h>
24 #include <inttypes.h>
25 #include <list>
26 
27 #include "char-device.h"
28 #include "reds.h"
29 #include "safe-list.hpp"
30 
31 #define CHAR_DEVICE_WRITE_TO_TIMEOUT 100
32 #define RED_CHAR_DEVICE_WAIT_TOKENS_TIMEOUT 30000
33 
34 enum WriteBufferOrigin {
35     WRITE_BUFFER_ORIGIN_NONE,
36     WRITE_BUFFER_ORIGIN_CLIENT,
37     WRITE_BUFFER_ORIGIN_SERVER,
38     WRITE_BUFFER_ORIGIN_SERVER_NO_TOKEN,
39 };
40 
41 struct RedCharDeviceWriteBufferPrivate {
42     RedCharDeviceClientOpaque *client; /* The client that sent the message to the device.
43                           NULL if the server created the message */
44     WriteBufferOrigin origin;
45     uint32_t token_price;
46     uint32_t refs;
47 };
48 
49 struct RedCharDeviceClient {
50     SPICE_CXX_GLIB_ALLOCATOR
51     using Queue = std::list<RedPipeItemPtr, red::Mallocator<RedPipeItemPtr> >;
52 
53     RedCharDeviceClient(RedCharDevice *dev,
54                         RedsState *reds,
55                         RedCharDeviceClientOpaque *client,
56                         bool do_flow_control,
57                         uint32_t max_send_queue_size,
58                         uint32_t num_client_tokens,
59                         uint32_t num_send_tokens);
60     ~RedCharDeviceClient();
61 
62     RedCharDevice *const dev;
63     RedCharDeviceClientOpaque *const client;
64     const bool do_flow_control;
65     uint64_t num_client_tokens;
66     uint64_t num_client_tokens_free; /* client messages that were consumed by the device */
67     uint64_t num_send_tokens; /* send to client */
68     SpiceTimer *wait_for_tokens_timer;
69     int wait_for_tokens_started;
70     Queue send_queue;
71     const uint32_t max_send_queue_size;
72 };
73 
74 struct RedCharDevicePrivate {
75     SPICE_CXX_GLIB_ALLOCATOR
76 
77     int running;
78     int active; /* has read/write been performed since the device was started */
79     int wait_for_migrate_data;
80 
81     GQueue write_queue;
82     RedCharDeviceWriteBuffer *cur_write_buf;
83     uint8_t *cur_write_buf_pos;
84     SpiceTimer *write_to_dev_timer;
85     uint64_t num_self_tokens;
86 
87     GList *clients; /* list of RedCharDeviceClient */
88 
89     uint64_t client_tokens_interval; /* frequency of returning tokens to the client */
90     SpiceCharDeviceInstance *sin;
91 
92     int during_read_from_device;
93     int during_write_to_device;
94 
95     SpiceServer *reds;
96 };
97 
98 static void red_char_device_write_buffer_unref(RedCharDeviceWriteBuffer *write_buf);
99 
100 void
send_tokens_to_client(RedCharDeviceClientOpaque * client,uint32_t tokens)101 RedCharDevice::send_tokens_to_client(RedCharDeviceClientOpaque *client, uint32_t tokens)
102 {
103     g_warn_if_reached();
104 }
105 
red_char_device_write_buffer_free(RedCharDeviceWriteBuffer * buf)106 static void red_char_device_write_buffer_free(RedCharDeviceWriteBuffer *buf)
107 {
108     if (buf) {
109         g_free(buf->priv);
110     }
111     /* NOTE: do not free buf. buf was contained into a larger structure
112      * which contained both private and public part and was freed above */
113 }
114 
write_buffers_queue_free(GQueue * write_queue)115 static void write_buffers_queue_free(GQueue *write_queue)
116 {
117     RedCharDeviceWriteBuffer *buf;
118     while ((buf = (RedCharDeviceWriteBuffer *) g_queue_pop_tail(write_queue)))
119         red_char_device_write_buffer_free(buf);
120 }
121 
red_char_device_client_free(RedCharDevice * dev,RedCharDeviceClient * dev_client)122 static void red_char_device_client_free(RedCharDevice *dev,
123                                         RedCharDeviceClient *dev_client)
124 {
125     GList *l, *next;
126 
127     red_timer_remove(dev_client->wait_for_tokens_timer);
128     dev_client->wait_for_tokens_timer = nullptr;
129 
130     dev_client->send_queue.clear();
131 
132     /* remove write buffers that are associated with the client */
133     spice_debug("write_queue_is_empty %d", g_queue_is_empty(&dev->priv->write_queue) && !dev->priv->cur_write_buf);
134     l = g_queue_peek_head_link(&dev->priv->write_queue);
135     while (l) {
136         auto write_buf = (RedCharDeviceWriteBuffer *) l->data;
137         next = l->next;
138 
139         if (write_buf->priv->origin == WRITE_BUFFER_ORIGIN_CLIENT &&
140             write_buf->priv->client == dev_client->client) {
141             g_queue_delete_link(&dev->priv->write_queue, l);
142             red_char_device_write_buffer_unref(write_buf);
143         }
144         l = next;
145     }
146 
147     if (dev->priv->cur_write_buf && dev->priv->cur_write_buf->priv->origin == WRITE_BUFFER_ORIGIN_CLIENT &&
148         dev->priv->cur_write_buf->priv->client == dev_client->client) {
149         dev->priv->cur_write_buf->priv->origin = WRITE_BUFFER_ORIGIN_NONE;
150         dev->priv->cur_write_buf->priv->client = nullptr;
151     }
152 
153     dev->priv->clients = g_list_remove(dev->priv->clients, dev_client);
154     delete dev_client;
155 }
156 
red_char_device_handle_client_overflow(RedCharDeviceClient * dev_client)157 static void red_char_device_handle_client_overflow(RedCharDeviceClient *dev_client)
158 {
159     RedCharDevice *dev = dev_client->dev;
160     dev->remove_client(dev_client->client);
161 }
162 
red_char_device_client_find(RedCharDevice * dev,RedCharDeviceClientOpaque * client)163 static RedCharDeviceClient *red_char_device_client_find(RedCharDevice *dev,
164                                                         RedCharDeviceClientOpaque *client)
165 {
166     RedCharDeviceClient *dev_client;
167 
168     GLIST_FOREACH(dev->priv->clients, RedCharDeviceClient, dev_client) {
169         if (dev_client->client == client) {
170             return dev_client;
171         }
172     }
173     return nullptr;
174 }
175 
176 /***************************
177  * Reading from the device *
178  **************************/
179 
device_client_wait_for_tokens_timeout(RedCharDeviceClient * dev_client)180 static void device_client_wait_for_tokens_timeout(RedCharDeviceClient *dev_client)
181 {
182     red_char_device_handle_client_overflow(dev_client);
183 }
184 
red_char_device_can_send_to_client(RedCharDeviceClient * dev_client)185 static int red_char_device_can_send_to_client(RedCharDeviceClient *dev_client)
186 {
187     return !dev_client->do_flow_control || dev_client->num_send_tokens;
188 }
189 
red_char_device_max_send_tokens(RedCharDevice * dev)190 static uint64_t red_char_device_max_send_tokens(RedCharDevice *dev)
191 {
192     RedCharDeviceClient *dev_client;
193     uint64_t max = 0;
194 
195     GLIST_FOREACH(dev->priv->clients, RedCharDeviceClient, dev_client) {
196         if (!dev_client->do_flow_control) {
197             max = ~0;
198             break;
199         }
200 
201         if (dev_client->num_send_tokens > max) {
202             max = dev_client->num_send_tokens;
203         }
204     }
205     return max;
206 }
207 
red_char_device_add_msg_to_client_queue(RedCharDeviceClient * dev_client,RedPipeItem * msg)208 static void red_char_device_add_msg_to_client_queue(RedCharDeviceClient *dev_client,
209                                                     RedPipeItem *msg)
210 {
211     if (dev_client->send_queue.size() >= dev_client->max_send_queue_size) {
212         red_char_device_handle_client_overflow(dev_client);
213         return;
214     }
215 
216     dev_client->send_queue.push_front(RedPipeItemPtr(msg));
217     if (!dev_client->wait_for_tokens_started) {
218         red_timer_start(dev_client->wait_for_tokens_timer,
219                         RED_CHAR_DEVICE_WAIT_TOKENS_TIMEOUT);
220         dev_client->wait_for_tokens_started = TRUE;
221     }
222 }
223 
red_char_device_send_msg_to_clients(RedCharDevice * dev,RedPipeItem * msg)224 static void red_char_device_send_msg_to_clients(RedCharDevice *dev,
225                                                 RedPipeItem *msg)
226 {
227     RedCharDeviceClient *dev_client;
228 
229     GLIST_FOREACH(dev->priv->clients, RedCharDeviceClient, dev_client) {
230         if (red_char_device_can_send_to_client(dev_client)) {
231             dev_client->num_send_tokens--;
232             spice_assert(dev_client->send_queue.empty());
233             dev->send_msg_to_client(msg, dev_client->client);
234 
235             /* don't refer to dev_client anymore, it may have been released */
236         } else {
237             red_char_device_add_msg_to_client_queue(dev_client, msg);
238         }
239     }
240 }
241 
red_char_device_read_from_device(RedCharDevice * dev)242 static bool red_char_device_read_from_device(RedCharDevice *dev)
243 {
244     uint64_t max_send_tokens;
245     int did_read = FALSE;
246 
247     if (!dev->priv->running || dev->priv->wait_for_migrate_data || !dev->priv->sin) {
248         return FALSE;
249     }
250 
251     /* There are 2 scenarios where we can get called recursively:
252      * 1) spice-vmc vmc_read triggering flush of throttled data, recalling wakeup
253      * (virtio)
254      * 2) in case of sending messages to the client, and unreferencing the
255      * msg, we trigger another read.
256      */
257     if (dev->priv->during_read_from_device++ > 0) {
258         return FALSE;
259     }
260 
261     max_send_tokens = red_char_device_max_send_tokens(dev);
262     red::shared_ptr<RedCharDevice> hold_dev(dev);
263     /*
264      * Reading from the device only in case at least one of the clients have a free token.
265      * All messages will be discarded if no client is attached to the device
266      */
267     while ((max_send_tokens || (dev->priv->clients == nullptr)) && dev->priv->running) {
268         auto msg = dev->read_one_msg_from_device();
269         if (!msg) {
270             if (dev->priv->during_read_from_device > 1) {
271                 dev->priv->during_read_from_device = 1;
272                 continue; /* a wakeup might have been called during the read -
273                              make sure it doesn't get lost */
274             }
275             break;
276         }
277         did_read = TRUE;
278         red_char_device_send_msg_to_clients(dev, msg.get());
279         max_send_tokens--;
280     }
281     dev->priv->during_read_from_device = 0;
282     if (dev->priv->running) {
283         dev->priv->active = dev->priv->active || did_read;
284     }
285     return did_read;
286 }
287 
red_char_device_client_send_queue_push(RedCharDeviceClient * dev_client)288 static void red_char_device_client_send_queue_push(RedCharDeviceClient *dev_client)
289 {
290     while (!dev_client->send_queue.empty() &&
291            red_char_device_can_send_to_client(dev_client)) {
292         RedPipeItemPtr msg = std::move(dev_client->send_queue.back());
293         dev_client->send_queue.pop_back();
294         g_assert(msg);
295         dev_client->num_send_tokens--;
296         dev_client->dev->send_msg_to_client(msg.get(), dev_client->client);
297     }
298 }
299 
300 static void
red_char_device_send_to_client_tokens_absorb(RedCharDevice * dev,RedCharDeviceClientOpaque * client,uint32_t tokens,bool reset)301 red_char_device_send_to_client_tokens_absorb(RedCharDevice *dev,
302                                              RedCharDeviceClientOpaque *client,
303                                              uint32_t tokens,
304                                              bool reset)
305 {
306     RedCharDeviceClient *dev_client;
307 
308     dev_client = red_char_device_client_find(dev, client);
309 
310     if (!dev_client) {
311         spice_error("client wasn't found dev %p client %p", dev, client);
312         return;
313     }
314 
315     if (reset) {
316         dev_client->num_send_tokens = 0;
317     }
318     dev_client->num_send_tokens += tokens;
319 
320     if (!dev_client->send_queue.empty()) {
321         spice_assert(dev_client->num_send_tokens == tokens);
322         red_char_device_client_send_queue_push(dev_client);
323     }
324 
325     if (red_char_device_can_send_to_client(dev_client)) {
326         red_timer_cancel(dev_client->wait_for_tokens_timer);
327         dev_client->wait_for_tokens_started = FALSE;
328         red_char_device_read_from_device(dev_client->dev);
329     } else if (!dev_client->send_queue.empty()) {
330         red_timer_start(dev_client->wait_for_tokens_timer,
331                         RED_CHAR_DEVICE_WAIT_TOKENS_TIMEOUT);
332         dev_client->wait_for_tokens_started = TRUE;
333     }
334 }
335 
send_to_client_tokens_add(RedCharDeviceClientOpaque * client,uint32_t tokens)336 void RedCharDevice::send_to_client_tokens_add(RedCharDeviceClientOpaque *client,
337                                               uint32_t tokens)
338 {
339     red_char_device_send_to_client_tokens_absorb(this, client, tokens, false);
340 }
341 
send_to_client_tokens_set(RedCharDeviceClientOpaque * client,uint32_t tokens)342 void RedCharDevice::send_to_client_tokens_set(RedCharDeviceClientOpaque *client,
343                                               uint32_t tokens)
344 {
345     red_char_device_send_to_client_tokens_absorb(this, client, tokens, true);
346 }
347 
348 /**************************
349  * Writing to the device  *
350 ***************************/
351 
red_char_device_client_tokens_add(RedCharDevice * dev,RedCharDeviceClient * dev_client,uint32_t num_tokens)352 static void red_char_device_client_tokens_add(RedCharDevice *dev,
353                                               RedCharDeviceClient *dev_client,
354                                               uint32_t num_tokens)
355 {
356     if (!dev_client->do_flow_control) {
357         return;
358     }
359     if (num_tokens > 1) {
360         spice_debug("#tokens > 1 (=%u)", num_tokens);
361     }
362     dev_client->num_client_tokens_free += num_tokens;
363     if (dev_client->num_client_tokens_free >= dev->priv->client_tokens_interval) {
364         uint32_t tokens = dev_client->num_client_tokens_free;
365 
366         dev_client->num_client_tokens += dev_client->num_client_tokens_free;
367         dev_client->num_client_tokens_free = 0;
368         dev->send_tokens_to_client(dev_client->client, tokens);
369     }
370 }
371 
write_to_device()372 int RedCharDevice::write_to_device()
373 {
374     SpiceCharDeviceInterface *sif;
375     int total = 0;
376     int n;
377 
378     if (!priv->running || priv->wait_for_migrate_data || !priv->sin) {
379         return 0;
380     }
381 
382     /* protect against recursion with red_char_device_wakeup */
383     if (priv->during_write_to_device++ > 0) {
384         return 0;
385     }
386 
387     red::shared_ptr<RedCharDevice> hold_dev(this);
388 
389     if (priv->write_to_dev_timer) {
390         red_timer_cancel(priv->write_to_dev_timer);
391     }
392 
393     sif = spice_char_device_get_interface(priv->sin);
394     while (priv->running) {
395         uint32_t write_len;
396 
397         if (!priv->cur_write_buf) {
398             priv->cur_write_buf = (RedCharDeviceWriteBuffer *) g_queue_pop_tail(&priv->write_queue);
399             if (!priv->cur_write_buf)
400                 break;
401             priv->cur_write_buf_pos = priv->cur_write_buf->buf;
402         }
403 
404         write_len = priv->cur_write_buf->buf + priv->cur_write_buf->buf_used -
405                     priv->cur_write_buf_pos;
406         n = sif->write(priv->sin, priv->cur_write_buf_pos, write_len);
407         if (n <= 0) {
408             if (priv->during_write_to_device > 1) {
409                 priv->during_write_to_device = 1;
410                 continue; /* a wakeup might have been called during the write -
411                              make sure it doesn't get lost */
412             }
413             break;
414         }
415         total += n;
416         write_len -= n;
417         if (!write_len) {
418             write_buffer_release(&priv->cur_write_buf);
419             continue;
420         }
421         priv->cur_write_buf_pos += n;
422     }
423     /* retry writing as long as the write queue is not empty */
424     if (priv->running) {
425         if (priv->cur_write_buf) {
426             if (priv->write_to_dev_timer) {
427                 red_timer_start(priv->write_to_dev_timer,
428                                 CHAR_DEVICE_WRITE_TO_TIMEOUT);
429             }
430         } else {
431             spice_assert(g_queue_is_empty(&priv->write_queue));
432         }
433         priv->active = priv->active || total;
434     }
435     priv->during_write_to_device = 0;
436     return total;
437 }
438 
write_retry(RedCharDevice * dev)439 void RedCharDevice::write_retry(RedCharDevice *dev)
440 {
441     if (dev->priv->write_to_dev_timer) {
442         red_timer_cancel(dev->priv->write_to_dev_timer);
443     }
444     dev->write_to_device();
445 }
446 
447 static RedCharDeviceWriteBuffer *
red_char_device_write_buffer_get(RedCharDevice * dev,RedCharDeviceClientOpaque * client,int size,WriteBufferOrigin origin,int migrated_data_tokens)448 red_char_device_write_buffer_get(RedCharDevice *dev, RedCharDeviceClientOpaque *client, int size,
449                                  WriteBufferOrigin origin, int migrated_data_tokens)
450 {
451     RedCharDeviceWriteBuffer *ret;
452 
453     if (origin == WRITE_BUFFER_ORIGIN_SERVER && !dev->priv->num_self_tokens) {
454         return nullptr;
455     }
456 
457     struct RedCharDeviceWriteBufferFull {
458         RedCharDeviceWriteBufferPrivate priv;
459         RedCharDeviceWriteBuffer buffer;
460     } *write_buf;
461     write_buf = (struct RedCharDeviceWriteBufferFull* )
462         g_malloc(sizeof(struct RedCharDeviceWriteBufferFull) + size);
463     memset(write_buf, 0, sizeof(*write_buf));
464     write_buf->priv.refs = 1;
465     ret = &write_buf->buffer;
466     ret->buf_size = size;
467     ret->priv = &write_buf->priv;
468 
469     spice_assert(!ret->buf_used);
470 
471     ret->priv->origin = origin;
472 
473     if (origin == WRITE_BUFFER_ORIGIN_CLIENT) {
474        spice_assert(client);
475        RedCharDeviceClient *dev_client = red_char_device_client_find(dev, client);
476        if (dev_client) {
477             if (!migrated_data_tokens &&
478                 dev_client->do_flow_control && !dev_client->num_client_tokens) {
479                 g_warning("token violation: dev %p client %p", dev, client);
480                 red_char_device_handle_client_overflow(dev_client);
481                 goto error;
482             }
483             ret->priv->client = client;
484             if (!migrated_data_tokens && dev_client->do_flow_control) {
485                 dev_client->num_client_tokens--;
486             }
487         } else {
488             /* it is possible that the client was removed due to send tokens underflow, but
489              * the caller still receive messages from the client */
490             g_warning("client not found: dev %p client %p", dev, client);
491             goto error;
492         }
493     } else if (origin == WRITE_BUFFER_ORIGIN_SERVER) {
494         dev->priv->num_self_tokens--;
495     }
496 
497     ret->priv->token_price = migrated_data_tokens ? migrated_data_tokens : 1;
498     ret->priv->refs = 1;
499     return ret;
500 error:
501     red_char_device_write_buffer_free(ret);
502     return nullptr;
503 }
504 
write_buffer_get_client(RedCharDeviceClientOpaque * client,int size)505 RedCharDeviceWriteBuffer *RedCharDevice::write_buffer_get_client(RedCharDeviceClientOpaque *client,
506                                                                  int size)
507 {
508     spice_assert(client);
509     return  red_char_device_write_buffer_get(this, client, size, WRITE_BUFFER_ORIGIN_CLIENT, 0);
510 }
511 
write_buffer_get_server(int size,bool use_token)512 RedCharDeviceWriteBuffer *RedCharDevice::write_buffer_get_server(int size,
513                                                                  bool use_token)
514 {
515     WriteBufferOrigin origin =
516         use_token ? WRITE_BUFFER_ORIGIN_SERVER : WRITE_BUFFER_ORIGIN_SERVER_NO_TOKEN;
517     return  red_char_device_write_buffer_get(this, nullptr, size, origin, 0);
518 }
519 
red_char_device_write_buffer_ref(RedCharDeviceWriteBuffer * write_buf)520 static RedCharDeviceWriteBuffer *red_char_device_write_buffer_ref(RedCharDeviceWriteBuffer *write_buf)
521 {
522     spice_assert(write_buf);
523 
524     write_buf->priv->refs++;
525     return write_buf;
526 }
527 
red_char_device_write_buffer_unref(RedCharDeviceWriteBuffer * write_buf)528 static void red_char_device_write_buffer_unref(RedCharDeviceWriteBuffer *write_buf)
529 {
530     spice_assert(write_buf);
531 
532     write_buf->priv->refs--;
533     if (write_buf->priv->refs == 0)
534         red_char_device_write_buffer_free(write_buf);
535 }
536 
write_buffer_add(RedCharDeviceWriteBuffer * write_buf)537 void RedCharDevice::write_buffer_add(RedCharDeviceWriteBuffer *write_buf)
538 {
539 
540     /* caller shouldn't add buffers for client that was removed */
541     if (write_buf->priv->origin == WRITE_BUFFER_ORIGIN_CLIENT &&
542         !red_char_device_client_find(this, write_buf->priv->client)) {
543         g_warning("client not found: this %p client %p", this, write_buf->priv->client);
544         red_char_device_write_buffer_unref(write_buf);
545         return;
546     }
547 
548     g_queue_push_head(&priv->write_queue, write_buf);
549     write_to_device();
550 }
551 
write_buffer_release(RedCharDevice * dev,RedCharDeviceWriteBuffer ** p_write_buf)552 void RedCharDevice::write_buffer_release(RedCharDevice *dev,
553                                          RedCharDeviceWriteBuffer **p_write_buf)
554 {
555     RedCharDeviceWriteBuffer *write_buf = *p_write_buf;
556     if (!write_buf) {
557         return;
558     }
559     *p_write_buf = nullptr;
560 
561     WriteBufferOrigin buf_origin = write_buf->priv->origin;
562     uint32_t buf_token_price = write_buf->priv->token_price;
563     RedCharDeviceClientOpaque *client = write_buf->priv->client;
564 
565     if (!dev) {
566         g_warning("no device. write buffer is freed");
567         red_char_device_write_buffer_free(write_buf);
568         return;
569     }
570 
571     spice_assert(dev->priv->cur_write_buf != write_buf);
572 
573     red_char_device_write_buffer_unref(write_buf);
574     if (buf_origin == WRITE_BUFFER_ORIGIN_CLIENT) {
575         RedCharDeviceClient *dev_client;
576 
577         spice_assert(client);
578         dev_client = red_char_device_client_find(dev, client);
579         /* when a client is removed, we remove all the buffers that are associated with it */
580         spice_assert(dev_client);
581         red_char_device_client_tokens_add(dev, dev_client, buf_token_price);
582     } else if (buf_origin == WRITE_BUFFER_ORIGIN_SERVER) {
583         dev->priv->num_self_tokens++;
584         dev->on_free_self_token();
585     }
586 }
587 
588 /********************************
589  * char_device_state management *
590  ********************************/
591 
reset_dev_instance(SpiceCharDeviceInstance * sin)592 void RedCharDevice::reset_dev_instance(SpiceCharDeviceInstance *sin)
593 {
594     spice_debug("sin %p, char device %p", sin, this);
595     priv->sin = sin;
596     if (sin) {
597         sin->st = this;
598     }
599     if (priv->reds) {
600         init_device_instance();
601     }
602 }
603 
RedCharDeviceClient(RedCharDevice * init_dev,RedsState * reds,RedCharDeviceClientOpaque * init_client,bool init_do_flow_control,uint32_t init_max_send_queue_size,uint32_t init_num_client_tokens,uint32_t init_num_send_tokens)604 RedCharDeviceClient::RedCharDeviceClient(RedCharDevice *init_dev,
605                                          RedsState *reds,
606                                          RedCharDeviceClientOpaque *init_client,
607                                          bool init_do_flow_control,
608                                          uint32_t init_max_send_queue_size,
609                                          uint32_t init_num_client_tokens,
610                                          uint32_t init_num_send_tokens):
611     dev(init_dev),
612     client(init_client),
613     do_flow_control(init_do_flow_control),
614     max_send_queue_size(init_max_send_queue_size)
615 {
616     if (do_flow_control) {
617         wait_for_tokens_timer =
618             reds_core_timer_add(reds, device_client_wait_for_tokens_timeout, this);
619         if (!wait_for_tokens_timer) {
620             spice_error("failed to create wait for tokens timer");
621         }
622         num_client_tokens = init_num_client_tokens;
623         num_send_tokens = init_num_send_tokens;
624     } else {
625         num_client_tokens = ~0;
626         num_send_tokens = ~0;
627     }
628 }
629 
630 RedCharDeviceClient::~RedCharDeviceClient() = default;
631 
client_add(RedCharDeviceClientOpaque * client,int do_flow_control,uint32_t max_send_queue_size,uint32_t num_client_tokens,uint32_t num_send_tokens,int wait_for_migrate_data)632 bool RedCharDevice::client_add(RedCharDeviceClientOpaque *client,
633                                int do_flow_control,
634                                uint32_t max_send_queue_size,
635                                uint32_t num_client_tokens,
636                                uint32_t num_send_tokens,
637                                int wait_for_migrate_data)
638 {
639     RedCharDeviceClient *dev_client;
640 
641 
642     spice_assert(client);
643 
644     if (wait_for_migrate_data && (priv->clients != nullptr || priv->active)) {
645         spice_warning("can't restore device %p from migration data. The device "
646                       "has already been active", this);
647         return FALSE;
648     }
649 
650     priv->wait_for_migrate_data = wait_for_migrate_data;
651 
652     spice_debug("char device %p, client %p", this, client);
653     dev_client = new RedCharDeviceClient(this,
654                                          priv->reds,
655                                          client,
656                                          !!do_flow_control,
657                                          max_send_queue_size,
658                                          num_client_tokens,
659                                          num_send_tokens);
660     priv->clients = g_list_prepend(priv->clients, dev_client);
661     /* Now that we have a client, forward any pending device data */
662     wakeup();
663     return TRUE;
664 }
665 
client_remove(RedCharDeviceClientOpaque * client)666 void RedCharDevice::client_remove(RedCharDeviceClientOpaque *client)
667 {
668     RedCharDeviceClient *dev_client;
669 
670     spice_debug("char device %p, client %p", this, client);
671     dev_client = red_char_device_client_find(this, client);
672 
673     if (!dev_client) {
674         spice_error("client wasn't found");
675         return;
676     }
677     red_char_device_client_free(this, dev_client);
678     if (priv->wait_for_migrate_data) {
679         spice_assert(priv->clients == nullptr);
680         priv->wait_for_migrate_data  = FALSE;
681         red_char_device_read_from_device(this);
682     }
683 }
684 
client_exists(RedCharDeviceClientOpaque * client)685 int RedCharDevice::client_exists(RedCharDeviceClientOpaque *client)
686 {
687     return (red_char_device_client_find(this, client) != nullptr);
688 }
689 
start()690 void RedCharDevice::start()
691 {
692     spice_debug("char device %p", this);
693     priv->running = TRUE;
694     red::shared_ptr<RedCharDevice> hold_dev(this);
695     while (write_to_device() ||
696            red_char_device_read_from_device(this));
697 }
698 
stop()699 void RedCharDevice::stop()
700 {
701     spice_debug("char device %p", this);
702     priv->running = FALSE;
703     priv->active = FALSE;
704     if (priv->write_to_dev_timer) {
705         red_timer_cancel(priv->write_to_dev_timer);
706     }
707 }
708 
reset()709 void RedCharDevice::reset()
710 {
711     RedCharDeviceClient *dev_client;
712     RedCharDeviceWriteBuffer *buf;
713 
714     priv->wait_for_migrate_data = FALSE;
715     spice_debug("char device %p", this);
716     while ((buf = (RedCharDeviceWriteBuffer *) g_queue_pop_tail(&priv->write_queue))) {
717         write_buffer_release(&buf);
718     }
719     write_buffer_release(&priv->cur_write_buf);
720 
721     GLIST_FOREACH(priv->clients, RedCharDeviceClient, dev_client) {
722         spice_debug("send_queue_empty %d", dev_client->send_queue.empty());
723         dev_client->num_send_tokens += dev_client->send_queue.size();
724         dev_client->send_queue.clear();
725 
726         /* If device is reset, we must reset the tokens counters as well as we
727          * don't hold any data from client and upon agent's reconnection we send
728          * SPICE_MSG_MAIN_AGENT_CONNECTED_TOKENS with all free tokens we have */
729         dev_client->num_client_tokens += dev_client->num_client_tokens_free;
730         dev_client->num_client_tokens_free = 0;
731     }
732 }
733 
wakeup()734 void RedCharDevice::wakeup()
735 {
736     write_to_device();
737     red_char_device_read_from_device(this);
738 }
739 
740 /*************
741  * Migration *
742  * **********/
743 
migrate_data_marshall_empty(SpiceMarshaller * m)744 void RedCharDevice::migrate_data_marshall_empty(SpiceMarshaller *m)
745 {
746     SpiceMigrateDataCharDevice *mig_data;
747 
748     spice_debug("trace");
749     mig_data = (SpiceMigrateDataCharDevice *)spice_marshaller_reserve_space(m,
750                                                                             sizeof(*mig_data));
751     memset(mig_data, 0, sizeof(*mig_data));
752     mig_data->version = SPICE_MIGRATE_DATA_CHAR_DEVICE_VERSION;
753     mig_data->connected = FALSE;
754 }
755 
migrate_data_marshaller_write_buffer_free(uint8_t * data,void * opaque)756 static void migrate_data_marshaller_write_buffer_free(uint8_t *data, void *opaque)
757 {
758     auto write_buf = (RedCharDeviceWriteBuffer *)opaque;
759 
760     red_char_device_write_buffer_unref(write_buf);
761 }
762 
migrate_data_marshall(SpiceMarshaller * m)763 void RedCharDevice::migrate_data_marshall(SpiceMarshaller *m)
764 {
765     RedCharDeviceClient *dev_client;
766     GList *item;
767     uint8_t *write_to_dev_sizes_ptr;
768     uint32_t write_to_dev_size;
769     uint32_t write_to_dev_tokens;
770     SpiceMarshaller *m2;
771 
772     /* multi-clients are not supported */
773     spice_assert(g_list_length(priv->clients) == 1);
774     dev_client = (RedCharDeviceClient *) g_list_last(priv->clients)->data;
775     /* FIXME: if there were more than one client before the marshalling,
776      * it is possible that the send_queue length > 0, and the send data
777      * should be migrated as well */
778     spice_assert(dev_client->send_queue.empty());
779     spice_marshaller_add_uint32(m, SPICE_MIGRATE_DATA_CHAR_DEVICE_VERSION);
780     spice_marshaller_add_uint8(m, 1); /* connected */
781     spice_marshaller_add_uint32(m, dev_client->num_client_tokens);
782     spice_marshaller_add_uint32(m, dev_client->num_send_tokens);
783     write_to_dev_sizes_ptr = spice_marshaller_reserve_space(m, sizeof(uint32_t)*2);
784     write_to_dev_size = 0;
785     write_to_dev_tokens = 0;
786 
787     m2 = spice_marshaller_get_ptr_submarshaller(m);
788     if (priv->cur_write_buf) {
789         uint32_t buf_remaining = priv->cur_write_buf->buf + priv->cur_write_buf->buf_used -
790                                  priv->cur_write_buf_pos;
791         spice_marshaller_add_by_ref_full(m2, priv->cur_write_buf_pos, buf_remaining,
792                                          migrate_data_marshaller_write_buffer_free,
793                                          red_char_device_write_buffer_ref(priv->cur_write_buf)
794                                          );
795         write_to_dev_size += buf_remaining;
796         if (priv->cur_write_buf->priv->origin == WRITE_BUFFER_ORIGIN_CLIENT) {
797             spice_assert(priv->cur_write_buf->priv->client == dev_client->client);
798             write_to_dev_tokens += priv->cur_write_buf->priv->token_price;
799         }
800     }
801 
802     for (item = g_queue_peek_tail_link(&priv->write_queue); item != nullptr; item = item->prev) {
803         auto write_buf = (RedCharDeviceWriteBuffer *) item->data;
804 
805         spice_marshaller_add_by_ref_full(m2, write_buf->buf, write_buf->buf_used,
806                                          migrate_data_marshaller_write_buffer_free,
807                                          red_char_device_write_buffer_ref(write_buf)
808                                          );
809         write_to_dev_size += write_buf->buf_used;
810         if (write_buf->priv->origin == WRITE_BUFFER_ORIGIN_CLIENT) {
811             spice_assert(write_buf->priv->client == dev_client->client);
812             write_to_dev_tokens += write_buf->priv->token_price;
813         }
814     }
815     spice_debug("migration data dev %p: write_queue size %u tokens %u",
816                 this, write_to_dev_size, write_to_dev_tokens);
817     spice_marshaller_set_uint32(m, write_to_dev_sizes_ptr, write_to_dev_size);
818     spice_marshaller_set_uint32(m, write_to_dev_sizes_ptr + sizeof(uint32_t), write_to_dev_tokens);
819 }
820 
restore(SpiceMigrateDataCharDevice * mig_data)821 bool RedCharDevice::restore(SpiceMigrateDataCharDevice *mig_data)
822 {
823     RedCharDeviceClient *dev_client;
824     uint32_t client_tokens_window;
825 
826     spice_assert(g_list_length(priv->clients) == 1 &&
827                  priv->wait_for_migrate_data);
828 
829     dev_client = (RedCharDeviceClient *) g_list_last(priv->clients)->data;
830     if (mig_data->version > SPICE_MIGRATE_DATA_CHAR_DEVICE_VERSION) {
831         spice_error("dev %p error: migration data version %u is bigger than self %u",
832                     this, mig_data->version, SPICE_MIGRATE_DATA_CHAR_DEVICE_VERSION);
833         return FALSE;
834     }
835     spice_assert(!priv->cur_write_buf && g_queue_is_empty(&priv->write_queue));
836     spice_assert(mig_data->connected);
837 
838     client_tokens_window = dev_client->num_client_tokens; /* initial state of tokens */
839     dev_client->num_client_tokens = mig_data->num_client_tokens;
840     /* assumption: client_tokens_window stays the same across severs */
841     dev_client->num_client_tokens_free = client_tokens_window -
842                                            mig_data->num_client_tokens -
843                                            mig_data->write_num_client_tokens;
844     dev_client->num_send_tokens = mig_data->num_send_tokens;
845 
846     if (mig_data->write_size > 0) {
847         if (mig_data->write_num_client_tokens) {
848             priv->cur_write_buf =
849                 red_char_device_write_buffer_get(this, dev_client->client,
850                     mig_data->write_size, WRITE_BUFFER_ORIGIN_CLIENT,
851                     mig_data->write_num_client_tokens);
852         } else {
853             priv->cur_write_buf =
854                 red_char_device_write_buffer_get(this, nullptr,
855                     mig_data->write_size, WRITE_BUFFER_ORIGIN_SERVER, 0);
856         }
857         /* the first write buffer contains all the data that was saved for migration */
858         memcpy(priv->cur_write_buf->buf,
859                ((uint8_t *)mig_data) + mig_data->write_data_ptr - sizeof(SpiceMigrateDataHeader),
860                mig_data->write_size);
861         priv->cur_write_buf->buf_used = mig_data->write_size;
862         priv->cur_write_buf_pos = priv->cur_write_buf->buf;
863     }
864     priv->wait_for_migrate_data = FALSE;
865     write_to_device();
866     red_char_device_read_from_device(this);
867     return TRUE;
868 }
869 
get_server()870 SpiceServer* RedCharDevice::get_server()
871 {
872     return priv->reds;
873 }
874 
spice_char_device_get_interface(SpiceCharDeviceInstance * instance)875 SpiceCharDeviceInterface *spice_char_device_get_interface(SpiceCharDeviceInstance *instance)
876 {
877    return SPICE_UPCAST(SpiceCharDeviceInterface, instance->base.sif);
878 }
879 
880 
init_device_instance()881 void RedCharDevice::init_device_instance()
882 {
883     SpiceCharDeviceInterface *sif;
884 
885     g_return_if_fail(priv->reds);
886 
887     red_timer_remove(priv->write_to_dev_timer);
888     priv->write_to_dev_timer = nullptr;
889 
890     if (priv->sin == nullptr) {
891        return;
892     }
893 
894     sif = spice_char_device_get_interface(priv->sin);
895     if (sif->base.minor_version <= 2 ||
896         !(sif->flags & SPICE_CHAR_DEVICE_NOTIFY_WRITABLE)) {
897         priv->write_to_dev_timer = reds_core_timer_add(priv->reds,
898                                                        RedCharDevice::write_retry,
899                                                        this);
900         if (!priv->write_to_dev_timer) {
901             spice_error("failed creating char dev write timer");
902         }
903     }
904 
905     priv->sin->st = this;
906 }
907 
~RedCharDevice()908 RedCharDevice::~RedCharDevice()
909 {
910     red_timer_remove(priv->write_to_dev_timer);
911     priv->write_to_dev_timer = nullptr;
912 
913     write_buffers_queue_free(&priv->write_queue);
914     red_char_device_write_buffer_free(priv->cur_write_buf);
915     priv->cur_write_buf = nullptr;
916 
917     while (priv->clients != nullptr) {
918         auto dev_client = (RedCharDeviceClient *) priv->clients->data;
919         red_char_device_client_free(this, dev_client);
920     }
921     priv->running = FALSE;
922 }
923 
924 void
port_event(uint8_t event)925 RedCharDevice::port_event(uint8_t event)
926 {
927 }
928 
spice_server_port_event(SpiceCharDeviceInstance * sin,uint8_t event)929 SPICE_GNUC_VISIBLE void spice_server_port_event(SpiceCharDeviceInstance *sin, uint8_t event)
930 {
931     if (sin->st == nullptr) {
932         spice_warning("no RedCharDevice attached to instance %p", sin);
933         return;
934     }
935 
936     sin->st->port_event(event);
937 }
938 
get_device_instance()939 SpiceCharDeviceInstance *RedCharDevice::get_device_instance()
940 {
941     return priv->sin;
942 }
943 
RedCharDevice(RedsState * reds,SpiceCharDeviceInstance * sin,uint64_t client_tokens_interval,uint64_t num_self_tokens)944 RedCharDevice::RedCharDevice(RedsState *reds, SpiceCharDeviceInstance *sin,
945                              uint64_t client_tokens_interval, uint64_t num_self_tokens)
946 {
947     priv->reds = reds;
948     priv->client_tokens_interval = client_tokens_interval;
949     priv->num_self_tokens = num_self_tokens;
950     reset_dev_instance(sin);
951 
952     g_queue_init(&priv->write_queue);
953 }
954 
read(uint8_t * buf,int len)955 int RedCharDevice::read(uint8_t *buf, int len)
956 {
957     auto sif = spice_char_device_get_interface(priv->sin);
958 
959     int ret = sif->read(priv->sin, buf, len);
960     if (ret > 0) {
961         priv->active = true;
962     }
963     return ret;
964 }
965