1 /*
2  * Copyright (c) 2004-2011 The Trustees of the University of Tennessee.
3  *                         All rights reserved.
4  * Copyright (c) 2012      Los Alamos National Security, LLC.  All rights
5  *                         reserved.
6  * Copyright (c) 2015      Intel, Inc. All rights reserved.
7  * $COPYRIGHT$
8  *
9  * Additional copyrights may follow
10  *
11  * $HEADER$
12  */
13 
14 #include "ompi_config.h"
15 #include "vprotocol_pessimist_eventlog.h"
16 #include "opal/mca/pmix/pmix.h"
17 #include "ompi/dpm/dpm.h"
18 
vprotocol_pessimist_event_logger_connect(int el_rank,ompi_communicator_t ** el_comm)19 int vprotocol_pessimist_event_logger_connect(int el_rank, ompi_communicator_t **el_comm)
20 {
21     int rc;
22     char *port;
23     int rank;
24     vprotocol_pessimist_clock_t connect_info[2];
25     opal_list_t results;
26     opal_pmix_pdata_t *pdat;
27 
28     OBJ_CONSTRUCT(&results, opal_list_t);
29     pdat = OBJ_NEW(opal_pmix_pdata_t);
30     asprintf(&pdat->value.key, VPROTOCOL_EVENT_LOGGER_NAME_FMT, el_rank);
31     opal_list_append(&results, &pdat->super);
32 
33     rc = opal_pmix.lookup(&results, NULL);
34     if (OPAL_SUCCESS != rc ||
35         OPAL_STRING != pdat->value.type ||
36         NULL == pdat->value.data.string) {
37         OPAL_LIST_DESTRUCT(&results);
38         return OMPI_ERR_NOT_FOUND;
39     }
40     port = strdup(pdat->value.data.string);
41     OPAL_LIST_DESTRUCT(&results);
42     V_OUTPUT_VERBOSE(45, "Found port < %s >", port);
43 
44     rc = ompi_dpm_connect_accept(MPI_COMM_SELF, 0, port, true, el_comm);
45     if(OMPI_SUCCESS != rc) {
46         OMPI_ERROR_LOG(rc);
47     }
48 
49     /* Send Rank, receive max buffer size and max_clock back */
50     rank = ompi_comm_rank(&ompi_mpi_comm_world.comm);
51     rc = mca_pml_v.host_pml.pml_send(&rank, 1, MPI_INTEGER, 0,
52                                      VPROTOCOL_PESSIMIST_EVENTLOG_NEW_CLIENT_CMD,
53                                      MCA_PML_BASE_SEND_STANDARD,
54                                      mca_vprotocol_pessimist.el_comm);
55     if(OPAL_UNLIKELY(MPI_SUCCESS != rc))
56         OMPI_ERRHANDLER_INVOKE(mca_vprotocol_pessimist.el_comm, rc,
57                                __FILE__ ": failed sending event logger handshake");
58     rc = mca_pml_v.host_pml.pml_recv(&connect_info, 2, MPI_UNSIGNED_LONG_LONG,
59                                      0, VPROTOCOL_PESSIMIST_EVENTLOG_NEW_CLIENT_CMD,
60                                      mca_vprotocol_pessimist.el_comm, MPI_STATUS_IGNORE);
61     if(OPAL_UNLIKELY(MPI_SUCCESS != rc))                                  \
62         OMPI_ERRHANDLER_INVOKE(mca_vprotocol_pessimist.el_comm, rc,       \
63                                __FILE__ ": failed receiving event logger handshake");
64 
65     return rc;
66 }
67 
vprotocol_pessimist_event_logger_disconnect(ompi_communicator_t * el_comm)68 int vprotocol_pessimist_event_logger_disconnect(ompi_communicator_t *el_comm)
69 {
70     ompi_dpm_disconnect(el_comm);
71     return OMPI_SUCCESS;
72 }
73 
vprotocol_pessimist_matching_replay(int * src)74 void vprotocol_pessimist_matching_replay(int *src) {
75 #if OPAL_ENABLE_DEBUG
76     vprotocol_pessimist_clock_t max = 0;
77 #endif
78     mca_vprotocol_pessimist_event_t *event;
79 
80     /* searching this request in the event list */
81     for(event = (mca_vprotocol_pessimist_event_t *) opal_list_get_first(&mca_vprotocol_pessimist.replay_events);
82         event != (mca_vprotocol_pessimist_event_t *) opal_list_get_end(&mca_vprotocol_pessimist.replay_events);
83         event = (mca_vprotocol_pessimist_event_t *) opal_list_get_next(event))
84     {
85         vprotocol_pessimist_matching_event_t *mevent;
86 
87         if(VPROTOCOL_PESSIMIST_EVENT_TYPE_MATCHING != event->type) continue;
88         mevent = &(event->u_event.e_matching);
89         if(mevent->reqid == mca_vprotocol_pessimist.clock)
90         {
91             /* this is the event to replay */
92             V_OUTPUT_VERBOSE(70, "pessimist: replay\tmatch\t%"PRIpclock"\trecv is forced from %d", mevent->reqid, mevent->src);
93             (*src) = mevent->src;
94             opal_list_remove_item(&mca_vprotocol_pessimist.replay_events,
95                                   (opal_list_item_t *) event);
96             VPESSIMIST_EVENT_RETURN(event);
97         }
98 #if OPAL_ENABLE_DEBUG
99         else if(mevent->reqid > max)
100             max = mevent->reqid;
101     }
102     /* not forcing a ANY SOURCE event whose recieve clock is lower than max
103      * is a bug indicating we have missed an event during logging ! */
104     assert(((*src) != MPI_ANY_SOURCE) || (mca_vprotocol_pessimist.clock > max));
105 #else
106     }
107 #endif
108 }
109 
vprotocol_pessimist_delivery_replay(size_t n,ompi_request_t ** reqs,int * outcount,int * index,ompi_status_public_t * status)110 void vprotocol_pessimist_delivery_replay(size_t n, ompi_request_t **reqs,
111                                          int *outcount, int *index,
112                                          ompi_status_public_t *status) {
113     mca_vprotocol_pessimist_event_t *event;
114 
115     for(event = (mca_vprotocol_pessimist_event_t *) opal_list_get_first(&mca_vprotocol_pessimist.replay_events);
116         event != (mca_vprotocol_pessimist_event_t *) opal_list_get_end(&mca_vprotocol_pessimist.replay_events);
117         event = (mca_vprotocol_pessimist_event_t *) opal_list_get_next(event))
118     {
119         vprotocol_pessimist_delivery_event_t *devent;
120 
121         if(VPROTOCOL_PESSIMIST_EVENT_TYPE_DELIVERY != event->type) continue;
122         devent = &(event->u_event.e_delivery);
123         if(devent->probeid < mca_vprotocol_pessimist.clock)
124         {
125             /* this particular test have to return no request completed yet */
126             V_OUTPUT_VERBOSE(70, "pessimist:\treplay\tdeliver\t%"PRIpclock"\tnone", mca_vprotocol_pessimist.clock);
127             *index = MPI_UNDEFINED;
128             *outcount = 0;
129             mca_vprotocol_pessimist.clock++;
130             /* This request have to stay in the queue until probeid matches */
131             return;
132         }
133         else if(devent->probeid == mca_vprotocol_pessimist.clock)
134         {
135             int i;
136             for(i = 0; i < (int) n; i++)
137             {
138                 if(VPESSIMIST_FTREQ(reqs[i])->reqid == devent->reqid)
139                 {
140                     V_OUTPUT_VERBOSE(70, "pessimist:\treplay\tdeliver\t%"PRIpclock"\t%"PRIpclock, devent->probeid, devent->reqid);
141                     opal_list_remove_item(&mca_vprotocol_pessimist.replay_events,
142                                           (opal_list_item_t *) event);
143                     VPESSIMIST_EVENT_RETURN(event);
144                     *index = i;
145                     *outcount = 1;
146                     mca_vprotocol_pessimist.clock++;
147                     ompi_request_wait(&reqs[i], status);
148                     return;
149                 }
150             }
151             V_OUTPUT_VERBOSE(70, "pessimist:\treplay\tdeliver\t%"PRIpclock"\tnone", mca_vprotocol_pessimist.clock);
152             assert(devent->reqid == 0); /* make sure we don't missed a request */
153             *index = MPI_UNDEFINED;
154             *outcount = 0;
155             mca_vprotocol_pessimist.clock++;
156             opal_list_remove_item(&mca_vprotocol_pessimist.replay_events,
157                                   (opal_list_item_t *) event);
158             VPESSIMIST_EVENT_RETURN(event);
159             return;
160         }
161     }
162     V_OUTPUT_VERBOSE(50, "pessimist:\treplay\tdeliver\t%"PRIpclock"\tnot forced", mca_vprotocol_pessimist.clock);
163 }
164