1 /*
2 
3  bridging.c: Implementation of the bridging centrality metric
4  See also: http://www.ncbi.nlm.nih.gov/pmc/articles/PMC2889704/
5 
6  AUTHOR: Simon Jacobs <sdjacobs@uchicago.edu>
7  LICENSE: GPLv2
8 
9  Bridging can be parallelized with the following method, given a graph G:
10  for each edge e in parallel:
11   compute cohesiveness of G\e
12  for each vertex v in parallel:
13   compute average cohesiveness of E(v)
14 
15  OpenMP parallelization is available. Compile with OpenMP flags (usually "-fopenmp").
16 
17  Parallelization with MPI is experimental and will not compile by default. In
18  order to compile with MPI:
19   * an MPI implementation, such as OpenMPI, must be installed on your system
20   * compile with the -DUSE_MPI flag, and use the mpicc compiler.
21 
22  Then,use the MPI function by starting R with "mpirun -n 1 R", load the Rmpi and snow
23  packages, and use the following code (assuming an igraph object `g'):
24 
25 el <- get.edgelist(g, names=F)
26 el_i <- as.integer(t(el))
27 n <- as.integer(max(el_i))
28 m <- as.integer(length(el_i)/2)
29 np <-  mpi.universe.size() - 1
30 cl <- makeMPIcluster(np)
31 x <- clusterApply(cl, 0:(np-1), function(rank, el_i, n, m) {
32   library(influenceR)
33   .Call("snap_bridging_R", el_i, n, m, as.integer(TRUE), as.integer(rank), PACKAGE="influenceR")
34 }, el_i, n, m) # ensure these values are exported.
35 stopCluster(cl)
36 mpi.exit()
37 bridging_values = x[1]
38 names(bridging_values) <- V(g)$name
39 
40 
41 */
42 
43 #include "graph_defs.h"
44 
45 #include <R.h>
46 #include <Rinternals.h>
47 
48 #ifdef USE_MPI
49 #include <mpi.h>
50 #endif
51 
52 double bridging_vertex_precomp(graph_t *G, long v, double cls, double *closeness);
53 double *main_bridging(graph_t *G, int *edgelist, double *scores);
54 double closeness(graph_t *G, long ignore_edge0, long ignore_edge1);
55 long BFS_parallel_frontier_expansion_bridging(graph_t* G, long src, long diameter, double *distance, long ignore_edge0, long ignore_edge1 );
56 
57 
bridging(graph_t * G,int * edgelist,double * scores)58 double *bridging(graph_t *G, int *edgelist, double *scores)
59 {
60 
61  int n = G->n; /* number of nodes */
62  int m = G->m; /* number of edges */
63 
64 
65   long u, v, j, k;
66 
67 	/* 1) compute closeness by edge in file */
68 
69   double *closeness_by_edge = (double *) R_alloc(m, sizeof(double));
70 
71 #ifdef _OPENMP
72 #pragma omp parallel for private(u, v, j, k)
73 #endif
74   for (int i = 0; i < m/2; i++) {
75 
76     u = edgelist[i*2] - 1;
77     v = edgelist[i*2+1] - 1;
78 
79     /* Find edge numbers */
80     for (j=G->numEdges[u]; v != G->endV[j] && j<G->numEdges[u+1]; j++);
81     for (k=G->numEdges[v]; u != G->endV[k] && k<G->numEdges[v+1]; k++);
82     assert(j != G->numEdges[u+1]);
83     assert(k != G->numEdges[v+1]);
84 
85     /* Calculate closeness */
86     double c = closeness(G, j, k);
87     closeness_by_edge[j] = c;
88     closeness_by_edge[k] = c;
89   }
90 
91   /* 2) Compute closeness by vertex */
92 
93 	double cls = closeness(G, -1, -1); // normal closeness (use all edges)
94 
95 #ifdef _OPENMP
96 #pragma omp parallel for private(v)
97 #endif
98 	for (v = 0; v < n; v++)
99 		scores[v] = bridging_vertex_precomp(G, v, cls, closeness_by_edge);
100 
101   return scores;
102 }
103 
104 #ifdef USE_MPI
bridging_MPI(graph_t * G,int * edgelist,double * scores)105 double *bridging_MPI(graph_t *G, int *edgelist, double *scores)
106 {
107 
108   // Get the number of processes
109   int size, rank;
110   MPI_Comm_size(MPI_COMM_WORLD, &size);
111   MPI_Comm_rank(MPI_COMM_WORLD, &rank);
112 
113   #ifdef VERBOSE
114   REprintf("hello from main_brdiging, process %d\n", rank);
115   #endif
116 
117  	int n = G->n; /* number of nodes */
118 	int m = G->m; /* number of edges */
119 
120 
121 	/* 1) compute closeness by edge in file */
122 
123   int bufsize = ceil(((double)m) / size), delta = bufsize/2;
124   int start = rank * delta, end = start + delta;
125   end = end > m/2 ? m/2 : end;
126 
127 #ifdef VERBOSE
128   REprintf("%d range: %d-%d\n", rank, start, end);
129 #endif
130 
131   double *buf = (double *) R_alloc(bufsize, sizeof(double));
132   int *edgeidx = (int *) R_alloc(bufsize, sizeof(int));
133 
134   assert(buf);
135   assert(edgeidx);
136 
137   int i=0, u, v;
138   long j, k;
139 
140 
141   for (int ii = start; ii < end; ii++) {
142     u = edgelist[ii*2] - 1;
143     v = edgelist[ii*2+1] - 1;
144 
145     /* Find edge numbers */
146     for (j=G->numEdges[u]; v != G->endV[j] && j<G->numEdges[u+1]; j++);
147     for (k=G->numEdges[v]; u != G->endV[k] && k<G->numEdges[v+1]; k++);
148     assert(j != G->numEdges[u+1]);
149     assert(k != G->numEdges[v+1]);
150 
151     /* Calculate closeness */
152     buf[i] = closeness(G, j, k);
153     edgeidx[i] = j;
154     buf[i+1] = buf[i];
155     edgeidx[i+1] = k;
156     i+=2;
157 
158     //fprintf(stderr, "%d: CBE %d %d %g\n", rank, j, k, buf[i]);
159   }
160 
161 #ifdef VERBOSE
162   REprintf("Rank %d done reading edges\n", rank);
163 #endif
164 
165 
166   double *closeness_buf = NULL;
167   int *edge_indices = NULL;
168   if (rank == 0) {
169     closeness_buf = (double *) R_alloc(bufsize*size, sizeof(double));
170     edge_indices = (int *) R_alloc(bufsize*size, sizeof(int));
171   }
172   MPI_Barrier(MPI_COMM_WORLD);
173 
174   MPI_Gather(buf, bufsize, MPI_DOUBLE, closeness_buf, bufsize, MPI_DOUBLE, 0, MPI_COMM_WORLD);
175   MPI_Gather(edgeidx, bufsize, MPI_INT, edge_indices, bufsize, MPI_INT, 0, MPI_COMM_WORLD);
176 
177   double *closeness_by_edge = (double *) R_alloc(m, sizeof(double));
178   /* Fill REAL closeness_by_edge matrix */
179 
180   if (rank == 0) {
181     for (int i = 0; i < m; i++) {
182   	  closeness_by_edge[edge_indices[i]] = closeness_buf[i];
183 #ifdef VERBOSE
184       REprintf("CBE %d %g\n", edge_indices[i], closeness_buf[i]);
185 #endif
186     }
187 
188     //free(closeness_buf);
189     //free(edge_indices);
190   }
191 
192   MPI_Barrier(MPI_COMM_WORLD);
193   MPI_Bcast(closeness_by_edge, m, MPI_DOUBLE, 0, MPI_COMM_WORLD);
194   //free(buf);
195   //free(edgeidx);
196 
197 
198 	/* 2) compute bridging score by NODE. Parallization here may be more trouble than it's worth. But we already have the resources. */
199   delta = ceil(((double)n) / size);
200   start = rank * delta, end = start + delta;
201   end = end > n ? n : end;
202 
203 	double cls = closeness(G, -1, -1); // normal closeness (use all edges)
204 
205   buf = (double *) R_alloc(delta, sizeof(double));
206 
207 	for (int v = start; v < end; v++)
208 		buf[v-start] = bridging_vertex_precomp(G, v, cls, closeness_by_edge);
209 
210   MPI_Gather(buf, delta, MPI_DOUBLE, scores, delta, MPI_DOUBLE, 0, MPI_COMM_WORLD);
211 
212 
213   MPI_Barrier(MPI_COMM_WORLD);
214 
215 
216   return scores;
217 }
218 
219 #endif
220 
bridging_vertex_precomp(graph_t * G,long v,double cls,double * closeness)221 double bridging_vertex_precomp(graph_t *G, long v, double cls, double *closeness) {
222 
223   int degree = 0;
224   double sum = 0;
225 
226   for (long j=G->numEdges[v]; j<G->numEdges[v+1]; j++) {
227     double cls_ = closeness[j];
228   	sum += cls - cls_;
229     degree++;
230   }
231 
232   if (degree == 0)
233   return 0;
234 
235   return sum/((double) degree);
236 }
237 
238 
239 // Two edges correspond to the same edge.
closeness(graph_t * G,long ignore_edge0,long ignore_edge1)240 double closeness(graph_t *G, long ignore_edge0, long ignore_edge1)
241 {
242 	int n = G->n;
243 
244   // Must be thread-safe for OpenMP. R_alloc is not thread-safe.
245 	double *distance = (double *) malloc(sizeof(double) * n);
246 
247   if (distance == NULL) {
248     REprintf("Ran out of memory");
249   }
250 
251 	double sum = 0;
252 
253 	for (int i = 0; i < n; i++) {
254 		/* memset */
255 		for (int j = 0; j < n; j++)
256 			distance[j] = INFINITY;
257 
258 		BFS_parallel_frontier_expansion_bridging(G, i, 75, distance, ignore_edge0, ignore_edge1);
259 
260 		for (int j = 0; j < i; j++) { /* sum upper triangular part */
261 		    sum += (1/distance[j]);
262 		}
263 	}
264 
265 	free(distance);
266 	return sum / (n*n - n);
267 }
268 
269 
270 /*
271  * OpenMP is disabled because we're parallelizing over graph edges and computing multiple BFS at once.
272  * To enable, set the following macro directive:
273  * DEFINE _OPENMP_BRIDGING _OPENMP
274  *
275  */
BFS_parallel_frontier_expansion_bridging(graph_t * G,long src,long diameter,double * distance,long ignore_edge0,long ignore_edge1)276 long BFS_parallel_frontier_expansion_bridging(graph_t* G, long src, long diameter, double *distance, long ignore_edge0, long ignore_edge1 ) {
277 
278     attr_id_t* S;
279     long *start = NULL;
280     char* visited;
281     long *pSCount;
282 #ifdef DIAGNOSTIC
283     double elapsed_time;
284 #endif
285 #ifdef _OPENMP_BRIDGING
286     omp_lock_t* vLock;
287 #endif
288 
289     long phase_num = 0, numPhases;
290     long count;
291 
292 
293 #ifdef _OPENMP_BRIDGING
294 
295 OMP("omp parallel")
296     {
297 #endif
298 
299         attr_id_t *pS, *pSt;
300         long pCount, pS_size;
301         long v, w;
302         int tid, nthreads;
303         long start_iter, end_iter;
304         long j, k, vert, n;
305 #ifdef _OPENMP_BRIDGING
306         int myLock;
307 #endif
308 
309 #ifdef _OPENMP_BRIDGING
310         long i;
311         tid = omp_get_thread_num();
312         nthreads = omp_get_num_threads();
313 #else
314         tid = 0;
315         nthreads = 1;
316 #endif
317 
318 
319 #ifdef DIAGNOSTIC
320         if (tid == 0)
321             elapsed_time = get_seconds();
322 #endif
323 
324         if (tid == 0)
325             numPhases = diameter + 1;
326         n = G->n;
327 
328         pS_size = n/nthreads + 1;
329         pS = (attr_id_t *) malloc(pS_size*sizeof(attr_id_t));
330         assert(pS != NULL);
331 
332         if (tid == 0) {
333             S = (attr_id_t *) malloc(n*sizeof(attr_id_t));
334             visited = (char *) calloc(n, sizeof(char));
335             start = (long *) calloc((numPhases+2), sizeof(long));
336             pSCount = (long *) malloc((nthreads+1)*sizeof(long));
337 #ifdef _OPENMP_BRIDGING
338             vLock = (omp_lock_t *) malloc(n*sizeof(omp_lock_t));
339 #endif
340         }
341 
342 #ifdef _OPENMP_BRIDGING
343 OMP("omp barrier")
344 OMP("omp for")
345         for (i=0; i<n; i++) {
346             omp_init_lock(&vLock[i]);
347         }
348 #endif
349 
350 #ifdef _OPENMP_BRIDGING
351 OMP("omp barrier")
352 #endif
353 
354         if (tid == 0) {
355             S[0] = src;
356             visited[src] = (char) 1;
357             count = 1;
358             phase_num = 0;
359             start[0] = 0;
360             start[1] = 1;
361 			distance[src] = 0;
362         }
363 
364 
365 #ifdef _OPENMP_BRIDGING
366 OMP("omp barrier")
367 #endif
368 
369         while (start[phase_num+1] - start[phase_num] > 0) {
370 
371             pCount = 0;
372 
373             start_iter = start[phase_num];
374             end_iter = start[phase_num+1];
375 #ifdef _OPENMP_BRIDGING
376 OMP("omp for")
377 #endif
378             for (vert=start_iter; vert<end_iter; vert++) {
379 
380                 v = S[vert];
381 
382                 for (j=G->numEdges[v]; j<G->numEdges[v+1]; j++) {
383 
384                     if(j == ignore_edge0 || j == ignore_edge1) {
385                         continue;
386                     }
387 
388                     w = G->endV[j];
389                     if (v == w)
390                         continue;
391 #ifdef _OPENMP_BRIDGING
392                     myLock = omp_test_lock(&vLock[w]);
393                     if (myLock) {
394 #endif
395                         if (visited[w] != (char) 1) {
396 							distance[w] = distance[v] + 1;
397                             visited[w] = (char) 1;
398                             if (pCount == pS_size) {
399                                 /* Resize pS */
400                                 pSt = (attr_id_t *)
401                                     malloc(2*pS_size*sizeof(attr_id_t));
402                                 memcpy(pSt, pS, pS_size*sizeof(attr_id_t));
403                                 free(pS);
404                                 pS = pSt;
405                                 pS_size = 2*pS_size;
406                             }
407                             pS[pCount++] = w;
408                         }
409 #ifdef _OPENMP_BRIDGING
410                         omp_unset_lock(&vLock[w]);
411                     }
412 #endif
413                 }
414             }
415 
416 
417 #ifdef _OPENMP_BRIDGING
418 OMP("omp barrier")
419 #endif
420             pSCount[tid+1] = pCount;
421 
422 #ifdef _OPENMP_BRIDGING
423 OMP("omp barrier")
424 #endif
425 
426             if (tid == 0) {
427                 pSCount[0] = start[phase_num+1];
428                 for(k=1; k<=nthreads; k++) {
429                     pSCount[k] = pSCount[k-1] + pSCount[k];
430                 }
431                 start[phase_num+2] = pSCount[nthreads];
432                 count = pSCount[nthreads];
433                 phase_num++;
434             }
435 
436 #ifdef _OPENMP_BRIDGING
437 OMP("omp barrier")
438 #endif
439             for (k = pSCount[tid]; k < pSCount[tid+1]; k++) {
440                 S[k] = pS[k-pSCount[tid]];
441             }
442 
443 
444 #ifdef _OPENMP_BRIDGING
445 OMP("omp barrier")
446 #endif
447         } /* End of search */
448 
449 #ifdef DIAGNOSTIC
450         if (tid == 0) {
451             REPrintf("Search from vertex %ld,"
452                     " No. of vertices visited: %ld\n", src, count);
453         }
454 #endif
455 
456         free(pS);
457 #ifdef _OPENMP_BRIDGING
458 OMP("omp barrier")
459 OMP("omp for")
460         for (i=0; i<n; i++) {
461             omp_destroy_lock(&vLock[i]);
462         }
463 OMP("omp barrier")
464 #endif
465 
466         if (tid == 0) {
467             free(S);
468             free(start);
469             free(visited);
470             free(pSCount);
471 #ifdef _OPENMP_BRIDGING
472             free(vLock);
473 #endif
474 
475         }
476 
477 #ifdef _OPENMP_BRIDGING
478     }
479 #endif
480 
481 #ifdef DIAGNOSTIC
482     elapsed_time = get_seconds() - elapsed_time;
483     REprintf("Time taken for BFS: %lf seconds\n", elpased_time);
484 #endif
485     return count;
486 }
487 
488 
489