1 /*
2    Unix SMB/CIFS implementation.
3 
4    WINS Replication server
5 
6    Copyright (C) Stefan Metzmacher	2005
7 
8    This program is free software; you can redistribute it and/or modify
9    it under the terms of the GNU General Public License as published by
10    the Free Software Foundation; either version 3 of the License, or
11    (at your option) any later version.
12 
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17 
18    You should have received a copy of the GNU General Public License
19    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 */
21 
22 #include "includes.h"
23 #include "lib/events/events.h"
24 #include "lib/socket/socket.h"
25 #include "smbd/service_task.h"
26 #include "smbd/service_stream.h"
27 #include "librpc/gen_ndr/winsrepl.h"
28 #include "wrepl_server/wrepl_server.h"
29 #include "nbt_server/wins/winsdb.h"
30 #include "libcli/composite/composite.h"
31 #include "libcli/wrepl/winsrepl.h"
32 #include "libcli/resolve/resolve.h"
33 #include "param/param.h"
34 
35 enum wreplsrv_out_connect_stage {
36 	WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET,
37 	WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX,
38 	WREPLSRV_OUT_CONNECT_STAGE_DONE
39 };
40 
41 struct wreplsrv_out_connect_state {
42 	enum wreplsrv_out_connect_stage stage;
43 	struct composite_context *c;
44 	struct wrepl_associate assoc_io;
45 	enum winsrepl_partner_type type;
46 	struct wreplsrv_out_connection *wreplconn;
47 	struct tevent_req *subreq;
48 };
49 
50 static void wreplsrv_out_connect_handler_treq(struct tevent_req *subreq);
51 
wreplsrv_out_connect_wait_socket(struct wreplsrv_out_connect_state * state)52 static NTSTATUS wreplsrv_out_connect_wait_socket(struct wreplsrv_out_connect_state *state)
53 {
54 	NTSTATUS status;
55 
56 	status = wrepl_connect_recv(state->subreq);
57 	TALLOC_FREE(state->subreq);
58 	NT_STATUS_NOT_OK_RETURN(status);
59 
60 	state->subreq = wrepl_associate_send(state,
61 					     state->wreplconn->service->task->event_ctx,
62 					     state->wreplconn->sock, &state->assoc_io);
63 	NT_STATUS_HAVE_NO_MEMORY(state->subreq);
64 
65 	tevent_req_set_callback(state->subreq,
66 				wreplsrv_out_connect_handler_treq,
67 				state);
68 
69 	state->stage = WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX;
70 
71 	return NT_STATUS_OK;
72 }
73 
wreplsrv_out_connect_wait_assoc_ctx(struct wreplsrv_out_connect_state * state)74 static NTSTATUS wreplsrv_out_connect_wait_assoc_ctx(struct wreplsrv_out_connect_state *state)
75 {
76 	NTSTATUS status;
77 
78 	status = wrepl_associate_recv(state->subreq, &state->assoc_io);
79 	TALLOC_FREE(state->subreq);
80 	NT_STATUS_NOT_OK_RETURN(status);
81 
82 	state->wreplconn->assoc_ctx.peer_ctx = state->assoc_io.out.assoc_ctx;
83 	state->wreplconn->assoc_ctx.peer_major = state->assoc_io.out.major_version;
84 
85 	if (state->type == WINSREPL_PARTNER_PUSH) {
86 		if (state->wreplconn->assoc_ctx.peer_major >= 5) {
87 			state->wreplconn->partner->push.wreplconn = state->wreplconn;
88 			talloc_steal(state->wreplconn->partner, state->wreplconn);
89 		} else {
90 			state->type = WINSREPL_PARTNER_NONE;
91 		}
92 	} else if (state->type == WINSREPL_PARTNER_PULL) {
93 		state->wreplconn->partner->pull.wreplconn = state->wreplconn;
94 		talloc_steal(state->wreplconn->partner, state->wreplconn);
95 	}
96 
97 	state->stage = WREPLSRV_OUT_CONNECT_STAGE_DONE;
98 
99 	return NT_STATUS_OK;
100 }
101 
wreplsrv_out_connect_handler(struct wreplsrv_out_connect_state * state)102 static void wreplsrv_out_connect_handler(struct wreplsrv_out_connect_state *state)
103 {
104 	struct composite_context *c = state->c;
105 
106 	switch (state->stage) {
107 	case WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET:
108 		c->status = wreplsrv_out_connect_wait_socket(state);
109 		break;
110 	case WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX:
111 		c->status = wreplsrv_out_connect_wait_assoc_ctx(state);
112 		c->state  = COMPOSITE_STATE_DONE;
113 		break;
114 	case WREPLSRV_OUT_CONNECT_STAGE_DONE:
115 		c->status = NT_STATUS_INTERNAL_ERROR;
116 	}
117 
118 	if (!NT_STATUS_IS_OK(c->status)) {
119 		c->state = COMPOSITE_STATE_ERROR;
120 	}
121 
122 	if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
123 		c->async.fn(c);
124 	}
125 }
126 
wreplsrv_out_connect_handler_treq(struct tevent_req * subreq)127 static void wreplsrv_out_connect_handler_treq(struct tevent_req *subreq)
128 {
129 	struct wreplsrv_out_connect_state *state = tevent_req_callback_data(subreq,
130 						   struct wreplsrv_out_connect_state);
131 	wreplsrv_out_connect_handler(state);
132 	return;
133 }
134 
wreplsrv_out_connect_send(struct wreplsrv_partner * partner,enum winsrepl_partner_type type,struct wreplsrv_out_connection * wreplconn)135 static struct composite_context *wreplsrv_out_connect_send(struct wreplsrv_partner *partner,
136 							   enum winsrepl_partner_type type,
137 							   struct wreplsrv_out_connection *wreplconn)
138 {
139 	struct composite_context *c = NULL;
140 	struct wreplsrv_service *service = partner->service;
141 	struct wreplsrv_out_connect_state *state = NULL;
142 	struct wreplsrv_out_connection **wreplconnp = &wreplconn;
143 	bool cached_connection = false;
144 
145 	c = talloc_zero(partner, struct composite_context);
146 	if (!c) goto failed;
147 
148 	state = talloc_zero(c, struct wreplsrv_out_connect_state);
149 	if (!state) goto failed;
150 	state->c	= c;
151 	state->type	= type;
152 
153 	c->state	= COMPOSITE_STATE_IN_PROGRESS;
154 	c->event_ctx	= service->task->event_ctx;
155 	c->private_data	= state;
156 
157 	if (type == WINSREPL_PARTNER_PUSH) {
158 		cached_connection	= true;
159 		wreplconn		= partner->push.wreplconn;
160 		wreplconnp		= &partner->push.wreplconn;
161 	} else if (type == WINSREPL_PARTNER_PULL) {
162 		cached_connection	= true;
163 		wreplconn		= partner->pull.wreplconn;
164 		wreplconnp		= &partner->pull.wreplconn;
165 	}
166 
167 	/* we have a connection already, so use it */
168 	if (wreplconn) {
169 		if (wrepl_socket_is_connected(wreplconn->sock)) {
170 			state->stage	= WREPLSRV_OUT_CONNECT_STAGE_DONE;
171 			state->wreplconn= wreplconn;
172 			composite_done(c);
173 			return c;
174 		} else if (!cached_connection) {
175 			state->stage	= WREPLSRV_OUT_CONNECT_STAGE_DONE;
176 			state->wreplconn= NULL;
177 			composite_done(c);
178 			return c;
179 		} else {
180 			talloc_free(wreplconn);
181 			*wreplconnp = NULL;
182 		}
183 	}
184 
185 	wreplconn = talloc_zero(state, struct wreplsrv_out_connection);
186 	if (!wreplconn) goto failed;
187 
188 	wreplconn->service	= service;
189 	wreplconn->partner	= partner;
190 	wreplconn->sock		= wrepl_socket_init(wreplconn, service->task->event_ctx);
191 	if (!wreplconn->sock) goto failed;
192 
193 	state->stage	= WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET;
194 	state->wreplconn= wreplconn;
195 	state->subreq	= wrepl_connect_send(state,
196 					     service->task->event_ctx,
197 					     wreplconn->sock,
198 					     partner->our_address?partner->our_address:wrepl_best_ip(service->task->lp_ctx, partner->address),
199 					     partner->address);
200 	if (!state->subreq) goto failed;
201 
202 	tevent_req_set_callback(state->subreq,
203 				wreplsrv_out_connect_handler_treq,
204 				state);
205 
206 	return c;
207 failed:
208 	talloc_free(c);
209 	return NULL;
210 }
211 
wreplsrv_out_connect_recv(struct composite_context * c,TALLOC_CTX * mem_ctx,struct wreplsrv_out_connection ** wreplconn)212 static NTSTATUS wreplsrv_out_connect_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
213 					  struct wreplsrv_out_connection **wreplconn)
214 {
215 	NTSTATUS status;
216 
217 	status = composite_wait(c);
218 
219 	if (NT_STATUS_IS_OK(status)) {
220 		struct wreplsrv_out_connect_state *state = talloc_get_type(c->private_data,
221 							   struct wreplsrv_out_connect_state);
222 		if (state->wreplconn) {
223 			*wreplconn = talloc_reference(mem_ctx, state->wreplconn);
224 			if (!*wreplconn) status = NT_STATUS_NO_MEMORY;
225 		} else {
226 			status = NT_STATUS_CONNECTION_DISCONNECTED;
227 		}
228 	}
229 
230 	talloc_free(c);
231 	return status;
232 
233 }
234 
235 struct wreplsrv_pull_table_io {
236 	struct {
237 		struct wreplsrv_partner *partner;
238 		uint32_t num_owners;
239 		struct wrepl_wins_owner *owners;
240 	} in;
241 	struct {
242 		uint32_t num_owners;
243 		struct wrepl_wins_owner *owners;
244 	} out;
245 };
246 
247 enum wreplsrv_pull_table_stage {
248 	WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION,
249 	WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY,
250 	WREPLSRV_PULL_TABLE_STAGE_DONE
251 };
252 
253 struct wreplsrv_pull_table_state {
254 	enum wreplsrv_pull_table_stage stage;
255 	struct composite_context *c;
256 	struct wrepl_pull_table table_io;
257 	struct wreplsrv_pull_table_io *io;
258 	struct composite_context *creq;
259 	struct wreplsrv_out_connection *wreplconn;
260 	struct tevent_req *subreq;
261 };
262 
263 static void wreplsrv_pull_table_handler_treq(struct tevent_req *subreq);
264 
wreplsrv_pull_table_wait_connection(struct wreplsrv_pull_table_state * state)265 static NTSTATUS wreplsrv_pull_table_wait_connection(struct wreplsrv_pull_table_state *state)
266 {
267 	NTSTATUS status;
268 
269 	status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
270 	NT_STATUS_NOT_OK_RETURN(status);
271 
272 	state->table_io.in.assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx;
273 	state->subreq = wrepl_pull_table_send(state,
274 					      state->wreplconn->service->task->event_ctx,
275 					      state->wreplconn->sock, &state->table_io);
276 	NT_STATUS_HAVE_NO_MEMORY(state->subreq);
277 
278 	tevent_req_set_callback(state->subreq,
279 				wreplsrv_pull_table_handler_treq,
280 				state);
281 
282 	state->stage = WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY;
283 
284 	return NT_STATUS_OK;
285 }
286 
wreplsrv_pull_table_wait_table_reply(struct wreplsrv_pull_table_state * state)287 static NTSTATUS wreplsrv_pull_table_wait_table_reply(struct wreplsrv_pull_table_state *state)
288 {
289 	NTSTATUS status;
290 
291 	status = wrepl_pull_table_recv(state->subreq, state, &state->table_io);
292 	TALLOC_FREE(state->subreq);
293 	NT_STATUS_NOT_OK_RETURN(status);
294 
295 	state->stage = WREPLSRV_PULL_TABLE_STAGE_DONE;
296 
297 	return NT_STATUS_OK;
298 }
299 
wreplsrv_pull_table_handler(struct wreplsrv_pull_table_state * state)300 static void wreplsrv_pull_table_handler(struct wreplsrv_pull_table_state *state)
301 {
302 	struct composite_context *c = state->c;
303 
304 	switch (state->stage) {
305 	case WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION:
306 		c->status = wreplsrv_pull_table_wait_connection(state);
307 		break;
308 	case WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY:
309 		c->status = wreplsrv_pull_table_wait_table_reply(state);
310 		c->state  = COMPOSITE_STATE_DONE;
311 		break;
312 	case WREPLSRV_PULL_TABLE_STAGE_DONE:
313 		c->status = NT_STATUS_INTERNAL_ERROR;
314 	}
315 
316 	if (!NT_STATUS_IS_OK(c->status)) {
317 		c->state = COMPOSITE_STATE_ERROR;
318 	}
319 
320 	if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
321 		c->async.fn(c);
322 	}
323 }
324 
wreplsrv_pull_table_handler_creq(struct composite_context * creq)325 static void wreplsrv_pull_table_handler_creq(struct composite_context *creq)
326 {
327 	struct wreplsrv_pull_table_state *state = talloc_get_type(creq->async.private_data,
328 						  struct wreplsrv_pull_table_state);
329 	wreplsrv_pull_table_handler(state);
330 	return;
331 }
332 
wreplsrv_pull_table_handler_treq(struct tevent_req * subreq)333 static void wreplsrv_pull_table_handler_treq(struct tevent_req *subreq)
334 {
335 	struct wreplsrv_pull_table_state *state = tevent_req_callback_data(subreq,
336 						  struct wreplsrv_pull_table_state);
337 	wreplsrv_pull_table_handler(state);
338 	return;
339 }
340 
wreplsrv_pull_table_send(TALLOC_CTX * mem_ctx,struct wreplsrv_pull_table_io * io)341 static struct composite_context *wreplsrv_pull_table_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_table_io *io)
342 {
343 	struct composite_context *c = NULL;
344 	struct wreplsrv_service *service = io->in.partner->service;
345 	struct wreplsrv_pull_table_state *state = NULL;
346 
347 	c = talloc_zero(mem_ctx, struct composite_context);
348 	if (!c) goto failed;
349 
350 	state = talloc_zero(c, struct wreplsrv_pull_table_state);
351 	if (!state) goto failed;
352 	state->c	= c;
353 	state->io	= io;
354 
355 	c->state	= COMPOSITE_STATE_IN_PROGRESS;
356 	c->event_ctx	= service->task->event_ctx;
357 	c->private_data	= state;
358 
359 	if (io->in.num_owners) {
360 		struct wrepl_wins_owner *partners;
361 		uint32_t i;
362 
363 		partners = talloc_array(state,
364 					struct wrepl_wins_owner,
365 					io->in.num_owners);
366 		if (composite_nomem(partners, c)) goto failed;
367 
368 		for (i=0; i < io->in.num_owners; i++) {
369 			partners[i] = io->in.owners[i];
370 			partners[i].address = talloc_strdup(partners,
371 						io->in.owners[i].address);
372 			if (composite_nomem(partners[i].address, c)) goto failed;
373 		}
374 
375 		state->table_io.out.num_partners	= io->in.num_owners;
376 		state->table_io.out.partners		= partners;
377 		state->stage				= WREPLSRV_PULL_TABLE_STAGE_DONE;
378 		composite_done(c);
379 		return c;
380 	}
381 
382 	state->stage    = WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION;
383 	state->creq	= wreplsrv_out_connect_send(io->in.partner, WINSREPL_PARTNER_PULL, NULL);
384 	if (!state->creq) goto failed;
385 
386 	state->creq->async.fn		= wreplsrv_pull_table_handler_creq;
387 	state->creq->async.private_data	= state;
388 
389 	return c;
390 failed:
391 	talloc_free(c);
392 	return NULL;
393 }
394 
wreplsrv_pull_table_recv(struct composite_context * c,TALLOC_CTX * mem_ctx,struct wreplsrv_pull_table_io * io)395 static NTSTATUS wreplsrv_pull_table_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
396 					 struct wreplsrv_pull_table_io *io)
397 {
398 	NTSTATUS status;
399 
400 	status = composite_wait(c);
401 
402 	if (NT_STATUS_IS_OK(status)) {
403 		struct wreplsrv_pull_table_state *state = talloc_get_type(c->private_data,
404 							  struct wreplsrv_pull_table_state);
405 		io->out.num_owners	= state->table_io.out.num_partners;
406 		io->out.owners		= talloc_move(mem_ctx, &state->table_io.out.partners);
407 	}
408 
409 	talloc_free(c);
410 	return status;
411 }
412 
413 struct wreplsrv_pull_names_io {
414 	struct {
415 		struct wreplsrv_partner *partner;
416 		struct wreplsrv_out_connection *wreplconn;
417 		struct wrepl_wins_owner owner;
418 	} in;
419 	struct {
420 		uint32_t num_names;
421 		struct wrepl_name *names;
422 	} out;
423 };
424 
425 enum wreplsrv_pull_names_stage {
426 	WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION,
427 	WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY,
428 	WREPLSRV_PULL_NAMES_STAGE_DONE
429 };
430 
431 struct wreplsrv_pull_names_state {
432 	enum wreplsrv_pull_names_stage stage;
433 	struct composite_context *c;
434 	struct wrepl_pull_names pull_io;
435 	struct wreplsrv_pull_names_io *io;
436 	struct composite_context *creq;
437 	struct wreplsrv_out_connection *wreplconn;
438 	struct tevent_req *subreq;
439 };
440 
441 static void wreplsrv_pull_names_handler_treq(struct tevent_req *subreq);
442 
wreplsrv_pull_names_wait_connection(struct wreplsrv_pull_names_state * state)443 static NTSTATUS wreplsrv_pull_names_wait_connection(struct wreplsrv_pull_names_state *state)
444 {
445 	NTSTATUS status;
446 
447 	status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
448 	NT_STATUS_NOT_OK_RETURN(status);
449 
450 	state->pull_io.in.assoc_ctx	= state->wreplconn->assoc_ctx.peer_ctx;
451 	state->pull_io.in.partner	= state->io->in.owner;
452 	state->subreq = wrepl_pull_names_send(state,
453 					      state->wreplconn->service->task->event_ctx,
454 					      state->wreplconn->sock,
455 					      &state->pull_io);
456 	NT_STATUS_HAVE_NO_MEMORY(state->subreq);
457 
458 	tevent_req_set_callback(state->subreq,
459 				wreplsrv_pull_names_handler_treq,
460 				state);
461 
462 	state->stage = WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY;
463 
464 	return NT_STATUS_OK;
465 }
466 
wreplsrv_pull_names_wait_send_reply(struct wreplsrv_pull_names_state * state)467 static NTSTATUS wreplsrv_pull_names_wait_send_reply(struct wreplsrv_pull_names_state *state)
468 {
469 	NTSTATUS status;
470 
471 	status = wrepl_pull_names_recv(state->subreq, state, &state->pull_io);
472 	TALLOC_FREE(state->subreq);
473 	NT_STATUS_NOT_OK_RETURN(status);
474 
475 	state->stage = WREPLSRV_PULL_NAMES_STAGE_DONE;
476 
477 	return NT_STATUS_OK;
478 }
479 
wreplsrv_pull_names_handler(struct wreplsrv_pull_names_state * state)480 static void wreplsrv_pull_names_handler(struct wreplsrv_pull_names_state *state)
481 {
482 	struct composite_context *c = state->c;
483 
484 	switch (state->stage) {
485 	case WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION:
486 		c->status = wreplsrv_pull_names_wait_connection(state);
487 		break;
488 	case WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY:
489 		c->status = wreplsrv_pull_names_wait_send_reply(state);
490 		c->state  = COMPOSITE_STATE_DONE;
491 		break;
492 	case WREPLSRV_PULL_NAMES_STAGE_DONE:
493 		c->status = NT_STATUS_INTERNAL_ERROR;
494 	}
495 
496 	if (!NT_STATUS_IS_OK(c->status)) {
497 		c->state = COMPOSITE_STATE_ERROR;
498 	}
499 
500 	if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
501 		c->async.fn(c);
502 	}
503 }
504 
wreplsrv_pull_names_handler_creq(struct composite_context * creq)505 static void wreplsrv_pull_names_handler_creq(struct composite_context *creq)
506 {
507 	struct wreplsrv_pull_names_state *state = talloc_get_type(creq->async.private_data,
508 						  struct wreplsrv_pull_names_state);
509 	wreplsrv_pull_names_handler(state);
510 	return;
511 }
512 
wreplsrv_pull_names_handler_treq(struct tevent_req * subreq)513 static void wreplsrv_pull_names_handler_treq(struct tevent_req *subreq)
514 {
515 	struct wreplsrv_pull_names_state *state = tevent_req_callback_data(subreq,
516 						  struct wreplsrv_pull_names_state);
517 	wreplsrv_pull_names_handler(state);
518 	return;
519 }
520 
wreplsrv_pull_names_send(TALLOC_CTX * mem_ctx,struct wreplsrv_pull_names_io * io)521 static struct composite_context *wreplsrv_pull_names_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_names_io *io)
522 {
523 	struct composite_context *c = NULL;
524 	struct wreplsrv_service *service = io->in.partner->service;
525 	struct wreplsrv_pull_names_state *state = NULL;
526 	enum winsrepl_partner_type partner_type = WINSREPL_PARTNER_PULL;
527 
528 	if (io->in.wreplconn) partner_type = WINSREPL_PARTNER_NONE;
529 
530 	c = talloc_zero(mem_ctx, struct composite_context);
531 	if (!c) goto failed;
532 
533 	state = talloc_zero(c, struct wreplsrv_pull_names_state);
534 	if (!state) goto failed;
535 	state->c	= c;
536 	state->io	= io;
537 
538 	c->state	= COMPOSITE_STATE_IN_PROGRESS;
539 	c->event_ctx	= service->task->event_ctx;
540 	c->private_data	= state;
541 
542 	state->stage	= WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION;
543 	state->creq	= wreplsrv_out_connect_send(io->in.partner, partner_type, io->in.wreplconn);
544 	if (!state->creq) goto failed;
545 
546 	state->creq->async.fn		= wreplsrv_pull_names_handler_creq;
547 	state->creq->async.private_data	= state;
548 
549 	return c;
550 failed:
551 	talloc_free(c);
552 	return NULL;
553 }
554 
wreplsrv_pull_names_recv(struct composite_context * c,TALLOC_CTX * mem_ctx,struct wreplsrv_pull_names_io * io)555 static NTSTATUS wreplsrv_pull_names_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
556 					 struct wreplsrv_pull_names_io *io)
557 {
558 	NTSTATUS status;
559 
560 	status = composite_wait(c);
561 
562 	if (NT_STATUS_IS_OK(status)) {
563 		struct wreplsrv_pull_names_state *state = talloc_get_type(c->private_data,
564 							  struct wreplsrv_pull_names_state);
565 		io->out.num_names	= state->pull_io.out.num_names;
566 		io->out.names		= talloc_move(mem_ctx, &state->pull_io.out.names);
567 	}
568 
569 	talloc_free(c);
570 	return status;
571 
572 }
573 
574 enum wreplsrv_pull_cycle_stage {
575 	WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY,
576 	WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES,
577 	WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC,
578 	WREPLSRV_PULL_CYCLE_STAGE_DONE
579 };
580 
581 struct wreplsrv_pull_cycle_state {
582 	enum wreplsrv_pull_cycle_stage stage;
583 	struct composite_context *c;
584 	struct wreplsrv_pull_cycle_io *io;
585 	struct wreplsrv_pull_table_io table_io;
586 	uint32_t current;
587 	struct wreplsrv_pull_names_io names_io;
588 	struct composite_context *creq;
589 	struct wrepl_associate_stop assoc_stop_io;
590 	struct tevent_req *subreq;
591 };
592 
593 static void wreplsrv_pull_cycle_handler_creq(struct composite_context *creq);
594 static void wreplsrv_pull_cycle_handler_treq(struct tevent_req *subreq);
595 
wreplsrv_pull_cycle_next_owner_do_work(struct wreplsrv_pull_cycle_state * state)596 static NTSTATUS wreplsrv_pull_cycle_next_owner_do_work(struct wreplsrv_pull_cycle_state *state)
597 {
598 	struct wreplsrv_owner *current_owner=NULL;
599 	struct wreplsrv_owner *local_owner;
600 	uint32_t i;
601 	uint64_t old_max_version = 0;
602 	bool do_pull = false;
603 
604 	for (i=state->current; i < state->table_io.out.num_owners; i++) {
605 		current_owner = wreplsrv_find_owner(state->io->in.partner->service,
606 						    state->io->in.partner->pull.table,
607 						    state->table_io.out.owners[i].address);
608 
609 		local_owner = wreplsrv_find_owner(state->io->in.partner->service,
610 						  state->io->in.partner->service->table,
611 						  state->table_io.out.owners[i].address);
612 		/*
613 		 * this means we are ourself the current owner,
614 		 * and we don't want replicate ourself
615 		 */
616 		if (!current_owner) continue;
617 
618 		/*
619 		 * this means we don't have any records of this owner
620 		 * so fetch them
621 		 */
622 		if (!local_owner) {
623 			do_pull		= true;
624 
625 			break;
626 		}
627 
628 		/*
629 		 * this means the remote partner has some new records of this owner
630 		 * fetch them
631 		 */
632 		if (current_owner->owner.max_version > local_owner->owner.max_version) {
633 			do_pull		= true;
634 			old_max_version	= local_owner->owner.max_version;
635 			break;
636 		}
637 	}
638 	state->current = i;
639 
640 	if (do_pull) {
641 		state->names_io.in.partner		= state->io->in.partner;
642 		state->names_io.in.wreplconn		= state->io->in.wreplconn;
643 		state->names_io.in.owner		= current_owner->owner;
644 		state->names_io.in.owner.min_version	= old_max_version + 1;
645 		state->creq = wreplsrv_pull_names_send(state, &state->names_io);
646 		NT_STATUS_HAVE_NO_MEMORY(state->creq);
647 
648 		state->creq->async.fn		= wreplsrv_pull_cycle_handler_creq;
649 		state->creq->async.private_data	= state;
650 
651 		return STATUS_MORE_ENTRIES;
652 	}
653 
654 	return NT_STATUS_OK;
655 }
656 
wreplsrv_pull_cycle_next_owner_wrapper(struct wreplsrv_pull_cycle_state * state)657 static NTSTATUS wreplsrv_pull_cycle_next_owner_wrapper(struct wreplsrv_pull_cycle_state *state)
658 {
659 	NTSTATUS status;
660 
661 	status = wreplsrv_pull_cycle_next_owner_do_work(state);
662 	if (NT_STATUS_IS_OK(status)) {
663 		state->stage = WREPLSRV_PULL_CYCLE_STAGE_DONE;
664 	} else if (NT_STATUS_EQUAL(STATUS_MORE_ENTRIES, status)) {
665 		state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES;
666 		status = NT_STATUS_OK;
667 	}
668 
669 	if (state->stage == WREPLSRV_PULL_CYCLE_STAGE_DONE && state->io->in.wreplconn) {
670 		state->assoc_stop_io.in.assoc_ctx	= state->io->in.wreplconn->assoc_ctx.peer_ctx;
671 		state->assoc_stop_io.in.reason		= 0;
672 		state->subreq = wrepl_associate_stop_send(state,
673 							  state->io->in.wreplconn->service->task->event_ctx,
674 							  state->io->in.wreplconn->sock,
675 							  &state->assoc_stop_io);
676 		NT_STATUS_HAVE_NO_MEMORY(state->subreq);
677 
678 		tevent_req_set_callback(state->subreq,
679 					wreplsrv_pull_cycle_handler_treq,
680 					state);
681 
682 		state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC;
683 	}
684 
685 	return status;
686 }
687 
wreplsrv_pull_cycle_wait_table_reply(struct wreplsrv_pull_cycle_state * state)688 static NTSTATUS wreplsrv_pull_cycle_wait_table_reply(struct wreplsrv_pull_cycle_state *state)
689 {
690 	NTSTATUS status;
691 	uint32_t i;
692 
693 	status = wreplsrv_pull_table_recv(state->creq, state, &state->table_io);
694 	NT_STATUS_NOT_OK_RETURN(status);
695 
696 	/* update partner table */
697 	for (i=0; i < state->table_io.out.num_owners; i++) {
698 		status = wreplsrv_add_table(state->io->in.partner->service,
699 					    state->io->in.partner,
700 					    &state->io->in.partner->pull.table,
701 					    state->table_io.out.owners[i].address,
702 					    state->table_io.out.owners[i].max_version);
703 		NT_STATUS_NOT_OK_RETURN(status);
704 	}
705 
706 	status = wreplsrv_pull_cycle_next_owner_wrapper(state);
707 	NT_STATUS_NOT_OK_RETURN(status);
708 
709 	return status;
710 }
711 
wreplsrv_pull_cycle_apply_records(struct wreplsrv_pull_cycle_state * state)712 static NTSTATUS wreplsrv_pull_cycle_apply_records(struct wreplsrv_pull_cycle_state *state)
713 {
714 	NTSTATUS status;
715 
716 	status = wreplsrv_apply_records(state->io->in.partner,
717 					&state->names_io.in.owner,
718 					state->names_io.out.num_names,
719 					state->names_io.out.names);
720 	NT_STATUS_NOT_OK_RETURN(status);
721 
722 	talloc_free(state->names_io.out.names);
723 	ZERO_STRUCT(state->names_io);
724 
725 	return NT_STATUS_OK;
726 }
727 
wreplsrv_pull_cycle_wait_send_replies(struct wreplsrv_pull_cycle_state * state)728 static NTSTATUS wreplsrv_pull_cycle_wait_send_replies(struct wreplsrv_pull_cycle_state *state)
729 {
730 	NTSTATUS status;
731 
732 	status = wreplsrv_pull_names_recv(state->creq, state, &state->names_io);
733 	NT_STATUS_NOT_OK_RETURN(status);
734 
735 	/*
736 	 * TODO: this should maybe an async call,
737 	 *       because we may need some network access
738 	 *       for conflict resolving
739 	 */
740 	status = wreplsrv_pull_cycle_apply_records(state);
741 	NT_STATUS_NOT_OK_RETURN(status);
742 
743 	status = wreplsrv_pull_cycle_next_owner_wrapper(state);
744 	NT_STATUS_NOT_OK_RETURN(status);
745 
746 	return status;
747 }
748 
wreplsrv_pull_cycle_wait_stop_assoc(struct wreplsrv_pull_cycle_state * state)749 static NTSTATUS wreplsrv_pull_cycle_wait_stop_assoc(struct wreplsrv_pull_cycle_state *state)
750 {
751 	NTSTATUS status;
752 
753 	status = wrepl_associate_stop_recv(state->subreq, &state->assoc_stop_io);
754 	TALLOC_FREE(state->subreq);
755 	NT_STATUS_NOT_OK_RETURN(status);
756 
757 	state->stage = WREPLSRV_PULL_CYCLE_STAGE_DONE;
758 
759 	return status;
760 }
761 
wreplsrv_pull_cycle_handler(struct wreplsrv_pull_cycle_state * state)762 static void wreplsrv_pull_cycle_handler(struct wreplsrv_pull_cycle_state *state)
763 {
764 	struct composite_context *c = state->c;
765 
766 	switch (state->stage) {
767 	case WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY:
768 		c->status = wreplsrv_pull_cycle_wait_table_reply(state);
769 		break;
770 	case WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES:
771 		c->status = wreplsrv_pull_cycle_wait_send_replies(state);
772 		break;
773 	case WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC:
774 		c->status = wreplsrv_pull_cycle_wait_stop_assoc(state);
775 		break;
776 	case WREPLSRV_PULL_CYCLE_STAGE_DONE:
777 		c->status = NT_STATUS_INTERNAL_ERROR;
778 	}
779 
780 	if (state->stage == WREPLSRV_PULL_CYCLE_STAGE_DONE) {
781 		c->state  = COMPOSITE_STATE_DONE;
782 	}
783 
784 	if (!NT_STATUS_IS_OK(c->status)) {
785 		c->state = COMPOSITE_STATE_ERROR;
786 	}
787 
788 	if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
789 		c->async.fn(c);
790 	}
791 }
792 
wreplsrv_pull_cycle_handler_creq(struct composite_context * creq)793 static void wreplsrv_pull_cycle_handler_creq(struct composite_context *creq)
794 {
795 	struct wreplsrv_pull_cycle_state *state = talloc_get_type(creq->async.private_data,
796 						  struct wreplsrv_pull_cycle_state);
797 	wreplsrv_pull_cycle_handler(state);
798 	return;
799 }
800 
wreplsrv_pull_cycle_handler_treq(struct tevent_req * subreq)801 static void wreplsrv_pull_cycle_handler_treq(struct tevent_req *subreq)
802 {
803 	struct wreplsrv_pull_cycle_state *state = tevent_req_callback_data(subreq,
804 						  struct wreplsrv_pull_cycle_state);
805 	wreplsrv_pull_cycle_handler(state);
806 	return;
807 }
808 
wreplsrv_pull_cycle_send(TALLOC_CTX * mem_ctx,struct wreplsrv_pull_cycle_io * io)809 struct composite_context *wreplsrv_pull_cycle_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_cycle_io *io)
810 {
811 	struct composite_context *c = NULL;
812 	struct wreplsrv_service *service = io->in.partner->service;
813 	struct wreplsrv_pull_cycle_state *state = NULL;
814 
815 	c = talloc_zero(mem_ctx, struct composite_context);
816 	if (!c) goto failed;
817 
818 	state = talloc_zero(c, struct wreplsrv_pull_cycle_state);
819 	if (!state) goto failed;
820 	state->c	= c;
821 	state->io	= io;
822 
823 	c->state	= COMPOSITE_STATE_IN_PROGRESS;
824 	c->event_ctx	= service->task->event_ctx;
825 	c->private_data	= state;
826 
827 	state->stage	= WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY;
828 	state->table_io.in.partner	= io->in.partner;
829 	state->table_io.in.num_owners	= io->in.num_owners;
830 	state->table_io.in.owners	= io->in.owners;
831 	state->creq = wreplsrv_pull_table_send(state, &state->table_io);
832 	if (!state->creq) goto failed;
833 
834 	state->creq->async.fn		= wreplsrv_pull_cycle_handler_creq;
835 	state->creq->async.private_data	= state;
836 
837 	return c;
838 failed:
839 	talloc_free(c);
840 	return NULL;
841 }
842 
wreplsrv_pull_cycle_recv(struct composite_context * c)843 NTSTATUS wreplsrv_pull_cycle_recv(struct composite_context *c)
844 {
845 	NTSTATUS status;
846 
847 	status = composite_wait(c);
848 
849 	talloc_free(c);
850 	return status;
851 }
852 
853 enum wreplsrv_push_notify_stage {
854 	WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT,
855 	WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_UPDATE,
856 	WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM,
857 	WREPLSRV_PUSH_NOTIFY_STAGE_DONE
858 };
859 
860 struct wreplsrv_push_notify_state {
861 	enum wreplsrv_push_notify_stage stage;
862 	struct composite_context *c;
863 	struct wreplsrv_push_notify_io *io;
864 	enum wrepl_replication_cmd command;
865 	bool full_table;
866 	struct wrepl_send_ctrl ctrl;
867 	struct wrepl_packet req_packet;
868 	struct wrepl_packet *rep_packet;
869 	struct composite_context *creq;
870 	struct wreplsrv_out_connection *wreplconn;
871 	struct tevent_req *subreq;
872 };
873 
874 static void wreplsrv_push_notify_handler_creq(struct composite_context *creq);
875 static void wreplsrv_push_notify_handler_treq(struct tevent_req *subreq);
876 
wreplsrv_push_notify_update(struct wreplsrv_push_notify_state * state)877 static NTSTATUS wreplsrv_push_notify_update(struct wreplsrv_push_notify_state *state)
878 {
879 	struct wreplsrv_service *service = state->io->in.partner->service;
880 	struct wrepl_packet *req = &state->req_packet;
881 	struct wrepl_replication *repl_out = &state->req_packet.message.replication;
882 	struct wrepl_table *table_out = &state->req_packet.message.replication.info.table;
883 	NTSTATUS status;
884 
885 	/* prepare the outgoing request */
886 	req->opcode	= WREPL_OPCODE_BITS;
887 	req->assoc_ctx	= state->wreplconn->assoc_ctx.peer_ctx;
888 	req->mess_type	= WREPL_REPLICATION;
889 
890 	repl_out->command = state->command;
891 
892 	status = wreplsrv_fill_wrepl_table(service, state, table_out,
893 					   service->wins_db->local_owner, state->full_table);
894 	NT_STATUS_NOT_OK_RETURN(status);
895 
896 	/* queue the request */
897 	state->subreq = wrepl_request_send(state,
898 					   state->wreplconn->service->task->event_ctx,
899 					   state->wreplconn->sock, req, NULL);
900 	NT_STATUS_HAVE_NO_MEMORY(state->subreq);
901 
902 	tevent_req_set_callback(state->subreq,
903 				wreplsrv_push_notify_handler_treq,
904 				state);
905 
906 	state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_UPDATE;
907 
908 	return NT_STATUS_OK;
909 }
910 
wreplsrv_push_notify_inform(struct wreplsrv_push_notify_state * state)911 static NTSTATUS wreplsrv_push_notify_inform(struct wreplsrv_push_notify_state *state)
912 {
913 	struct wreplsrv_service *service = state->io->in.partner->service;
914 	struct wrepl_packet *req = &state->req_packet;
915 	struct wrepl_replication *repl_out = &state->req_packet.message.replication;
916 	struct wrepl_table *table_out = &state->req_packet.message.replication.info.table;
917 	NTSTATUS status;
918 
919 	req->opcode	= WREPL_OPCODE_BITS;
920 	req->assoc_ctx	= state->wreplconn->assoc_ctx.peer_ctx;
921 	req->mess_type	= WREPL_REPLICATION;
922 
923 	repl_out->command = state->command;
924 
925 	status = wreplsrv_fill_wrepl_table(service, state, table_out,
926 					   service->wins_db->local_owner, state->full_table);
927 	NT_STATUS_NOT_OK_RETURN(status);
928 
929 	/* we won't get a reply to a inform message */
930 	state->ctrl.send_only		= true;
931 
932 	state->subreq = wrepl_request_send(state,
933 					   state->wreplconn->service->task->event_ctx,
934 					   state->wreplconn->sock, req, &state->ctrl);
935 	NT_STATUS_HAVE_NO_MEMORY(state->subreq);
936 
937 	tevent_req_set_callback(state->subreq,
938 				wreplsrv_push_notify_handler_treq,
939 				state);
940 
941 	state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM;
942 
943 	return NT_STATUS_OK;
944 }
945 
wreplsrv_push_notify_wait_connect(struct wreplsrv_push_notify_state * state)946 static NTSTATUS wreplsrv_push_notify_wait_connect(struct wreplsrv_push_notify_state *state)
947 {
948 	NTSTATUS status;
949 
950 	status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
951 	NT_STATUS_NOT_OK_RETURN(status);
952 
953 	/* is the peer doesn't support inform fallback to update */
954 	switch (state->command) {
955 	case WREPL_REPL_INFORM:
956 		if (state->wreplconn->assoc_ctx.peer_major < 5) {
957 			state->command = WREPL_REPL_UPDATE;
958 		}
959 		break;
960 	case WREPL_REPL_INFORM2:
961 		if (state->wreplconn->assoc_ctx.peer_major < 5) {
962 			state->command = WREPL_REPL_UPDATE2;
963 		}
964 		break;
965 	default:
966 		break;
967 	}
968 
969 	switch (state->command) {
970 	case WREPL_REPL_UPDATE:
971 		state->full_table = true;
972 		return wreplsrv_push_notify_update(state);
973 	case WREPL_REPL_UPDATE2:
974 		state->full_table = false;
975 		return wreplsrv_push_notify_update(state);
976 	case WREPL_REPL_INFORM:
977 		state->full_table = true;
978 		return wreplsrv_push_notify_inform(state);
979 	case WREPL_REPL_INFORM2:
980 		state->full_table = false;
981 		return wreplsrv_push_notify_inform(state);
982 	default:
983 		return NT_STATUS_INTERNAL_ERROR;
984 	}
985 }
986 
wreplsrv_push_notify_wait_update(struct wreplsrv_push_notify_state * state)987 static NTSTATUS wreplsrv_push_notify_wait_update(struct wreplsrv_push_notify_state *state)
988 {
989 	struct wreplsrv_in_connection *wrepl_in;
990 	struct tstream_context *stream;
991 	void *process_context = NULL;
992 	NTSTATUS status;
993 
994 	status = wrepl_request_recv(state->subreq, state, NULL);
995 	TALLOC_FREE(state->subreq);
996 	NT_STATUS_NOT_OK_RETURN(status);
997 
998 	/*
999 	 * now we need to convert the wrepl_socket (client connection)
1000 	 * into a wreplsrv_in_connection (server connection), because
1001 	 * we'll act as a server on this connection after the WREPL_REPL_UPDATE*
1002 	 * message is received by the peer.
1003 	 */
1004 
1005 	status = wrepl_socket_split_stream(state->wreplconn->sock, state, &stream);
1006 	NT_STATUS_NOT_OK_RETURN(status);
1007 
1008 	/*
1009 	 * now create a wreplsrv_in_connection,
1010 	 * on which we act as server
1011 	 *
1012 	 * NOTE: stream will be stolen by
1013 	 *       wreplsrv_in_connection_merge()
1014 	 */
1015 	process_context = state->io->in.partner->service->task->process_context;
1016 	status = wreplsrv_in_connection_merge(state->io->in.partner,
1017 					      state->wreplconn->assoc_ctx.peer_ctx,
1018 					      &stream,
1019 					      &wrepl_in, process_context);
1020 	NT_STATUS_NOT_OK_RETURN(status);
1021 
1022 	/* now we can free the wreplsrv_out_connection */
1023 	TALLOC_FREE(state->wreplconn);
1024 
1025 	state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
1026 	return NT_STATUS_OK;
1027 }
1028 
wreplsrv_push_notify_wait_inform(struct wreplsrv_push_notify_state * state)1029 static NTSTATUS wreplsrv_push_notify_wait_inform(struct wreplsrv_push_notify_state *state)
1030 {
1031 	NTSTATUS status;
1032 
1033 	status = wrepl_request_recv(state->subreq, state, NULL);
1034 	TALLOC_FREE(state->subreq);
1035 	NT_STATUS_NOT_OK_RETURN(status);
1036 
1037 	state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
1038 	return status;
1039 }
1040 
wreplsrv_push_notify_handler(struct wreplsrv_push_notify_state * state)1041 static void wreplsrv_push_notify_handler(struct wreplsrv_push_notify_state *state)
1042 {
1043 	struct composite_context *c = state->c;
1044 
1045 	switch (state->stage) {
1046 	case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT:
1047 		c->status = wreplsrv_push_notify_wait_connect(state);
1048 		break;
1049 	case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_UPDATE:
1050 		c->status = wreplsrv_push_notify_wait_update(state);
1051 		break;
1052 	case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM:
1053 		c->status = wreplsrv_push_notify_wait_inform(state);
1054 		break;
1055 	case WREPLSRV_PUSH_NOTIFY_STAGE_DONE:
1056 		c->status = NT_STATUS_INTERNAL_ERROR;
1057 	}
1058 
1059 	if (state->stage == WREPLSRV_PUSH_NOTIFY_STAGE_DONE) {
1060 		c->state  = COMPOSITE_STATE_DONE;
1061 	}
1062 
1063 	if (!NT_STATUS_IS_OK(c->status)) {
1064 		c->state = COMPOSITE_STATE_ERROR;
1065 	}
1066 
1067 	if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
1068 		c->async.fn(c);
1069 	}
1070 }
1071 
wreplsrv_push_notify_handler_creq(struct composite_context * creq)1072 static void wreplsrv_push_notify_handler_creq(struct composite_context *creq)
1073 {
1074 	struct wreplsrv_push_notify_state *state = talloc_get_type(creq->async.private_data,
1075 						   struct wreplsrv_push_notify_state);
1076 	wreplsrv_push_notify_handler(state);
1077 	return;
1078 }
1079 
wreplsrv_push_notify_handler_treq(struct tevent_req * subreq)1080 static void wreplsrv_push_notify_handler_treq(struct tevent_req *subreq)
1081 {
1082 	struct wreplsrv_push_notify_state *state = tevent_req_callback_data(subreq,
1083 						   struct wreplsrv_push_notify_state);
1084 	wreplsrv_push_notify_handler(state);
1085 	return;
1086 }
1087 
wreplsrv_push_notify_send(TALLOC_CTX * mem_ctx,struct wreplsrv_push_notify_io * io)1088 struct composite_context *wreplsrv_push_notify_send(TALLOC_CTX *mem_ctx, struct wreplsrv_push_notify_io *io)
1089 {
1090 	struct composite_context *c = NULL;
1091 	struct wreplsrv_service *service = io->in.partner->service;
1092 	struct wreplsrv_push_notify_state *state = NULL;
1093 	enum winsrepl_partner_type partner_type;
1094 
1095 	c = talloc_zero(mem_ctx, struct composite_context);
1096 	if (!c) goto failed;
1097 
1098 	state = talloc_zero(c, struct wreplsrv_push_notify_state);
1099 	if (!state) goto failed;
1100 	state->c	= c;
1101 	state->io	= io;
1102 
1103 	if (io->in.inform) {
1104 		/* we can cache the connection in partner->push->wreplconn */
1105 		partner_type = WINSREPL_PARTNER_PUSH;
1106 		if (io->in.propagate) {
1107 			state->command	= WREPL_REPL_INFORM2;
1108 		} else {
1109 			state->command	= WREPL_REPL_INFORM;
1110 		}
1111 	} else {
1112 		/* we can NOT cache the connection */
1113 		partner_type = WINSREPL_PARTNER_NONE;
1114 		if (io->in.propagate) {
1115 			state->command	= WREPL_REPL_UPDATE2;
1116 		} else {
1117 			state->command	= WREPL_REPL_UPDATE;
1118 		}
1119 	}
1120 
1121 	c->state	= COMPOSITE_STATE_IN_PROGRESS;
1122 	c->event_ctx	= service->task->event_ctx;
1123 	c->private_data	= state;
1124 
1125 	state->stage	= WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT;
1126 	state->creq	= wreplsrv_out_connect_send(io->in.partner, partner_type, NULL);
1127 	if (!state->creq) goto failed;
1128 
1129 	state->creq->async.fn		= wreplsrv_push_notify_handler_creq;
1130 	state->creq->async.private_data	= state;
1131 
1132 	return c;
1133 failed:
1134 	talloc_free(c);
1135 	return NULL;
1136 }
1137 
wreplsrv_push_notify_recv(struct composite_context * c)1138 NTSTATUS wreplsrv_push_notify_recv(struct composite_context *c)
1139 {
1140 	NTSTATUS status;
1141 
1142 	status = composite_wait(c);
1143 
1144 	talloc_free(c);
1145 	return status;
1146 }
1147