1 /* This is (almost) standalone */
2 #define _POSIX_C_SOURCE 200112L
3 
4 #include "cado.h" // IWYU pragma: keep
5 #include <stdio.h>
6 #include <stdlib.h>
7 #include <sys/types.h>
8 #include <sys/stat.h>
9 #ifdef HAVE_UTSNAME_H
10 #include <sys/utsname.h>
11 #endif
12 #include <errno.h>
13 #include <unistd.h>
14 #include <time.h>
15 #include <string.h>
16 #ifndef STANDALONE
17 #include "select_mpi.h"
18 #else
19 #include <mpi.h>
20 #include "macros.h"
21 #define ASSERT_ALWAYS(x) do { if (!(x)) abort(); } while (0)
22 #endif
23 
24 int rank;
25 int size;
26 
27 #define BUFFERSIZE      (1 << 28)
28 
share_file(const char * fname,int root,size_t total,MPI_Comm comm)29 void share_file(const char * fname, int root, size_t total, MPI_Comm comm)
30 {
31     FILE * f;
32     f = fopen(fname, rank == root ? "rb" : "wb");
33     if (f == NULL)
34         MPI_Abort(comm,1);
35 
36     char * buf = malloc(BUFFERSIZE);
37     size_t fsz = 0;
38     time_t t0 = time(NULL);
39     time_t t1 = t0 + 1;
40     for( ; ; ) {
41         int n = 0;
42         if (rank == root)
43             n = fread(buf, 1, BUFFERSIZE, f);
44 
45         MPI_Bcast(&n, 1, MPI_INT, root, comm);
46         fsz += n;
47         if (n == 0)
48             break;
49         MPI_Bcast(buf, n, MPI_BYTE, root, comm);
50         if (rank != root) {
51             int m=fwrite(buf, 1, n, f);
52             if (m != n)
53                 abort();
54         } else {
55             time_t t = time(NULL);
56             if (t >= t1) {
57                 printf("%.1f MB in %d s (%.2f MB/s) [%.1f%%]\n",
58                         fsz * 1.0e-6,
59                         (int) (t-t0), (double) fsz * 1.0e-6 / (t-t0),
60                         100.0 * (double) fsz/ total
61                         );
62                 t1 = t + 1;
63             }
64         }
65     }
66     fclose(f);
67     int t = time(NULL) - t0;
68     if (rank == 0)
69         printf(" broadcasted in %d s (%.2f MB/s)\n",
70                 t, (double) fsz * 1.0e-6 / t);
71     free(buf);
72 }
73 
main(int argc,char * argv[])74 int main(int argc, char * argv[])
75 {
76     struct stat sbuf[1];
77 #ifdef HAVE_UTSNAME_H
78     struct utsname u[1];
79     char *nodename = u[0].nodename;
80 #else
81     char *nodename = "unknown";
82 #endif
83 
84 #ifdef HAVE_UTSNAME_H
85     uname(u);
86 #endif
87     MPI_Init(&argc, &argv);
88 
89     MPI_Comm_size(MPI_COMM_WORLD, &size);
90     MPI_Comm_rank(MPI_COMM_WORLD, &rank);
91     printf("node %d/%d on %s\n", rank,size,nodename);
92 
93     int duplicate=0;
94     size_t minname=strlen(nodename) + 1;
95     size_t maxname=strlen(nodename) + 1;
96     MPI_Allreduce(MPI_IN_PLACE, &maxname, 1, MPI_INT, MPI_MAX, MPI_COMM_WORLD);
97     MPI_Allreduce(MPI_IN_PLACE, &minname, 1, MPI_INT, MPI_MIN, MPI_COMM_WORLD);
98     ASSERT_ALWAYS(minname == maxname);
99     char * allnames = malloc(size * maxname);
100     MPI_Allgather(nodename, maxname, MPI_BYTE, allnames, maxname, MPI_BYTE, MPI_COMM_WORLD);
101     for(int i = 0 ; i < rank ; i++) {
102         if (memcmp(allnames + i * maxname, nodename, maxname) == 0) {
103             fprintf(stderr, "%s on node %d/%d duplicates node %d/%d\n",
104                     nodename, rank, size, i, size);
105             duplicate=1;
106         }
107     }
108 
109     MPI_Comm comm;
110     MPI_Comm_split(MPI_COMM_WORLD, duplicate, rank, &comm);
111 
112     MPI_Comm_size(comm, &size);
113     MPI_Comm_rank(comm, &rank);
114     if (!duplicate)
115     printf("reduced node %d/%d on %s\n", rank,size,nodename);
116 
117     for(int i = 1 ; i < argc && !duplicate; i++) {
118         int rc;
119         memset(sbuf, 0, sizeof(struct stat));
120         rc = stat(argv[i], sbuf);
121         if (rc < 0 && errno != ENOENT)
122             abort();
123         int ok = rc == 0;
124         size_t * allsizes = malloc(size * sizeof(size_t));
125         size_t me = sbuf->st_size;
126         MPI_Allgather(&me, sizeof(size_t), MPI_BYTE,
127                 allsizes, sizeof(size_t), MPI_BYTE, comm);
128         size_t szmax = 0;
129         for(int k = 0 ; k < size ; k++) {
130             ok = ok && allsizes[k] == me;
131             if (allsizes[k] > szmax)
132                 szmax = allsizes[k];
133         }
134         MPI_Allreduce(MPI_IN_PLACE, &ok,    1, MPI_INT, MPI_SUM, comm);
135         if (ok) {
136             if (rank == 0)
137                 printf("%s ok everywhere (%zu MB)\n", argv[i], szmax >> 20);
138             continue;
139         } else {
140             if (rc == 0 && me < szmax) {
141                 printf("node %d/%d, %s is only %zu < %zu. Removed\n",
142                         rank,size,argv[i],me,szmax);
143                 unlink(argv[i]);
144                 rc = -1;
145             }
146             ok = rc == 0;
147             MPI_Allreduce(MPI_IN_PLACE, &ok, 1, MPI_INT, MPI_SUM, comm);
148             if (ok != 1) {
149                 if (rank == 0)
150                     fprintf(stderr, "warning: %d<%d full files found for %s\n", ok, size, argv[i]);
151                 // MPI_Abort(comm,1);
152             }
153         }
154         int node=0;
155         if (rc == 0) node = rank;
156         MPI_Allreduce(MPI_IN_PLACE, &node, 1, MPI_INT, MPI_MAX, comm);
157         if (rank == 0) {
158             printf("%s (%zu MB, node %d)\n", argv[i], szmax >> 20, node);
159             fflush(stdout);
160         }
161         share_file(argv[i], node, szmax, comm);
162     }
163 
164     MPI_Barrier(MPI_COMM_WORLD);
165     MPI_Comm_free(&comm);
166 
167     MPI_Finalize();
168 }
169 
170