1 /*****************************************************************************\
2 ** kvs.c - KVS manipulation functions
3 *****************************************************************************
4 * Copyright (C) 2011-2012 National University of Defense Technology.
5 * Written by Hongjia Cao <hjcao@nudt.edu.cn>.
6 * All rights reserved.
7 *
8 * This file is part of Slurm, a resource management program.
9 * For details, see <https://slurm.schedmd.com/>.
10 * Please also read the included file: DISCLAIMER.
11 *
12 * Slurm is free software; you can redistribute it and/or modify it under
13 * the terms of the GNU General Public License as published by the Free
14 * Software Foundation; either version 2 of the License, or (at your option)
15 * any later version.
16 *
17 * In addition, as a special exception, the copyright holders give permission
18 * to link the code of portions of this program with the OpenSSL library under
19 * certain conditions as described in each individual source file, and
20 * distribute linked combinations including the two. You must obey the GNU
21 * General Public License in all respects for all of the code used other than
22 * OpenSSL. If you modify file(s) with this exception, you may extend this
23 * exception to your version of the file(s), but you are not obligated to do
24 * so. If you do not wish to do so, delete this exception statement from your
25 * version. If you delete this exception statement from all source files in
26 * the program, then also delete it here.
27 *
28 * Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
29 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
30 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
31 * details.
32 *
33 * You should have received a copy of the GNU General Public License along
34 * with Slurm; if not, write to the Free Software Foundation, Inc.,
35 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
36 \*****************************************************************************/
37
38 #include <stdlib.h>
39 #include <unistd.h>
40
41 #include "kvs.h"
42 #include "setup.h"
43 #include "tree.h"
44 #include "pmi.h"
45
46 #define MAX_RETRIES 5
47
48 /* for fence */
49 int tasks_to_wait = 0;
50 int children_to_wait = 0;
51 int kvs_seq = 1; /* starting from 1 */
52 int waiting_kvs_resp = 0;
53
54
55 /* bucket of key-value pairs */
56 typedef struct kvs_bucket {
57 char **pairs;
58 uint32_t count;
59 uint32_t size;
60 } kvs_bucket_t;
61
62 static kvs_bucket_t *kvs_hash = NULL;
63 static uint32_t hash_size = 0;
64
65 static char *temp_kvs_buf = NULL;
66 static int temp_kvs_cnt = 0;
67 static int temp_kvs_size = 0;
68
69 static int no_dup_keys = 0;
70
71 #define TASKS_PER_BUCKET 8
72 #define TEMP_KVS_SIZE_INC 2048
73
74 #define KEY_INDEX(i) (i * 2)
75 #define VAL_INDEX(i) (i * 2 + 1)
76 #define HASH(key) ( _hash(key) % hash_size)
77
78 inline static uint32_t
_hash(char * key)79 _hash(char *key)
80 {
81 int len, i;
82 uint32_t hash = 0;
83 uint8_t shift;
84
85 len = strlen(key);
86 for (i = 0; i < len; i ++) {
87 shift = (uint8_t)(hash >> 24);
88 hash = (hash << 8) | (uint32_t)(shift ^ (uint8_t)key[i]);
89 }
90 return hash;
91 }
92
93 extern int
temp_kvs_init(void)94 temp_kvs_init(void)
95 {
96 uint16_t cmd;
97 uint32_t nodeid, num_children, size;
98 Buf buf = NULL;
99
100 xfree(temp_kvs_buf);
101 temp_kvs_cnt = 0;
102 temp_kvs_size = TEMP_KVS_SIZE_INC;
103 temp_kvs_buf = xmalloc(temp_kvs_size);
104
105 /* put the tree cmd here to simplify message sending */
106 if (in_stepd()) {
107 cmd = TREE_CMD_KVS_FENCE;
108 } else {
109 cmd = TREE_CMD_KVS_FENCE_RESP;
110 }
111
112 buf = init_buf(1024);
113 pack16(cmd, buf);
114 if (in_stepd()) {
115 nodeid = job_info.nodeid;
116 /* XXX: TBC */
117 num_children = tree_info.num_children + 1;
118
119 pack32(nodeid, buf); /* from_nodeid */
120 packstr(tree_info.this_node, buf); /* from_node */
121 pack32(num_children, buf); /* num_children */
122 pack32(kvs_seq, buf);
123 } else {
124 pack32(kvs_seq, buf);
125 }
126 size = get_buf_offset(buf);
127 if (temp_kvs_cnt + size > temp_kvs_size) {
128 temp_kvs_size += TEMP_KVS_SIZE_INC;
129 xrealloc(temp_kvs_buf, temp_kvs_size);
130 }
131 memcpy(&temp_kvs_buf[temp_kvs_cnt], get_buf_data(buf), size);
132 temp_kvs_cnt += size;
133 free_buf(buf);
134
135 tasks_to_wait = 0;
136 children_to_wait = 0;
137
138 return SLURM_SUCCESS;
139 }
140
141 extern int
temp_kvs_add(char * key,char * val)142 temp_kvs_add(char *key, char *val)
143 {
144 Buf buf;
145 uint32_t size;
146
147 if ( key == NULL || val == NULL )
148 return SLURM_SUCCESS;
149
150 buf = init_buf(PMI2_MAX_KEYLEN + PMI2_MAX_VALLEN + 2 * sizeof(uint32_t));
151 packstr(key, buf);
152 packstr(val, buf);
153 size = get_buf_offset(buf);
154 if (temp_kvs_cnt + size > temp_kvs_size) {
155 temp_kvs_size += TEMP_KVS_SIZE_INC;
156 xrealloc(temp_kvs_buf, temp_kvs_size);
157 }
158 memcpy(&temp_kvs_buf[temp_kvs_cnt], get_buf_data(buf), size);
159 temp_kvs_cnt += size;
160 free_buf(buf);
161
162 return SLURM_SUCCESS;
163 }
164
165 extern int
temp_kvs_merge(Buf buf)166 temp_kvs_merge(Buf buf)
167 {
168 char *data;
169 uint32_t offset, size;
170
171 size = remaining_buf(buf);
172 if (size == 0) {
173 return SLURM_SUCCESS;
174 }
175 data = get_buf_data(buf);
176 offset = get_buf_offset(buf);
177
178 if (temp_kvs_cnt + size > temp_kvs_size) {
179 temp_kvs_size += size;
180 xrealloc(temp_kvs_buf, temp_kvs_size);
181 }
182 memcpy(&temp_kvs_buf[temp_kvs_cnt], &data[offset], size);
183 temp_kvs_cnt += size;
184
185 return SLURM_SUCCESS;
186 }
187
188 extern int
temp_kvs_send(void)189 temp_kvs_send(void)
190 {
191 int rc = SLURM_ERROR, retry = 0;
192 unsigned int delay = 1;
193 char *nodelist = NULL;
194
195 if (!in_stepd()) /* srun */
196 nodelist = xstrdup(job_info.step_nodelist);
197 else if (tree_info.parent_node)
198 nodelist = xstrdup(tree_info.parent_node);
199
200 /* cmd included in temp_kvs_buf */
201 kvs_seq++; /* expecting new kvs after now */
202
203 while (1) {
204 if (retry == 1)
205 verbose("failed to send temp kvs, rc=%d, retrying", rc);
206
207 if (nodelist)
208 /* srun or non-first-level stepds */
209 rc = slurm_forward_data(&nodelist,
210 tree_sock_addr,
211 temp_kvs_cnt,
212 temp_kvs_buf);
213 else /* first level stepds */
214 rc = tree_msg_to_srun(temp_kvs_cnt, temp_kvs_buf);
215
216 if (rc == SLURM_SUCCESS)
217 break;
218
219 if (++retry >= MAX_RETRIES)
220 break;
221 /* wait, in case parent stepd / srun not ready */
222 sleep(delay);
223 delay *= 2;
224 }
225 temp_kvs_init(); /* clear old temp kvs */
226
227 xfree(nodelist);
228
229 return rc;
230 }
231
232 /**************************************************************/
233
234 extern int
kvs_init(void)235 kvs_init(void)
236 {
237 debug3("mpi/pmi2: in kvs_init");
238
239 hash_size = ((job_info.ntasks + TASKS_PER_BUCKET - 1) / TASKS_PER_BUCKET);
240
241 kvs_hash = xmalloc(hash_size * sizeof(kvs_bucket_t));
242
243 if (getenv(PMI2_KVS_NO_DUP_KEYS_ENV))
244 no_dup_keys = 1;
245
246 return SLURM_SUCCESS;
247 }
248
249 /*
250 * returned value is not dup-ed
251 */
252 extern char *
kvs_get(char * key)253 kvs_get(char *key)
254 {
255 kvs_bucket_t *bucket;
256 char *val = NULL;
257 int i;
258
259 debug3("mpi/pmi2: in kvs_get, key=%s", key);
260
261 bucket = &kvs_hash[HASH(key)];
262 if (bucket->count > 0) {
263 for(i = 0; i < bucket->count; i ++) {
264 if (! xstrcmp(key, bucket->pairs[KEY_INDEX(i)])) {
265 val = bucket->pairs[VAL_INDEX(i)];
266 break;
267 }
268 }
269 }
270
271 debug3("mpi/pmi2: out kvs_get, val=%s", val);
272
273 return val;
274 }
275
276 extern int
kvs_put(char * key,char * val)277 kvs_put(char *key, char *val)
278 {
279 kvs_bucket_t *bucket;
280 int i;
281
282 debug3("mpi/pmi2: in kvs_put");
283
284 bucket = &kvs_hash[HASH(key)];
285
286 if (! no_dup_keys) {
287 for (i = 0; i < bucket->count; i ++) {
288 if (! xstrcmp(key, bucket->pairs[KEY_INDEX(i)])) {
289 /* replace the k-v pair */
290 xfree(bucket->pairs[VAL_INDEX(i)]);
291 bucket->pairs[VAL_INDEX(i)] = xstrdup(val);
292 debug("mpi/pmi2: put kvs %s=%s", key, val);
293 return SLURM_SUCCESS;
294 }
295 }
296 }
297 if (bucket->count * 2 >= bucket->size) {
298 bucket->size += (TASKS_PER_BUCKET * 2);
299 xrealloc(bucket->pairs, bucket->size * sizeof(char *));
300 }
301 /* add the k-v pair */
302 i = bucket->count;
303 bucket->pairs[KEY_INDEX(i)] = xstrdup(key);
304 bucket->pairs[VAL_INDEX(i)] = xstrdup(val);
305 bucket->count ++;
306
307 debug3("mpi/pmi2: put kvs %s=%s", key, val);
308 return SLURM_SUCCESS;
309 }
310
311 extern int
kvs_clear(void)312 kvs_clear(void)
313 {
314 kvs_bucket_t *bucket;
315 int i, j;
316
317 for (i = 0; i < hash_size; i ++){
318 bucket = &kvs_hash[i];
319 for (j = 0; j < bucket->count; j ++) {
320 xfree (bucket->pairs[KEY_INDEX(j)]);
321 xfree (bucket->pairs[VAL_INDEX(j)]);
322 }
323 }
324 xfree(kvs_hash);
325
326 return SLURM_SUCCESS;
327 }
328