1 /* This is (almost) standalone */
2 #define _POSIX_C_SOURCE 200112L
3
4 #include "cado.h" // IWYU pragma: keep
5 #include <stdio.h>
6 #include <stdlib.h>
7 #include <sys/types.h>
8 #include <sys/stat.h>
9 #ifdef HAVE_UTSNAME_H
10 #include <sys/utsname.h>
11 #endif
12 #include <errno.h>
13 #include <unistd.h>
14 #include <time.h>
15 #include <string.h>
16 #ifndef STANDALONE
17 #include "select_mpi.h"
18 #else
19 #include <mpi.h>
20 #include "macros.h"
21 #define ASSERT_ALWAYS(x) do { if (!(x)) abort(); } while (0)
22 #endif
23
24 int rank;
25 int size;
26
27 #define BUFFERSIZE (1 << 28)
28
share_file(const char * fname,int root,size_t total,MPI_Comm comm)29 void share_file(const char * fname, int root, size_t total, MPI_Comm comm)
30 {
31 FILE * f;
32 f = fopen(fname, rank == root ? "rb" : "wb");
33 if (f == NULL)
34 MPI_Abort(comm,1);
35
36 char * buf = malloc(BUFFERSIZE);
37 size_t fsz = 0;
38 time_t t0 = time(NULL);
39 time_t t1 = t0 + 1;
40 for( ; ; ) {
41 int n = 0;
42 if (rank == root)
43 n = fread(buf, 1, BUFFERSIZE, f);
44
45 MPI_Bcast(&n, 1, MPI_INT, root, comm);
46 fsz += n;
47 if (n == 0)
48 break;
49 MPI_Bcast(buf, n, MPI_BYTE, root, comm);
50 if (rank != root) {
51 int m=fwrite(buf, 1, n, f);
52 if (m != n)
53 abort();
54 } else {
55 time_t t = time(NULL);
56 if (t >= t1) {
57 printf("%.1f MB in %d s (%.2f MB/s) [%.1f%%]\n",
58 fsz * 1.0e-6,
59 (int) (t-t0), (double) fsz * 1.0e-6 / (t-t0),
60 100.0 * (double) fsz/ total
61 );
62 t1 = t + 1;
63 }
64 }
65 }
66 fclose(f);
67 int t = time(NULL) - t0;
68 if (rank == 0)
69 printf(" broadcasted in %d s (%.2f MB/s)\n",
70 t, (double) fsz * 1.0e-6 / t);
71 free(buf);
72 }
73
main(int argc,char * argv[])74 int main(int argc, char * argv[])
75 {
76 struct stat sbuf[1];
77 #ifdef HAVE_UTSNAME_H
78 struct utsname u[1];
79 char *nodename = u[0].nodename;
80 #else
81 char *nodename = "unknown";
82 #endif
83
84 #ifdef HAVE_UTSNAME_H
85 uname(u);
86 #endif
87 MPI_Init(&argc, &argv);
88
89 MPI_Comm_size(MPI_COMM_WORLD, &size);
90 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
91 printf("node %d/%d on %s\n", rank,size,nodename);
92
93 int duplicate=0;
94 size_t minname=strlen(nodename) + 1;
95 size_t maxname=strlen(nodename) + 1;
96 MPI_Allreduce(MPI_IN_PLACE, &maxname, 1, MPI_INT, MPI_MAX, MPI_COMM_WORLD);
97 MPI_Allreduce(MPI_IN_PLACE, &minname, 1, MPI_INT, MPI_MIN, MPI_COMM_WORLD);
98 ASSERT_ALWAYS(minname == maxname);
99 char * allnames = malloc(size * maxname);
100 MPI_Allgather(nodename, maxname, MPI_BYTE, allnames, maxname, MPI_BYTE, MPI_COMM_WORLD);
101 for(int i = 0 ; i < rank ; i++) {
102 if (memcmp(allnames + i * maxname, nodename, maxname) == 0) {
103 fprintf(stderr, "%s on node %d/%d duplicates node %d/%d\n",
104 nodename, rank, size, i, size);
105 duplicate=1;
106 }
107 }
108
109 MPI_Comm comm;
110 MPI_Comm_split(MPI_COMM_WORLD, duplicate, rank, &comm);
111
112 MPI_Comm_size(comm, &size);
113 MPI_Comm_rank(comm, &rank);
114 if (!duplicate)
115 printf("reduced node %d/%d on %s\n", rank,size,nodename);
116
117 for(int i = 1 ; i < argc && !duplicate; i++) {
118 int rc;
119 memset(sbuf, 0, sizeof(struct stat));
120 rc = stat(argv[i], sbuf);
121 if (rc < 0 && errno != ENOENT)
122 abort();
123 int ok = rc == 0;
124 size_t * allsizes = malloc(size * sizeof(size_t));
125 size_t me = sbuf->st_size;
126 MPI_Allgather(&me, sizeof(size_t), MPI_BYTE,
127 allsizes, sizeof(size_t), MPI_BYTE, comm);
128 size_t szmax = 0;
129 for(int k = 0 ; k < size ; k++) {
130 ok = ok && allsizes[k] == me;
131 if (allsizes[k] > szmax)
132 szmax = allsizes[k];
133 }
134 MPI_Allreduce(MPI_IN_PLACE, &ok, 1, MPI_INT, MPI_SUM, comm);
135 if (ok) {
136 if (rank == 0)
137 printf("%s ok everywhere (%zu MB)\n", argv[i], szmax >> 20);
138 continue;
139 } else {
140 if (rc == 0 && me < szmax) {
141 printf("node %d/%d, %s is only %zu < %zu. Removed\n",
142 rank,size,argv[i],me,szmax);
143 unlink(argv[i]);
144 rc = -1;
145 }
146 ok = rc == 0;
147 MPI_Allreduce(MPI_IN_PLACE, &ok, 1, MPI_INT, MPI_SUM, comm);
148 if (ok != 1) {
149 if (rank == 0)
150 fprintf(stderr, "warning: %d<%d full files found for %s\n", ok, size, argv[i]);
151 // MPI_Abort(comm,1);
152 }
153 }
154 int node=0;
155 if (rc == 0) node = rank;
156 MPI_Allreduce(MPI_IN_PLACE, &node, 1, MPI_INT, MPI_MAX, comm);
157 if (rank == 0) {
158 printf("%s (%zu MB, node %d)\n", argv[i], szmax >> 20, node);
159 fflush(stdout);
160 }
161 share_file(argv[i], node, szmax, comm);
162 }
163
164 MPI_Barrier(MPI_COMM_WORLD);
165 MPI_Comm_free(&comm);
166
167 MPI_Finalize();
168 }
169
170