1 /*****************************************************************************\
2  **  kvs.c - KVS manipulation functions
3  *****************************************************************************
4  *  Copyright (C) 2011-2012 National University of Defense Technology.
5  *  Written by Hongjia Cao <hjcao@nudt.edu.cn>.
6  *  All rights reserved.
7  *
8  *  This file is part of Slurm, a resource management program.
9  *  For details, see <https://slurm.schedmd.com/>.
10  *  Please also read the included file: DISCLAIMER.
11  *
12  *  Slurm is free software; you can redistribute it and/or modify it under
13  *  the terms of the GNU General Public License as published by the Free
14  *  Software Foundation; either version 2 of the License, or (at your option)
15  *  any later version.
16  *
17  *  In addition, as a special exception, the copyright holders give permission
18  *  to link the code of portions of this program with the OpenSSL library under
19  *  certain conditions as described in each individual source file, and
20  *  distribute linked combinations including the two. You must obey the GNU
21  *  General Public License in all respects for all of the code used other than
22  *  OpenSSL. If you modify file(s) with this exception, you may extend this
23  *  exception to your version of the file(s), but you are not obligated to do
24  *  so. If you do not wish to do so, delete this exception statement from your
25  *  version.  If you delete this exception statement from all source files in
26  *  the program, then also delete it here.
27  *
28  *  Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
29  *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
30  *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
31  *  details.
32  *
33  *  You should have received a copy of the GNU General Public License along
34  *  with Slurm; if not, write to the Free Software Foundation, Inc.,
35  *  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA.
36 \*****************************************************************************/
37 
38 #include <stdlib.h>
39 #include <unistd.h>
40 
41 #include "kvs.h"
42 #include "setup.h"
43 #include "tree.h"
44 #include "pmi.h"
45 
46 #define MAX_RETRIES 5
47 
48 /* for fence */
49 int tasks_to_wait = 0;
50 int children_to_wait = 0;
51 int kvs_seq = 1; /* starting from 1 */
52 int waiting_kvs_resp = 0;
53 
54 
55 /* bucket of key-value pairs */
56 typedef struct kvs_bucket {
57 	char **pairs;
58 	uint32_t count;
59 	uint32_t size;
60 } kvs_bucket_t;
61 
62 static kvs_bucket_t *kvs_hash = NULL;
63 static uint32_t hash_size = 0;
64 
65 static char *temp_kvs_buf = NULL;
66 static int temp_kvs_cnt = 0;
67 static int temp_kvs_size = 0;
68 
69 static int no_dup_keys = 0;
70 
71 #define TASKS_PER_BUCKET 8
72 #define TEMP_KVS_SIZE_INC 2048
73 
74 #define KEY_INDEX(i) (i * 2)
75 #define VAL_INDEX(i) (i * 2 + 1)
76 #define HASH(key) ( _hash(key) % hash_size)
77 
78 inline static uint32_t
_hash(char * key)79 _hash(char *key)
80 {
81 	int len, i;
82 	uint32_t hash = 0;
83 	uint8_t shift;
84 
85 	len = strlen(key);
86 	for (i = 0; i < len; i ++) {
87 		shift = (uint8_t)(hash >> 24);
88 		hash = (hash << 8) | (uint32_t)(shift ^ (uint8_t)key[i]);
89 	}
90 	return hash;
91 }
92 
93 extern int
temp_kvs_init(void)94 temp_kvs_init(void)
95 {
96 	uint16_t cmd;
97 	uint32_t nodeid, num_children, size;
98 	Buf buf = NULL;
99 
100 	xfree(temp_kvs_buf);
101 	temp_kvs_cnt = 0;
102 	temp_kvs_size = TEMP_KVS_SIZE_INC;
103 	temp_kvs_buf = xmalloc(temp_kvs_size);
104 
105 	/* put the tree cmd here to simplify message sending */
106 	if (in_stepd()) {
107 		cmd = TREE_CMD_KVS_FENCE;
108 	} else {
109 		cmd = TREE_CMD_KVS_FENCE_RESP;
110 	}
111 
112 	buf = init_buf(1024);
113 	pack16(cmd, buf);
114 	if (in_stepd()) {
115 		nodeid = job_info.nodeid;
116 		/* XXX: TBC */
117 		num_children = tree_info.num_children + 1;
118 
119 		pack32(nodeid, buf); /* from_nodeid */
120 		packstr(tree_info.this_node, buf); /* from_node */
121 		pack32(num_children, buf); /* num_children */
122 		pack32(kvs_seq, buf);
123 	} else {
124 		pack32(kvs_seq, buf);
125 	}
126 	size = get_buf_offset(buf);
127 	if (temp_kvs_cnt + size > temp_kvs_size) {
128 		temp_kvs_size += TEMP_KVS_SIZE_INC;
129 		xrealloc(temp_kvs_buf, temp_kvs_size);
130 	}
131 	memcpy(&temp_kvs_buf[temp_kvs_cnt], get_buf_data(buf), size);
132 	temp_kvs_cnt += size;
133 	free_buf(buf);
134 
135 	tasks_to_wait = 0;
136 	children_to_wait = 0;
137 
138 	return SLURM_SUCCESS;
139 }
140 
141 extern int
temp_kvs_add(char * key,char * val)142 temp_kvs_add(char *key, char *val)
143 {
144 	Buf buf;
145 	uint32_t size;
146 
147 	if ( key == NULL || val == NULL )
148 		return SLURM_SUCCESS;
149 
150 	buf = init_buf(PMI2_MAX_KEYLEN + PMI2_MAX_VALLEN + 2 * sizeof(uint32_t));
151 	packstr(key, buf);
152 	packstr(val, buf);
153 	size = get_buf_offset(buf);
154 	if (temp_kvs_cnt + size > temp_kvs_size) {
155 		temp_kvs_size += TEMP_KVS_SIZE_INC;
156 		xrealloc(temp_kvs_buf, temp_kvs_size);
157 	}
158 	memcpy(&temp_kvs_buf[temp_kvs_cnt], get_buf_data(buf), size);
159 	temp_kvs_cnt += size;
160 	free_buf(buf);
161 
162 	return SLURM_SUCCESS;
163 }
164 
165 extern int
temp_kvs_merge(Buf buf)166 temp_kvs_merge(Buf buf)
167 {
168 	char *data;
169 	uint32_t offset, size;
170 
171 	size = remaining_buf(buf);
172 	if (size == 0) {
173 		return SLURM_SUCCESS;
174 	}
175 	data = get_buf_data(buf);
176 	offset = get_buf_offset(buf);
177 
178 	if (temp_kvs_cnt + size > temp_kvs_size) {
179 		temp_kvs_size += size;
180 		xrealloc(temp_kvs_buf, temp_kvs_size);
181 	}
182 	memcpy(&temp_kvs_buf[temp_kvs_cnt], &data[offset], size);
183 	temp_kvs_cnt += size;
184 
185 	return SLURM_SUCCESS;
186 }
187 
188 extern int
temp_kvs_send(void)189 temp_kvs_send(void)
190 {
191 	int rc = SLURM_ERROR, retry = 0;
192 	unsigned int delay = 1;
193 	char *nodelist = NULL;
194 
195 	if (!in_stepd())	/* srun */
196 		nodelist = xstrdup(job_info.step_nodelist);
197 	else if (tree_info.parent_node)
198 		nodelist = xstrdup(tree_info.parent_node);
199 
200 	/* cmd included in temp_kvs_buf */
201 	kvs_seq++; /* expecting new kvs after now */
202 
203 	while (1) {
204 		if (retry == 1)
205 			verbose("failed to send temp kvs, rc=%d, retrying", rc);
206 
207 		if (nodelist)
208 			/* srun or non-first-level stepds */
209 			rc = slurm_forward_data(&nodelist,
210 						tree_sock_addr,
211 						temp_kvs_cnt,
212 						temp_kvs_buf);
213 		else		/* first level stepds */
214 			rc = tree_msg_to_srun(temp_kvs_cnt, temp_kvs_buf);
215 
216 		if (rc == SLURM_SUCCESS)
217 			break;
218 
219 		if (++retry >= MAX_RETRIES)
220 			break;
221 		/* wait, in case parent stepd / srun not ready */
222 		sleep(delay);
223 		delay *= 2;
224 	}
225 	temp_kvs_init();	/* clear old temp kvs */
226 
227 	xfree(nodelist);
228 
229 	return rc;
230 }
231 
232 /**************************************************************/
233 
234 extern int
kvs_init(void)235 kvs_init(void)
236 {
237 	debug3("mpi/pmi2: in kvs_init");
238 
239 	hash_size = ((job_info.ntasks + TASKS_PER_BUCKET - 1) / TASKS_PER_BUCKET);
240 
241 	kvs_hash = xmalloc(hash_size * sizeof(kvs_bucket_t));
242 
243 	if (getenv(PMI2_KVS_NO_DUP_KEYS_ENV))
244 		no_dup_keys = 1;
245 
246 	return SLURM_SUCCESS;
247 }
248 
249 /*
250  * returned value is not dup-ed
251  */
252 extern char *
kvs_get(char * key)253 kvs_get(char *key)
254 {
255 	kvs_bucket_t *bucket;
256 	char *val = NULL;
257 	int i;
258 
259 	debug3("mpi/pmi2: in kvs_get, key=%s", key);
260 
261 	bucket = &kvs_hash[HASH(key)];
262 	if (bucket->count > 0) {
263 		for(i = 0; i < bucket->count; i ++) {
264 			if (! xstrcmp(key, bucket->pairs[KEY_INDEX(i)])) {
265 				val = bucket->pairs[VAL_INDEX(i)];
266 				break;
267 			}
268 		}
269 	}
270 
271 	debug3("mpi/pmi2: out kvs_get, val=%s", val);
272 
273 	return val;
274 }
275 
276 extern int
kvs_put(char * key,char * val)277 kvs_put(char *key, char *val)
278 {
279 	kvs_bucket_t *bucket;
280 	int i;
281 
282 	debug3("mpi/pmi2: in kvs_put");
283 
284 	bucket = &kvs_hash[HASH(key)];
285 
286 	if (! no_dup_keys) {
287 		for (i = 0; i < bucket->count; i ++) {
288 			if (! xstrcmp(key, bucket->pairs[KEY_INDEX(i)])) {
289 				/* replace the k-v pair */
290 				xfree(bucket->pairs[VAL_INDEX(i)]);
291 				bucket->pairs[VAL_INDEX(i)] = xstrdup(val);
292 				debug("mpi/pmi2: put kvs %s=%s", key, val);
293 				return SLURM_SUCCESS;
294 			}
295 		}
296 	}
297 	if (bucket->count * 2 >= bucket->size) {
298 		bucket->size += (TASKS_PER_BUCKET * 2);
299 		xrealloc(bucket->pairs, bucket->size * sizeof(char *));
300 	}
301 	/* add the k-v pair */
302 	i = bucket->count;
303 	bucket->pairs[KEY_INDEX(i)] = xstrdup(key);
304 	bucket->pairs[VAL_INDEX(i)] = xstrdup(val);
305 	bucket->count ++;
306 
307 	debug3("mpi/pmi2: put kvs %s=%s", key, val);
308 	return SLURM_SUCCESS;
309 }
310 
311 extern int
kvs_clear(void)312 kvs_clear(void)
313 {
314 	kvs_bucket_t *bucket;
315 	int i, j;
316 
317 	for (i = 0; i < hash_size; i ++){
318 		bucket = &kvs_hash[i];
319 		for (j = 0; j < bucket->count; j ++) {
320 			xfree (bucket->pairs[KEY_INDEX(j)]);
321 			xfree (bucket->pairs[VAL_INDEX(j)]);
322 		}
323 	}
324 	xfree(kvs_hash);
325 
326 	return SLURM_SUCCESS;
327 }
328