1!--------------------------------------------------------------------------------------------------!
2!   CP2K: A general program to perform molecular dynamics simulations                              !
3!   Copyright (C) 2000 - 2020  CP2K developers group                                               !
4!--------------------------------------------------------------------------------------------------!
5
6! **************************************************************************************************
7!> \note
8!>      Basic type for real space grid methods
9!> \par History
10!>      JGH (22-May-2002) : New routine rs_grid_zero
11!>      JGH (12-Jun-2002) : Bug fix for mpi groups
12!>      JGH (19-Jun-2003) : Added routine for task distribution
13!>      JGH (23-Nov-2003) : Added routine for task loop separation
14!> \author JGH (18-Mar-2001)
15! **************************************************************************************************
16MODULE realspace_grid_types
17   USE cp_array_utils,                  ONLY: cp_1d_r_p_type
18   USE cp_log_handling,                 ONLY: cp_to_string
19   USE fast,                            ONLY: copy_cr,&
20                                              copy_rc
21   USE kahan_sum,                       ONLY: accurate_sum
22   USE kinds,                           ONLY: dp,&
23                                              int_8
24   USE machine,                         ONLY: m_memory
25   USE mathlib,                         ONLY: det_3x3
26   USE message_passing,                 ONLY: &
27        mp_comm_dup, mp_comm_free, mp_environ, mp_irecv, mp_isend, mp_isendrecv, mp_max, mp_min, &
28        mp_request_null, mp_sum, mp_sync, mp_waitall, mp_waitany
29   USE pw_grid_types,                   ONLY: PW_MODE_LOCAL,&
30                                              pw_grid_type
31   USE pw_grids,                        ONLY: pw_grid_release,&
32                                              pw_grid_retain
33   USE pw_methods,                      ONLY: pw_integrate_function
34   USE pw_types,                        ONLY: COMPLEXDATA3D,&
35                                              REALDATA3D,&
36                                              pw_type
37   USE util,                            ONLY: get_limit
38
39!$ USE OMP_LIB, ONLY: omp_get_max_threads, omp_get_thread_num, omp_get_num_threads
40
41#include "../base/base_uses.f90"
42
43   IMPLICIT NONE
44
45   PRIVATE
46   PUBLIC :: realspace_grid_type, &
47             realspace_grid_desc_type, &
48             realspace_grid_p_type, &
49             realspace_grid_desc_p_type, &
50             realspace_grid_input_type
51
52   PUBLIC :: rs_pw_transfer, &
53             rs_grid_zero, &
54             rs_grid_set_box, &
55             rs_grid_create, &
56             rs_grid_create_descriptor, &
57             rs_grid_retain, &
58             rs_grid_retain_descriptor, &
59             rs_grid_release, &
60             rs_grid_release_descriptor, &
61             rs_grid_reorder_ranks, &
62             rs_grid_print, &
63             rs_grid_locate_rank, &
64             rs_grid_max_ngpts, &
65             rs_grid_mult_and_add
66
67   INTEGER, PARAMETER, PUBLIC               :: rsgrid_distributed = 0, &
68                                               rsgrid_replicated = 1, &
69                                               rsgrid_automatic = 2
70
71   LOGICAL, PRIVATE, PARAMETER :: debug_this_module = .FALSE.
72   CHARACTER(len=*), PARAMETER, PRIVATE :: moduleN = 'realspace_grid_types'
73   INTEGER, SAVE, PRIVATE :: last_rs_id = 0
74   INTEGER, SAVE, PRIVATE :: allocated_rs_grid_count = 0
75   INTEGER, PARAMETER, PUBLIC :: rs2pw = 11, pw2rs = 12
76
77! **************************************************************************************************
78   TYPE realspace_grid_input_type
79      INTEGER       :: distribution_type
80      INTEGER       :: distribution_layout(3)
81      REAL(KIND=dp) :: memory_factor
82      LOGICAL       :: lock_distribution
83      INTEGER       :: nsmax
84      REAL(KIND=dp) :: halo_reduction_factor
85   END TYPE realspace_grid_input_type
86
87! **************************************************************************************************
88   TYPE realspace_grid_desc_type
89      INTEGER :: grid_id ! tag of the pw_grid
90
91      TYPE(pw_grid_type), POINTER   :: pw ! the pw grid
92
93      INTEGER :: ref_count ! reference count
94
95      INTEGER(int_8) :: ngpts ! # grid points
96      INTEGER, DIMENSION(3) :: npts ! # grid points per dimension
97      INTEGER, DIMENSION(3) :: lb ! lower bounds
98      INTEGER, DIMENSION(3) :: ub ! upper bounds
99
100      INTEGER :: border ! border points
101
102      INTEGER, DIMENSION(3) :: perd ! periodicity enforced
103      REAL(KIND=dp), DIMENSION(3, 3) :: dh ! incremental grid matrix
104      REAL(KIND=dp), DIMENSION(3, 3) :: dh_inv ! inverse incremental grid matrix
105      LOGICAL :: orthorhombic ! grid symmetry
106
107      LOGICAL :: parallel ! whether the corresponding pw grid is distributed
108      LOGICAL :: distributed ! whether the rs grid is distributed
109      ! these MPI related quantities are only meaningful depending on how the grid has been laid out
110      ! they are most useful for fully distributed grids, where they reflect the topology of the grid
111      INTEGER :: group
112      INTEGER :: my_pos
113      LOGICAL :: group_head
114      INTEGER :: group_size
115      INTEGER, DIMENSION(3) :: group_dim
116      INTEGER, DIMENSION(3) :: group_coor
117      INTEGER, DIMENSION(3) :: neighbours
118      ! only meaningful on distributed grids
119      ! a list of bounds for each CPU
120      INTEGER, DIMENSION(:, :), POINTER :: lb_global
121      INTEGER, DIMENSION(:, :), POINTER :: ub_global
122      ! a mapping from linear rank to 3d coord
123      INTEGER, DIMENSION(:, :), POINTER :: rank2coord
124      INTEGER, DIMENSION(:, :, :), POINTER :: coord2rank
125      ! a mapping from index to rank (which allows to figure out easily on which rank a given point of the grid is)
126      INTEGER, DIMENSION(:), POINTER :: x2coord
127      INTEGER, DIMENSION(:), POINTER :: y2coord
128      INTEGER, DIMENSION(:), POINTER :: z2coord
129
130      INTEGER                :: my_virtual_pos
131      INTEGER, DIMENSION(3) :: virtual_group_coor
132
133      INTEGER, DIMENSION(:), ALLOCATABLE :: virtual2real, real2virtual
134
135   END TYPE realspace_grid_desc_type
136
137   TYPE realspace_grid_type
138
139      TYPE(realspace_grid_desc_type), POINTER :: desc
140
141      INTEGER :: id_nr ! unique identifier of rs
142      INTEGER :: ref_count ! reference count
143
144      INTEGER :: ngpts_local ! local dimensions
145      INTEGER, DIMENSION(3) :: npts_local
146      INTEGER, DIMENSION(3) :: lb_local
147      INTEGER, DIMENSION(3) :: ub_local
148      INTEGER, DIMENSION(3) :: lb_real ! lower bounds of the real local data
149      INTEGER, DIMENSION(3) :: ub_real ! upper bounds of the real local data
150
151      INTEGER, DIMENSION(:), POINTER :: px, py, pz ! index translators
152      REAL(KIND=dp), DIMENSION(:, :, :), POINTER :: r ! the grid
153
154   END TYPE realspace_grid_type
155
156! **************************************************************************************************
157   TYPE realspace_grid_p_type
158      TYPE(realspace_grid_type), POINTER :: rs_grid
159   END TYPE realspace_grid_p_type
160
161   TYPE realspace_grid_desc_p_type
162      TYPE(realspace_grid_desc_type), POINTER :: rs_desc
163   END TYPE realspace_grid_desc_p_type
164
165CONTAINS
166
167! **************************************************************************************************
168!> \brief returns the 1D rank of the task which is a cartesian shift away from 1D rank rank_in
169!>        only possible if rs_grid is a distributed grid
170!> \param rs_desc ...
171!> \param rank_in ...
172!> \param shift ...
173!> \return ...
174! **************************************************************************************************
175   FUNCTION rs_grid_locate_rank(rs_desc, rank_in, shift) RESULT(rank_out)
176      TYPE(realspace_grid_desc_type), POINTER            :: rs_desc
177      INTEGER, INTENT(IN)                                :: rank_in
178      INTEGER, DIMENSION(3), INTENT(IN)                  :: shift
179      INTEGER                                            :: rank_out
180
181      INTEGER                                            :: coord(3)
182
183      coord = MODULO(rs_desc%rank2coord(:, rank_in) + shift, rs_desc%group_dim)
184      rank_out = rs_desc%coord2rank(coord(1), coord(2), coord(3))
185   END FUNCTION rs_grid_locate_rank
186
187! **************************************************************************************************
188!> \brief Determine the setup of real space grids - this is divided up into the
189!>        creation of a descriptor and the actual grid itself (see rs_grid_create)
190!> \param desc ...
191!> \param pw_grid ...
192!> \param input_settings ...
193!> \param border_points ...
194!> \par History
195!>      JGH (08-Jun-2003) : nsmax <= 0 indicates fully replicated grid
196!>      Iain Bethune (05-Sep-2008) : modified cut heuristic
197!>      (c) The Numerical Algorithms Group (NAG) Ltd, 2008 on behalf of the HECToR project
198!>      - Create a descriptor for realspace grids with a number of border
199!>        points as exactly given by the optional argument border_points.
200!>        These grids are always distributed.
201!>        (27.11.2013, Matthias Krack)
202!> \author JGH (18-Mar-2001)
203! **************************************************************************************************
204   SUBROUTINE rs_grid_create_descriptor(desc, pw_grid, input_settings, border_points)
205      TYPE(realspace_grid_desc_type), POINTER            :: desc
206      TYPE(pw_grid_type), POINTER                        :: pw_grid
207      TYPE(realspace_grid_input_type), INTENT(IN)        :: input_settings
208      INTEGER, INTENT(IN), OPTIONAL                      :: border_points
209
210      CHARACTER(LEN=*), PARAMETER :: routineN = 'rs_grid_create_descriptor', &
211         routineP = moduleN//':'//routineN
212
213      INTEGER                                            :: border_size, dir, handle, i, j, k, l, &
214                                                            lb(2), n_slices(3), n_slices_tmp(3), &
215                                                            neighbours(3), nmin
216      LOGICAL                                            :: overlap
217      REAL(KIND=dp)                                      :: ratio, ratio_best, volume, volume_dist
218
219      CALL timeset(routineN, handle)
220
221      CPASSERT(ASSOCIATED(pw_grid))
222
223      IF (PRESENT(border_points)) THEN
224         border_size = border_points
225      ELSE
226         border_size = 0
227      END IF
228
229      ALLOCATE (desc)
230
231      CALL mp_sync(pw_grid%para%group)
232
233      NULLIFY (desc%rank2coord, desc%coord2rank, desc%lb_global, desc%ub_global, desc%x2coord, desc%y2coord, desc%z2coord)
234
235      desc%pw => pw_grid
236      CALL pw_grid_retain(desc%pw)
237
238      desc%dh = pw_grid%dh
239      desc%dh_inv = pw_grid%dh_inv
240      desc%orthorhombic = pw_grid%orthorhombic
241      desc%grid_id = pw_grid%id_nr
242      desc%ref_count = 1
243
244      IF (pw_grid%para%mode == PW_MODE_LOCAL) THEN
245         ! The corresponding group has dimension 1
246         ! All operations will be done locally
247         desc%npts = pw_grid%npts
248         desc%ngpts = PRODUCT(INT(desc%npts, KIND=int_8))
249         desc%lb = pw_grid%bounds(1, :)
250         desc%ub = pw_grid%bounds(2, :)
251         desc%border = border_size
252         IF (border_size == 0) THEN
253            desc%perd = 1
254         ELSE
255            desc%perd = 0
256         END IF
257         desc%parallel = .FALSE.
258         desc%distributed = .FALSE.
259         desc%group = -1
260         desc%group_size = 1
261         desc%group_head = .TRUE.
262         desc%group_dim = 1
263         desc%group_coor = 0
264         desc%my_pos = 0
265      ELSE
266         ! group size of desc grid
267         ! global grid dimensions are still the same
268         desc%group_size = pw_grid%para%group_size
269         desc%npts = pw_grid%npts
270         desc%ngpts = PRODUCT(INT(desc%npts, KIND=int_8))
271         desc%lb = pw_grid%bounds(1, :)
272         desc%ub = pw_grid%bounds(2, :)
273
274         ! this is the eventual border size
275         IF (border_size == 0) THEN
276            nmin = (input_settings%nsmax + 1)/2
277            nmin = MAX(0, NINT(nmin*input_settings%halo_reduction_factor))
278         ELSE
279            ! Set explicitly the requested border size
280            nmin = border_size
281         END IF
282
283         IF (input_settings%distribution_type == rsgrid_replicated) THEN
284
285            n_slices = 1
286            IF (border_size > 0) THEN
287               CALL cp_abort(__LOCATION__, &
288                             "An explicit border size > 0 is not yet working for "// &
289                             "replicated realspace grids. Request DISTRIBUTION_TYPE "// &
290                             "distributed for RS_GRID explicitly.")
291            END IF
292
293         ELSE
294
295            n_slices = 1
296            ratio_best = -HUGE(ratio_best)
297
298            ! don't allow distributions with more processors than real grid points
299            DO k = 1, MIN(desc%npts(3), desc%group_size)
300            DO j = 1, MIN(desc%npts(2), desc%group_size)
301               i = MIN(desc%npts(1), desc%group_size/(j*k))
302               n_slices_tmp = (/i, j, k/)
303
304               ! we don't match the actual number of CPUs
305               IF (PRODUCT(n_slices_tmp) .NE. desc%group_size) CYCLE
306
307               ! we see if there has been a input constraint
308               ! i.e. if the layout is not -1 we need to fullfil it
309               IF (.NOT. ALL(PACK(n_slices_tmp == input_settings%distribution_layout, &
310                                  (/-1, -1, -1/) /= input_settings%distribution_layout) &
311                             )) CYCLE
312
313               ! we currently can not work with a grid that has borders that overlaps with the local data of the grid itself
314               overlap = .FALSE.
315               DO dir = 1, 3
316                  IF (n_slices_tmp(dir) > 1) THEN
317                     neighbours(dir) = HUGE(0)
318                     DO l = 0, n_slices_tmp(dir) - 1
319                        lb = get_limit(desc%npts(dir), n_slices_tmp(dir), l)
320                        neighbours(dir) = MIN(lb(2) - lb(1) + 1, neighbours(dir))
321                     ENDDO
322                     desc%neighbours(dir) = (nmin + neighbours(dir) - 1)/neighbours(dir)
323                     IF (desc%neighbours(dir) .GE. n_slices_tmp(dir)) overlap = .TRUE.
324                  ELSE
325                     neighbours(dir) = 0
326                  ENDIF
327               ENDDO
328               ! XXXXXXX I think the overlap criterion / neighbour calculation is too conservative
329               ! write(*,*) n_slices_tmp,desc % neighbours, overlap
330               IF (overlap) CYCLE
331
332               ! a heuristic optimisation to reduce the memory usage
333               ! we go for the smallest local to real volume
334               ! volume of the box without the wings / volume of the box with the wings
335               ! with prefactodesc to promote less cuts in Z dimension
336               ratio = PRODUCT(REAL(desc%npts, KIND=dp)/n_slices_tmp)/ &
337                       PRODUCT(REAL(desc%npts, KIND=dp)/n_slices_tmp + &
338                               MERGE((/0.0, 0.0, 0.0/), 2*(/1.06*nmin, 1.05*nmin, 1.03*nmin/), n_slices_tmp == (/1, 1, 1/)))
339               IF (ratio > ratio_best) THEN
340                  ratio_best = ratio
341                  n_slices = n_slices_tmp
342               ENDIF
343
344            ENDDO
345            ENDDO
346
347            ! if automatic we can still decide this is a replicated grid
348            ! if the memory gain (or the gain is messages) is too small.
349            IF (input_settings%distribution_type == rsgrid_automatic) THEN
350               volume = PRODUCT(REAL(desc%npts, KIND=dp))
351               volume_dist = PRODUCT(REAL(desc%npts, KIND=dp)/n_slices + &
352                                     MERGE((/0, 0, 0/), 2*(/nmin, nmin, nmin/), n_slices == (/1, 1, 1/)))
353               IF (volume < volume_dist*input_settings%memory_factor) THEN
354                  n_slices = 1
355               END IF
356            END IF
357
358         END IF
359
360         desc%group_dim(:) = n_slices(:)
361         CALL mp_comm_dup(pw_grid%para%group, desc%group)
362         CALL mp_environ(desc%group_size, desc%my_pos, desc%group)
363
364         IF (ALL(n_slices == 1)) THEN
365            ! CASE 1 : only one slice: we do not need overlapping regions and special
366            !          recombination of the total density
367            desc%border = border_size
368            IF (border_size == 0) THEN
369               desc%perd = 1
370            ELSE
371               desc%perd = 0
372            END IF
373            desc%distributed = .FALSE.
374            desc%parallel = .TRUE.
375            desc%group_head = pw_grid%para%group_head
376            desc%group_coor(:) = 0
377            desc%my_virtual_pos = 0
378
379            ALLOCATE (desc%virtual2real(0:desc%group_size - 1))
380            ALLOCATE (desc%real2virtual(0:desc%group_size - 1))
381            ! Start with no reordering
382            DO i = 0, desc%group_size - 1
383               desc%virtual2real(i) = i
384               desc%real2virtual(i) = i
385            END DO
386         ELSE
387            ! CASE 2 : general case
388            ! periodicity is no longer enforced arbritary directions
389            IF (border_size == 0) THEN
390               desc%perd = 1
391               DO dir = 1, 3
392                  IF (n_slices(dir) > 1) desc%perd(dir) = 0
393               END DO
394            ELSE
395               desc%perd(:) = 0
396            END IF
397            ! we keep a border of nmin points
398            desc%border = nmin
399            ! we are going parallel on the real space grid
400            desc%parallel = .TRUE.
401            desc%distributed = .TRUE.
402            desc%group_head = (desc%my_pos == 0)
403
404            ! set up global info about the distribution
405            ALLOCATE (desc%rank2coord(3, 0:desc%group_size - 1))
406            ALLOCATE (desc%coord2rank(0:desc%group_dim(1) - 1, 0:desc%group_dim(2) - 1, 0:desc%group_dim(3) - 1))
407            ALLOCATE (desc%lb_global(3, 0:desc%group_size - 1))
408            ALLOCATE (desc%ub_global(3, 0:desc%group_size - 1))
409            ALLOCATE (desc%x2coord(desc%lb(1):desc%ub(1)))
410            ALLOCATE (desc%y2coord(desc%lb(2):desc%ub(2)))
411            ALLOCATE (desc%z2coord(desc%lb(3):desc%ub(3)))
412
413            DO i = 0, desc%group_size - 1
414               ! Calculate coordinates in a row-major order (to be SMP-friendly)
415               desc%rank2coord(1, i) = i/(desc%group_dim(2)*desc%group_dim(3))
416               desc%rank2coord(2, i) = MODULO(i, desc%group_dim(2)*desc%group_dim(3)) &
417                                       /desc%group_dim(3)
418               desc%rank2coord(3, i) = MODULO(i, desc%group_dim(3))
419
420               IF (i == desc%my_pos) THEN
421                  desc%group_coor = desc%rank2coord(:, i)
422               END IF
423
424               desc%coord2rank(desc%rank2coord(1, i), desc%rank2coord(2, i), desc%rank2coord(3, i)) = i
425               ! the lb_global and ub_global correspond to lb_real and ub_real of each task
426               desc%lb_global(:, i) = desc%lb
427               desc%ub_global(:, i) = desc%ub
428               DO dir = 1, 3
429                  IF (desc%group_dim(dir) .GT. 1) THEN
430                     lb = get_limit(desc%npts(dir), desc%group_dim(dir), desc%rank2coord(dir, i))
431                     desc%lb_global(dir, i) = lb(1) + desc%lb(dir) - 1
432                     desc%ub_global(dir, i) = lb(2) + desc%lb(dir) - 1
433                  ENDIF
434               ENDDO
435            ENDDO
436
437            ! map a grid point to a CPU coord
438            DO dir = 1, 3
439               DO l = 0, desc%group_dim(dir) - 1
440                  IF (desc%group_dim(dir) .GT. 1) THEN
441                     lb = get_limit(desc%npts(dir), desc%group_dim(dir), l)
442                     lb = lb + desc%lb(dir) - 1
443                  ELSE
444                     lb(1) = desc%lb(dir)
445                     lb(2) = desc%ub(dir)
446                  ENDIF
447                  SELECT CASE (dir)
448                  CASE (1)
449                     desc%x2coord(lb(1):lb(2)) = l
450                  CASE (2)
451                     desc%y2coord(lb(1):lb(2)) = l
452                  CASE (3)
453                     desc%z2coord(lb(1):lb(2)) = l
454                  END SELECT
455               ENDDO
456            ENDDO
457
458            ! an upper bound for the number of neighbours the border is overlapping with
459            DO dir = 1, 3
460               desc%neighbours(dir) = 0
461               IF ((n_slices(dir) > 1) .OR. (border_size > 0)) THEN
462                  neighbours(dir) = HUGE(0)
463                  DO l = 0, n_slices(dir) - 1
464                     lb = get_limit(desc%npts(dir), n_slices(dir), l)
465                     neighbours(dir) = MIN(lb(2) - lb(1) + 1, neighbours(dir))
466                  END DO
467                  desc%neighbours(dir) = (desc%border + neighbours(dir) - 1)/neighbours(dir)
468               END IF
469            END DO
470
471            ALLOCATE (desc%virtual2real(0:desc%group_size - 1))
472            ALLOCATE (desc%real2virtual(0:desc%group_size - 1))
473            ! Start with no reordering
474            DO i = 0, desc%group_size - 1
475               desc%virtual2real(i) = i
476               desc%real2virtual(i) = i
477            END DO
478
479            desc%my_virtual_pos = desc%real2virtual(desc%my_pos)
480            desc%virtual_group_coor(:) = desc%rank2coord(:, desc%my_virtual_pos)
481
482         END IF
483      END IF
484
485      CALL timestop(handle)
486
487   END SUBROUTINE rs_grid_create_descriptor
488
489! **************************************************************************************************
490!> \brief ...
491!> \param rs ...
492!> \param desc ...
493! **************************************************************************************************
494   SUBROUTINE rs_grid_create(rs, desc)
495      TYPE(realspace_grid_type), POINTER                 :: rs
496      TYPE(realspace_grid_desc_type), POINTER            :: desc
497
498      CHARACTER(LEN=*), PARAMETER :: routineN = 'rs_grid_create', routineP = moduleN//':'//routineN
499
500      INTEGER                                            :: handle
501
502      CALL timeset(routineN, handle)
503
504      ALLOCATE (rs)
505
506      last_rs_id = last_rs_id + 1
507      rs%id_nr = last_rs_id
508      rs%ref_count = 1
509      rs%desc => desc
510      CALL rs_grid_retain_descriptor(rs%desc)
511
512      IF (desc%pw%para%mode == PW_MODE_LOCAL) THEN
513         ! The corresponding group has dimension 1
514         ! All operations will be done locally
515         rs%lb_real = desc%lb
516         rs%ub_real = desc%ub
517         rs%lb_local = rs%lb_real - desc%border*(1 - desc%perd)
518         rs%ub_local = rs%ub_real + desc%border*(1 - desc%perd)
519         rs%npts_local = rs%ub_local - rs%lb_local + 1
520         rs%ngpts_local = PRODUCT(rs%npts_local)
521      END IF
522
523      IF (ALL(rs%desc%group_dim == 1)) THEN
524         ! CASE 1 : only one slice: we do not need overlapping regions and special
525         !          recombination of the total density
526         rs%lb_real = desc%lb
527         rs%ub_real = desc%ub
528         rs%lb_local = rs%lb_real - desc%border*(1 - desc%perd)
529         rs%ub_local = rs%ub_real + desc%border*(1 - desc%perd)
530         rs%npts_local = rs%ub_local - rs%lb_local + 1
531         rs%ngpts_local = PRODUCT(rs%npts_local)
532      ELSE
533         ! CASE 2 : general case
534         ! extract some more derived quantities about the local grid
535         rs%lb_real = desc%lb_global(:, desc%my_virtual_pos)
536         rs%ub_real = desc%ub_global(:, desc%my_virtual_pos)
537         rs%lb_local = rs%lb_real - desc%border*(1 - desc%perd)
538         rs%ub_local = rs%ub_real + desc%border*(1 - desc%perd)
539         rs%npts_local = rs%ub_local - rs%lb_local + 1
540         rs%ngpts_local = PRODUCT(rs%npts_local)
541      END IF
542
543      allocated_rs_grid_count = allocated_rs_grid_count + 1
544
545      ALLOCATE (rs%r(rs%lb_local(1):rs%ub_local(1), &
546                     rs%lb_local(2):rs%ub_local(2), &
547                     rs%lb_local(3):rs%ub_local(3)))
548      ALLOCATE (rs%px(desc%npts(1)))
549      ALLOCATE (rs%py(desc%npts(2)))
550      ALLOCATE (rs%pz(desc%npts(3)))
551
552      CALL timestop(handle)
553
554   END SUBROUTINE rs_grid_create
555
556! **************************************************************************************************
557!> \brief Defines a new ordering of ranks on this realspace grid, recalculating
558!>        the data bounds and reallocating the grid.  As a result, each MPI process
559!>        now has a real rank (i.e. it's rank in the MPI communicator from the pw grid)
560!>        and a virtual rank (the rank of the process where the data now owned by this
561!>        process would reside in an ordinary cartesian distribution).
562!>        NB. Since the grid size required may change, the caller should be sure to release
563!>        and recreate the corresponding rs_grids
564!>        The desc%real2virtual and desc%virtual2real arrays can be used to map
565!>        a physical rank to the 'rank' of data owned by that process and vice versa
566!> \param desc ...
567!> \param real2virtual ...
568!> \par History
569!>        04-2009 created [Iain Bethune]
570!>          (c) The Numerical Algorithms Group (NAG) Ltd, 2009 on behalf of the HECToR project
571! **************************************************************************************************
572   SUBROUTINE rs_grid_reorder_ranks(desc, real2virtual)
573
574      TYPE(realspace_grid_desc_type), POINTER            :: desc
575      INTEGER, DIMENSION(:), INTENT(IN)                  :: real2virtual
576
577      CHARACTER(LEN=*), PARAMETER :: routineN = 'rs_grid_reorder_ranks', &
578         routineP = moduleN//':'//routineN
579
580      INTEGER                                            :: handle, i
581
582      CALL timeset(routineN, handle)
583
584      desc%real2virtual(:) = real2virtual
585
586      DO i = 0, desc%group_size - 1
587         desc%virtual2real(desc%real2virtual(i)) = i
588      END DO
589
590      desc%my_virtual_pos = desc%real2virtual(desc%my_pos)
591
592      IF (.NOT. ALL(desc%group_dim == 1)) THEN
593         desc%virtual_group_coor(:) = desc%rank2coord(:, desc%my_virtual_pos)
594      END IF
595
596      CALL timestop(handle)
597
598   END SUBROUTINE
599
600! **************************************************************************************************
601!> \brief Print information on grids to output
602!> \param rs ...
603!> \param iounit ...
604!> \author JGH (17-May-2007)
605! **************************************************************************************************
606   SUBROUTINE rs_grid_print(rs, iounit)
607      TYPE(realspace_grid_type), POINTER                 :: rs
608      INTEGER, INTENT(in)                                :: iounit
609
610      CHARACTER(LEN=*), PARAMETER :: routineN = 'rs_grid_print', routineP = moduleN//':'//routineN
611
612      INTEGER                                            :: dir, i, nn
613      REAL(KIND=dp)                                      :: pp(3)
614
615      IF (rs%desc%parallel) THEN
616         IF (iounit > 0) THEN
617            WRITE (iounit, '(/,A,T71,I10)') &
618               " RS_GRID| Information for grid number ", rs%desc%grid_id
619            DO i = 1, 3
620               WRITE (iounit, '(A,I3,T30,2I8,T62,A,T71,I10)') " RS_GRID|   Bounds ", &
621                  i, rs%desc%lb(i), rs%desc%ub(i), "Points:", rs%desc%npts(i)
622            END DO
623            IF (.NOT. rs%desc%distributed) THEN
624               WRITE (iounit, '(A)') " RS_GRID| Real space fully replicated"
625               WRITE (iounit, '(A,T71,I10)') &
626                  " RS_GRID| Group size ", rs%desc%group_dim(2)
627            ELSE
628               DO dir = 1, 3
629                  IF (rs%desc%perd(dir) /= 1) THEN
630                     WRITE (iounit, '(A,T71,I3,A)') &
631                        " RS_GRID| Real space distribution over ", rs%desc%group_dim(dir), " groups"
632                     WRITE (iounit, '(A,T71,I10)') &
633                        " RS_GRID| Real space distribution along direction ", dir
634                     WRITE (iounit, '(A,T71,I10)') &
635                        " RS_GRID| Border size ", rs%desc%border
636                  END IF
637               END DO
638            END IF
639         END IF
640         IF (rs%desc%distributed) THEN
641            DO dir = 1, 3
642               IF (rs%desc%perd(dir) /= 1) THEN
643                  nn = rs%npts_local(dir)
644                  CALL mp_sum(nn, rs%desc%group)
645                  pp(1) = REAL(nn, KIND=dp)/REAL(PRODUCT(rs%desc%group_dim), KIND=dp)
646                  nn = rs%npts_local(dir)
647                  CALL mp_max(nn, rs%desc%group)
648                  pp(2) = REAL(nn, KIND=dp)
649                  nn = rs%npts_local(dir)
650                  CALL mp_min(nn, rs%desc%group)
651                  pp(3) = REAL(nn, KIND=dp)
652                  IF (iounit > 0) THEN
653                     WRITE (iounit, '(A,T48,A)') " RS_GRID|   Distribution", &
654                        "  Average         Max         Min"
655                     WRITE (iounit, '(A,T45,F12.1,2I12)') " RS_GRID|   Planes   ", &
656                        pp(1), NINT(pp(2)), NINT(pp(3))
657                  END IF
658               END IF
659            END DO
660!          WRITE ( iounit, '(/)' )
661         END IF
662      ELSE
663         IF (iounit > 0) THEN
664            WRITE (iounit, '(/,A,T71,I10)') &
665               " RS_GRID| Information for grid number ", rs%desc%grid_id
666            DO i = 1, 3
667               WRITE (iounit, '(A,I3,T30,2I8,T62,A,T71,I10)') " RS_GRID|   Bounds ", &
668                  i, rs%desc%lb(i), rs%desc%ub(i), "Points:", rs%desc%npts(i)
669            END DO
670!         WRITE ( iounit, '(/)' )
671         ENDIF
672      END IF
673
674   END SUBROUTINE rs_grid_print
675
676! **************************************************************************************************
677!> \brief Copy a function from/to a PW grid type to/from a real
678!>      space type
679!>      dir is the named constant rs2pw or pw2rs
680!> \param rs ...
681!> \param pw ...
682!> \param dir ...
683!> \par History
684!>      JGH (15-Feb-2003) reduced additional memory usage
685!>      Joost VandeVondele (Sep-2003) moved from sum/bcast to shift
686!> \author JGH (18-Mar-2001)
687! **************************************************************************************************
688   SUBROUTINE rs_pw_transfer(rs, pw, dir)
689
690      TYPE(realspace_grid_type), POINTER                 :: rs
691      TYPE(pw_type), POINTER                             :: pw
692      INTEGER, INTENT(IN)                                :: dir
693
694      CHARACTER(len=*), PARAMETER :: routineN = 'rs_pw_transfer', routineP = moduleN//':'//routineN
695
696      CHARACTER(LEN=5)                                   :: dirname
697      INTEGER                                            :: handle, handle2, i, im, j, jm, k, km
698
699      CALL timeset(routineN, handle2)
700      IF (dir .EQ. rs2pw) dirname = "RS2PW"
701      IF (dir .EQ. pw2rs) dirname = "PW2RS"
702      CALL timeset(routineN//"_"//TRIM(dirname)//"_"//TRIM(ADJUSTL( &
703                                                           cp_to_string(CEILING(pw%pw_grid%cutoff/10)*10))), handle)
704
705      IF (rs%desc%grid_id /= pw%pw_grid%id_nr) &
706         CPABORT("Different rs and pw indentifiers")
707      IF ((pw%in_use .NE. REALDATA3D) .AND. (pw%in_use .NE. COMPLEXDATA3D)) &
708         CPABORT("Need REALDATA3D or COMPLEXDATA3D")
709      IF ((dir .NE. rs2pw) .AND. (dir .NE. pw2rs)) &
710         CPABORT("Direction must be rs2pw or pw2rs")
711
712      IF (rs%desc%distributed) THEN
713         CALL rs_pw_transfer_distributed(rs, pw, dir)
714      ELSE IF (rs%desc%parallel) THEN
715         CALL rs_pw_transfer_replicated(rs, pw, dir)
716      ELSE ! treat simple serial case locally
717         IF (dir == rs2pw) THEN
718            IF (pw%in_use == REALDATA3D) THEN
719               IF (rs%desc%border == 0) THEN
720                  CALL dcopy(SIZE(rs%r), rs%r, 1, pw%cr3d, 1)
721               ELSE
722                  CPASSERT(LBOUND(pw%cr3d, 3) .EQ. rs%lb_real(3))
723!$OMP                 PARALLEL DO DEFAULT(NONE) SHARED(pw,rs)
724                  DO i = rs%lb_real(3), rs%ub_real(3)
725                     pw%cr3d(:, :, i) = rs%r(rs%lb_real(1):rs%ub_real(1), &
726                                             rs%lb_real(2):rs%ub_real(2), i)
727                  END DO
728!$OMP                 END PARALLEL DO
729               END IF
730            ELSEIF (pw%in_use == COMPLEXDATA3D) THEN
731               IF (rs%desc%border == 0) THEN
732                  CALL copy_rc(rs%r, pw%cc3d)
733               ELSE
734                  CPASSERT(LBOUND(pw%cr3d, 3) .EQ. rs%lb_real(3))
735!$OMP                 PARALLEL DO DEFAULT(NONE) SHARED(pw,rs)
736                  DO i = rs%lb_real(3), rs%ub_real(3)
737                     pw%cc3d(:, :, i) = CMPLX(rs%r(rs%lb_real(1):rs%ub_real(1), &
738                                                   rs%lb_real(2):rs%ub_real(2), i), &
739                                              0.0_dp, KIND=dp)
740                  END DO
741!$OMP                 END PARALLEL DO
742               END IF
743            ELSE
744               CPABORT("PW type not compatible")
745            END IF
746         ELSE
747            IF (pw%in_use == REALDATA3D) THEN
748               IF (rs%desc%border == 0) THEN
749                  CALL dcopy(SIZE(rs%r), pw%cr3d, 1, rs%r, 1)
750               ELSE
751!$OMP                 PARALLEL DO DEFAULT(NONE) &
752!$OMP                             PRIVATE(i,im,j,jm,k,km) &
753!$OMP                             SHARED(pw,rs)
754                  DO k = rs%lb_local(3), rs%ub_local(3)
755                     IF (k < rs%lb_real(3)) THEN
756                        km = k + rs%desc%npts(3)
757                     ELSE IF (k > rs%ub_real(3)) THEN
758                        km = k - rs%desc%npts(3)
759                     ELSE
760                        km = k
761                     END IF
762                     DO j = rs%lb_local(2), rs%ub_local(2)
763                        IF (j < rs%lb_real(2)) THEN
764                           jm = j + rs%desc%npts(2)
765                        ELSE IF (j > rs%ub_real(2)) THEN
766                           jm = j - rs%desc%npts(2)
767                        ELSE
768                           jm = j
769                        END IF
770                        DO i = rs%lb_local(1), rs%ub_local(1)
771                           IF (i < rs%lb_real(1)) THEN
772                              im = i + rs%desc%npts(1)
773                           ELSE IF (i > rs%ub_real(1)) THEN
774                              im = i - rs%desc%npts(1)
775                           ELSE
776                              im = i
777                           END IF
778                           rs%r(i, j, k) = pw%cr3d(im, jm, km)
779                        END DO
780                     END DO
781                  END DO
782!$OMP                 END PARALLEL DO
783               END IF
784            ELSEIF (pw%in_use == COMPLEXDATA3D) THEN
785               IF (rs%desc%border == 0) THEN
786                  CALL copy_cr(pw%cc3d, rs%r)
787               ELSE
788!$OMP                 PARALLEL DO DEFAULT(NONE) &
789!$OMP                             PRIVATE(i,im,j,jm,k,km) &
790!$OMP                             SHARED(pw,rs)
791                  DO k = rs%lb_local(3), rs%ub_local(3)
792                     IF (k < rs%lb_real(3)) THEN
793                        km = k + rs%desc%npts(3)
794                     ELSE IF (k > rs%ub_real(3)) THEN
795                        km = k - rs%desc%npts(3)
796                     ELSE
797                        km = k
798                     END IF
799                     DO j = rs%lb_local(2), rs%ub_local(2)
800                        IF (j < rs%lb_real(2)) THEN
801                           jm = j + rs%desc%npts(2)
802                        ELSE IF (j > rs%ub_real(2)) THEN
803                           jm = j - rs%desc%npts(2)
804                        ELSE
805                           jm = j
806                        END IF
807                        DO i = rs%lb_local(1), rs%ub_local(1)
808                           IF (i < rs%lb_real(1)) THEN
809                              im = i + rs%desc%npts(1)
810                           ELSE IF (i > rs%ub_real(1)) THEN
811                              im = i - rs%desc%npts(1)
812                           ELSE
813                              im = i
814                           END IF
815                           rs%r(i, j, k) = REAL(pw%cc3d(im, jm, km), KIND=dp)
816                        END DO
817                     END DO
818                  END DO
819!$OMP                 END PARALLEL DO
820               END IF
821            ELSE
822               CPABORT("PW type not compatible")
823            END IF
824         END IF
825      END IF
826
827      CALL timestop(handle)
828      CALL timestop(handle2)
829
830   END SUBROUTINE rs_pw_transfer
831
832! **************************************************************************************************
833!> \brief transfer from a realspace grid to a planewave grid or the other way around
834!> \param rs ...
835!> \param pw ...
836!> \param dir == rs2pw or dir == pw2rs
837!> \note
838!>      rs2pw sums all data on the rs grid into the respective pw grid
839!>      pw2rs will scatter all pw data to the rs grids
840! **************************************************************************************************
841   SUBROUTINE rs_pw_transfer_replicated(rs, pw, dir)
842      TYPE(realspace_grid_type), POINTER                 :: rs
843      TYPE(pw_type), POINTER                             :: pw
844      INTEGER, INTENT(IN)                                :: dir
845
846      CHARACTER(len=*), PARAMETER :: routineN = 'rs_pw_transfer_replicated', &
847         routineP = moduleN//':'//routineN
848
849      INTEGER                                            :: dest, group, i, ii, im, ip, ix, iy, iz, &
850                                                            j, jm, k, km, mepos, nma, nn, np, &
851                                                            req(2), s(3), source
852      INTEGER, ALLOCATABLE, DIMENSION(:)                 :: rcount
853      INTEGER, DIMENSION(3)                              :: lb, ub
854      INTEGER, DIMENSION(:, :), POINTER                  :: pbo
855      INTEGER, DIMENSION(:, :, :), POINTER               :: bo
856      REAL(KIND=dp), DIMENSION(:), POINTER               :: recvbuf, sendbuf, swapptr
857      REAL(KIND=dp), DIMENSION(:, :, :), POINTER         :: grid
858
859      np = pw%pw_grid%para%group_size
860      bo => pw%pw_grid%para%bo(1:2, 1:3, 0:np - 1, 1)
861      pbo => pw%pw_grid%bounds
862      group = pw%pw_grid%para%rs_group
863      mepos = pw%pw_grid%para%rs_mpo
864      ALLOCATE (rcount(0:np - 1))
865      DO ip = 1, np
866         rcount(ip - 1) = PRODUCT(bo(2, :, ip) - bo(1, :, ip) + 1)
867      END DO
868      nma = MAXVAL(rcount(0:np - 1))
869      ALLOCATE (sendbuf(nma), recvbuf(nma))
870      sendbuf = 1.0E99_dp; recvbuf = 1.0E99_dp ! init mpi'ed buffers to silence warnings under valgrind
871      grid => rs%r
872
873      !sample peak memory
874      CALL m_memory()
875
876      IF (dir == rs2pw) THEN
877         dest = MODULO(mepos + 1, np)
878         source = MODULO(mepos - 1, np)
879         sendbuf = 0.0_dp
880
881         DO ip = 1, np
882
883            lb = pbo(1, :) + bo(1, :, MODULO(mepos - ip, np) + 1) - 1
884            ub = pbo(1, :) + bo(2, :, MODULO(mepos - ip, np) + 1) - 1
885            ! this loop takes about the same time as the message passing call
886            ! notice that the range of ix is only a small fraction of the first index of grid
887            ! therefore it seems faster to have the second index as the innermost loop
888            ! if this runs on many cpus
889            ! tested on itanium, pentium4, opteron, ultrasparc...
890            s = ub - lb + 1
891            DO iz = lb(3), ub(3)
892               DO ix = lb(1), ub(1)
893                  ii = (iz - lb(3))*s(1)*s(2) + (ix - lb(1)) + 1
894                  DO iy = lb(2), ub(2)
895                     sendbuf(ii) = sendbuf(ii) + grid(ix, iy, iz)
896                     ii = ii + s(1)
897                  END DO
898               END DO
899            END DO
900            IF (ip .EQ. np) EXIT
901            CALL mp_isendrecv(sendbuf, dest, recvbuf, source, &
902                              group, req(1), req(2), 13)
903            CALL mp_waitall(req)
904            swapptr => sendbuf
905            sendbuf => recvbuf
906            recvbuf => swapptr
907         ENDDO
908         nn = rcount(mepos)
909         IF (pw%in_use == REALDATA3D) THEN
910            CALL dcopy(nn, sendbuf, 1, pw%cr3d, 1)
911         ELSEIF (pw%in_use == COMPLEXDATA3D) THEN
912            CALL copy_rc(RESHAPE(sendbuf, SHAPE(pw%cc3d)), pw%cc3d)
913         ELSE
914            CPABORT("PW type not compatible")
915         END IF
916      ELSE
917         nn = rcount(mepos)
918         IF (pw%in_use == REALDATA3D) THEN
919            CALL dcopy(nn, pw%cr3d, 1, sendbuf, 1)
920         ELSEIF (pw%in_use == COMPLEXDATA3D) THEN
921            CALL dcopy(nn, pw%cc3d, 2, sendbuf, 1)
922         ELSE
923            CPABORT("PW type not compatible")
924         END IF
925
926         dest = MODULO(mepos + 1, np)
927         source = MODULO(mepos - 1, np)
928
929         DO ip = 0, np - 1
930            ! we must shift the buffer only np-1 times around
931            IF (ip .NE. np - 1) THEN
932               CALL mp_isendrecv(sendbuf, dest, recvbuf, source, &
933                                 group, req(1), req(2), 13)
934            ENDIF
935            lb = pbo(1, :) + bo(1, :, MODULO(mepos - ip, np) + 1) - 1
936            ub = pbo(1, :) + bo(2, :, MODULO(mepos - ip, np) + 1) - 1
937            ii = 0
938            ! this loop takes about the same time as the message passing call
939            ! If I read the code correctly then:
940            ! BUG/FIXME: Unfortunately MPI standard says we are not allowed
941            ! to access the sendbuf/recvbuf while the sendrecv is still not
942            ! finished. This EXPLICITLY includes READING sendbuf!
943            ! I don't know any MPI implementation that really requires this
944            ! but the standard explicitly mandates it.
945            DO iz = lb(3), ub(3)
946               DO iy = lb(2), ub(2)
947                  DO ix = lb(1), ub(1)
948                     ii = ii + 1
949                     grid(ix, iy, iz) = sendbuf(ii)
950                  END DO
951               END DO
952            END DO
953            IF (ip .NE. np - 1) THEN
954               CALL mp_waitall(req)
955            ENDIF
956            swapptr => sendbuf
957            sendbuf => recvbuf
958            recvbuf => swapptr
959         END DO
960         IF (rs%desc%border > 0) THEN
961!$OMP           PARALLEL DO DEFAULT(NONE) &
962!$OMP                       PRIVATE(i,im,j,jm,k,km) &
963!$OMP                       SHARED(rs)
964            DO k = rs%lb_local(3), rs%ub_local(3)
965               IF (k < rs%lb_real(3)) THEN
966                  km = k + rs%desc%npts(3)
967               ELSE IF (k > rs%ub_real(3)) THEN
968                  km = k - rs%desc%npts(3)
969               ELSE
970                  km = k
971               END IF
972               DO j = rs%lb_local(2), rs%ub_local(2)
973                  IF (j < rs%lb_real(2)) THEN
974                     jm = j + rs%desc%npts(2)
975                  ELSE IF (j > rs%ub_real(2)) THEN
976                     jm = j - rs%desc%npts(2)
977                  ELSE
978                     jm = j
979                  END IF
980                  DO i = rs%lb_local(1), rs%ub_local(1)
981                     IF (i < rs%lb_real(1)) THEN
982                        im = i + rs%desc%npts(1)
983                     ELSE IF (i > rs%ub_real(1)) THEN
984                        im = i - rs%desc%npts(1)
985                     ELSE
986                        im = i
987                     END IF
988                     rs%r(i, j, k) = rs%r(im, jm, km)
989                  END DO
990               END DO
991            END DO
992!$OMP           END PARALLEL DO
993         END IF
994      END IF
995
996      DEALLOCATE (rcount)
997      DEALLOCATE (sendbuf)
998      DEALLOCATE (recvbuf)
999
1000   END SUBROUTINE rs_pw_transfer_replicated
1001
1002! **************************************************************************************************
1003!> \brief does the rs2pw and pw2rs transfer in the case where the rs grid is
1004!>       distributed (3D domain decomposition)
1005!> \param rs ...
1006!> \param pw ...
1007!> \param dir ...
1008!> \par History
1009!>      12.2007 created [Matt Watkins]
1010!>      9.2008 reduced amount of halo data sent [Iain Bethune]
1011!>      10.2008 added non-blocking communication [Iain Bethune]
1012!>      4.2009 added support for rank-reordering on the grid [Iain Bethune]
1013!>      12.2009 added OMP and sparse alltoall [Iain Bethune]
1014!>              (c) The Numerical Algorithms Group (NAG) Ltd, 2008-2009 on behalf of the HECToR project
1015!> \note
1016!>       the transfer is a two step procedure. For example, for the rs2pw transfer:
1017!>
1018!>       1) Halo-exchange in 3D so that the local part of the rs_grid contains the full data
1019!>       2) an alltoall communication to redistribute the local rs_grid to the local pw_grid
1020!>
1021!>       the halo exchange is most expensive on a large number of CPUs. Particular in this halo
1022!>       exchange is that the border region is rather large (e.g. 20 points) and that it might overlap
1023!>       with the central domain of several CPUs (i.e. next nearest neighbors)
1024! **************************************************************************************************
1025   SUBROUTINE rs_pw_transfer_distributed(rs, pw, dir)
1026      TYPE(realspace_grid_type), POINTER                 :: rs
1027      TYPE(pw_type), POINTER                             :: pw
1028      INTEGER, INTENT(IN)                                :: dir
1029
1030      CHARACTER(len=*), PARAMETER :: routineN = 'rs_pw_transfer_distributed', &
1031         routineP = moduleN//':'//routineN
1032
1033      CHARACTER(LEN=200)                                 :: error_string
1034      INTEGER :: completed, dest_down, dest_up, i, idir, j, k, lb, my_id, my_pw_rank, my_rs_rank, &
1035         n_shifts, nn, num_threads, position, source_down, source_up, ub, x, y, z
1036      INTEGER, ALLOCATABLE, DIMENSION(:)                 :: dshifts, recv_disps, recv_reqs, &
1037                                                            recv_sizes, send_disps, send_reqs, &
1038                                                            send_sizes, ushifts
1039      INTEGER, ALLOCATABLE, DIMENSION(:, :)              :: bounds, recv_tasks, send_tasks
1040      INTEGER, DIMENSION(2)                              :: neighbours, pos
1041      INTEGER, DIMENSION(3) :: coords, lb_recv, lb_recv_down, lb_recv_up, lb_send, lb_send_down, &
1042         lb_send_up, ub_recv, ub_recv_down, ub_recv_up, ub_send, ub_send_down, ub_send_up
1043      INTEGER, DIMENSION(4)                              :: req
1044      LOGICAL, DIMENSION(3)                              :: halo_swapped
1045      REAL(KIND=dp)                                      :: pw_sum, rs_sum
1046      REAL(KIND=dp), DIMENSION(:, :, :), POINTER         :: recv_buf_3d_down, recv_buf_3d_up, &
1047                                                            send_buf_3d_down, send_buf_3d_up
1048      TYPE(cp_1d_r_p_type), ALLOCATABLE, DIMENSION(:)    :: recv_bufs, send_bufs
1049
1050      num_threads = 1
1051      my_id = 0
1052
1053      IF (dir == rs2pw) THEN
1054
1055         ! safety check, to be removed once we're absolute sure the routine is correct
1056         IF (debug_this_module) THEN
1057            rs_sum = accurate_sum(rs%r)*ABS(det_3x3(rs%desc%dh))
1058            CALL mp_sum(rs_sum, rs%desc%group)
1059         ENDIF
1060
1061         halo_swapped = .FALSE.
1062         ! We don't need to send the 'edges' of the halos that have already been sent
1063         ! Halos are contiguous in memory in z-direction only, so swap these first,
1064         ! and send less data in the y and x directions which are more expensive
1065
1066         DO idir = 3, 1, -1
1067
1068            IF (rs%desc%perd(idir) .NE. 1) THEN
1069
1070               ALLOCATE (dshifts(0:rs%desc%neighbours(idir)))
1071               ALLOCATE (ushifts(0:rs%desc%neighbours(idir)))
1072
1073               ushifts = 0
1074               dshifts = 0
1075
1076               ! check that we don't try to send data to ourself
1077               DO n_shifts = 1, MIN(rs%desc%neighbours(idir), rs%desc%group_dim(idir) - 1)
1078
1079                  ! need to take into account the possible varying widths of neighbouring cells
1080                  ! offset_up and offset_down hold the real size of the neighbouring cells
1081                  position = MODULO(rs%desc%virtual_group_coor(idir) - n_shifts, rs%desc%group_dim(idir))
1082                  neighbours = get_limit(rs%desc%npts(idir), rs%desc%group_dim(idir), position)
1083                  dshifts(n_shifts) = dshifts(n_shifts - 1) + (neighbours(2) - neighbours(1) + 1)
1084
1085                  position = MODULO(rs%desc%virtual_group_coor(idir) + n_shifts, rs%desc%group_dim(idir))
1086                  neighbours = get_limit(rs%desc%npts(idir), rs%desc%group_dim(idir), position)
1087                  ushifts(n_shifts) = ushifts(n_shifts - 1) + (neighbours(2) - neighbours(1) + 1)
1088
1089                  ! The border data has to be send/received from the neighbours
1090                  ! First we calculate the source and destination processes for the shift
1091                  ! We do both shifts at once to allow for more overlap of communication and buffer packing/unpacking
1092
1093                  CALL cart_shift(rs, idir, -1*n_shifts, source_down, dest_down)
1094
1095                  lb_send_down(:) = rs%lb_local(:)
1096                  lb_recv_down(:) = rs%lb_local(:)
1097                  ub_recv_down(:) = rs%ub_local(:)
1098                  ub_send_down(:) = rs%ub_local(:)
1099
1100                  IF (dshifts(n_shifts - 1) .LE. rs%desc%border) THEN
1101                     ub_send_down(idir) = lb_send_down(idir) + rs%desc%border - 1 - dshifts(n_shifts - 1)
1102                     lb_send_down(idir) = MAX(lb_send_down(idir), &
1103                                              lb_send_down(idir) + rs%desc%border - dshifts(n_shifts))
1104
1105                     ub_recv_down(idir) = ub_recv_down(idir) - rs%desc%border
1106                     lb_recv_down(idir) = MAX(lb_recv_down(idir) + rs%desc%border, &
1107                                              ub_recv_down(idir) - rs%desc%border + 1 + ushifts(n_shifts - 1))
1108                  ELSE
1109                     lb_send_down(idir) = 0
1110                     ub_send_down(idir) = -1
1111                     lb_recv_down(idir) = 0
1112                     ub_recv_down(idir) = -1
1113                  ENDIF
1114
1115                  DO i = 1, 3
1116                     IF (halo_swapped(i)) THEN
1117                        lb_send_down(i) = rs%lb_real(i)
1118                        ub_send_down(i) = rs%ub_real(i)
1119                        lb_recv_down(i) = rs%lb_real(i)
1120                        ub_recv_down(i) = rs%ub_real(i)
1121                     ENDIF
1122                  ENDDO
1123
1124                  ! post the receive
1125                  ALLOCATE (recv_buf_3d_down(lb_recv_down(1):ub_recv_down(1), &
1126                                             lb_recv_down(2):ub_recv_down(2), lb_recv_down(3):ub_recv_down(3)))
1127                  CALL mp_irecv(recv_buf_3d_down, source_down, rs%desc%group, req(1))
1128
1129                  ! now allocate, pack and send the send buffer
1130                  nn = PRODUCT(ub_send_down - lb_send_down + 1)
1131                  ALLOCATE (send_buf_3d_down(lb_send_down(1):ub_send_down(1), &
1132                                             lb_send_down(2):ub_send_down(2), lb_send_down(3):ub_send_down(3)))
1133
1134!$OMP PARALLEL DEFAULT(NONE), &
1135!$OMP          PRIVATE(lb,ub,my_id,NUM_THREADS), &
1136!$OMP          SHARED(send_buf_3d_down,rs,lb_send_down,ub_send_down)
1137!$                num_threads = MIN(omp_get_max_threads(), ub_send_down(3) - lb_send_down(3) + 1)
1138!$                my_id = omp_get_thread_num()
1139                  IF (my_id < num_threads) THEN
1140                     lb = lb_send_down(3) + ((ub_send_down(3) - lb_send_down(3) + 1)*my_id)/num_threads
1141                     ub = lb_send_down(3) + ((ub_send_down(3) - lb_send_down(3) + 1)*(my_id + 1))/num_threads - 1
1142
1143                     send_buf_3d_down(lb_send_down(1):ub_send_down(1), lb_send_down(2):ub_send_down(2), &
1144                                      lb:ub) = rs%r(lb_send_down(1):ub_send_down(1), &
1145                                                    lb_send_down(2):ub_send_down(2), lb:ub)
1146                  END IF
1147!$OMP END PARALLEL
1148
1149                  CALL mp_isend(send_buf_3d_down, dest_down, rs%desc%group, req(3))
1150
1151                  ! Now for the other direction
1152                  CALL cart_shift(rs, idir, n_shifts, source_up, dest_up)
1153
1154                  lb_send_up(:) = rs%lb_local(:)
1155                  lb_recv_up(:) = rs%lb_local(:)
1156                  ub_recv_up(:) = rs%ub_local(:)
1157                  ub_send_up(:) = rs%ub_local(:)
1158
1159                  IF (ushifts(n_shifts - 1) .LE. rs%desc%border) THEN
1160
1161                     lb_send_up(idir) = ub_send_up(idir) - rs%desc%border + 1 + ushifts(n_shifts - 1)
1162                     ub_send_up(idir) = MIN(ub_send_up(idir), &
1163                                            ub_send_up(idir) - rs%desc%border + ushifts(n_shifts))
1164
1165                     lb_recv_up(idir) = lb_recv_up(idir) + rs%desc%border
1166                     ub_recv_up(idir) = MIN(ub_recv_up(idir) - rs%desc%border, &
1167                                            lb_recv_up(idir) + rs%desc%border - 1 - dshifts(n_shifts - 1))
1168                  ELSE
1169                     lb_send_up(idir) = 0
1170                     ub_send_up(idir) = -1
1171                     lb_recv_up(idir) = 0
1172                     ub_recv_up(idir) = -1
1173                  ENDIF
1174
1175                  DO i = 1, 3
1176                     IF (halo_swapped(i)) THEN
1177                        lb_send_up(i) = rs%lb_real(i)
1178                        ub_send_up(i) = rs%ub_real(i)
1179                        lb_recv_up(i) = rs%lb_real(i)
1180                        ub_recv_up(i) = rs%ub_real(i)
1181                     ENDIF
1182                  ENDDO
1183
1184                  ! post the receive
1185                  ALLOCATE (recv_buf_3d_up(lb_recv_up(1):ub_recv_up(1), &
1186                                           lb_recv_up(2):ub_recv_up(2), lb_recv_up(3):ub_recv_up(3)))
1187                  CALL mp_irecv(recv_buf_3d_up, source_up, rs%desc%group, req(2))
1188
1189                  ! now allocate,pack and send the send buffer
1190                  nn = PRODUCT(ub_send_up - lb_send_up + 1)
1191                  ALLOCATE (send_buf_3d_up(lb_send_up(1):ub_send_up(1), &
1192                                           lb_send_up(2):ub_send_up(2), lb_send_up(3):ub_send_up(3)))
1193
1194!$OMP PARALLEL DEFAULT(NONE), &
1195!$OMP          PRIVATE(lb,ub,my_id,NUM_THREADS), &
1196!$OMP          SHARED(send_buf_3d_up,rs,lb_send_up,ub_send_up)
1197!$                num_threads = MIN(omp_get_max_threads(), ub_send_up(3) - lb_send_up(3) + 1)
1198!$                my_id = omp_get_thread_num()
1199                  IF (my_id < num_threads) THEN
1200                     lb = lb_send_up(3) + ((ub_send_up(3) - lb_send_up(3) + 1)*my_id)/num_threads
1201                     ub = lb_send_up(3) + ((ub_send_up(3) - lb_send_up(3) + 1)*(my_id + 1))/num_threads - 1
1202
1203                     send_buf_3d_up(lb_send_up(1):ub_send_up(1), lb_send_up(2):ub_send_up(2), &
1204                                    lb:ub) = rs%r(lb_send_up(1):ub_send_up(1), &
1205                                                  lb_send_up(2):ub_send_up(2), lb:ub)
1206                  END IF
1207!$OMP END PARALLEL
1208
1209                  CALL mp_isend(send_buf_3d_up, dest_up, rs%desc%group, req(4))
1210
1211                  ! wait for a recv to complete, then we can unpack
1212
1213                  DO i = 1, 2
1214
1215                     CALL mp_waitany(req(1:2), completed)
1216
1217                     IF (completed .EQ. 1) THEN
1218
1219                        ! only some procs may need later shifts
1220                        IF (ub_recv_down(idir) .GE. lb_recv_down(idir)) THEN
1221                           ! Sum the data in the RS Grid
1222!$OMP PARALLEL DEFAULT(NONE), &
1223!$OMP          PRIVATE(lb,ub,my_id,NUM_THREADS), &
1224!$OMP          SHARED(recv_buf_3d_down,rs,lb_recv_down,ub_recv_down)
1225!$                         num_threads = MIN(omp_get_max_threads(), ub_recv_down(3) - lb_recv_down(3) + 1)
1226!$                         my_id = omp_get_thread_num()
1227                           IF (my_id < num_threads) THEN
1228                              lb = lb_recv_down(3) + ((ub_recv_down(3) - lb_recv_down(3) + 1)*my_id)/num_threads
1229                              ub = lb_recv_down(3) + ((ub_recv_down(3) - lb_recv_down(3) + 1)*(my_id + 1))/num_threads - 1
1230
1231                              rs%r(lb_recv_down(1):ub_recv_down(1), &
1232                                   lb_recv_down(2):ub_recv_down(2), lb:ub) = &
1233                                 rs%r(lb_recv_down(1):ub_recv_down(1), &
1234                                      lb_recv_down(2):ub_recv_down(2), lb:ub) + &
1235                                 recv_buf_3d_down(:, :, lb:ub)
1236                           END IF
1237!$OMP END PARALLEL
1238                        END IF
1239                        DEALLOCATE (recv_buf_3d_down)
1240                     ELSE
1241
1242                        ! only some procs may need later shifts
1243                        IF (ub_recv_up(idir) .GE. lb_recv_up(idir)) THEN
1244                           ! Sum the data in the RS Grid
1245!$OMP PARALLEL DEFAULT(NONE), &
1246!$OMP          PRIVATE(lb,ub,my_id,NUM_THREADS), &
1247!$OMP          SHARED(recv_buf_3d_up,rs,lb_recv_up,ub_recv_up)
1248!$                         num_threads = MIN(omp_get_max_threads(), ub_recv_up(3) - lb_recv_up(3) + 1)
1249!$                         my_id = omp_get_thread_num()
1250                           IF (my_id < num_threads) THEN
1251                              lb = lb_recv_up(3) + ((ub_recv_up(3) - lb_recv_up(3) + 1)*my_id)/num_threads
1252                              ub = lb_recv_up(3) + ((ub_recv_up(3) - lb_recv_up(3) + 1)*(my_id + 1))/num_threads - 1
1253
1254                              rs%r(lb_recv_up(1):ub_recv_up(1), &
1255                                   lb_recv_up(2):ub_recv_up(2), lb:ub) = &
1256                                 rs%r(lb_recv_up(1):ub_recv_up(1), &
1257                                      lb_recv_up(2):ub_recv_up(2), lb:ub) + &
1258                                 recv_buf_3d_up(:, :, lb:ub)
1259                           END IF
1260!$OMP END PARALLEL
1261                        END IF
1262                        DEALLOCATE (recv_buf_3d_up)
1263                     END IF
1264
1265                  END DO
1266
1267                  ! make sure the sends have completed before we deallocate
1268
1269                  CALL mp_waitall(req(3:4))
1270
1271                  DEALLOCATE (send_buf_3d_down)
1272                  DEALLOCATE (send_buf_3d_up)
1273               END DO
1274
1275               DEALLOCATE (dshifts)
1276               DEALLOCATE (ushifts)
1277
1278            END IF
1279
1280            halo_swapped(idir) = .TRUE.
1281
1282         END DO
1283
1284         ! This is the real redistribution
1285         ALLOCATE (bounds(0:pw%pw_grid%para%group_size - 1, 1:4))
1286
1287         ! work out the pw grid points each proc holds
1288         DO i = 0, pw%pw_grid%para%group_size - 1
1289            bounds(i, 1:2) = pw%pw_grid%para%bo(1:2, 1, i, 1)
1290            bounds(i, 3:4) = pw%pw_grid%para%bo(1:2, 2, i, 1)
1291            bounds(i, 1:2) = bounds(i, 1:2) - pw%pw_grid%npts(1)/2 - 1
1292            bounds(i, 3:4) = bounds(i, 3:4) - pw%pw_grid%npts(2)/2 - 1
1293         ENDDO
1294
1295         ALLOCATE (send_tasks(0:pw%pw_grid%para%group_size - 1, 1:6))
1296         ALLOCATE (send_sizes(0:pw%pw_grid%para%group_size - 1))
1297         ALLOCATE (send_disps(0:pw%pw_grid%para%group_size - 1))
1298         ALLOCATE (recv_tasks(0:pw%pw_grid%para%group_size - 1, 1:6))
1299         ALLOCATE (recv_sizes(0:pw%pw_grid%para%group_size - 1))
1300         ALLOCATE (recv_disps(0:pw%pw_grid%para%group_size - 1))
1301         send_tasks(:, 1) = 1
1302         send_tasks(:, 2) = 0
1303         send_tasks(:, 3) = 1
1304         send_tasks(:, 4) = 0
1305         send_tasks(:, 5) = 1
1306         send_tasks(:, 6) = 0
1307         send_sizes = 0
1308         recv_sizes = 0
1309
1310         my_rs_rank = rs%desc%my_pos
1311         my_pw_rank = pw%pw_grid%para%rs_mpo
1312
1313         ! find the processors that should hold our data
1314         ! should be part of the rs grid type
1315         ! this is a loop over real ranks (i.e. the in-order cartesian ranks)
1316         ! do the recv and send tasks in two separate loops which will
1317         ! load balance better for OpenMP with large numbers of MPI tasks
1318
1319!$OMP PARALLEL DO DEFAULT(NONE), &
1320!$OMP             PRIVATE(coords,idir,pos,lb_send,ub_send), &
1321!$OMP             SHARED(rs,bounds,my_rs_rank,recv_tasks,recv_sizes)
1322         DO i = 0, rs%desc%group_size - 1
1323
1324            coords(:) = rs%desc%rank2coord(:, rs%desc%real2virtual(i))
1325            !calculate the rs grid points on each processor
1326            !coords is the part of the grid that rank i actually holds
1327            DO idir = 1, 3
1328               pos(:) = get_limit(rs%desc%npts(idir), rs%desc%group_dim(idir), coords(idir))
1329               pos(:) = pos(:) - rs%desc%npts(idir)/2 - 1
1330               lb_send(idir) = pos(1)
1331               ub_send(idir) = pos(2)
1332            ENDDO
1333
1334            IF (lb_send(1) .GT. bounds(my_rs_rank, 2)) CYCLE
1335            IF (ub_send(1) .LT. bounds(my_rs_rank, 1)) CYCLE
1336            IF (lb_send(2) .GT. bounds(my_rs_rank, 4)) CYCLE
1337            IF (ub_send(2) .LT. bounds(my_rs_rank, 3)) CYCLE
1338
1339            recv_tasks(i, 1) = MAX(lb_send(1), bounds(my_rs_rank, 1))
1340            recv_tasks(i, 2) = MIN(ub_send(1), bounds(my_rs_rank, 2))
1341            recv_tasks(i, 3) = MAX(lb_send(2), bounds(my_rs_rank, 3))
1342            recv_tasks(i, 4) = MIN(ub_send(2), bounds(my_rs_rank, 4))
1343            recv_tasks(i, 5) = lb_send(3)
1344            recv_tasks(i, 6) = ub_send(3)
1345            recv_sizes(i) = (recv_tasks(i, 2) - recv_tasks(i, 1) + 1)* &
1346                            (recv_tasks(i, 4) - recv_tasks(i, 3) + 1)*(recv_tasks(i, 6) - recv_tasks(i, 5) + 1)
1347
1348         ENDDO
1349!$OMP END PARALLEL DO
1350
1351         coords(:) = rs%desc%rank2coord(:, rs%desc%real2virtual(my_rs_rank))
1352         DO idir = 1, 3
1353            pos(:) = get_limit(rs%desc%npts(idir), rs%desc%group_dim(idir), coords(idir))
1354            pos(:) = pos(:) - rs%desc%npts(idir)/2 - 1
1355            lb_send(idir) = pos(1)
1356            ub_send(idir) = pos(2)
1357         ENDDO
1358
1359         lb_recv(:) = lb_send(:)
1360         ub_recv(:) = ub_send(:)
1361!$OMP PARALLEL DO DEFAULT(NONE), &
1362!$OMP             SHARED(pw,lb_send,ub_send,bounds,send_tasks,send_sizes)
1363         DO j = 0, pw%pw_grid%para%group_size - 1
1364
1365            IF (lb_send(1) .GT. bounds(j, 2)) CYCLE
1366            IF (ub_send(1) .LT. bounds(j, 1)) CYCLE
1367            IF (lb_send(2) .GT. bounds(j, 4)) CYCLE
1368            IF (ub_send(2) .LT. bounds(j, 3)) CYCLE
1369
1370            send_tasks(j, 1) = MAX(lb_send(1), bounds(j, 1))
1371            send_tasks(j, 2) = MIN(ub_send(1), bounds(j, 2))
1372            send_tasks(j, 3) = MAX(lb_send(2), bounds(j, 3))
1373            send_tasks(j, 4) = MIN(ub_send(2), bounds(j, 4))
1374            send_tasks(j, 5) = lb_send(3)
1375            send_tasks(j, 6) = ub_send(3)
1376            send_sizes(j) = (send_tasks(j, 2) - send_tasks(j, 1) + 1)* &
1377                            (send_tasks(j, 4) - send_tasks(j, 3) + 1)*(send_tasks(j, 6) - send_tasks(j, 5) + 1)
1378
1379         END DO
1380!$OMP END PARALLEL DO
1381
1382         send_disps(0) = 0
1383         recv_disps(0) = 0
1384         DO i = 1, pw%pw_grid%para%group_size - 1
1385            send_disps(i) = send_disps(i - 1) + send_sizes(i - 1)
1386            recv_disps(i) = recv_disps(i - 1) + recv_sizes(i - 1)
1387         ENDDO
1388
1389         CPASSERT(SUM(send_sizes) == PRODUCT(ub_recv - lb_recv + 1))
1390
1391         ALLOCATE (send_bufs(0:rs%desc%group_size - 1))
1392         ALLOCATE (recv_bufs(0:rs%desc%group_size - 1))
1393
1394         DO i = 0, rs%desc%group_size - 1
1395            IF (send_sizes(i) .NE. 0) THEN
1396               ALLOCATE (send_bufs(i)%array(send_sizes(i)))
1397            ELSE
1398               NULLIFY (send_bufs(i)%array)
1399            END IF
1400            IF (recv_sizes(i) .NE. 0) THEN
1401               ALLOCATE (recv_bufs(i)%array(recv_sizes(i)))
1402            ELSE
1403               NULLIFY (recv_bufs(i)%array)
1404            END IF
1405         END DO
1406
1407         ALLOCATE (recv_reqs(0:rs%desc%group_size - 1))
1408         recv_reqs = mp_request_null
1409
1410         DO i = 0, rs%desc%group_size - 1
1411            IF (recv_sizes(i) .NE. 0) THEN
1412               CALL mp_irecv(recv_bufs(i)%array, i, rs%desc%group, recv_reqs(i))
1413            END IF
1414         END DO
1415
1416         ! do packing
1417!$OMP PARALLEL DO DEFAULT(NONE), &
1418!$OMP             PRIVATE(k,z,y,x), &
1419!$OMP             SHARED(rs,send_tasks,send_bufs,send_disps)
1420         DO i = 0, rs%desc%group_size - 1
1421            k = 0
1422            DO z = send_tasks(i, 5), send_tasks(i, 6)
1423               DO y = send_tasks(i, 3), send_tasks(i, 4)
1424                  DO x = send_tasks(i, 1), send_tasks(i, 2)
1425                     k = k + 1
1426                     send_bufs(i)%array(k) = rs%r(x, y, z)
1427                  ENDDO
1428               ENDDO
1429            ENDDO
1430         ENDDO
1431!$OMP END PARALLEL DO
1432
1433         ALLOCATE (send_reqs(0:rs%desc%group_size - 1))
1434         send_reqs = mp_request_null
1435
1436         DO i = 0, rs%desc%group_size - 1
1437            IF (send_sizes(i) .NE. 0) THEN
1438               CALL mp_isend(send_bufs(i)%array, i, rs%desc%group, send_reqs(i))
1439            END IF
1440         END DO
1441
1442         ! do unpacking
1443         ! no OMP here so we can unpack each message as it arrives
1444         DO i = 0, rs%desc%group_size - 1
1445            IF (recv_sizes(i) .EQ. 0) CYCLE
1446
1447            CALL mp_waitany(recv_reqs, completed)
1448            k = 0
1449            DO z = recv_tasks(completed - 1, 5), recv_tasks(completed - 1, 6)
1450               DO y = recv_tasks(completed - 1, 3), recv_tasks(completed - 1, 4)
1451                  DO x = recv_tasks(completed - 1, 1), recv_tasks(completed - 1, 2)
1452                     k = k + 1
1453                     pw%cr3d(x, y, z) = recv_bufs(completed - 1)%array(k)
1454                  ENDDO
1455               ENDDO
1456            ENDDO
1457         ENDDO
1458
1459         CALL mp_waitall(send_reqs)
1460
1461         DEALLOCATE (recv_reqs)
1462         DEALLOCATE (send_reqs)
1463
1464         DO i = 0, rs%desc%group_size - 1
1465            IF (ASSOCIATED(send_bufs(i)%array)) THEN
1466               DEALLOCATE (send_bufs(i)%array)
1467            END IF
1468            IF (ASSOCIATED(recv_bufs(i)%array)) THEN
1469               DEALLOCATE (recv_bufs(i)%array)
1470            END IF
1471         END DO
1472
1473         DEALLOCATE (send_bufs)
1474         DEALLOCATE (recv_bufs)
1475         DEALLOCATE (send_tasks)
1476         DEALLOCATE (send_sizes)
1477         DEALLOCATE (send_disps)
1478         DEALLOCATE (recv_tasks)
1479         DEALLOCATE (recv_sizes)
1480         DEALLOCATE (recv_disps)
1481
1482         IF (debug_this_module) THEN
1483            ! safety check, to be removed once we're absolute sure the routine is correct
1484            pw_sum = pw_integrate_function(pw)
1485            IF (ABS(pw_sum - rs_sum)/MAX(1.0_dp, ABS(pw_sum), ABS(rs_sum)) > EPSILON(rs_sum)*1000) THEN
1486               WRITE (error_string, '(A,6(1X,I4.4),3F25.16)') "rs_pw_transfer_distributed", &
1487                  rs%desc%npts, rs%desc%group_dim, pw_sum, rs_sum, ABS(pw_sum - rs_sum)
1488               CALL cp_abort(__LOCATION__, &
1489                             error_string//" Please report this bug ... quick workaround: use "// &
1490                             "DISTRIBUTION_TYPE REPLICATED")
1491            ENDIF
1492         ENDIF
1493
1494      ELSE
1495
1496         ! pw to rs transfer
1497
1498         CALL rs_grid_zero(rs)
1499
1500         ! This is the real redistribution
1501
1502         ALLOCATE (bounds(0:pw%pw_grid%para%group_size - 1, 1:4))
1503
1504         DO i = 0, pw%pw_grid%para%group_size - 1
1505            bounds(i, 1:2) = pw%pw_grid%para%bo(1:2, 1, i, 1)
1506            bounds(i, 3:4) = pw%pw_grid%para%bo(1:2, 2, i, 1)
1507            bounds(i, 1:2) = bounds(i, 1:2) - pw%pw_grid%npts(1)/2 - 1
1508            bounds(i, 3:4) = bounds(i, 3:4) - pw%pw_grid%npts(2)/2 - 1
1509         ENDDO
1510
1511         ALLOCATE (send_tasks(0:pw%pw_grid%para%group_size - 1, 1:6))
1512         ALLOCATE (send_sizes(0:pw%pw_grid%para%group_size - 1))
1513         ALLOCATE (send_disps(0:pw%pw_grid%para%group_size - 1))
1514         ALLOCATE (recv_tasks(0:pw%pw_grid%para%group_size - 1, 1:6))
1515         ALLOCATE (recv_sizes(0:pw%pw_grid%para%group_size - 1))
1516         ALLOCATE (recv_disps(0:pw%pw_grid%para%group_size - 1))
1517
1518         send_tasks = 0
1519         send_tasks(:, 1) = 1
1520         send_tasks(:, 2) = 0
1521         send_tasks(:, 3) = 1
1522         send_tasks(:, 4) = 0
1523         send_tasks(:, 5) = 1
1524         send_tasks(:, 6) = 0
1525         send_sizes = 0
1526
1527         recv_tasks = 0
1528         recv_tasks(:, 1) = 1
1529         recv_tasks(:, 2) = 0
1530         send_tasks(:, 3) = 1
1531         send_tasks(:, 4) = 0
1532         send_tasks(:, 5) = 1
1533         send_tasks(:, 6) = 0
1534         recv_sizes = 0
1535
1536         my_rs_rank = rs%desc%my_pos
1537         my_pw_rank = pw%pw_grid%para%rs_mpo
1538
1539         ! find the processors that should hold our data
1540         ! should be part of the rs grid type
1541         ! this is a loop over real ranks (i.e. the in-order cartesian ranks)
1542         ! do the recv and send tasks in two separate loops which will
1543         ! load balance better for OpenMP with large numbers of MPI tasks
1544
1545         ! this is the reverse of rs2pw: what were the sends are now the recvs
1546
1547!$OMP PARALLEL DO DEFAULT(NONE), &
1548!$OMP             PRIVATE(coords,idir,pos,lb_send,ub_send), &
1549!$OMP             SHARED(rs,bounds,my_rs_rank,send_tasks,send_sizes,pw)
1550         DO i = 0, pw%pw_grid%para%group_size - 1
1551
1552            coords(:) = rs%desc%rank2coord(:, rs%desc%real2virtual(i))
1553            !calculate the real rs grid points on each processor
1554            !coords is the part of the grid that rank i actually holds
1555            DO idir = 1, 3
1556               pos(:) = get_limit(rs%desc%npts(idir), rs%desc%group_dim(idir), coords(idir))
1557               pos(:) = pos(:) - rs%desc%npts(idir)/2 - 1
1558               lb_send(idir) = pos(1)
1559               ub_send(idir) = pos(2)
1560            ENDDO
1561
1562            IF (ub_send(1) .LT. bounds(my_rs_rank, 1)) CYCLE
1563            IF (lb_send(1) .GT. bounds(my_rs_rank, 2)) CYCLE
1564            IF (ub_send(2) .LT. bounds(my_rs_rank, 3)) CYCLE
1565            IF (lb_send(2) .GT. bounds(my_rs_rank, 4)) CYCLE
1566
1567            send_tasks(i, 1) = MAX(lb_send(1), bounds(my_rs_rank, 1))
1568            send_tasks(i, 2) = MIN(ub_send(1), bounds(my_rs_rank, 2))
1569            send_tasks(i, 3) = MAX(lb_send(2), bounds(my_rs_rank, 3))
1570            send_tasks(i, 4) = MIN(ub_send(2), bounds(my_rs_rank, 4))
1571            send_tasks(i, 5) = lb_send(3)
1572            send_tasks(i, 6) = ub_send(3)
1573            send_sizes(i) = (send_tasks(i, 2) - send_tasks(i, 1) + 1)* &
1574                            (send_tasks(i, 4) - send_tasks(i, 3) + 1)*(send_tasks(i, 6) - send_tasks(i, 5) + 1)
1575
1576         ENDDO
1577!$OMP END PARALLEL DO
1578
1579         coords(:) = rs%desc%rank2coord(:, rs%desc%real2virtual(my_rs_rank))
1580         DO idir = 1, 3
1581            pos(:) = get_limit(rs%desc%npts(idir), rs%desc%group_dim(idir), coords(idir))
1582            pos(:) = pos(:) - rs%desc%npts(idir)/2 - 1
1583            lb_send(idir) = pos(1)
1584            ub_send(idir) = pos(2)
1585         ENDDO
1586
1587         lb_recv(:) = lb_send(:)
1588         ub_recv(:) = ub_send(:)
1589
1590!$OMP PARALLEL DO DEFAULT(NONE), &
1591!$OMP             SHARED(pw,lb_send,ub_send,bounds,recv_tasks,recv_sizes)
1592         DO j = 0, pw%pw_grid%para%group_size - 1
1593
1594            IF (ub_send(1) .LT. bounds(j, 1)) CYCLE
1595            IF (lb_send(1) .GT. bounds(j, 2)) CYCLE
1596            IF (ub_send(2) .LT. bounds(j, 3)) CYCLE
1597            IF (lb_send(2) .GT. bounds(j, 4)) CYCLE
1598
1599            recv_tasks(j, 1) = MAX(lb_send(1), bounds(j, 1))
1600            recv_tasks(j, 2) = MIN(ub_send(1), bounds(j, 2))
1601            recv_tasks(j, 3) = MAX(lb_send(2), bounds(j, 3))
1602            recv_tasks(j, 4) = MIN(ub_send(2), bounds(j, 4))
1603            recv_tasks(j, 5) = lb_send(3)
1604            recv_tasks(j, 6) = ub_send(3)
1605            recv_sizes(j) = (recv_tasks(j, 2) - recv_tasks(j, 1) + 1)* &
1606                            (recv_tasks(j, 4) - recv_tasks(j, 3) + 1)*(recv_tasks(j, 6) - recv_tasks(j, 5) + 1)
1607
1608         ENDDO
1609!$OMP END PARALLEL DO
1610
1611         send_disps(0) = 0
1612         recv_disps(0) = 0
1613         DO i = 1, pw%pw_grid%para%group_size - 1
1614            send_disps(i) = send_disps(i - 1) + send_sizes(i - 1)
1615            recv_disps(i) = recv_disps(i - 1) + recv_sizes(i - 1)
1616         ENDDO
1617
1618         CPASSERT(SUM(recv_sizes) == PRODUCT(ub_recv - lb_recv + 1))
1619
1620         ALLOCATE (send_bufs(0:rs%desc%group_size - 1))
1621         ALLOCATE (recv_bufs(0:rs%desc%group_size - 1))
1622
1623         DO i = 0, rs%desc%group_size - 1
1624            IF (send_sizes(i) .NE. 0) THEN
1625               ALLOCATE (send_bufs(i)%array(send_sizes(i)))
1626            ELSE
1627               NULLIFY (send_bufs(i)%array)
1628            END IF
1629            IF (recv_sizes(i) .NE. 0) THEN
1630               ALLOCATE (recv_bufs(i)%array(recv_sizes(i)))
1631            ELSE
1632               NULLIFY (recv_bufs(i)%array)
1633            END IF
1634         END DO
1635
1636         ALLOCATE (recv_reqs(0:rs%desc%group_size - 1))
1637         recv_reqs = mp_request_null
1638
1639         DO i = 0, rs%desc%group_size - 1
1640            IF (recv_sizes(i) .NE. 0) THEN
1641               CALL mp_irecv(recv_bufs(i)%array, i, rs%desc%group, recv_reqs(i))
1642            END IF
1643         END DO
1644
1645         ! do packing
1646!$OMP PARALLEL DO DEFAULT(NONE), &
1647!$OMP             PRIVATE(k,z,y,x), &
1648!$OMP             SHARED(pw,rs,send_tasks,send_bufs,send_disps)
1649         DO i = 0, rs%desc%group_size - 1
1650            k = 0
1651            DO z = send_tasks(i, 5), send_tasks(i, 6)
1652               DO y = send_tasks(i, 3), send_tasks(i, 4)
1653                  DO x = send_tasks(i, 1), send_tasks(i, 2)
1654                     k = k + 1
1655                     send_bufs(i)%array(k) = pw%cr3d(x, y, z)
1656                  ENDDO
1657               ENDDO
1658            ENDDO
1659         ENDDO
1660!$OMP END PARALLEL DO
1661
1662         ALLOCATE (send_reqs(0:rs%desc%group_size - 1))
1663         send_reqs = mp_request_null
1664
1665         DO i = 0, rs%desc%group_size - 1
1666            IF (send_sizes(i) .NE. 0) THEN
1667               CALL mp_isend(send_bufs(i)%array, i, rs%desc%group, send_reqs(i))
1668            END IF
1669         END DO
1670
1671         ! do unpacking
1672         ! no OMP here so we can unpack each message as it arrives
1673
1674         DO i = 0, rs%desc%group_size - 1
1675            IF (recv_sizes(i) .EQ. 0) CYCLE
1676
1677            CALL mp_waitany(recv_reqs, completed)
1678            k = 0
1679            DO z = recv_tasks(completed - 1, 5), recv_tasks(completed - 1, 6)
1680               DO y = recv_tasks(completed - 1, 3), recv_tasks(completed - 1, 4)
1681                  DO x = recv_tasks(completed - 1, 1), recv_tasks(completed - 1, 2)
1682                     k = k + 1
1683                     rs%r(x, y, z) = recv_bufs(completed - 1)%array(k)
1684                  ENDDO
1685               ENDDO
1686            ENDDO
1687         ENDDO
1688
1689         CALL mp_waitall(send_reqs)
1690
1691         DEALLOCATE (recv_reqs)
1692         DEALLOCATE (send_reqs)
1693
1694         DO i = 0, rs%desc%group_size - 1
1695            IF (ASSOCIATED(send_bufs(i)%array)) THEN
1696               DEALLOCATE (send_bufs(i)%array)
1697            END IF
1698            IF (ASSOCIATED(recv_bufs(i)%array)) THEN
1699               DEALLOCATE (recv_bufs(i)%array)
1700            END IF
1701         END DO
1702
1703         DEALLOCATE (send_bufs)
1704         DEALLOCATE (recv_bufs)
1705         DEALLOCATE (send_tasks)
1706         DEALLOCATE (send_sizes)
1707         DEALLOCATE (send_disps)
1708         DEALLOCATE (recv_tasks)
1709         DEALLOCATE (recv_sizes)
1710         DEALLOCATE (recv_disps)
1711
1712         ! now pass wings around
1713         halo_swapped = .FALSE.
1714
1715         DO idir = 1, 3
1716
1717            IF (rs%desc%perd(idir) /= 1) THEN
1718
1719               ALLOCATE (dshifts(0:rs%desc%neighbours(idir)))
1720               ALLOCATE (ushifts(0:rs%desc%neighbours(idir)))
1721               ushifts = 0
1722               dshifts = 0
1723
1724               DO n_shifts = 1, rs%desc%neighbours(idir)
1725
1726                  ! need to take into account the possible varying widths of neighbouring cells
1727                  ! ushifts and dshifts hold the real size of the neighbouring cells
1728
1729                  position = MODULO(rs%desc%virtual_group_coor(idir) - n_shifts, rs%desc%group_dim(idir))
1730                  neighbours = get_limit(rs%desc%npts(idir), rs%desc%group_dim(idir), position)
1731                  dshifts(n_shifts) = dshifts(n_shifts - 1) + (neighbours(2) - neighbours(1) + 1)
1732
1733                  position = MODULO(rs%desc%virtual_group_coor(idir) + n_shifts, rs%desc%group_dim(idir))
1734                  neighbours = get_limit(rs%desc%npts(idir), rs%desc%group_dim(idir), position)
1735                  ushifts(n_shifts) = ushifts(n_shifts - 1) + (neighbours(2) - neighbours(1) + 1)
1736
1737                  ! The border data has to be send/received from the neighbors
1738                  ! First we calculate the source and destination processes for the shift
1739                  ! The first shift is "downwards"
1740
1741                  CALL cart_shift(rs, idir, -1*n_shifts, source_down, dest_down)
1742
1743                  lb_send_down(:) = rs%lb_local(:)
1744                  ub_send_down(:) = rs%ub_local(:)
1745                  lb_recv_down(:) = rs%lb_local(:)
1746                  ub_recv_down(:) = rs%ub_local(:)
1747
1748                  IF (dshifts(n_shifts - 1) .LE. rs%desc%border) THEN
1749                     lb_send_down(idir) = lb_send_down(idir) + rs%desc%border
1750                     ub_send_down(idir) = MIN(ub_send_down(idir) - rs%desc%border, &
1751                                              lb_send_down(idir) + rs%desc%border - 1 - dshifts(n_shifts - 1))
1752
1753                     lb_recv_down(idir) = ub_recv_down(idir) - rs%desc%border + 1 + ushifts(n_shifts - 1)
1754                     ub_recv_down(idir) = MIN(ub_recv_down(idir), &
1755                                              ub_recv_down(idir) - rs%desc%border + ushifts(n_shifts))
1756                  ELSE
1757                     lb_send_down(idir) = 0
1758                     ub_send_down(idir) = -1
1759                     lb_recv_down(idir) = 0
1760                     ub_recv_down(idir) = -1
1761                  ENDIF
1762
1763                  DO i = 1, 3
1764                     IF (.NOT. (halo_swapped(i) .OR. i .EQ. idir)) THEN
1765                        lb_send_down(i) = rs%lb_real(i)
1766                        ub_send_down(i) = rs%ub_real(i)
1767                        lb_recv_down(i) = rs%lb_real(i)
1768                        ub_recv_down(i) = rs%ub_real(i)
1769                     ENDIF
1770                  ENDDO
1771
1772                  ! allocate the recv buffer
1773                  nn = PRODUCT(ub_recv_down - lb_recv_down + 1)
1774                  ALLOCATE (recv_buf_3d_down(lb_recv_down(1):ub_recv_down(1), &
1775                                             lb_recv_down(2):ub_recv_down(2), lb_recv_down(3):ub_recv_down(3)))
1776
1777                  ! recv buffer is now ready, so post the receive
1778                  CALL mp_irecv(recv_buf_3d_down, source_down, rs%desc%group, req(1))
1779
1780                  ! now allocate,pack and send the send buffer
1781                  nn = PRODUCT(ub_send_down - lb_send_down + 1)
1782                  ALLOCATE (send_buf_3d_down(lb_send_down(1):ub_send_down(1), &
1783                                             lb_send_down(2):ub_send_down(2), lb_send_down(3):ub_send_down(3)))
1784
1785!$OMP PARALLEL DEFAULT(NONE), &
1786!$OMP          PRIVATE(lb,ub,my_id,NUM_THREADS), &
1787!$OMP          SHARED(send_buf_3d_down,rs,lb_send_down,ub_send_down)
1788!$                num_threads = MIN(omp_get_max_threads(), ub_send_down(3) - lb_send_down(3) + 1)
1789!$                my_id = omp_get_thread_num()
1790                  IF (my_id < num_threads) THEN
1791                     lb = lb_send_down(3) + ((ub_send_down(3) - lb_send_down(3) + 1)*my_id)/num_threads
1792                     ub = lb_send_down(3) + ((ub_send_down(3) - lb_send_down(3) + 1)*(my_id + 1))/num_threads - 1
1793
1794                     send_buf_3d_down(lb_send_down(1):ub_send_down(1), lb_send_down(2):ub_send_down(2), &
1795                                      lb:ub) = rs%r(lb_send_down(1):ub_send_down(1), &
1796                                                    lb_send_down(2):ub_send_down(2), lb:ub)
1797                  END IF
1798!$OMP END PARALLEL
1799
1800                  CALL mp_isend(send_buf_3d_down, dest_down, rs%desc%group, req(3))
1801
1802                  ! Now for the other direction
1803
1804                  CALL cart_shift(rs, idir, n_shifts, source_up, dest_up)
1805
1806                  lb_send_up(:) = rs%lb_local(:)
1807                  ub_send_up(:) = rs%ub_local(:)
1808                  lb_recv_up(:) = rs%lb_local(:)
1809                  ub_recv_up(:) = rs%ub_local(:)
1810
1811                  IF (ushifts(n_shifts - 1) .LE. rs%desc%border) THEN
1812                     ub_send_up(idir) = ub_send_up(idir) - rs%desc%border
1813                     lb_send_up(idir) = MAX(lb_send_up(idir) + rs%desc%border, &
1814                                            ub_send_up(idir) - rs%desc%border + 1 + ushifts(n_shifts - 1))
1815
1816                     ub_recv_up(idir) = lb_recv_up(idir) + rs%desc%border - 1 - dshifts(n_shifts - 1)
1817                     lb_recv_up(idir) = MAX(lb_recv_up(idir), &
1818                                            lb_recv_up(idir) + rs%desc%border - dshifts(n_shifts))
1819                  ELSE
1820                     lb_send_up(idir) = 0
1821                     ub_send_up(idir) = -1
1822                     lb_recv_up(idir) = 0
1823                     ub_recv_up(idir) = -1
1824                  ENDIF
1825
1826                  DO i = 1, 3
1827                     IF (.NOT. (halo_swapped(i) .OR. i .EQ. idir)) THEN
1828                        lb_send_up(i) = rs%lb_real(i)
1829                        ub_send_up(i) = rs%ub_real(i)
1830                        lb_recv_up(i) = rs%lb_real(i)
1831                        ub_recv_up(i) = rs%ub_real(i)
1832                     ENDIF
1833                  ENDDO
1834
1835                  ! allocate the recv buffer
1836                  nn = PRODUCT(ub_recv_up - lb_recv_up + 1)
1837                  ALLOCATE (recv_buf_3d_up(lb_recv_up(1):ub_recv_up(1), &
1838                                           lb_recv_up(2):ub_recv_up(2), lb_recv_up(3):ub_recv_up(3)))
1839
1840                  ! recv buffer is now ready, so post the receive
1841
1842                  CALL mp_irecv(recv_buf_3d_up, source_up, rs%desc%group, req(2))
1843
1844                  ! now allocate,pack and send the send buffer
1845                  nn = PRODUCT(ub_send_up - lb_send_up + 1)
1846                  ALLOCATE (send_buf_3d_up(lb_send_up(1):ub_send_up(1), &
1847                                           lb_send_up(2):ub_send_up(2), lb_send_up(3):ub_send_up(3)))
1848
1849!$OMP PARALLEL DEFAULT(NONE), &
1850!$OMP          PRIVATE(lb,ub,my_id,NUM_THREADS), &
1851!$OMP          SHARED(send_buf_3d_up,rs,lb_send_up,ub_send_up)
1852!$                num_threads = MIN(omp_get_max_threads(), ub_send_up(3) - lb_send_up(3) + 1)
1853!$                my_id = omp_get_thread_num()
1854                  IF (my_id < num_threads) THEN
1855                     lb = lb_send_up(3) + ((ub_send_up(3) - lb_send_up(3) + 1)*my_id)/num_threads
1856                     ub = lb_send_up(3) + ((ub_send_up(3) - lb_send_up(3) + 1)*(my_id + 1))/num_threads - 1
1857
1858                     send_buf_3d_up(lb_send_up(1):ub_send_up(1), lb_send_up(2):ub_send_up(2), &
1859                                    lb:ub) = rs%r(lb_send_up(1):ub_send_up(1), &
1860                                                  lb_send_up(2):ub_send_up(2), lb:ub)
1861                  END IF
1862!$OMP END PARALLEL
1863
1864                  CALL mp_isend(send_buf_3d_up, dest_up, rs%desc%group, req(4))
1865
1866                  ! wait for a recv to complete, then we can unpack
1867
1868                  DO i = 1, 2
1869
1870                     CALL mp_waitany(req(1:2), completed)
1871
1872                     IF (completed .EQ. 1) THEN
1873
1874                        ! only some procs may need later shifts
1875                        IF (ub_recv_down(idir) .GE. lb_recv_down(idir)) THEN
1876
1877                           ! Add the data to the RS Grid
1878!$OMP PARALLEL DEFAULT(NONE), &
1879!$OMP          PRIVATE(lb,ub,my_id,NUM_THREADS), &
1880!$OMP          SHARED(recv_buf_3d_down,rs,lb_recv_down,ub_recv_down)
1881!$                         num_threads = MIN(omp_get_max_threads(), ub_recv_down(3) - lb_recv_down(3) + 1)
1882!$                         my_id = omp_get_thread_num()
1883                           IF (my_id < num_threads) THEN
1884                              lb = lb_recv_down(3) + ((ub_recv_down(3) - lb_recv_down(3) + 1)*my_id)/num_threads
1885                              ub = lb_recv_down(3) + ((ub_recv_down(3) - lb_recv_down(3) + 1)*(my_id + 1))/num_threads - 1
1886
1887                              rs%r(lb_recv_down(1):ub_recv_down(1), lb_recv_down(2):ub_recv_down(2), &
1888                                   lb:ub) = recv_buf_3d_down(:, :, lb:ub)
1889                           END IF
1890!$OMP END PARALLEL
1891                        END IF
1892
1893                        DEALLOCATE (recv_buf_3d_down)
1894                     ELSE
1895
1896                        ! only some procs may need later shifts
1897                        IF (ub_recv_up(idir) .GE. lb_recv_up(idir)) THEN
1898
1899                           ! Add the data to the RS Grid
1900!$OMP PARALLEL DEFAULT(NONE), &
1901!$OMP          PRIVATE(lb,ub,my_id,NUM_THREADS), &
1902!$OMP          SHARED(recv_buf_3d_up,rs,lb_recv_up,ub_recv_up)
1903!$                         num_threads = MIN(omp_get_max_threads(), ub_recv_up(3) - lb_recv_up(3) + 1)
1904!$                         my_id = omp_get_thread_num()
1905                           IF (my_id < num_threads) THEN
1906                              lb = lb_recv_up(3) + ((ub_recv_up(3) - lb_recv_up(3) + 1)*my_id)/num_threads
1907                              ub = lb_recv_up(3) + ((ub_recv_up(3) - lb_recv_up(3) + 1)*(my_id + 1))/num_threads - 1
1908
1909                              rs%r(lb_recv_up(1):ub_recv_up(1), lb_recv_up(2):ub_recv_up(2), &
1910                                   lb:ub) = recv_buf_3d_up(:, :, lb:ub)
1911                           END IF
1912!$OMP END PARALLEL
1913                        END IF
1914
1915                        DEALLOCATE (recv_buf_3d_up)
1916                     END IF
1917                  END DO
1918
1919                  CALL mp_waitall(req(3:4))
1920
1921                  DEALLOCATE (send_buf_3d_down)
1922                  DEALLOCATE (send_buf_3d_up)
1923               END DO
1924
1925               DEALLOCATE (ushifts)
1926               DEALLOCATE (dshifts)
1927            END IF
1928
1929            halo_swapped(idir) = .TRUE.
1930
1931         END DO
1932      END IF
1933
1934   END SUBROUTINE rs_pw_transfer_distributed
1935
1936! **************************************************************************************************
1937!> \brief Initialize grid to zero
1938!> \param rs ...
1939!> \par History
1940!>      none
1941!> \author JGH (23-Mar-2002)
1942! **************************************************************************************************
1943   SUBROUTINE rs_grid_zero(rs)
1944
1945      TYPE(realspace_grid_type), POINTER                 :: rs
1946
1947      CHARACTER(len=*), PARAMETER :: routineN = 'rs_grid_zero', routineP = moduleN//':'//routineN
1948
1949      INTEGER                                            :: handle, i, j, k, l(3), u(3)
1950
1951      CALL timeset(routineN, handle)
1952      l(1) = LBOUND(rs%r, 1); l(2) = LBOUND(rs%r, 2); l(3) = LBOUND(rs%r, 3)
1953      u(1) = UBOUND(rs%r, 1); u(2) = UBOUND(rs%r, 2); u(3) = UBOUND(rs%r, 3)
1954!$OMP PARALLEL DO DEFAULT(NONE) COLLAPSE(3) &
1955!$OMP             PRIVATE(i,j,k) &
1956!$OMP             SHARED(rs,l,u)
1957      DO k = l(3), u(3)
1958      DO j = l(2), u(2)
1959      DO i = l(1), u(1)
1960         rs%r(i, j, k) = 0.0_dp
1961      ENDDO
1962      ENDDO
1963      ENDDO
1964!$OMP END PARALLEL DO
1965      CALL timestop(handle)
1966
1967   END SUBROUTINE rs_grid_zero
1968
1969! **************************************************************************************************
1970!> \brief rs1(i) = rs1(i) + rs2(i)*rs3(i)
1971!> \param rs1 ...
1972!> \param rs2 ...
1973!> \param rs3 ...
1974!> \param scalar ...
1975!> \par History
1976!>      none
1977!> \author
1978! **************************************************************************************************
1979   SUBROUTINE rs_grid_mult_and_add(rs1, rs2, rs3, scalar)
1980
1981      TYPE(realspace_grid_type), POINTER                 :: rs1, rs2, rs3
1982      REAL(dp), INTENT(IN)                               :: scalar
1983
1984      CHARACTER(len=*), PARAMETER :: routineN = 'rs_grid_mult_and_add', &
1985         routineP = moduleN//':'//routineN
1986
1987      INTEGER                                            :: handle, i, j, k, l(3), u(3)
1988
1989!-----------------------------------------------------------------------------!
1990
1991      CALL timeset(routineN, handle)
1992      IF (scalar .NE. 0.0_dp) THEN
1993         l(1) = LBOUND(rs1%r, 1); l(2) = LBOUND(rs1%r, 2); l(3) = LBOUND(rs1%r, 3)
1994         u(1) = UBOUND(rs1%r, 1); u(2) = UBOUND(rs1%r, 2); u(3) = UBOUND(rs1%r, 3)
1995!$OMP PARALLEL DO DEFAULT(NONE) COLLAPSE(3) &
1996!$OMP             PRIVATE(i,j,k) &
1997!$OMP             SHARED(rs1,rs2,rs3,scalar,l,u)
1998         DO k = l(3), u(3)
1999         DO j = l(2), u(2)
2000         DO i = l(1), u(1)
2001            rs1%r(i, j, k) = rs1%r(i, j, k) + scalar*rs2%r(i, j, k)*rs3%r(i, j, k)
2002         ENDDO
2003         ENDDO
2004         ENDDO
2005!$OMP END PARALLEL DO
2006      ENDIF
2007      CALL timestop(handle)
2008   END SUBROUTINE rs_grid_mult_and_add
2009
2010! **************************************************************************************************
2011!> \brief Set box matrix info for real space grid
2012!>      This is needed for variable cell simulations
2013!> \param pw_grid ...
2014!> \param rs ...
2015!> \par History
2016!>      none
2017!> \author JGH (15-May-2007)
2018! **************************************************************************************************
2019   SUBROUTINE rs_grid_set_box(pw_grid, rs)
2020
2021      TYPE(pw_grid_type), POINTER                        :: pw_grid
2022      TYPE(realspace_grid_type), POINTER                 :: rs
2023
2024      CHARACTER(len=*), PARAMETER :: routineN = 'rs_grid_set_box', &
2025         routineP = moduleN//':'//routineN
2026
2027      CPASSERT(ASSOCIATED(pw_grid))
2028      CPASSERT(ASSOCIATED(rs))
2029      CPASSERT(rs%desc%grid_id == pw_grid%id_nr)
2030      rs%desc%dh = pw_grid%dh
2031      rs%desc%dh_inv = pw_grid%dh_inv
2032
2033   END SUBROUTINE rs_grid_set_box
2034
2035! **************************************************************************************************
2036!> \brief retains the given rs grid (see doc/ReferenceCounting.html)
2037!> \param rs_grid the grid to retain
2038!> \par History
2039!>      03.2003 created [fawzi]
2040!> \author fawzi
2041! **************************************************************************************************
2042   SUBROUTINE rs_grid_retain(rs_grid)
2043      TYPE(realspace_grid_type), POINTER                 :: rs_grid
2044
2045      CHARACTER(len=*), PARAMETER :: routineN = 'rs_grid_retain', routineP = moduleN//':'//routineN
2046
2047      CPASSERT(ASSOCIATED(rs_grid))
2048      CPASSERT(rs_grid%ref_count > 0)
2049      rs_grid%ref_count = rs_grid%ref_count + 1
2050   END SUBROUTINE rs_grid_retain
2051
2052! **************************************************************************************************
2053!> \brief retains the given rs grid descriptor (see doc/ReferenceCounting.html)
2054!> \param rs_desc the grid descriptor to retain
2055!> \par History
2056!>      04.2009 created [Iain Bethune]
2057!>        (c) The Numerical Algorithms Group (NAG) Ltd, 2009 on behalf of the HECToR project
2058! **************************************************************************************************
2059   SUBROUTINE rs_grid_retain_descriptor(rs_desc)
2060      TYPE(realspace_grid_desc_type), POINTER            :: rs_desc
2061
2062      CHARACTER(len=*), PARAMETER :: routineN = 'rs_grid_retain_descriptor', &
2063         routineP = moduleN//':'//routineN
2064
2065      CPASSERT(ASSOCIATED(rs_desc))
2066      CPASSERT(rs_desc%ref_count > 0)
2067      rs_desc%ref_count = rs_desc%ref_count + 1
2068   END SUBROUTINE rs_grid_retain_descriptor
2069
2070! **************************************************************************************************
2071!> \brief releases the given rs grid (see doc/ReferenceCounting.html)
2072!> \param rs_grid the rs grid to release
2073!> \par History
2074!>      03.2003 created [fawzi]
2075!> \author fawzi
2076! **************************************************************************************************
2077   SUBROUTINE rs_grid_release(rs_grid)
2078      TYPE(realspace_grid_type), POINTER                 :: rs_grid
2079
2080      CHARACTER(len=*), PARAMETER :: routineN = 'rs_grid_release', &
2081         routineP = moduleN//':'//routineN
2082
2083      IF (ASSOCIATED(rs_grid)) THEN
2084         CPASSERT(rs_grid%ref_count > 0)
2085         rs_grid%ref_count = rs_grid%ref_count - 1
2086         IF (rs_grid%ref_count == 0) THEN
2087
2088            CALL rs_grid_release_descriptor(rs_grid%desc)
2089
2090            allocated_rs_grid_count = allocated_rs_grid_count - 1
2091            DEALLOCATE (rs_grid%r)
2092            DEALLOCATE (rs_grid%px)
2093            DEALLOCATE (rs_grid%py)
2094            DEALLOCATE (rs_grid%pz)
2095            DEALLOCATE (rs_grid)
2096         END IF
2097      END IF
2098   END SUBROUTINE rs_grid_release
2099
2100! **************************************************************************************************
2101!> \brief releases the given rs grid descriptor (see doc/ReferenceCounting.html)
2102!> \param rs_desc the rs grid descriptor to release
2103!> \par History
2104!>      04.2009 created [Iain Bethune]
2105!>        (c) The Numerical Algorithms Group (NAG) Ltd, 2009 on behalf of the HECToR project
2106! **************************************************************************************************
2107   SUBROUTINE rs_grid_release_descriptor(rs_desc)
2108      TYPE(realspace_grid_desc_type), POINTER            :: rs_desc
2109
2110      CHARACTER(len=*), PARAMETER :: routineN = 'rs_grid_release_descriptor', &
2111         routineP = moduleN//':'//routineN
2112
2113      IF (ASSOCIATED(rs_desc)) THEN
2114         CPASSERT(rs_desc%ref_count > 0)
2115         rs_desc%ref_count = rs_desc%ref_count - 1
2116         IF (rs_desc%ref_count == 0) THEN
2117
2118            CALL pw_grid_release(rs_desc%pw)
2119
2120            IF (rs_desc%parallel) THEN
2121               ! release the group communicator
2122               CALL mp_comm_free(rs_desc%group)
2123
2124               DEALLOCATE (rs_desc%virtual2real)
2125               DEALLOCATE (rs_desc%real2virtual)
2126            END IF
2127
2128            IF (rs_desc%distributed) THEN
2129               DEALLOCATE (rs_desc%rank2coord)
2130               DEALLOCATE (rs_desc%coord2rank)
2131               DEALLOCATE (rs_desc%lb_global)
2132               DEALLOCATE (rs_desc%ub_global)
2133               DEALLOCATE (rs_desc%x2coord)
2134               DEALLOCATE (rs_desc%y2coord)
2135               DEALLOCATE (rs_desc%z2coord)
2136            ENDIF
2137
2138            DEALLOCATE (rs_desc)
2139         END IF
2140      END IF
2141   END SUBROUTINE rs_grid_release_descriptor
2142
2143! **************************************************************************************************
2144!> \brief emulates the function of an MPI_cart_shift operation, but the shift is
2145!>        done in virtual coordinates, and the corresponding real ranks are returned
2146!> \param rs_grid ...
2147!> \param dir ...
2148!> \param disp ...
2149!> \param source ...
2150!> \param dest ...
2151!> \par History
2152!>      04.2009 created [Iain Bethune]
2153!>        (c) The Numerical Algorithms Group (NAG) Ltd, 2009 on behalf of the HECToR project
2154! **************************************************************************************************
2155   SUBROUTINE cart_shift(rs_grid, dir, disp, source, dest)
2156
2157      TYPE(realspace_grid_type), POINTER                 :: rs_grid
2158      INTEGER, INTENT(IN)                                :: dir, disp
2159      INTEGER, INTENT(OUT)                               :: source, dest
2160
2161      CHARACTER(len=*), PARAMETER :: routineN = 'cart_shift', routineP = moduleN//':'//routineN
2162
2163      INTEGER, DIMENSION(3)                              :: shift_coords
2164
2165      shift_coords = rs_grid%desc%virtual_group_coor
2166      shift_coords(dir) = MODULO(shift_coords(dir) + disp, rs_grid%desc%group_dim(dir))
2167      dest = rs_grid%desc%virtual2real(rs_grid%desc%coord2rank(shift_coords(1), shift_coords(2), shift_coords(3)))
2168      shift_coords = rs_grid%desc%virtual_group_coor
2169      shift_coords(dir) = MODULO(shift_coords(dir) - disp, rs_grid%desc%group_dim(dir))
2170      source = rs_grid%desc%virtual2real(rs_grid%desc%coord2rank(shift_coords(1), shift_coords(2), shift_coords(3)))
2171
2172   END SUBROUTINE
2173
2174! **************************************************************************************************
2175!> \brief returns the maximum number of points in the local grid of any process
2176!>        to account for the case where the grid may later be reordered
2177!> \param desc ...
2178!> \return ...
2179!> \par History
2180!>      10.2011 created [Iain Bethune]
2181! **************************************************************************************************
2182   FUNCTION rs_grid_max_ngpts(desc) RESULT(max_ngpts)
2183      TYPE(realspace_grid_desc_type), POINTER            :: desc
2184      INTEGER                                            :: max_ngpts
2185
2186      CHARACTER(len=*), PARAMETER :: routineN = 'rs_grid_max_ngpts', &
2187         routineP = moduleN//':'//routineN
2188
2189      INTEGER                                            :: handle, i
2190      INTEGER, DIMENSION(3)                              :: lb, ub
2191
2192      CALL timeset(routineN, handle)
2193
2194      max_ngpts = 0
2195      IF ((desc%pw%para%mode == PW_MODE_LOCAL) .OR. &
2196          (ALL(desc%group_dim == 1))) THEN
2197         CPASSERT(PRODUCT(INT(desc%npts, KIND=int_8)) < HUGE(1))
2198         max_ngpts = PRODUCT(desc%npts)
2199      ELSE
2200         DO i = 0, desc%group_size - 1
2201            lb = desc%lb_global(:, i)
2202            ub = desc%ub_global(:, i)
2203            lb = lb - desc%border*(1 - desc%perd)
2204            ub = ub + desc%border*(1 - desc%perd)
2205            CPASSERT(PRODUCT(INT(ub - lb + 1, KIND=int_8)) < HUGE(1))
2206            max_ngpts = MAX(max_ngpts, PRODUCT(ub - lb + 1))
2207         END DO
2208      END IF
2209
2210      CALL timestop(handle)
2211
2212   END FUNCTION rs_grid_max_ngpts
2213
2214END MODULE realspace_grid_types
2215