xref: /qemu/net/colo-compare.c (revision e3a6e0da)
1 /*
2  * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
3  * (a.k.a. Fault Tolerance or Continuous Replication)
4  *
5  * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
6  * Copyright (c) 2016 FUJITSU LIMITED
7  * Copyright (c) 2016 Intel Corporation
8  *
9  * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
10  *
11  * This work is licensed under the terms of the GNU GPL, version 2 or
12  * later.  See the COPYING file in the top-level directory.
13  */
14 
15 #include "qemu/osdep.h"
16 #include "qemu-common.h"
17 #include "qemu/error-report.h"
18 #include "trace.h"
19 #include "qapi/error.h"
20 #include "net/net.h"
21 #include "net/eth.h"
22 #include "qom/object_interfaces.h"
23 #include "qemu/iov.h"
24 #include "qom/object.h"
25 #include "net/queue.h"
26 #include "chardev/char-fe.h"
27 #include "qemu/sockets.h"
28 #include "colo.h"
29 #include "sysemu/iothread.h"
30 #include "net/colo-compare.h"
31 #include "migration/colo.h"
32 #include "migration/migration.h"
33 #include "util.h"
34 
35 #include "block/aio-wait.h"
36 #include "qemu/coroutine.h"
37 
38 #define TYPE_COLO_COMPARE "colo-compare"
39 typedef struct CompareState CompareState;
40 DECLARE_INSTANCE_CHECKER(CompareState, COLO_COMPARE,
41                          TYPE_COLO_COMPARE)
42 
43 static QTAILQ_HEAD(, CompareState) net_compares =
44        QTAILQ_HEAD_INITIALIZER(net_compares);
45 
46 static NotifierList colo_compare_notifiers =
47     NOTIFIER_LIST_INITIALIZER(colo_compare_notifiers);
48 
49 #define COMPARE_READ_LEN_MAX NET_BUFSIZE
50 #define MAX_QUEUE_SIZE 1024
51 
52 #define COLO_COMPARE_FREE_PRIMARY     0x01
53 #define COLO_COMPARE_FREE_SECONDARY   0x02
54 
55 #define REGULAR_PACKET_CHECK_MS 3000
56 #define DEFAULT_TIME_OUT_MS 3000
57 
58 /* #define DEBUG_COLO_PACKETS */
59 
60 static QemuMutex colo_compare_mutex;
61 static bool colo_compare_active;
62 static QemuMutex event_mtx;
63 static QemuCond event_complete_cond;
64 static int event_unhandled_count;
65 static uint32_t max_queue_size;
66 
67 /*
68  *  + CompareState ++
69  *  |               |
70  *  +---------------+   +---------------+         +---------------+
71  *  |   conn list   + - >      conn     + ------- >      conn     + -- > ......
72  *  +---------------+   +---------------+         +---------------+
73  *  |               |     |           |             |          |
74  *  +---------------+ +---v----+  +---v----+    +---v----+ +---v----+
75  *                    |primary |  |secondary    |primary | |secondary
76  *                    |packet  |  |packet  +    |packet  | |packet  +
77  *                    +--------+  +--------+    +--------+ +--------+
78  *                        |           |             |          |
79  *                    +---v----+  +---v----+    +---v----+ +---v----+
80  *                    |primary |  |secondary    |primary | |secondary
81  *                    |packet  |  |packet  +    |packet  | |packet  +
82  *                    +--------+  +--------+    +--------+ +--------+
83  *                        |           |             |          |
84  *                    +---v----+  +---v----+    +---v----+ +---v----+
85  *                    |primary |  |secondary    |primary | |secondary
86  *                    |packet  |  |packet  +    |packet  | |packet  +
87  *                    +--------+  +--------+    +--------+ +--------+
88  */
89 
90 typedef struct SendCo {
91     Coroutine *co;
92     struct CompareState *s;
93     CharBackend *chr;
94     GQueue send_list;
95     bool notify_remote_frame;
96     bool done;
97     int ret;
98 } SendCo;
99 
100 typedef struct SendEntry {
101     uint32_t size;
102     uint32_t vnet_hdr_len;
103     uint8_t *buf;
104 } SendEntry;
105 
106 struct CompareState {
107     Object parent;
108 
109     char *pri_indev;
110     char *sec_indev;
111     char *outdev;
112     char *notify_dev;
113     CharBackend chr_pri_in;
114     CharBackend chr_sec_in;
115     CharBackend chr_out;
116     CharBackend chr_notify_dev;
117     SocketReadState pri_rs;
118     SocketReadState sec_rs;
119     SocketReadState notify_rs;
120     SendCo out_sendco;
121     SendCo notify_sendco;
122     bool vnet_hdr;
123     uint32_t compare_timeout;
124     uint32_t expired_scan_cycle;
125 
126     /*
127      * Record the connection that through the NIC
128      * Element type: Connection
129      */
130     GQueue conn_list;
131     /* Record the connection without repetition */
132     GHashTable *connection_track_table;
133 
134     IOThread *iothread;
135     GMainContext *worker_context;
136     QEMUTimer *packet_check_timer;
137 
138     QEMUBH *event_bh;
139     enum colo_event event;
140 
141     QTAILQ_ENTRY(CompareState) next;
142 };
143 
144 typedef struct CompareClass {
145     ObjectClass parent_class;
146 } CompareClass;
147 
148 enum {
149     PRIMARY_IN = 0,
150     SECONDARY_IN,
151 };
152 
153 static const char *colo_mode[] = {
154     [PRIMARY_IN] = "primary",
155     [SECONDARY_IN] = "secondary",
156 };
157 
158 static int compare_chr_send(CompareState *s,
159                             uint8_t *buf,
160                             uint32_t size,
161                             uint32_t vnet_hdr_len,
162                             bool notify_remote_frame,
163                             bool zero_copy);
164 
165 static bool packet_matches_str(const char *str,
166                                const uint8_t *buf,
167                                uint32_t packet_len)
168 {
169     if (packet_len != strlen(str)) {
170         return false;
171     }
172 
173     return !memcmp(str, buf, strlen(str));
174 }
175 
176 static void notify_remote_frame(CompareState *s)
177 {
178     char msg[] = "DO_CHECKPOINT";
179     int ret = 0;
180 
181     ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true, false);
182     if (ret < 0) {
183         error_report("Notify Xen COLO-frame failed");
184     }
185 }
186 
187 static void colo_compare_inconsistency_notify(CompareState *s)
188 {
189     if (s->notify_dev) {
190         notify_remote_frame(s);
191     } else {
192         notifier_list_notify(&colo_compare_notifiers,
193                              migrate_get_current());
194     }
195 }
196 
197 static gint seq_sorter(Packet *a, Packet *b, gpointer data)
198 {
199     struct tcp_hdr *atcp, *btcp;
200 
201     atcp = (struct tcp_hdr *)(a->transport_header);
202     btcp = (struct tcp_hdr *)(b->transport_header);
203     return ntohl(atcp->th_seq) - ntohl(btcp->th_seq);
204 }
205 
206 static void fill_pkt_tcp_info(void *data, uint32_t *max_ack)
207 {
208     Packet *pkt = data;
209     struct tcp_hdr *tcphd;
210 
211     tcphd = (struct tcp_hdr *)pkt->transport_header;
212 
213     pkt->tcp_seq = ntohl(tcphd->th_seq);
214     pkt->tcp_ack = ntohl(tcphd->th_ack);
215     *max_ack = *max_ack > pkt->tcp_ack ? *max_ack : pkt->tcp_ack;
216     pkt->header_size = pkt->transport_header - (uint8_t *)pkt->data
217                        + (tcphd->th_off << 2) - pkt->vnet_hdr_len;
218     pkt->payload_size = pkt->size - pkt->header_size;
219     pkt->seq_end = pkt->tcp_seq + pkt->payload_size;
220     pkt->flags = tcphd->th_flags;
221 }
222 
223 /*
224  * Return 1 on success, if return 0 means the
225  * packet will be dropped
226  */
227 static int colo_insert_packet(GQueue *queue, Packet *pkt, uint32_t *max_ack)
228 {
229     if (g_queue_get_length(queue) <= max_queue_size) {
230         if (pkt->ip->ip_p == IPPROTO_TCP) {
231             fill_pkt_tcp_info(pkt, max_ack);
232             g_queue_insert_sorted(queue,
233                                   pkt,
234                                   (GCompareDataFunc)seq_sorter,
235                                   NULL);
236         } else {
237             g_queue_push_tail(queue, pkt);
238         }
239         return 1;
240     }
241     return 0;
242 }
243 
244 /*
245  * Return 0 on success, if return -1 means the pkt
246  * is unsupported(arp and ipv6) and will be sent later
247  */
248 static int packet_enqueue(CompareState *s, int mode, Connection **con)
249 {
250     ConnectionKey key;
251     Packet *pkt = NULL;
252     Connection *conn;
253     int ret;
254 
255     if (mode == PRIMARY_IN) {
256         pkt = packet_new(s->pri_rs.buf,
257                          s->pri_rs.packet_len,
258                          s->pri_rs.vnet_hdr_len);
259     } else {
260         pkt = packet_new(s->sec_rs.buf,
261                          s->sec_rs.packet_len,
262                          s->sec_rs.vnet_hdr_len);
263     }
264 
265     if (parse_packet_early(pkt)) {
266         packet_destroy(pkt, NULL);
267         pkt = NULL;
268         return -1;
269     }
270     fill_connection_key(pkt, &key);
271 
272     conn = connection_get(s->connection_track_table,
273                           &key,
274                           &s->conn_list);
275 
276     if (!conn->processing) {
277         g_queue_push_tail(&s->conn_list, conn);
278         conn->processing = true;
279     }
280 
281     if (mode == PRIMARY_IN) {
282         ret = colo_insert_packet(&conn->primary_list, pkt, &conn->pack);
283     } else {
284         ret = colo_insert_packet(&conn->secondary_list, pkt, &conn->sack);
285     }
286 
287     if (!ret) {
288         trace_colo_compare_drop_packet(colo_mode[mode],
289             "queue size too big, drop packet");
290         packet_destroy(pkt, NULL);
291         pkt = NULL;
292     }
293 
294     *con = conn;
295 
296     return 0;
297 }
298 
299 static inline bool after(uint32_t seq1, uint32_t seq2)
300 {
301         return (int32_t)(seq1 - seq2) > 0;
302 }
303 
304 static void colo_release_primary_pkt(CompareState *s, Packet *pkt)
305 {
306     int ret;
307     ret = compare_chr_send(s,
308                            pkt->data,
309                            pkt->size,
310                            pkt->vnet_hdr_len,
311                            false,
312                            true);
313     if (ret < 0) {
314         error_report("colo send primary packet failed");
315     }
316     trace_colo_compare_main("packet same and release packet");
317     packet_destroy_partial(pkt, NULL);
318 }
319 
320 /*
321  * The IP packets sent by primary and secondary
322  * will be compared in here
323  * TODO support ip fragment, Out-Of-Order
324  * return:    0  means packet same
325  *            > 0 || < 0 means packet different
326  */
327 static int colo_compare_packet_payload(Packet *ppkt,
328                                        Packet *spkt,
329                                        uint16_t poffset,
330                                        uint16_t soffset,
331                                        uint16_t len)
332 
333 {
334     if (trace_event_get_state_backends(TRACE_COLO_COMPARE_IP_INFO)) {
335         char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
336 
337         strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
338         strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
339         strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
340         strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
341 
342         trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
343                                    pri_ip_dst, spkt->size,
344                                    sec_ip_src, sec_ip_dst);
345     }
346 
347     return memcmp(ppkt->data + poffset, spkt->data + soffset, len);
348 }
349 
350 /*
351  * return true means that the payload is consist and
352  * need to make the next comparison, false means do
353  * the checkpoint
354 */
355 static bool colo_mark_tcp_pkt(Packet *ppkt, Packet *spkt,
356                               int8_t *mark, uint32_t max_ack)
357 {
358     *mark = 0;
359 
360     if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) {
361         if (!colo_compare_packet_payload(ppkt, spkt,
362                                         ppkt->header_size, spkt->header_size,
363                                         ppkt->payload_size)) {
364             *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY;
365             return true;
366         }
367     }
368 
369     /* one part of secondary packet payload still need to be compared */
370     if (!after(ppkt->seq_end, spkt->seq_end)) {
371         if (!colo_compare_packet_payload(ppkt, spkt,
372                                         ppkt->header_size + ppkt->offset,
373                                         spkt->header_size + spkt->offset,
374                                         ppkt->payload_size - ppkt->offset)) {
375             if (!after(ppkt->tcp_ack, max_ack)) {
376                 *mark = COLO_COMPARE_FREE_PRIMARY;
377                 spkt->offset += ppkt->payload_size - ppkt->offset;
378                 return true;
379             } else {
380                 /* secondary guest hasn't ack the data, don't send
381                  * out this packet
382                  */
383                 return false;
384             }
385         }
386     } else {
387         /* primary packet is longer than secondary packet, compare
388          * the same part and mark the primary packet offset
389          */
390         if (!colo_compare_packet_payload(ppkt, spkt,
391                                         ppkt->header_size + ppkt->offset,
392                                         spkt->header_size + spkt->offset,
393                                         spkt->payload_size - spkt->offset)) {
394             *mark = COLO_COMPARE_FREE_SECONDARY;
395             ppkt->offset += spkt->payload_size - spkt->offset;
396             return true;
397         }
398     }
399 
400     return false;
401 }
402 
403 static void colo_compare_tcp(CompareState *s, Connection *conn)
404 {
405     Packet *ppkt = NULL, *spkt = NULL;
406     int8_t mark;
407 
408     /*
409      * If ppkt and spkt have the same payload, but ppkt's ACK
410      * is greater than spkt's ACK, in this case we can not
411      * send the ppkt because it will cause the secondary guest
412      * to miss sending some data in the next. Therefore, we
413      * record the maximum ACK in the current queue at both
414      * primary side and secondary side. Only when the ack is
415      * less than the smaller of the two maximum ack, then we
416      * can ensure that the packet's payload is acknowledged by
417      * primary and secondary.
418     */
419     uint32_t min_ack = conn->pack > conn->sack ? conn->sack : conn->pack;
420 
421 pri:
422     if (g_queue_is_empty(&conn->primary_list)) {
423         return;
424     }
425     ppkt = g_queue_pop_head(&conn->primary_list);
426 sec:
427     if (g_queue_is_empty(&conn->secondary_list)) {
428         g_queue_push_head(&conn->primary_list, ppkt);
429         return;
430     }
431     spkt = g_queue_pop_head(&conn->secondary_list);
432 
433     if (ppkt->tcp_seq == ppkt->seq_end) {
434         colo_release_primary_pkt(s, ppkt);
435         ppkt = NULL;
436     }
437 
438     if (ppkt && conn->compare_seq && !after(ppkt->seq_end, conn->compare_seq)) {
439         trace_colo_compare_main("pri: this packet has compared");
440         colo_release_primary_pkt(s, ppkt);
441         ppkt = NULL;
442     }
443 
444     if (spkt->tcp_seq == spkt->seq_end) {
445         packet_destroy(spkt, NULL);
446         if (!ppkt) {
447             goto pri;
448         } else {
449             goto sec;
450         }
451     } else {
452         if (conn->compare_seq && !after(spkt->seq_end, conn->compare_seq)) {
453             trace_colo_compare_main("sec: this packet has compared");
454             packet_destroy(spkt, NULL);
455             if (!ppkt) {
456                 goto pri;
457             } else {
458                 goto sec;
459             }
460         }
461         if (!ppkt) {
462             g_queue_push_head(&conn->secondary_list, spkt);
463             goto pri;
464         }
465     }
466 
467     if (colo_mark_tcp_pkt(ppkt, spkt, &mark, min_ack)) {
468         trace_colo_compare_tcp_info("pri",
469                                     ppkt->tcp_seq, ppkt->tcp_ack,
470                                     ppkt->header_size, ppkt->payload_size,
471                                     ppkt->offset, ppkt->flags);
472 
473         trace_colo_compare_tcp_info("sec",
474                                     spkt->tcp_seq, spkt->tcp_ack,
475                                     spkt->header_size, spkt->payload_size,
476                                     spkt->offset, spkt->flags);
477 
478         if (mark == COLO_COMPARE_FREE_PRIMARY) {
479             conn->compare_seq = ppkt->seq_end;
480             colo_release_primary_pkt(s, ppkt);
481             g_queue_push_head(&conn->secondary_list, spkt);
482             goto pri;
483         }
484         if (mark == COLO_COMPARE_FREE_SECONDARY) {
485             conn->compare_seq = spkt->seq_end;
486             packet_destroy(spkt, NULL);
487             goto sec;
488         }
489         if (mark == (COLO_COMPARE_FREE_PRIMARY | COLO_COMPARE_FREE_SECONDARY)) {
490             conn->compare_seq = ppkt->seq_end;
491             colo_release_primary_pkt(s, ppkt);
492             packet_destroy(spkt, NULL);
493             goto pri;
494         }
495     } else {
496         g_queue_push_head(&conn->primary_list, ppkt);
497         g_queue_push_head(&conn->secondary_list, spkt);
498 
499 #ifdef DEBUG_COLO_PACKETS
500         qemu_hexdump(stderr, "colo-compare ppkt", ppkt->data, ppkt->size);
501         qemu_hexdump(stderr, "colo-compare spkt", spkt->data, spkt->size);
502 #endif
503 
504         colo_compare_inconsistency_notify(s);
505     }
506 }
507 
508 
509 /*
510  * Called from the compare thread on the primary
511  * for compare udp packet
512  */
513 static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt)
514 {
515     uint16_t network_header_length = ppkt->ip->ip_hl << 2;
516     uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
517 
518     trace_colo_compare_main("compare udp");
519 
520     /*
521      * Because of ppkt and spkt are both in the same connection,
522      * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
523      * same with spkt. In addition, IP header's Identification is a random
524      * field, we can handle it in IP fragmentation function later.
525      * COLO just concern the response net packet payload from primary guest
526      * and secondary guest are same or not, So we ignored all IP header include
527      * other field like TOS,TTL,IP Checksum. we only need to compare
528      * the ip payload here.
529      */
530     if (ppkt->size != spkt->size) {
531         trace_colo_compare_main("UDP: payload size of packets are different");
532         return -1;
533     }
534     if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
535                                     ppkt->size - offset)) {
536         trace_colo_compare_udp_miscompare("primary pkt size", ppkt->size);
537         trace_colo_compare_udp_miscompare("Secondary pkt size", spkt->size);
538 #ifdef DEBUG_COLO_PACKETS
539         qemu_hexdump(stderr, "colo-compare pri pkt", ppkt->data, ppkt->size);
540         qemu_hexdump(stderr, "colo-compare sec pkt", spkt->data, spkt->size);
541 #endif
542         return -1;
543     } else {
544         return 0;
545     }
546 }
547 
548 /*
549  * Called from the compare thread on the primary
550  * for compare icmp packet
551  */
552 static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt)
553 {
554     uint16_t network_header_length = ppkt->ip->ip_hl << 2;
555     uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
556 
557     trace_colo_compare_main("compare icmp");
558 
559     /*
560      * Because of ppkt and spkt are both in the same connection,
561      * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
562      * same with spkt. In addition, IP header's Identification is a random
563      * field, we can handle it in IP fragmentation function later.
564      * COLO just concern the response net packet payload from primary guest
565      * and secondary guest are same or not, So we ignored all IP header include
566      * other field like TOS,TTL,IP Checksum. we only need to compare
567      * the ip payload here.
568      */
569     if (ppkt->size != spkt->size) {
570         trace_colo_compare_main("ICMP: payload size of packets are different");
571         return -1;
572     }
573     if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
574                                     ppkt->size - offset)) {
575         trace_colo_compare_icmp_miscompare("primary pkt size",
576                                            ppkt->size);
577         trace_colo_compare_icmp_miscompare("Secondary pkt size",
578                                            spkt->size);
579 #ifdef DEBUG_COLO_PACKETS
580         qemu_hexdump(stderr, "colo-compare pri pkt", ppkt->data, ppkt->size);
581         qemu_hexdump(stderr, "colo-compare sec pkt", spkt->data, spkt->size);
582 #endif
583         return -1;
584     } else {
585         return 0;
586     }
587 }
588 
589 /*
590  * Called from the compare thread on the primary
591  * for compare other packet
592  */
593 static int colo_packet_compare_other(Packet *spkt, Packet *ppkt)
594 {
595     uint16_t offset = ppkt->vnet_hdr_len;
596 
597     trace_colo_compare_main("compare other");
598     if (trace_event_get_state_backends(TRACE_COLO_COMPARE_IP_INFO)) {
599         char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
600 
601         strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
602         strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
603         strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
604         strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
605 
606         trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
607                                    pri_ip_dst, spkt->size,
608                                    sec_ip_src, sec_ip_dst);
609     }
610 
611     if (ppkt->size != spkt->size) {
612         trace_colo_compare_main("Other: payload size of packets are different");
613         return -1;
614     }
615     return colo_compare_packet_payload(ppkt, spkt, offset, offset,
616                                        ppkt->size - offset);
617 }
618 
619 static int colo_old_packet_check_one(Packet *pkt, int64_t *check_time)
620 {
621     int64_t now = qemu_clock_get_ms(QEMU_CLOCK_HOST);
622 
623     if ((now - pkt->creation_ms) > (*check_time)) {
624         trace_colo_old_packet_check_found(pkt->creation_ms);
625         return 0;
626     } else {
627         return 1;
628     }
629 }
630 
631 void colo_compare_register_notifier(Notifier *notify)
632 {
633     notifier_list_add(&colo_compare_notifiers, notify);
634 }
635 
636 void colo_compare_unregister_notifier(Notifier *notify)
637 {
638     notifier_remove(notify);
639 }
640 
641 static int colo_old_packet_check_one_conn(Connection *conn,
642                                           CompareState *s)
643 {
644     GList *result = NULL;
645 
646     result = g_queue_find_custom(&conn->primary_list,
647                                  &s->compare_timeout,
648                                  (GCompareFunc)colo_old_packet_check_one);
649 
650     if (result) {
651         /* Do checkpoint will flush old packet */
652         colo_compare_inconsistency_notify(s);
653         return 0;
654     }
655 
656     return 1;
657 }
658 
659 /*
660  * Look for old packets that the secondary hasn't matched,
661  * if we have some then we have to checkpoint to wake
662  * the secondary up.
663  */
664 static void colo_old_packet_check(void *opaque)
665 {
666     CompareState *s = opaque;
667 
668     /*
669      * If we find one old packet, stop finding job and notify
670      * COLO frame do checkpoint.
671      */
672     g_queue_find_custom(&s->conn_list, s,
673                         (GCompareFunc)colo_old_packet_check_one_conn);
674 }
675 
676 static void colo_compare_packet(CompareState *s, Connection *conn,
677                                 int (*HandlePacket)(Packet *spkt,
678                                 Packet *ppkt))
679 {
680     Packet *pkt = NULL;
681     GList *result = NULL;
682 
683     while (!g_queue_is_empty(&conn->primary_list) &&
684            !g_queue_is_empty(&conn->secondary_list)) {
685         pkt = g_queue_pop_head(&conn->primary_list);
686         result = g_queue_find_custom(&conn->secondary_list,
687                  pkt, (GCompareFunc)HandlePacket);
688 
689         if (result) {
690             colo_release_primary_pkt(s, pkt);
691             g_queue_remove(&conn->secondary_list, result->data);
692         } else {
693             /*
694              * If one packet arrive late, the secondary_list or
695              * primary_list will be empty, so we can't compare it
696              * until next comparison. If the packets in the list are
697              * timeout, it will trigger a checkpoint request.
698              */
699             trace_colo_compare_main("packet different");
700             g_queue_push_head(&conn->primary_list, pkt);
701 
702             colo_compare_inconsistency_notify(s);
703             break;
704         }
705     }
706 }
707 
708 /*
709  * Called from the compare thread on the primary
710  * for compare packet with secondary list of the
711  * specified connection when a new packet was
712  * queued to it.
713  */
714 static void colo_compare_connection(void *opaque, void *user_data)
715 {
716     CompareState *s = user_data;
717     Connection *conn = opaque;
718 
719     switch (conn->ip_proto) {
720     case IPPROTO_TCP:
721         colo_compare_tcp(s, conn);
722         break;
723     case IPPROTO_UDP:
724         colo_compare_packet(s, conn, colo_packet_compare_udp);
725         break;
726     case IPPROTO_ICMP:
727         colo_compare_packet(s, conn, colo_packet_compare_icmp);
728         break;
729     default:
730         colo_compare_packet(s, conn, colo_packet_compare_other);
731         break;
732     }
733 }
734 
735 static void coroutine_fn _compare_chr_send(void *opaque)
736 {
737     SendCo *sendco = opaque;
738     CompareState *s = sendco->s;
739     int ret = 0;
740 
741     while (!g_queue_is_empty(&sendco->send_list)) {
742         SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
743         uint32_t len = htonl(entry->size);
744 
745         ret = qemu_chr_fe_write_all(sendco->chr, (uint8_t *)&len, sizeof(len));
746 
747         if (ret != sizeof(len)) {
748             g_free(entry->buf);
749             g_slice_free(SendEntry, entry);
750             goto err;
751         }
752 
753         if (!sendco->notify_remote_frame && s->vnet_hdr) {
754             /*
755              * We send vnet header len make other module(like filter-redirector)
756              * know how to parse net packet correctly.
757              */
758             len = htonl(entry->vnet_hdr_len);
759 
760             ret = qemu_chr_fe_write_all(sendco->chr,
761                                         (uint8_t *)&len,
762                                         sizeof(len));
763 
764             if (ret != sizeof(len)) {
765                 g_free(entry->buf);
766                 g_slice_free(SendEntry, entry);
767                 goto err;
768             }
769         }
770 
771         ret = qemu_chr_fe_write_all(sendco->chr,
772                                     (uint8_t *)entry->buf,
773                                     entry->size);
774 
775         if (ret != entry->size) {
776             g_free(entry->buf);
777             g_slice_free(SendEntry, entry);
778             goto err;
779         }
780 
781         g_free(entry->buf);
782         g_slice_free(SendEntry, entry);
783     }
784 
785     sendco->ret = 0;
786     goto out;
787 
788 err:
789     while (!g_queue_is_empty(&sendco->send_list)) {
790         SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
791         g_free(entry->buf);
792         g_slice_free(SendEntry, entry);
793     }
794     sendco->ret = ret < 0 ? ret : -EIO;
795 out:
796     sendco->co = NULL;
797     sendco->done = true;
798     aio_wait_kick();
799 }
800 
801 static int compare_chr_send(CompareState *s,
802                             uint8_t *buf,
803                             uint32_t size,
804                             uint32_t vnet_hdr_len,
805                             bool notify_remote_frame,
806                             bool zero_copy)
807 {
808     SendCo *sendco;
809     SendEntry *entry;
810 
811     if (notify_remote_frame) {
812         sendco = &s->notify_sendco;
813     } else {
814         sendco = &s->out_sendco;
815     }
816 
817     if (!size) {
818         return 0;
819     }
820 
821     entry = g_slice_new(SendEntry);
822     entry->size = size;
823     entry->vnet_hdr_len = vnet_hdr_len;
824     if (zero_copy) {
825         entry->buf = buf;
826     } else {
827         entry->buf = g_malloc(size);
828         memcpy(entry->buf, buf, size);
829     }
830     g_queue_push_head(&sendco->send_list, entry);
831 
832     if (sendco->done) {
833         sendco->co = qemu_coroutine_create(_compare_chr_send, sendco);
834         sendco->done = false;
835         qemu_coroutine_enter(sendco->co);
836         if (sendco->done) {
837             /* report early errors */
838             return sendco->ret;
839         }
840     }
841 
842     /* assume success */
843     return 0;
844 }
845 
846 static int compare_chr_can_read(void *opaque)
847 {
848     return COMPARE_READ_LEN_MAX;
849 }
850 
851 /*
852  * Called from the main thread on the primary for packets
853  * arriving over the socket from the primary.
854  */
855 static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
856 {
857     CompareState *s = COLO_COMPARE(opaque);
858     int ret;
859 
860     ret = net_fill_rstate(&s->pri_rs, buf, size);
861     if (ret == -1) {
862         qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, NULL,
863                                  NULL, NULL, true);
864         error_report("colo-compare primary_in error");
865     }
866 }
867 
868 /*
869  * Called from the main thread on the primary for packets
870  * arriving over the socket from the secondary.
871  */
872 static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
873 {
874     CompareState *s = COLO_COMPARE(opaque);
875     int ret;
876 
877     ret = net_fill_rstate(&s->sec_rs, buf, size);
878     if (ret == -1) {
879         qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, NULL,
880                                  NULL, NULL, true);
881         error_report("colo-compare secondary_in error");
882     }
883 }
884 
885 static void compare_notify_chr(void *opaque, const uint8_t *buf, int size)
886 {
887     CompareState *s = COLO_COMPARE(opaque);
888     int ret;
889 
890     ret = net_fill_rstate(&s->notify_rs, buf, size);
891     if (ret == -1) {
892         qemu_chr_fe_set_handlers(&s->chr_notify_dev, NULL, NULL, NULL, NULL,
893                                  NULL, NULL, true);
894         error_report("colo-compare notify_dev error");
895     }
896 }
897 
898 /*
899  * Check old packet regularly so it can watch for any packets
900  * that the secondary hasn't produced equivalents of.
901  */
902 static void check_old_packet_regular(void *opaque)
903 {
904     CompareState *s = opaque;
905 
906     /* if have old packet we will notify checkpoint */
907     colo_old_packet_check(s);
908     timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
909               s->expired_scan_cycle);
910 }
911 
912 /* Public API, Used for COLO frame to notify compare event */
913 void colo_notify_compares_event(void *opaque, int event, Error **errp)
914 {
915     CompareState *s;
916     qemu_mutex_lock(&colo_compare_mutex);
917 
918     if (!colo_compare_active) {
919         qemu_mutex_unlock(&colo_compare_mutex);
920         return;
921     }
922 
923     qemu_mutex_lock(&event_mtx);
924     QTAILQ_FOREACH(s, &net_compares, next) {
925         s->event = event;
926         qemu_bh_schedule(s->event_bh);
927         event_unhandled_count++;
928     }
929     /* Wait all compare threads to finish handling this event */
930     while (event_unhandled_count > 0) {
931         qemu_cond_wait(&event_complete_cond, &event_mtx);
932     }
933 
934     qemu_mutex_unlock(&event_mtx);
935     qemu_mutex_unlock(&colo_compare_mutex);
936 }
937 
938 static void colo_compare_timer_init(CompareState *s)
939 {
940     AioContext *ctx = iothread_get_aio_context(s->iothread);
941 
942     s->packet_check_timer = aio_timer_new(ctx, QEMU_CLOCK_VIRTUAL,
943                                 SCALE_MS, check_old_packet_regular,
944                                 s);
945     timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
946               s->expired_scan_cycle);
947 }
948 
949 static void colo_compare_timer_del(CompareState *s)
950 {
951     if (s->packet_check_timer) {
952         timer_del(s->packet_check_timer);
953         timer_free(s->packet_check_timer);
954         s->packet_check_timer = NULL;
955     }
956  }
957 
958 static void colo_flush_packets(void *opaque, void *user_data);
959 
960 static void colo_compare_handle_event(void *opaque)
961 {
962     CompareState *s = opaque;
963 
964     switch (s->event) {
965     case COLO_EVENT_CHECKPOINT:
966         g_queue_foreach(&s->conn_list, colo_flush_packets, s);
967         break;
968     case COLO_EVENT_FAILOVER:
969         break;
970     default:
971         break;
972     }
973 
974     qemu_mutex_lock(&event_mtx);
975     assert(event_unhandled_count > 0);
976     event_unhandled_count--;
977     qemu_cond_broadcast(&event_complete_cond);
978     qemu_mutex_unlock(&event_mtx);
979 }
980 
981 static void colo_compare_iothread(CompareState *s)
982 {
983     AioContext *ctx = iothread_get_aio_context(s->iothread);
984     object_ref(OBJECT(s->iothread));
985     s->worker_context = iothread_get_g_main_context(s->iothread);
986 
987     qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read,
988                              compare_pri_chr_in, NULL, NULL,
989                              s, s->worker_context, true);
990     qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read,
991                              compare_sec_chr_in, NULL, NULL,
992                              s, s->worker_context, true);
993     if (s->notify_dev) {
994         qemu_chr_fe_set_handlers(&s->chr_notify_dev, compare_chr_can_read,
995                                  compare_notify_chr, NULL, NULL,
996                                  s, s->worker_context, true);
997     }
998 
999     colo_compare_timer_init(s);
1000     s->event_bh = aio_bh_new(ctx, colo_compare_handle_event, s);
1001 }
1002 
1003 static char *compare_get_pri_indev(Object *obj, Error **errp)
1004 {
1005     CompareState *s = COLO_COMPARE(obj);
1006 
1007     return g_strdup(s->pri_indev);
1008 }
1009 
1010 static void compare_set_pri_indev(Object *obj, const char *value, Error **errp)
1011 {
1012     CompareState *s = COLO_COMPARE(obj);
1013 
1014     g_free(s->pri_indev);
1015     s->pri_indev = g_strdup(value);
1016 }
1017 
1018 static char *compare_get_sec_indev(Object *obj, Error **errp)
1019 {
1020     CompareState *s = COLO_COMPARE(obj);
1021 
1022     return g_strdup(s->sec_indev);
1023 }
1024 
1025 static void compare_set_sec_indev(Object *obj, const char *value, Error **errp)
1026 {
1027     CompareState *s = COLO_COMPARE(obj);
1028 
1029     g_free(s->sec_indev);
1030     s->sec_indev = g_strdup(value);
1031 }
1032 
1033 static char *compare_get_outdev(Object *obj, Error **errp)
1034 {
1035     CompareState *s = COLO_COMPARE(obj);
1036 
1037     return g_strdup(s->outdev);
1038 }
1039 
1040 static void compare_set_outdev(Object *obj, const char *value, Error **errp)
1041 {
1042     CompareState *s = COLO_COMPARE(obj);
1043 
1044     g_free(s->outdev);
1045     s->outdev = g_strdup(value);
1046 }
1047 
1048 static bool compare_get_vnet_hdr(Object *obj, Error **errp)
1049 {
1050     CompareState *s = COLO_COMPARE(obj);
1051 
1052     return s->vnet_hdr;
1053 }
1054 
1055 static void compare_set_vnet_hdr(Object *obj,
1056                                  bool value,
1057                                  Error **errp)
1058 {
1059     CompareState *s = COLO_COMPARE(obj);
1060 
1061     s->vnet_hdr = value;
1062 }
1063 
1064 static char *compare_get_notify_dev(Object *obj, Error **errp)
1065 {
1066     CompareState *s = COLO_COMPARE(obj);
1067 
1068     return g_strdup(s->notify_dev);
1069 }
1070 
1071 static void compare_set_notify_dev(Object *obj, const char *value, Error **errp)
1072 {
1073     CompareState *s = COLO_COMPARE(obj);
1074 
1075     g_free(s->notify_dev);
1076     s->notify_dev = g_strdup(value);
1077 }
1078 
1079 static void compare_get_timeout(Object *obj, Visitor *v,
1080                                 const char *name, void *opaque,
1081                                 Error **errp)
1082 {
1083     CompareState *s = COLO_COMPARE(obj);
1084     uint32_t value = s->compare_timeout;
1085 
1086     visit_type_uint32(v, name, &value, errp);
1087 }
1088 
1089 static void compare_set_timeout(Object *obj, Visitor *v,
1090                                 const char *name, void *opaque,
1091                                 Error **errp)
1092 {
1093     CompareState *s = COLO_COMPARE(obj);
1094     uint32_t value;
1095 
1096     if (!visit_type_uint32(v, name, &value, errp)) {
1097         return;
1098     }
1099     if (!value) {
1100         error_setg(errp, "Property '%s.%s' requires a positive value",
1101                    object_get_typename(obj), name);
1102         return;
1103     }
1104     s->compare_timeout = value;
1105 }
1106 
1107 static void compare_get_expired_scan_cycle(Object *obj, Visitor *v,
1108                                            const char *name, void *opaque,
1109                                            Error **errp)
1110 {
1111     CompareState *s = COLO_COMPARE(obj);
1112     uint32_t value = s->expired_scan_cycle;
1113 
1114     visit_type_uint32(v, name, &value, errp);
1115 }
1116 
1117 static void compare_set_expired_scan_cycle(Object *obj, Visitor *v,
1118                                            const char *name, void *opaque,
1119                                            Error **errp)
1120 {
1121     CompareState *s = COLO_COMPARE(obj);
1122     uint32_t value;
1123 
1124     if (!visit_type_uint32(v, name, &value, errp)) {
1125         return;
1126     }
1127     if (!value) {
1128         error_setg(errp, "Property '%s.%s' requires a positive value",
1129                    object_get_typename(obj), name);
1130         return;
1131     }
1132     s->expired_scan_cycle = value;
1133 }
1134 
1135 static void get_max_queue_size(Object *obj, Visitor *v,
1136                                const char *name, void *opaque,
1137                                Error **errp)
1138 {
1139     uint32_t value = max_queue_size;
1140 
1141     visit_type_uint32(v, name, &value, errp);
1142 }
1143 
1144 static void set_max_queue_size(Object *obj, Visitor *v,
1145                                const char *name, void *opaque,
1146                                Error **errp)
1147 {
1148     Error *local_err = NULL;
1149     uint32_t value;
1150 
1151     visit_type_uint32(v, name, &value, &local_err);
1152     if (local_err) {
1153         goto out;
1154     }
1155     if (!value) {
1156         error_setg(&local_err, "Property '%s.%s' requires a positive value",
1157                    object_get_typename(obj), name);
1158         goto out;
1159     }
1160     max_queue_size = value;
1161 
1162 out:
1163     error_propagate(errp, local_err);
1164 }
1165 
1166 static void compare_pri_rs_finalize(SocketReadState *pri_rs)
1167 {
1168     CompareState *s = container_of(pri_rs, CompareState, pri_rs);
1169     Connection *conn = NULL;
1170 
1171     if (packet_enqueue(s, PRIMARY_IN, &conn)) {
1172         trace_colo_compare_main("primary: unsupported packet in");
1173         compare_chr_send(s,
1174                          pri_rs->buf,
1175                          pri_rs->packet_len,
1176                          pri_rs->vnet_hdr_len,
1177                          false,
1178                          false);
1179     } else {
1180         /* compare packet in the specified connection */
1181         colo_compare_connection(conn, s);
1182     }
1183 }
1184 
1185 static void compare_sec_rs_finalize(SocketReadState *sec_rs)
1186 {
1187     CompareState *s = container_of(sec_rs, CompareState, sec_rs);
1188     Connection *conn = NULL;
1189 
1190     if (packet_enqueue(s, SECONDARY_IN, &conn)) {
1191         trace_colo_compare_main("secondary: unsupported packet in");
1192     } else {
1193         /* compare packet in the specified connection */
1194         colo_compare_connection(conn, s);
1195     }
1196 }
1197 
1198 static void compare_notify_rs_finalize(SocketReadState *notify_rs)
1199 {
1200     CompareState *s = container_of(notify_rs, CompareState, notify_rs);
1201 
1202     const char msg[] = "COLO_COMPARE_GET_XEN_INIT";
1203     int ret;
1204 
1205     if (packet_matches_str("COLO_USERSPACE_PROXY_INIT",
1206                            notify_rs->buf,
1207                            notify_rs->packet_len)) {
1208         ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true, false);
1209         if (ret < 0) {
1210             error_report("Notify Xen COLO-frame INIT failed");
1211         }
1212     } else if (packet_matches_str("COLO_CHECKPOINT",
1213                                   notify_rs->buf,
1214                                   notify_rs->packet_len)) {
1215         /* colo-compare do checkpoint, flush pri packet and remove sec packet */
1216         g_queue_foreach(&s->conn_list, colo_flush_packets, s);
1217     } else {
1218         error_report("COLO compare got unsupported instruction");
1219     }
1220 }
1221 
1222 /*
1223  * Return 0 is success.
1224  * Return 1 is failed.
1225  */
1226 static int find_and_check_chardev(Chardev **chr,
1227                                   char *chr_name,
1228                                   Error **errp)
1229 {
1230     *chr = qemu_chr_find(chr_name);
1231     if (*chr == NULL) {
1232         error_setg(errp, "Device '%s' not found",
1233                    chr_name);
1234         return 1;
1235     }
1236 
1237     if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_RECONNECTABLE)) {
1238         error_setg(errp, "chardev \"%s\" is not reconnectable",
1239                    chr_name);
1240         return 1;
1241     }
1242 
1243     if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_GCONTEXT)) {
1244         error_setg(errp, "chardev \"%s\" cannot switch context",
1245                    chr_name);
1246         return 1;
1247     }
1248 
1249     return 0;
1250 }
1251 
1252 /*
1253  * Called from the main thread on the primary
1254  * to setup colo-compare.
1255  */
1256 static void colo_compare_complete(UserCreatable *uc, Error **errp)
1257 {
1258     CompareState *s = COLO_COMPARE(uc);
1259     Chardev *chr;
1260 
1261     if (!s->pri_indev || !s->sec_indev || !s->outdev || !s->iothread) {
1262         error_setg(errp, "colo compare needs 'primary_in' ,"
1263                    "'secondary_in','outdev','iothread' property set");
1264         return;
1265     } else if (!strcmp(s->pri_indev, s->outdev) ||
1266                !strcmp(s->sec_indev, s->outdev) ||
1267                !strcmp(s->pri_indev, s->sec_indev)) {
1268         error_setg(errp, "'indev' and 'outdev' could not be same "
1269                    "for compare module");
1270         return;
1271     }
1272 
1273     if (!s->compare_timeout) {
1274         /* Set default value to 3000 MS */
1275         s->compare_timeout = DEFAULT_TIME_OUT_MS;
1276     }
1277 
1278     if (!s->expired_scan_cycle) {
1279         /* Set default value to 3000 MS */
1280         s->expired_scan_cycle = REGULAR_PACKET_CHECK_MS;
1281     }
1282 
1283     if (!max_queue_size) {
1284         /* Set default queue size to 1024 */
1285         max_queue_size = MAX_QUEUE_SIZE;
1286     }
1287 
1288     if (find_and_check_chardev(&chr, s->pri_indev, errp) ||
1289         !qemu_chr_fe_init(&s->chr_pri_in, chr, errp)) {
1290         return;
1291     }
1292 
1293     if (find_and_check_chardev(&chr, s->sec_indev, errp) ||
1294         !qemu_chr_fe_init(&s->chr_sec_in, chr, errp)) {
1295         return;
1296     }
1297 
1298     if (find_and_check_chardev(&chr, s->outdev, errp) ||
1299         !qemu_chr_fe_init(&s->chr_out, chr, errp)) {
1300         return;
1301     }
1302 
1303     net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr);
1304     net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr);
1305 
1306     /* Try to enable remote notify chardev, currently just for Xen COLO */
1307     if (s->notify_dev) {
1308         if (find_and_check_chardev(&chr, s->notify_dev, errp) ||
1309             !qemu_chr_fe_init(&s->chr_notify_dev, chr, errp)) {
1310             return;
1311         }
1312 
1313         net_socket_rs_init(&s->notify_rs, compare_notify_rs_finalize,
1314                            s->vnet_hdr);
1315     }
1316 
1317     s->out_sendco.s = s;
1318     s->out_sendco.chr = &s->chr_out;
1319     s->out_sendco.notify_remote_frame = false;
1320     s->out_sendco.done = true;
1321     g_queue_init(&s->out_sendco.send_list);
1322 
1323     if (s->notify_dev) {
1324         s->notify_sendco.s = s;
1325         s->notify_sendco.chr = &s->chr_notify_dev;
1326         s->notify_sendco.notify_remote_frame = true;
1327         s->notify_sendco.done = true;
1328         g_queue_init(&s->notify_sendco.send_list);
1329     }
1330 
1331     g_queue_init(&s->conn_list);
1332 
1333     s->connection_track_table = g_hash_table_new_full(connection_key_hash,
1334                                                       connection_key_equal,
1335                                                       g_free,
1336                                                       connection_destroy);
1337 
1338     colo_compare_iothread(s);
1339 
1340     qemu_mutex_lock(&colo_compare_mutex);
1341     if (!colo_compare_active) {
1342         qemu_mutex_init(&event_mtx);
1343         qemu_cond_init(&event_complete_cond);
1344         colo_compare_active = true;
1345     }
1346     QTAILQ_INSERT_TAIL(&net_compares, s, next);
1347     qemu_mutex_unlock(&colo_compare_mutex);
1348 
1349     return;
1350 }
1351 
1352 static void colo_flush_packets(void *opaque, void *user_data)
1353 {
1354     CompareState *s = user_data;
1355     Connection *conn = opaque;
1356     Packet *pkt = NULL;
1357 
1358     while (!g_queue_is_empty(&conn->primary_list)) {
1359         pkt = g_queue_pop_head(&conn->primary_list);
1360         compare_chr_send(s,
1361                          pkt->data,
1362                          pkt->size,
1363                          pkt->vnet_hdr_len,
1364                          false,
1365                          true);
1366         packet_destroy_partial(pkt, NULL);
1367     }
1368     while (!g_queue_is_empty(&conn->secondary_list)) {
1369         pkt = g_queue_pop_head(&conn->secondary_list);
1370         packet_destroy(pkt, NULL);
1371     }
1372 }
1373 
1374 static void colo_compare_class_init(ObjectClass *oc, void *data)
1375 {
1376     UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
1377 
1378     ucc->complete = colo_compare_complete;
1379 }
1380 
1381 static void colo_compare_init(Object *obj)
1382 {
1383     CompareState *s = COLO_COMPARE(obj);
1384 
1385     object_property_add_str(obj, "primary_in",
1386                             compare_get_pri_indev, compare_set_pri_indev);
1387     object_property_add_str(obj, "secondary_in",
1388                             compare_get_sec_indev, compare_set_sec_indev);
1389     object_property_add_str(obj, "outdev",
1390                             compare_get_outdev, compare_set_outdev);
1391     object_property_add_link(obj, "iothread", TYPE_IOTHREAD,
1392                             (Object **)&s->iothread,
1393                             object_property_allow_set_link,
1394                             OBJ_PROP_LINK_STRONG);
1395     /* This parameter just for Xen COLO */
1396     object_property_add_str(obj, "notify_dev",
1397                             compare_get_notify_dev, compare_set_notify_dev);
1398 
1399     object_property_add(obj, "compare_timeout", "uint32",
1400                         compare_get_timeout,
1401                         compare_set_timeout, NULL, NULL);
1402 
1403     object_property_add(obj, "expired_scan_cycle", "uint32",
1404                         compare_get_expired_scan_cycle,
1405                         compare_set_expired_scan_cycle, NULL, NULL);
1406 
1407     object_property_add(obj, "max_queue_size", "uint32",
1408                         get_max_queue_size,
1409                         set_max_queue_size, NULL, NULL);
1410 
1411     s->vnet_hdr = false;
1412     object_property_add_bool(obj, "vnet_hdr_support", compare_get_vnet_hdr,
1413                              compare_set_vnet_hdr);
1414 }
1415 
1416 static void colo_compare_finalize(Object *obj)
1417 {
1418     CompareState *s = COLO_COMPARE(obj);
1419     CompareState *tmp = NULL;
1420 
1421     qemu_mutex_lock(&colo_compare_mutex);
1422     QTAILQ_FOREACH(tmp, &net_compares, next) {
1423         if (tmp == s) {
1424             QTAILQ_REMOVE(&net_compares, s, next);
1425             break;
1426         }
1427     }
1428     if (QTAILQ_EMPTY(&net_compares)) {
1429         colo_compare_active = false;
1430         qemu_mutex_destroy(&event_mtx);
1431         qemu_cond_destroy(&event_complete_cond);
1432     }
1433     qemu_mutex_unlock(&colo_compare_mutex);
1434 
1435     qemu_chr_fe_deinit(&s->chr_pri_in, false);
1436     qemu_chr_fe_deinit(&s->chr_sec_in, false);
1437     qemu_chr_fe_deinit(&s->chr_out, false);
1438     if (s->notify_dev) {
1439         qemu_chr_fe_deinit(&s->chr_notify_dev, false);
1440     }
1441 
1442     colo_compare_timer_del(s);
1443 
1444     qemu_bh_delete(s->event_bh);
1445 
1446     AioContext *ctx = iothread_get_aio_context(s->iothread);
1447     aio_context_acquire(ctx);
1448     AIO_WAIT_WHILE(ctx, !s->out_sendco.done);
1449     if (s->notify_dev) {
1450         AIO_WAIT_WHILE(ctx, !s->notify_sendco.done);
1451     }
1452     aio_context_release(ctx);
1453 
1454     /* Release all unhandled packets after compare thead exited */
1455     g_queue_foreach(&s->conn_list, colo_flush_packets, s);
1456     AIO_WAIT_WHILE(NULL, !s->out_sendco.done);
1457 
1458     g_queue_clear(&s->conn_list);
1459     g_queue_clear(&s->out_sendco.send_list);
1460     if (s->notify_dev) {
1461         g_queue_clear(&s->notify_sendco.send_list);
1462     }
1463 
1464     if (s->connection_track_table) {
1465         g_hash_table_destroy(s->connection_track_table);
1466     }
1467 
1468     object_unref(OBJECT(s->iothread));
1469 
1470     g_free(s->pri_indev);
1471     g_free(s->sec_indev);
1472     g_free(s->outdev);
1473     g_free(s->notify_dev);
1474 }
1475 
1476 static void __attribute__((__constructor__)) colo_compare_init_globals(void)
1477 {
1478     colo_compare_active = false;
1479     qemu_mutex_init(&colo_compare_mutex);
1480 }
1481 
1482 static const TypeInfo colo_compare_info = {
1483     .name = TYPE_COLO_COMPARE,
1484     .parent = TYPE_OBJECT,
1485     .instance_size = sizeof(CompareState),
1486     .instance_init = colo_compare_init,
1487     .instance_finalize = colo_compare_finalize,
1488     .class_size = sizeof(CompareClass),
1489     .class_init = colo_compare_class_init,
1490     .interfaces = (InterfaceInfo[]) {
1491         { TYPE_USER_CREATABLE },
1492         { }
1493     }
1494 };
1495 
1496 static void register_types(void)
1497 {
1498     type_register_static(&colo_compare_info);
1499 }
1500 
1501 type_init(register_types);
1502