1 /*
2
3 bridging.c: Implementation of the bridging centrality metric
4 See also: http://www.ncbi.nlm.nih.gov/pmc/articles/PMC2889704/
5
6 AUTHOR: Simon Jacobs <sdjacobs@uchicago.edu>
7 LICENSE: GPLv2
8
9 Bridging can be parallelized with the following method, given a graph G:
10 for each edge e in parallel:
11 compute cohesiveness of G\e
12 for each vertex v in parallel:
13 compute average cohesiveness of E(v)
14
15 OpenMP parallelization is available. Compile with OpenMP flags (usually "-fopenmp").
16
17 Parallelization with MPI is experimental and will not compile by default. In
18 order to compile with MPI:
19 * an MPI implementation, such as OpenMPI, must be installed on your system
20 * compile with the -DUSE_MPI flag, and use the mpicc compiler.
21
22 Then,use the MPI function by starting R with "mpirun -n 1 R", load the Rmpi and snow
23 packages, and use the following code (assuming an igraph object `g'):
24
25 el <- get.edgelist(g, names=F)
26 el_i <- as.integer(t(el))
27 n <- as.integer(max(el_i))
28 m <- as.integer(length(el_i)/2)
29 np <- mpi.universe.size() - 1
30 cl <- makeMPIcluster(np)
31 x <- clusterApply(cl, 0:(np-1), function(rank, el_i, n, m) {
32 library(influenceR)
33 .Call("snap_bridging_R", el_i, n, m, as.integer(TRUE), as.integer(rank), PACKAGE="influenceR")
34 }, el_i, n, m) # ensure these values are exported.
35 stopCluster(cl)
36 mpi.exit()
37 bridging_values = x[1]
38 names(bridging_values) <- V(g)$name
39
40
41 */
42
43 #include "graph_defs.h"
44
45 #include <R.h>
46 #include <Rinternals.h>
47
48 #ifdef USE_MPI
49 #include <mpi.h>
50 #endif
51
52 double bridging_vertex_precomp(graph_t *G, long v, double cls, double *closeness);
53 double *main_bridging(graph_t *G, int *edgelist, double *scores);
54 double closeness(graph_t *G, long ignore_edge0, long ignore_edge1);
55 long BFS_parallel_frontier_expansion_bridging(graph_t* G, long src, long diameter, double *distance, long ignore_edge0, long ignore_edge1 );
56
57
bridging(graph_t * G,int * edgelist,double * scores)58 double *bridging(graph_t *G, int *edgelist, double *scores)
59 {
60
61 int n = G->n; /* number of nodes */
62 int m = G->m; /* number of edges */
63
64
65 long u, v, j, k;
66
67 /* 1) compute closeness by edge in file */
68
69 double *closeness_by_edge = (double *) R_alloc(m, sizeof(double));
70
71 #ifdef _OPENMP
72 #pragma omp parallel for private(u, v, j, k)
73 #endif
74 for (int i = 0; i < m/2; i++) {
75
76 u = edgelist[i*2] - 1;
77 v = edgelist[i*2+1] - 1;
78
79 /* Find edge numbers */
80 for (j=G->numEdges[u]; v != G->endV[j] && j<G->numEdges[u+1]; j++);
81 for (k=G->numEdges[v]; u != G->endV[k] && k<G->numEdges[v+1]; k++);
82 assert(j != G->numEdges[u+1]);
83 assert(k != G->numEdges[v+1]);
84
85 /* Calculate closeness */
86 double c = closeness(G, j, k);
87 closeness_by_edge[j] = c;
88 closeness_by_edge[k] = c;
89 }
90
91 /* 2) Compute closeness by vertex */
92
93 double cls = closeness(G, -1, -1); // normal closeness (use all edges)
94
95 #ifdef _OPENMP
96 #pragma omp parallel for private(v)
97 #endif
98 for (v = 0; v < n; v++)
99 scores[v] = bridging_vertex_precomp(G, v, cls, closeness_by_edge);
100
101 return scores;
102 }
103
104 #ifdef USE_MPI
bridging_MPI(graph_t * G,int * edgelist,double * scores)105 double *bridging_MPI(graph_t *G, int *edgelist, double *scores)
106 {
107
108 // Get the number of processes
109 int size, rank;
110 MPI_Comm_size(MPI_COMM_WORLD, &size);
111 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
112
113 #ifdef VERBOSE
114 REprintf("hello from main_brdiging, process %d\n", rank);
115 #endif
116
117 int n = G->n; /* number of nodes */
118 int m = G->m; /* number of edges */
119
120
121 /* 1) compute closeness by edge in file */
122
123 int bufsize = ceil(((double)m) / size), delta = bufsize/2;
124 int start = rank * delta, end = start + delta;
125 end = end > m/2 ? m/2 : end;
126
127 #ifdef VERBOSE
128 REprintf("%d range: %d-%d\n", rank, start, end);
129 #endif
130
131 double *buf = (double *) R_alloc(bufsize, sizeof(double));
132 int *edgeidx = (int *) R_alloc(bufsize, sizeof(int));
133
134 assert(buf);
135 assert(edgeidx);
136
137 int i=0, u, v;
138 long j, k;
139
140
141 for (int ii = start; ii < end; ii++) {
142 u = edgelist[ii*2] - 1;
143 v = edgelist[ii*2+1] - 1;
144
145 /* Find edge numbers */
146 for (j=G->numEdges[u]; v != G->endV[j] && j<G->numEdges[u+1]; j++);
147 for (k=G->numEdges[v]; u != G->endV[k] && k<G->numEdges[v+1]; k++);
148 assert(j != G->numEdges[u+1]);
149 assert(k != G->numEdges[v+1]);
150
151 /* Calculate closeness */
152 buf[i] = closeness(G, j, k);
153 edgeidx[i] = j;
154 buf[i+1] = buf[i];
155 edgeidx[i+1] = k;
156 i+=2;
157
158 //fprintf(stderr, "%d: CBE %d %d %g\n", rank, j, k, buf[i]);
159 }
160
161 #ifdef VERBOSE
162 REprintf("Rank %d done reading edges\n", rank);
163 #endif
164
165
166 double *closeness_buf = NULL;
167 int *edge_indices = NULL;
168 if (rank == 0) {
169 closeness_buf = (double *) R_alloc(bufsize*size, sizeof(double));
170 edge_indices = (int *) R_alloc(bufsize*size, sizeof(int));
171 }
172 MPI_Barrier(MPI_COMM_WORLD);
173
174 MPI_Gather(buf, bufsize, MPI_DOUBLE, closeness_buf, bufsize, MPI_DOUBLE, 0, MPI_COMM_WORLD);
175 MPI_Gather(edgeidx, bufsize, MPI_INT, edge_indices, bufsize, MPI_INT, 0, MPI_COMM_WORLD);
176
177 double *closeness_by_edge = (double *) R_alloc(m, sizeof(double));
178 /* Fill REAL closeness_by_edge matrix */
179
180 if (rank == 0) {
181 for (int i = 0; i < m; i++) {
182 closeness_by_edge[edge_indices[i]] = closeness_buf[i];
183 #ifdef VERBOSE
184 REprintf("CBE %d %g\n", edge_indices[i], closeness_buf[i]);
185 #endif
186 }
187
188 //free(closeness_buf);
189 //free(edge_indices);
190 }
191
192 MPI_Barrier(MPI_COMM_WORLD);
193 MPI_Bcast(closeness_by_edge, m, MPI_DOUBLE, 0, MPI_COMM_WORLD);
194 //free(buf);
195 //free(edgeidx);
196
197
198 /* 2) compute bridging score by NODE. Parallization here may be more trouble than it's worth. But we already have the resources. */
199 delta = ceil(((double)n) / size);
200 start = rank * delta, end = start + delta;
201 end = end > n ? n : end;
202
203 double cls = closeness(G, -1, -1); // normal closeness (use all edges)
204
205 buf = (double *) R_alloc(delta, sizeof(double));
206
207 for (int v = start; v < end; v++)
208 buf[v-start] = bridging_vertex_precomp(G, v, cls, closeness_by_edge);
209
210 MPI_Gather(buf, delta, MPI_DOUBLE, scores, delta, MPI_DOUBLE, 0, MPI_COMM_WORLD);
211
212
213 MPI_Barrier(MPI_COMM_WORLD);
214
215
216 return scores;
217 }
218
219 #endif
220
bridging_vertex_precomp(graph_t * G,long v,double cls,double * closeness)221 double bridging_vertex_precomp(graph_t *G, long v, double cls, double *closeness) {
222
223 int degree = 0;
224 double sum = 0;
225
226 for (long j=G->numEdges[v]; j<G->numEdges[v+1]; j++) {
227 double cls_ = closeness[j];
228 sum += cls - cls_;
229 degree++;
230 }
231
232 if (degree == 0)
233 return 0;
234
235 return sum/((double) degree);
236 }
237
238
239 // Two edges correspond to the same edge.
closeness(graph_t * G,long ignore_edge0,long ignore_edge1)240 double closeness(graph_t *G, long ignore_edge0, long ignore_edge1)
241 {
242 int n = G->n;
243
244 // Must be thread-safe for OpenMP. R_alloc is not thread-safe.
245 double *distance = (double *) malloc(sizeof(double) * n);
246
247 if (distance == NULL) {
248 REprintf("Ran out of memory");
249 }
250
251 double sum = 0;
252
253 for (int i = 0; i < n; i++) {
254 /* memset */
255 for (int j = 0; j < n; j++)
256 distance[j] = INFINITY;
257
258 BFS_parallel_frontier_expansion_bridging(G, i, 75, distance, ignore_edge0, ignore_edge1);
259
260 for (int j = 0; j < i; j++) { /* sum upper triangular part */
261 sum += (1/distance[j]);
262 }
263 }
264
265 free(distance);
266 return sum / (n*n - n);
267 }
268
269
270 /*
271 * OpenMP is disabled because we're parallelizing over graph edges and computing multiple BFS at once.
272 * To enable, set the following macro directive:
273 * DEFINE _OPENMP_BRIDGING _OPENMP
274 *
275 */
BFS_parallel_frontier_expansion_bridging(graph_t * G,long src,long diameter,double * distance,long ignore_edge0,long ignore_edge1)276 long BFS_parallel_frontier_expansion_bridging(graph_t* G, long src, long diameter, double *distance, long ignore_edge0, long ignore_edge1 ) {
277
278 attr_id_t* S;
279 long *start = NULL;
280 char* visited;
281 long *pSCount;
282 #ifdef DIAGNOSTIC
283 double elapsed_time;
284 #endif
285 #ifdef _OPENMP_BRIDGING
286 omp_lock_t* vLock;
287 #endif
288
289 long phase_num = 0, numPhases;
290 long count;
291
292
293 #ifdef _OPENMP_BRIDGING
294
295 OMP("omp parallel")
296 {
297 #endif
298
299 attr_id_t *pS, *pSt;
300 long pCount, pS_size;
301 long v, w;
302 int tid, nthreads;
303 long start_iter, end_iter;
304 long j, k, vert, n;
305 #ifdef _OPENMP_BRIDGING
306 int myLock;
307 #endif
308
309 #ifdef _OPENMP_BRIDGING
310 long i;
311 tid = omp_get_thread_num();
312 nthreads = omp_get_num_threads();
313 #else
314 tid = 0;
315 nthreads = 1;
316 #endif
317
318
319 #ifdef DIAGNOSTIC
320 if (tid == 0)
321 elapsed_time = get_seconds();
322 #endif
323
324 if (tid == 0)
325 numPhases = diameter + 1;
326 n = G->n;
327
328 pS_size = n/nthreads + 1;
329 pS = (attr_id_t *) malloc(pS_size*sizeof(attr_id_t));
330 assert(pS != NULL);
331
332 if (tid == 0) {
333 S = (attr_id_t *) malloc(n*sizeof(attr_id_t));
334 visited = (char *) calloc(n, sizeof(char));
335 start = (long *) calloc((numPhases+2), sizeof(long));
336 pSCount = (long *) malloc((nthreads+1)*sizeof(long));
337 #ifdef _OPENMP_BRIDGING
338 vLock = (omp_lock_t *) malloc(n*sizeof(omp_lock_t));
339 #endif
340 }
341
342 #ifdef _OPENMP_BRIDGING
343 OMP("omp barrier")
344 OMP("omp for")
345 for (i=0; i<n; i++) {
346 omp_init_lock(&vLock[i]);
347 }
348 #endif
349
350 #ifdef _OPENMP_BRIDGING
351 OMP("omp barrier")
352 #endif
353
354 if (tid == 0) {
355 S[0] = src;
356 visited[src] = (char) 1;
357 count = 1;
358 phase_num = 0;
359 start[0] = 0;
360 start[1] = 1;
361 distance[src] = 0;
362 }
363
364
365 #ifdef _OPENMP_BRIDGING
366 OMP("omp barrier")
367 #endif
368
369 while (start[phase_num+1] - start[phase_num] > 0) {
370
371 pCount = 0;
372
373 start_iter = start[phase_num];
374 end_iter = start[phase_num+1];
375 #ifdef _OPENMP_BRIDGING
376 OMP("omp for")
377 #endif
378 for (vert=start_iter; vert<end_iter; vert++) {
379
380 v = S[vert];
381
382 for (j=G->numEdges[v]; j<G->numEdges[v+1]; j++) {
383
384 if(j == ignore_edge0 || j == ignore_edge1) {
385 continue;
386 }
387
388 w = G->endV[j];
389 if (v == w)
390 continue;
391 #ifdef _OPENMP_BRIDGING
392 myLock = omp_test_lock(&vLock[w]);
393 if (myLock) {
394 #endif
395 if (visited[w] != (char) 1) {
396 distance[w] = distance[v] + 1;
397 visited[w] = (char) 1;
398 if (pCount == pS_size) {
399 /* Resize pS */
400 pSt = (attr_id_t *)
401 malloc(2*pS_size*sizeof(attr_id_t));
402 memcpy(pSt, pS, pS_size*sizeof(attr_id_t));
403 free(pS);
404 pS = pSt;
405 pS_size = 2*pS_size;
406 }
407 pS[pCount++] = w;
408 }
409 #ifdef _OPENMP_BRIDGING
410 omp_unset_lock(&vLock[w]);
411 }
412 #endif
413 }
414 }
415
416
417 #ifdef _OPENMP_BRIDGING
418 OMP("omp barrier")
419 #endif
420 pSCount[tid+1] = pCount;
421
422 #ifdef _OPENMP_BRIDGING
423 OMP("omp barrier")
424 #endif
425
426 if (tid == 0) {
427 pSCount[0] = start[phase_num+1];
428 for(k=1; k<=nthreads; k++) {
429 pSCount[k] = pSCount[k-1] + pSCount[k];
430 }
431 start[phase_num+2] = pSCount[nthreads];
432 count = pSCount[nthreads];
433 phase_num++;
434 }
435
436 #ifdef _OPENMP_BRIDGING
437 OMP("omp barrier")
438 #endif
439 for (k = pSCount[tid]; k < pSCount[tid+1]; k++) {
440 S[k] = pS[k-pSCount[tid]];
441 }
442
443
444 #ifdef _OPENMP_BRIDGING
445 OMP("omp barrier")
446 #endif
447 } /* End of search */
448
449 #ifdef DIAGNOSTIC
450 if (tid == 0) {
451 REPrintf("Search from vertex %ld,"
452 " No. of vertices visited: %ld\n", src, count);
453 }
454 #endif
455
456 free(pS);
457 #ifdef _OPENMP_BRIDGING
458 OMP("omp barrier")
459 OMP("omp for")
460 for (i=0; i<n; i++) {
461 omp_destroy_lock(&vLock[i]);
462 }
463 OMP("omp barrier")
464 #endif
465
466 if (tid == 0) {
467 free(S);
468 free(start);
469 free(visited);
470 free(pSCount);
471 #ifdef _OPENMP_BRIDGING
472 free(vLock);
473 #endif
474
475 }
476
477 #ifdef _OPENMP_BRIDGING
478 }
479 #endif
480
481 #ifdef DIAGNOSTIC
482 elapsed_time = get_seconds() - elapsed_time;
483 REprintf("Time taken for BFS: %lf seconds\n", elpased_time);
484 #endif
485 return count;
486 }
487
488
489