1 #include <ccan/fdpass/fdpass.h>
2 #include <ccan/io/fdpass/fdpass.h>
3 #include <common/daemon_conn.h>
4 #include <wire/wire_io.h>
5 #include <wire/wire_sync.h>
6 
7 struct daemon_conn {
8 	/* Last message we received */
9 	u8 *msg_in;
10 
11 	/* Queue of outgoing messages */
12 	struct msg_queue *out;
13 
14 	/* Underlying connection: we're freed if it closes, and vice versa */
15 	struct io_conn *conn;
16 
17 	/* Callback for incoming messages */
18 	struct io_plan *(*recv)(struct io_conn *conn, const u8 *, void *);
19 
20 	/* Called whenever we've cleared the msg_out queue. */
21 	void (*outq_empty)(void *);
22 
23 	/* Arg for both callbacks. */
24 	void *arg;
25 };
26 
handle_read(struct io_conn * conn,struct daemon_conn * dc)27 static struct io_plan *handle_read(struct io_conn *conn,
28 				      struct daemon_conn *dc)
29 {
30 	return dc->recv(conn, dc->msg_in, dc->arg);
31 }
32 
daemon_conn_read_next(struct io_conn * conn,struct daemon_conn * dc)33 struct io_plan *daemon_conn_read_next(struct io_conn *conn,
34 				      struct daemon_conn *dc)
35 {
36 	/* FIXME: We could use disposable parent instead, and recv() could
37 	 * tal_steal() it?  If they did that now, we'd free it here. */
38 	tal_free(dc->msg_in);
39 	return io_read_wire(conn, dc, &dc->msg_in, handle_read, dc);
40 }
41 
daemon_conn_write_next(struct io_conn * conn,struct daemon_conn * dc)42 static struct io_plan *daemon_conn_write_next(struct io_conn *conn,
43 					      struct daemon_conn *dc)
44 {
45 	const u8 *msg;
46 
47 	msg = msg_dequeue(dc->out);
48 
49 	/* If nothing in queue, give empty callback a chance to queue somthing */
50 	if (!msg && dc->outq_empty) {
51 		dc->outq_empty(dc->arg);
52 		msg = msg_dequeue(dc->out);
53 	}
54 
55 	if (msg) {
56 		int fd = msg_extract_fd(msg);
57 		if (fd >= 0) {
58 			tal_free(msg);
59 			return io_send_fd(conn, fd, true,
60 					  daemon_conn_write_next, dc);
61 		}
62 		return io_write_wire(conn, take(msg), daemon_conn_write_next,
63 				     dc);
64 	}
65 	return msg_queue_wait(conn, dc->out, daemon_conn_write_next, dc);
66 }
67 
daemon_conn_sync_flush(struct daemon_conn * dc)68 bool daemon_conn_sync_flush(struct daemon_conn *dc)
69 {
70 	const u8 *msg;
71 	int daemon_fd;
72 
73 	/* Flush any current packet. */
74 	if (!io_flush_sync(dc->conn))
75 		return false;
76 
77 	/* Make fd blocking for the duration */
78 	daemon_fd = io_conn_fd(dc->conn);
79 	if (!io_fd_block(daemon_fd, true))
80 		return false;
81 
82 	/* Flush existing messages. */
83 	while ((msg = msg_dequeue(dc->out)) != NULL) {
84 		int fd = msg_extract_fd(msg);
85 		if (fd >= 0) {
86 			tal_free(msg);
87 			if (!fdpass_send(daemon_fd, fd))
88 				break;
89 		} else if (!wire_sync_write(daemon_fd, take(msg)))
90 			break;
91 	}
92 	io_fd_block(daemon_fd, false);
93 
94 	/* Success if and only if we flushed them all. */
95 	return msg == NULL;
96 }
97 
daemon_conn_start(struct io_conn * conn,struct daemon_conn * dc)98 static struct io_plan *daemon_conn_start(struct io_conn *conn,
99 					 struct daemon_conn *dc)
100 {
101 	return io_duplex(conn, daemon_conn_read_next(conn, dc),
102 			 /* Could call daemon_conn_write_next, but we don't
103 			  * want it to call empty_cb just yet! */
104 			 msg_queue_wait(conn, dc->out,
105 					daemon_conn_write_next, dc));
106 }
107 
destroy_dc_from_conn(struct io_conn * conn,struct daemon_conn * dc)108 static void destroy_dc_from_conn(struct io_conn *conn, struct daemon_conn *dc)
109 {
110 	/* Harmless free loop if conn is being destroyed because dc freed */
111 	tal_free(dc);
112 }
113 
daemon_conn_new_(const tal_t * ctx,int fd,struct io_plan * (* recv)(struct io_conn *,const u8 *,void *),void (* outq_empty)(void *),void * arg)114 struct daemon_conn *daemon_conn_new_(const tal_t *ctx, int fd,
115 				     struct io_plan *(*recv)(struct io_conn *,
116 							     const u8 *,
117 							     void *),
118 				     void (*outq_empty)(void *),
119 				     void *arg)
120 {
121 	struct daemon_conn *dc = tal(NULL, struct daemon_conn);
122 
123 	dc->recv = recv;
124 	dc->outq_empty = outq_empty;
125 	dc->arg = arg;
126 	dc->msg_in = NULL;
127 	dc->out = msg_queue_new(dc);
128 
129 	dc->conn = io_new_conn(dc, fd, daemon_conn_start, dc);
130 	tal_add_destructor2(dc->conn, destroy_dc_from_conn, dc);
131 	return dc;
132 }
133 
daemon_conn_send(struct daemon_conn * dc,const u8 * msg)134 void daemon_conn_send(struct daemon_conn *dc, const u8 *msg)
135 {
136 	msg_enqueue(dc->out, msg);
137 }
138 
daemon_conn_send_fd(struct daemon_conn * dc,int fd)139 void daemon_conn_send_fd(struct daemon_conn *dc, int fd)
140 {
141 	msg_enqueue_fd(dc->out, fd);
142 }
143 
daemon_conn_wake(struct daemon_conn * dc)144 void daemon_conn_wake(struct daemon_conn *dc)
145 {
146 	msg_wake(dc->out);
147 }
148 
daemon_conn_queue_length(const struct daemon_conn * dc)149 size_t daemon_conn_queue_length(const struct daemon_conn *dc)
150 {
151 	return msg_queue_length(dc->out);
152 }
153