1 /*
2 * Copyright (c) 2004-2011 The Trustees of the University of Tennessee.
3 * All rights reserved.
4 * Copyright (c) 2012 Los Alamos National Security, LLC. All rights
5 * reserved.
6 * Copyright (c) 2015 Intel, Inc. All rights reserved.
7 * $COPYRIGHT$
8 *
9 * Additional copyrights may follow
10 *
11 * $HEADER$
12 */
13
14 #include "ompi_config.h"
15 #include "vprotocol_pessimist_eventlog.h"
16 #include "opal/mca/pmix/pmix.h"
17 #include "ompi/dpm/dpm.h"
18
vprotocol_pessimist_event_logger_connect(int el_rank,ompi_communicator_t ** el_comm)19 int vprotocol_pessimist_event_logger_connect(int el_rank, ompi_communicator_t **el_comm)
20 {
21 int rc;
22 char *port;
23 int rank;
24 vprotocol_pessimist_clock_t connect_info[2];
25 opal_list_t results;
26 opal_pmix_pdata_t *pdat;
27
28 OBJ_CONSTRUCT(&results, opal_list_t);
29 pdat = OBJ_NEW(opal_pmix_pdata_t);
30 asprintf(&pdat->value.key, VPROTOCOL_EVENT_LOGGER_NAME_FMT, el_rank);
31 opal_list_append(&results, &pdat->super);
32
33 rc = opal_pmix.lookup(&results, NULL);
34 if (OPAL_SUCCESS != rc ||
35 OPAL_STRING != pdat->value.type ||
36 NULL == pdat->value.data.string) {
37 OPAL_LIST_DESTRUCT(&results);
38 return OMPI_ERR_NOT_FOUND;
39 }
40 port = strdup(pdat->value.data.string);
41 OPAL_LIST_DESTRUCT(&results);
42 V_OUTPUT_VERBOSE(45, "Found port < %s >", port);
43
44 rc = ompi_dpm_connect_accept(MPI_COMM_SELF, 0, port, true, el_comm);
45 if(OMPI_SUCCESS != rc) {
46 OMPI_ERROR_LOG(rc);
47 }
48
49 /* Send Rank, receive max buffer size and max_clock back */
50 rank = ompi_comm_rank(&ompi_mpi_comm_world.comm);
51 rc = mca_pml_v.host_pml.pml_send(&rank, 1, MPI_INTEGER, 0,
52 VPROTOCOL_PESSIMIST_EVENTLOG_NEW_CLIENT_CMD,
53 MCA_PML_BASE_SEND_STANDARD,
54 mca_vprotocol_pessimist.el_comm);
55 if(OPAL_UNLIKELY(MPI_SUCCESS != rc))
56 OMPI_ERRHANDLER_INVOKE(mca_vprotocol_pessimist.el_comm, rc,
57 __FILE__ ": failed sending event logger handshake");
58 rc = mca_pml_v.host_pml.pml_recv(&connect_info, 2, MPI_UNSIGNED_LONG_LONG,
59 0, VPROTOCOL_PESSIMIST_EVENTLOG_NEW_CLIENT_CMD,
60 mca_vprotocol_pessimist.el_comm, MPI_STATUS_IGNORE);
61 if(OPAL_UNLIKELY(MPI_SUCCESS != rc)) \
62 OMPI_ERRHANDLER_INVOKE(mca_vprotocol_pessimist.el_comm, rc, \
63 __FILE__ ": failed receiving event logger handshake");
64
65 return rc;
66 }
67
vprotocol_pessimist_event_logger_disconnect(ompi_communicator_t * el_comm)68 int vprotocol_pessimist_event_logger_disconnect(ompi_communicator_t *el_comm)
69 {
70 ompi_dpm_disconnect(el_comm);
71 return OMPI_SUCCESS;
72 }
73
vprotocol_pessimist_matching_replay(int * src)74 void vprotocol_pessimist_matching_replay(int *src) {
75 #if OPAL_ENABLE_DEBUG
76 vprotocol_pessimist_clock_t max = 0;
77 #endif
78 mca_vprotocol_pessimist_event_t *event;
79
80 /* searching this request in the event list */
81 for(event = (mca_vprotocol_pessimist_event_t *) opal_list_get_first(&mca_vprotocol_pessimist.replay_events);
82 event != (mca_vprotocol_pessimist_event_t *) opal_list_get_end(&mca_vprotocol_pessimist.replay_events);
83 event = (mca_vprotocol_pessimist_event_t *) opal_list_get_next(event))
84 {
85 vprotocol_pessimist_matching_event_t *mevent;
86
87 if(VPROTOCOL_PESSIMIST_EVENT_TYPE_MATCHING != event->type) continue;
88 mevent = &(event->u_event.e_matching);
89 if(mevent->reqid == mca_vprotocol_pessimist.clock)
90 {
91 /* this is the event to replay */
92 V_OUTPUT_VERBOSE(70, "pessimist: replay\tmatch\t%"PRIpclock"\trecv is forced from %d", mevent->reqid, mevent->src);
93 (*src) = mevent->src;
94 opal_list_remove_item(&mca_vprotocol_pessimist.replay_events,
95 (opal_list_item_t *) event);
96 VPESSIMIST_EVENT_RETURN(event);
97 }
98 #if OPAL_ENABLE_DEBUG
99 else if(mevent->reqid > max)
100 max = mevent->reqid;
101 }
102 /* not forcing a ANY SOURCE event whose recieve clock is lower than max
103 * is a bug indicating we have missed an event during logging ! */
104 assert(((*src) != MPI_ANY_SOURCE) || (mca_vprotocol_pessimist.clock > max));
105 #else
106 }
107 #endif
108 }
109
vprotocol_pessimist_delivery_replay(size_t n,ompi_request_t ** reqs,int * outcount,int * index,ompi_status_public_t * status)110 void vprotocol_pessimist_delivery_replay(size_t n, ompi_request_t **reqs,
111 int *outcount, int *index,
112 ompi_status_public_t *status) {
113 mca_vprotocol_pessimist_event_t *event;
114
115 for(event = (mca_vprotocol_pessimist_event_t *) opal_list_get_first(&mca_vprotocol_pessimist.replay_events);
116 event != (mca_vprotocol_pessimist_event_t *) opal_list_get_end(&mca_vprotocol_pessimist.replay_events);
117 event = (mca_vprotocol_pessimist_event_t *) opal_list_get_next(event))
118 {
119 vprotocol_pessimist_delivery_event_t *devent;
120
121 if(VPROTOCOL_PESSIMIST_EVENT_TYPE_DELIVERY != event->type) continue;
122 devent = &(event->u_event.e_delivery);
123 if(devent->probeid < mca_vprotocol_pessimist.clock)
124 {
125 /* this particular test have to return no request completed yet */
126 V_OUTPUT_VERBOSE(70, "pessimist:\treplay\tdeliver\t%"PRIpclock"\tnone", mca_vprotocol_pessimist.clock);
127 *index = MPI_UNDEFINED;
128 *outcount = 0;
129 mca_vprotocol_pessimist.clock++;
130 /* This request have to stay in the queue until probeid matches */
131 return;
132 }
133 else if(devent->probeid == mca_vprotocol_pessimist.clock)
134 {
135 int i;
136 for(i = 0; i < (int) n; i++)
137 {
138 if(VPESSIMIST_FTREQ(reqs[i])->reqid == devent->reqid)
139 {
140 V_OUTPUT_VERBOSE(70, "pessimist:\treplay\tdeliver\t%"PRIpclock"\t%"PRIpclock, devent->probeid, devent->reqid);
141 opal_list_remove_item(&mca_vprotocol_pessimist.replay_events,
142 (opal_list_item_t *) event);
143 VPESSIMIST_EVENT_RETURN(event);
144 *index = i;
145 *outcount = 1;
146 mca_vprotocol_pessimist.clock++;
147 ompi_request_wait(&reqs[i], status);
148 return;
149 }
150 }
151 V_OUTPUT_VERBOSE(70, "pessimist:\treplay\tdeliver\t%"PRIpclock"\tnone", mca_vprotocol_pessimist.clock);
152 assert(devent->reqid == 0); /* make sure we don't missed a request */
153 *index = MPI_UNDEFINED;
154 *outcount = 0;
155 mca_vprotocol_pessimist.clock++;
156 opal_list_remove_item(&mca_vprotocol_pessimist.replay_events,
157 (opal_list_item_t *) event);
158 VPESSIMIST_EVENT_RETURN(event);
159 return;
160 }
161 }
162 V_OUTPUT_VERBOSE(50, "pessimist:\treplay\tdeliver\t%"PRIpclock"\tnot forced", mca_vprotocol_pessimist.clock);
163 }
164