1! MPI 3D Navier-Stokes Solver Test
2!
3! Copyright (c) 2012-2014, Sourcery, Inc.
4! All rights reserved.
5!
6! Redistribution and use in source and binary forms, with or without
7! modification, are permitted provided that the following conditions are met:
8!     * Redistributions of source code must retain the above copyright
9!       notice, this list of conditions and the following disclaimer.
10!     * Redistributions in binary form must reproduce the above copyright
11!       notice, this list of conditions and the following disclaimer in the
12!       documentation and/or other materials provided with the distribution.
13!     * Neither the name of the Sourcery, Inc., nor the
14!       names of its contributors may be used to endorse or promote products
15!       derived from this software without specific prior written permission.
16!
17! THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
18! ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19! WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
20! DISCLAIMED. IN NO EVENT SHALL SOURCERY, INC., BE LIABLE FOR ANY
21! DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
22! (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
23! LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
24! ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25! (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
26!
27
28!(*----------------------------------------------------------------------------------------------------------------------
29!         basic in-core shear code ( 7 words/node, not threaded, in-core, no file read/write )
30!------------------------------------------------------------------------------------------------------------------------*)
31
32! Define universal constants:
33! In the case of exactly representable numbers, the definitions are useful
34! to ensure subprogram argument type/kind/rank matching without having to
35! repind kind specifiers everywhere.
36module constants_module
37  use iso_fortran_env, only : int64
38  implicit none
39  private
40  public :: one,zero
41  integer(int64), parameter :: one=1_int64,zero=0_int64
42end module
43
44! Initialize the random seed with a varying seed to ensure a different
45! random number sequence for each invocation of subroutine, e.g. for
46! invocations on different images of a coarray parallel program.
47! Setting any seed values to zero is depcretated because it can result
48! in low-quality random number sequences.
49! (Source: https://gcc.gnu.org/onlinedocs/gfortran/RANDOM_005fSEED.html)
50module random_module
51  implicit none
52  private
53  public :: init_random_seed
54contains
55  subroutine init_random_seed()
56            use iso_fortran_env, only: int64
57            implicit none
58            integer, allocatable :: seed(:)
59            integer :: i, n, un, istat, dt(8), pid
60            integer(int64) :: t
61
62            call random_seed(size = n)
63            allocate(seed(n))
64            ! First try if the OS provides a random number generator
65            open(newunit=un, file="/dev/urandom", access="stream", &
66                 form="unformatted", action="read", status="old", iostat=istat)
67            if (istat == 0) then
68               read(un) seed
69               close(un)
70            else
71               ! Fallback to XOR:ing the current time and pid. The PID is
72               ! useful in case one launches multiple instances of the same
73               ! program in parallel.
74               call system_clock(t)
75               if (t == 0) then
76                  call date_and_time(values=dt)
77                  t = (dt(1) - 1970) * 365_int64 * 24 * 60 * 60 * 1000 &
78                       + dt(2) * 31_int64 * 24 * 60 * 60 * 1000 &
79                       + dt(3) * 24_int64 * 60 * 60 * 1000 &
80                       + dt(5) * 60 * 60 * 1000 &
81                       + dt(6) * 60 * 1000 + dt(7) * 1000 &
82                       + dt(8)
83               end if
84               pid = getpid()
85               t = ieor(t, int(pid, kind(t)))
86               do i = 1, n
87                  seed(i) = lcg(t)
88               end do
89            end if
90            call random_seed(put=seed)
91          contains
92            ! This simple PRNG might not be good enough for real work, but is
93            ! sufficient for seeding a better PRNG.
94            function lcg(s)
95              integer :: lcg
96              integer(int64) :: s
97              if (s == 0) then
98                 s = 104729
99              else
100                 s = mod(s, 4294967296_int64)
101              end if
102              s = mod(s * 279470273_int64, 4294967291_int64)
103              lcg = int(mod(s, int(huge(0), int64)), kind(0))
104            end function lcg
105          end subroutine init_random_seed
106end module random_module
107
108module run_size
109    use iso_fortran_env, only : int64,real64
110    implicit none
111      include 'mpif.h'
112        real ::  viscos, shear, b11, b22, b33, b12, velmax, max_velmax
113        integer(int64) ::  nx, ny, nz, nsteps, output_step
114        integer(int64) :: my, mx, first_y, last_y, first_x, last_x
115        integer(int64) :: ierror
116        real(real64) :: cpu_time, tran_time, total_time
117        real(real64) :: max_cpu_time, max_tran_time, max_total_time
118        real(real64) :: min_cpu_time, min_tran_time, min_total_time
119
120        real ::  time, cfl, dt
121        integer(int64) :: my_node, num_nodes
122        real, parameter :: pi = 3.141592653589793
123
124contains
125
126    subroutine global_times()
127        call MPI_REDUCE(total_time, max_total_time, 1, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD, ierror)
128        call MPI_REDUCE(total_time, min_total_time, 1, MPI_DOUBLE, MPI_MIN, 0, MPI_COMM_WORLD, ierror)
129        call MPI_REDUCE(tran_time, max_tran_time, 1, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD, ierror)
130        call MPI_REDUCE(tran_time, min_tran_time, 1, MPI_DOUBLE, MPI_MIN, 0, MPI_COMM_WORLD, ierror)
131        call MPI_REDUCE(cpu_time, max_cpu_time, 1, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD, ierror)
132        call MPI_REDUCE(cpu_time, min_cpu_time, 1, MPI_DOUBLE, MPI_MIN, 0, MPI_COMM_WORLD, ierror)
133    end subroutine global_times
134
135subroutine copy3( A,B, n1, sA1, sB1, n2, sA2, sB2, n3, sA3, sB3 )
136  implicit none
137  complex, intent(in)  :: A(0:*)
138  complex, intent(out) :: B(0:*)
139  integer(int64), intent(in) :: n1, sA1, sB1
140  integer(int64), intent(in) :: n2, sA2, sB2
141  integer(int64), intent(in) :: n3, sA3, sB3
142  integer(int64) i,j,k
143
144  do k=0,n3-1
145     do j=0,n2-1
146        do i=0,n1-1
147           B(i*sB1+j*sB2+k*sB3) = A(i*sA1+j*sA2+k*sA3)
148        end do
149     end do
150  end do
151end subroutine copy3
152
153end module run_size
154
155program mshear
156
157  !(***********************************************************************************************************
158  !                   m a i n   p r o g r a m
159  !***********************************************************************************************************)
160      use run_size
161      implicit none
162
163      interface
164        subroutine solve_navier_stokes
165        end subroutine solve_navier_stokes
166      end interface
167
168  call MPI_INIT(ierror)
169  call MPI_COMM_RANK(MPI_COMM_WORLD, my_node, ierror)
170  call MPI_COMM_SIZE(MPI_COMM_WORLD, num_nodes, ierror)
171
172      if( my_node == 0 ) then
173
174        write(6,*) "nx,ny,nz : ";               read(5,*) nx, ny, nz
175        if ( mod(nx/2,num_nodes) /= 0) then;    write(6,*) "nx/2 not multiple of num_nodes";    stop;   end if
176        if ( mod(ny,num_nodes) /= 0) then;      write(6,*) " ny not multiple of num_nodes";     stop;   end if
177
178        write(6,*) "viscos, shear : ";          read(5,*) viscos, shear
179        write(6,*) "b11 b22 b33 b12 : ";        read(5,*) b11, b22, b33, b12
180        write(6,*) "nsteps, output_step : ";    read(5,*) nsteps, output_step
181
182        write(6,fmt="(3(A,i4))") "nx =",nx,   "   ny =",ny,   "   nz =",nz
183        write(6,fmt="(2(A,f7.3))") "viscos = ", viscos, "      shear = ", shear
184        write(6,fmt="(A,4f7.3)") "b11 b22 b33 b12 = ", b11, b22, b33, b12
185        write(6,fmt="(2(A,i6))") "nsteps = ", nsteps, "       output_step = ", output_step
186
187        write(6,fmt="(A,i4,A)") "----------------- running on ", num_nodes, " images -------------------"
188
189      end if
190
191        call MPI_BCAST( nx, 1, MPI_INT, 0, MPI_COMM_WORLD, ierror )
192        call MPI_BCAST( ny, 1, MPI_INT, 0, MPI_COMM_WORLD, ierror )
193        call MPI_BCAST( nz, 1, MPI_INT, 0, MPI_COMM_WORLD, ierror )
194
195        call MPI_BCAST( viscos, 1, MPI_FLOAT, 0, MPI_COMM_WORLD, ierror )
196        call MPI_BCAST( shear, 1, MPI_FLOAT, 0, MPI_COMM_WORLD, ierror )
197        call MPI_BCAST( b11, 1, MPI_FLOAT, 0, MPI_COMM_WORLD, ierror )
198        call MPI_BCAST( b22, 1, MPI_FLOAT, 0, MPI_COMM_WORLD, ierror )
199        call MPI_BCAST( b33, 1, MPI_FLOAT, 0, MPI_COMM_WORLD, ierror )
200        call MPI_BCAST( b12, 1, MPI_FLOAT, 0, MPI_COMM_WORLD, ierror )
201        call MPI_BCAST( nsteps, 1, MPI_INT, 0, MPI_COMM_WORLD, ierror )
202        call MPI_BCAST( output_step, 1, MPI_INT, 0, MPI_COMM_WORLD, ierror )
203        call MPI_BARRIER(MPI_COMM_WORLD, ierror)
204
205 	    mx = nx/2 / num_nodes;  first_x = my_node*mx + 1;   last_x  = my_node*mx + mx
206        my = ny / num_nodes;    first_y = my_node*my + 1;   last_y  = my_node*my + my
207
208    call solve_navier_stokes
209
210end program mshear
211
212!  (***********************************************************************************************************
213!             n a v i e r - s t o k e s   s o l v e r
214!  ************************************************************************************************************)
215
216  subroutine solve_navier_stokes
217      use run_size
218      implicit none
219
220  !(*****************************   declarations     ****************************************)
221
222       integer(int64) ::  stop, rflag, oflag, step, rkstep, nshells, msg_size
223       real ::  k1(nx/2), k2(ny), k3(nz), mk1(nx/2), mk2(ny), mk3(nz) &
224              , kx(nx/2), ky_(nx/2,ny), ky(nx/2,ny), kz(nz)
225       complex :: sx(nx/2,3), sy(ny,3), sz(nz,3)
226       integer(int64) :: trigx, trigy, trigz, trigxy
227
228       complex, allocatable ::  u(:,:,:,:)    ! u(nz,4,first_x:last_x,ny)    !(*-- x-y planes --*)
229       complex, allocatable ::  ur(:,:,:,:)   !ur(nz,4,first_y:last_y,nx/2)  !(*-- x-z planes --*)
230       complex, allocatable ::  un(:,:,:,:)   !un(nz,3,first_x:last_x,ny)    !(*-- x-y planes --*)
231       complex, allocatable :: bufr(:)
232
233interface
234!--------    note: integer(int64)'s required for FFT's and other assembly-coded externals   ------
235
236   function ctrig( len ) bind(C)               !(*-- define complex FFT trig table --*)
237        import int64
238        integer(int64), value, intent(in) :: len
239        integer(int64) :: ctrig     !-- C pointer!
240   end function ctrig
241
242   function rtrig( len ) bind(C)              !(*-- define real FFT trig table --*)
243        import int64
244        integer(int64), value, intent(in):: len
245        integer(int64) :: rtrig     !-- C pointer!
246   end function rtrig
247
248   subroutine cfft( len, lot, data, inc, jmp, ctrig, isign ) bind(C)    !(*-- complex FFT --*)
249        import int64
250        integer(int64), value, intent(in) :: len, lot, inc, jmp, ctrig, isign
251        complex, dimension(0:0), intent(in) :: data
252   end subroutine cfft
253
254   subroutine rfft( len, lot, data, inc, jmp, rtrig, isign ) bind(C)   !(*-- real FFT --*)
255        import int64
256        integer(int64), value, intent(in) :: len, lot, inc, jmp, rtrig, isign
257        complex, dimension(0:0), intent(in) :: data
258   end subroutine rfft
259
260   function WALLTIME() bind(C, name = "WALLTIME")
261       import real64
262       real(real64) :: WALLTIME
263   end function WALLTIME
264
265
266end interface
267
268       trigx = rtrig( nx )
269       trigy = ctrig( ny )
270       trigz = ctrig( nz )
271	   trigxy = ctrig( nx+ny )
272
273       msg_size = nz*4*mx*my     !-- message size (complex data items)
274
275       allocate (  u(nz , 4 , first_x:last_x , ny)   )   !(*-- y-z planes --*)
276       allocate ( ur(nz , 4 , first_y:last_y , nx/2) )   !(*-- x-z planes --*)
277       allocate ( un(nz , 3 , first_x:last_x , ny)   )   !(*-- y-z planes --*)
278       allocate ( bufr(msg_size) )
279
280       stop = 0;   step = 0;  rkstep = 2;  rflag = 0;  cfl = 1;   dt = 0
281       nshells = max( nx,ny,nz )
282
283                        call define_kspace
284                        call define_field
285                        call enforce_conjugate_symmetry
286						call copy_n_s
287                        call define_shifts
288
289       total_time =  -WALLTIME()      !-- start the clock
290
291       tran_time = 0;   cpu_time = -WALLTIME()
292
293  !(*********************************   begin execution loop   *****************************************)
294
295       do while (stop == 0)
296
297                        call phase1
298            rkstep = 1
299                        call transpose_X_Y
300                        call phase2
301                        call transpose_Y_X
302                        call define_step
303                        call define_shifts
304                        call phase3
305                        call pressure
306		if (oflag /= 0) call spectra
307                        call advance
308                        call phase1
309            rkstep = 2
310                        call transpose_X_Y
311                        call phase2
312                        call transpose_Y_X
313                        call phase3
314                        call advance
315                        call pressure
316        if (rflag /= 0) call remesh
317                        call copy_s_n
318
319                		step = step + 1
320                        time = time + dt
321       end do
322
323  !(*********************************   end execution loop   ***********************************************)
324
325       deallocate ( u, ur, un )
326       deallocate ( bufr )
327       call MPI_BARRIER(MPI_COMM_WORLD, ierror)  !-- other nodes wait for broadcast!
328
329       total_time =  total_time + WALLTIME()    !-- stop the clock
330       cpu_time =  cpu_time + WALLTIME()    !-- stop the clock
331       call global_times
332
333       if (my_node == 0 )   write(6,fmt="(3(10X,A,2f7.2))")  &
334                            , "total_time ", min_total_time/step, max_total_time/step &
335                            , "cpu_time ", min_cpu_time/step, max_cpu_time/step &
336                            , "tran_time ", min_tran_time/step, max_tran_time/step
337
338
339!       write(6,fmt="(A,i4,3f7.2)")  "image ", my_node, total_time/step, cpu_time/step, tran_time/step
340
341
342contains
343
344 !(***********************************************************************************************************
345 !                          transpose the Y and Z planes
346 !***********************************************************************************************************)
347
348!-----                   u(nz,4,mx,my*num_nodes) [num_nodes]
349!-----                  ur(nz,4,my,mx*num_nodes) [num_nodes]
350!-----                bufr(nz,4,my,mx) or bufr(nz,4,mx,my)
351
352!-------------   out-of-place transpose data_s --> data_r  ----------------------------
353
354 subroutine transpose_X_Y
355
356    use constants_module, only : one
357    use run_size
358    implicit none
359
360    integer(int64) :: to, from, stage, idr(0:num_nodes-1), ids, send_tag, recv_tag
361    integer(int64) :: send_status(MPI_STATUS_SIZE), recv_status(MPI_STATUS_SIZE)
362
363    cpu_time = cpu_time + WALLTIME()
364 call MPI_BARRIER(MPI_COMM_WORLD, ierror)   !--  wait for other nodes to finish compute
365    tran_time = tran_time - WALLTIME()
366
367!--------------   issue all block receives ... tags = 256*dst_image+src_image  ------------------
368
369    do stage = 1, num_nodes-1
370        from = mod( my_node+stage, num_nodes )
371        recv_tag = 256*my_node + from
372        call MPI_IRECV  ( ur(1,1,first_y,1+from*mx) &
373                        ,   msg_size*2,  MPI_REAL,  from, recv_tag,  MPI_COMM_WORLD,  idr(stage),  ierror)
374    end do
375
376!--------------   transpose my image's block (no communication needed)  ------------------
377
378    call copy3 (    u(1,1,first_x,1+my_node*my) &                   !-- intra-node transpose
379                ,  ur(1,1,first_y,1+my_node*mx) &                   !-- no inter-node transpose needed
380                ,   nz*3, one, one        &                                 !-- note: only 3 of 4 words needed
381                ,   mx, nz*4, nz*4*my &
382                ,   my, nz*4*mx, nz*4 )
383
384!--------------   issue all block sends ... tags = 256*dst_image+src_image  ------------------
385
386
387    do stage = 1, num_nodes-1   !-- process sends in order
388        to = mod( my_node+stage, num_nodes )
389        send_tag = 256*to + my_node
390        call copy3 ( u(1,1,first_x,1+to*my), bufr &                !-- intra-node transpose from buffer
391                ,   nz*3, one, one        &                             !-- note: only 3 of 4 words needed
392                ,   mx, nz*4, nz*4*my &
393                ,   my, nz*4*mx, nz*4 )
394
395        call MPI_SEND ( bufr &
396                        ,   msg_size*2,  MPI_REAL,  to, send_tag,  MPI_COMM_WORLD,  ierror)
397    end do
398
399!--------------   wait on receives   ------------------
400
401    do stage = 1, num_nodes-1
402        call MPI_WAIT( idr(stage),  recv_status,  ierror )
403    end do
404
405call MPI_BARRIER(MPI_COMM_WORLD, ierror)     !--  wait for other nodes to finish transpose (not needed)
406    tran_time = tran_time + WALLTIME()
407    cpu_time = cpu_time - WALLTIME()
408
409 end  subroutine transpose_X_Y
410
411!-------------   out-of-place transpose data_r --> data_s  ----------------------------
412
413 subroutine transpose_Y_X
414
415    use constants_module, only : one
416    use run_size
417    implicit none
418
419    integer(int64) :: to, from, stage, idr(0:num_nodes-1), ids, send_tag, recv_tag
420    integer(int64) :: send_status(MPI_STATUS_SIZE), recv_status(MPI_STATUS_SIZE)
421
422    cpu_time = cpu_time + WALLTIME()
423 call MPI_BARRIER(MPI_COMM_WORLD, ierror)   !--  wait for other nodes to finish compute
424    tran_time = tran_time - WALLTIME()
425
426!--------------   issue all block receives ... tags = 256*dst_image+src_image  ------------------
427
428    do stage = 1, num_nodes-1
429        from = mod( my_node+stage, num_nodes )
430        recv_tag = 256*my_node + from
431        call MPI_IRECV  ( u(1,1,first_x,1+from*my) &
432                        ,   msg_size*2,  MPI_REAL,  from, recv_tag,  MPI_COMM_WORLD,  idr(stage),  ierror)
433    end do
434
435!--------------   transpose my image's block (no communication needed)  ------------------
436
437    call copy3 (   ur(1,1,first_y,1+my_node*mx) &                   !-- intra-node transpose
438                ,   u(1,1,first_x,1+my_node*my) &                   !-- no inter-node transpose needed
439                ,   nz*4, one, one    &                                 !-- note: all 4 words needed
440                ,   my, nz*4, nz*4*mx &
441                ,   mx, nz*4*my, nz*4 )
442
443!--------------   issue all block sends ... tags = 256*dst_image+src_image  ------------------
444
445
446    do stage = 1, num_nodes-1   !-- process sends in order
447        to = mod( my_node+stage, num_nodes )
448        send_tag = 256*to + my_node
449        call copy3 ( ur(1,1,first_y,1+to*mx), bufr &                !-- intra-node transpose from buffer
450                ,   nz*4, one, one    &                                 !-- note: all 4 words needed
451                ,   my, nz*4, nz*4*mx &
452                ,   mx, nz*4*my, nz*4 )
453
454        call MPI_SEND ( bufr &
455                        ,   msg_size*2,  MPI_REAL,  to, send_tag,  MPI_COMM_WORLD,  ierror)
456    end do
457
458!--------------   wait on receives   ------------------
459
460    do stage = 1, num_nodes-1
461        call MPI_WAIT( idr(stage),  recv_status,  ierror )
462    end do
463
464call MPI_BARRIER(MPI_COMM_WORLD, ierror)     !--  wait for other nodes to finish transpose
465    tran_time = tran_time + WALLTIME()
466    cpu_time = cpu_time - WALLTIME()
467
468 end  subroutine transpose_Y_X
469
470
471!(*************************************************************************************************************
472!           enforce conjugate symmetry for plane kx=0 of wavespace  (half of this plane is redundant)
473!***************************************************************************************************************)
474
475  subroutine enforce_conjugate_symmetry
476
477    integer(int64) ::  i, x, y, z
478
479!(*------------------------- un( K ) = conjg( un( -K ) ) ---------------------*)
480
481    if (my_node == 0 ) then     !-- x=1 is in node=1
482        x = 1
483        do i = 1, 3
484            z = 1;              y = 1;              un(z,i,x,y) = 0
485            z = 1;              do y = 2, ny/2;     un(z,i,x,y) = conjg( un(z,i,x,ny+2-y) );        end do
486            do z = 2, nz/2;     y = 1;              un(z,i,x,y) = conjg( un(nz+2-z,i,x,y) );        end do
487            do z = 2, nz/2;     do y = 2, ny;       un(z,i,x,y) = conjg( un(nz+2-z,i,x,ny+2-y) );   end do;	end do
488        end do
489    end if
490end  subroutine enforce_conjugate_symmetry
491
492 !(***********************************************************************************************************
493 !                spectra :  accumulate spectra and other statistics over flow field
494 !***********************************************************************************************************)
495
496 subroutine  spectra
497
498      use run_size
499     implicit none
500
501    integer(int64) :: k, x, y, z
502    real :: kk, ww, uw, uu, uv, duu, factor   &
503          , ek(nshells), dk(nshells), hk(nshells), tk(nshells), sample(nshells)
504    real, save ::  sum_ek, sum_dk, sum_hk, sum_tk, ek_sum, dk_sum, hk_sum, tk_sum
505
506      total_time =  total_time + WALLTIME()     !-- stop the clock!  time/step does not include spectra time
507
508	  oflag = 0
509      ek = 0;       dk = 0;       hk = 0;     tk = 0;   sample = 0
510
511 !(*---------------------   three dimensional spectra  -----------------------*)
512
513      do x = first_x, last_x;   do y = 1, ny;      do  z = 1, nz
514
515            if( mk1(x)+mk2(y)+mk3(z) > 2./9. ) &
516                              then;   factor = 0
517            else if (x == 1)  then;   factor = 1
518                              else;   factor = 2
519            end if
520
521            kk = kx(x)**2 + ky(x,y)**2 + kz(z)**2
522            k = 1 + int( sqrt( kk ) + 0.5 )
523
524			  uu = factor * real( un(z,1,x,y) * conjg( un(z,1,x,y) ) &
525                                + un(z,2,x,y) * conjg( un(z,2,x,y) ) &
526                                + un(z,3,x,y) * conjg( un(z,3,x,y) ) )
527              ww = kk * uu
528              uv = factor * real( un(z,1,x,y) * conjg( un(z,2,x,y) ) )
529
530              uw = factor * 2 * aimag( kx(x) *   un(z,2,x,y) * conjg( un(z,3,x,y) ) &
531                                     + ky(x,y) * un(z,3,x,y) * conjg( un(z,1,x,y) ) &
532                                     + kz(z) *   un(z,1,x,y) * conjg( un(z,2,x,y) ) )
533
534			  duu = factor * real( un(z,1,x,y) * conjg( u(z,1,x,y) ) &
535                                 + un(z,2,x,y) * conjg( u(z,2,x,y) ) &
536								 + un(z,3,x,y) * conjg( u(z,3,x,y) ) ) / (dt/2) + shear * uv
537
538              sample(k) = sample(k) + factor        !(*-- shell sample --*)
539              ek(k) = ek(k) + uu                    !(*-- 2 * energy sum --*)
540              dk(k) = dk(k) + ww                    !(*-- enstrophy sum --*)
541              hk(k) = hk(k) + uw                    !(*-- helicity sum --*)
542              tk(k) = tk(k) + duu                   !(*-- transfer sum --*)
543
544      end do;   end do;  end do
545
546 !(************************     finished accumulation :  compute final statistics     *************************)
547
548    sum_ek = 0;     sum_dk = 0;     sum_hk = 0;     sum_tk = 0
549    do k = nshells, 1, -1
550        sum_ek  = sum_ek + ek(k)
551        sum_dk  = sum_dk + dk(k)
552        sum_hk  = sum_hk + hk(k)
553        sum_tk  = sum_tk + tk(k)
554    end do
555
556    call MPI_BARRIER(MPI_COMM_WORLD, ierror)
557    call MPI_REDUCE(sum_ek, ek_sum, 1, MPI_FLOAT, MPI_SUM, 0, MPI_COMM_WORLD, ierror);  sum_ek = ek_sum
558    call MPI_REDUCE(sum_dk, dk_sum, 1, MPI_FLOAT, MPI_SUM, 0, MPI_COMM_WORLD, ierror);  sum_dk = dk_sum
559    call MPI_REDUCE(sum_hk, hk_sum, 1, MPI_FLOAT, MPI_SUM, 0, MPI_COMM_WORLD, ierror);  sum_hk = hk_sum
560    call MPI_REDUCE(sum_tk, tk_sum, 1, MPI_FLOAT, MPI_SUM, 0, MPI_COMM_WORLD, ierror);  sum_tk = tk_sum
561
562    if (my_node == 0 ) then
563        if (step == 0)   write(6,*) "step   time     energy    enstrophy   helicity   transfer"
564        write(6,fmt="(i3, 5e11.3)")  step,  time,    sum_ek,   sum_dk,     sum_hk,    sum_tk
565    end if
566
567     total_time =  total_time - WALLTIME()     !-- restart the clock!
568 end  subroutine  spectra
569
570  !(************************************************************************************************************
571  !        define_field  :   define initial flow field from scratch
572  !************************************************************************************************************)
573
574 subroutine define_field
575
576    use random_module, only : init_random_seed
577    use run_size
578    implicit none
579
580    real ::     k, k12, f, phi, theta1, theta2
581    complex ::  alpha, beta
582    integer(int64) ::  x, y, z
583    real, parameter :: klo=8, khi=16
584
585   time = 0
586   call init_random_seed
587
588   do x = first_x, last_x
589       do  y = 1, ny
590            do z = 1, nz
591                 call random_number(theta1)
592                 call random_number(theta2)
593                 call random_number(phi   )
594                 k   = sqrt( kx(x)**2 + ky(x,y)**2 + kz(z)**2 )
595                 k12 = sqrt( kx(x)**2 + ky(x,y)**2 )
596
597                 if ( k == 0  .or.  mk1(x)+mk2(y)+mk3(z)>2./9.  .or.  k < klo  .or.  k > khi ) &
598                     then;   f = 0
599                     else;   f = sqrt( 1./(2*pi) ) / k
600                 end if
601
602                 alpha = f * exp( (0,2) * pi * theta1 ) * cos( 2*pi * phi )
603                 beta  = f * exp( (0,2) * pi * theta2 ) * sin( 2*pi * phi )
604
605                 if (k12 == 0) &
606                 then; un(z,1,x,y) = alpha
607                       un(z,2,x,y) = beta
608                       un(z,3,x,y) = 0
609
610                 else; un(z,1,x,y) = ( beta * kz(z) * kx(x)   + alpha * k * ky(x,y) ) / ( k * k12 )
611                       un(z,2,x,y) = ( beta * kz(z) * ky(x,y) - alpha * k * kx(x)   ) / ( k * k12 )
612                       un(z,3,x,y) = - beta * k12 / k
613                 end if
614
615   end do;  end do; end do
616 end  subroutine define_field
617
618 !(***********************************************************************************************************
619 !          define_shifts  :    define coordinate shifts for control of 1-d alias errors
620 ! ***********************************************************************************************************)
621
622    subroutine  define_shifts
623    use run_size
624    implicit none
625
626           integer ::  seed_size
627           integer(int64) ::  x, y, z, i
628           integer(int64), save ::  init = 0
629           real :: delta_x, delta_y, delta_z
630
631           if (init == 0) &     !-- Note: delta's not carried over from previous run
632           then;
633                init = 1
634                call random_seed(size=seed_size)
635                call random_seed(put=[(1234567,i=1,seed_size)])!(* same random numbers for each image! *)
636                do  x = 1, nx/2;  sx(x,3) = exp (  (0,1) * ( pi / nx ) * k1(x) ); end do
637                do  y = 1, ny  ;  sy(y,3) = exp (  (0,1) * ( pi / ny ) * k2(y) ); end do
638                do  z = 1, nz  ;  sz(z,3) = exp (  (0,1) * ( pi / nz ) * k3(z) ); end do
639            else;
640                call random_number(delta_x);delta_x = 2*pi / nx * delta_x
641                do  x = 1, nx/2;  sx(x,1) = sx(x,3)
642                                  sx(x,2) = exp (  (0,1) * delta_x * k1(x) )
643                                  sx(x,3) = exp (  (0,1) * ( delta_x + pi / nx ) * k1(x) ); end do
644
645                call random_number(delta_y);delta_y = 2*pi / ny * delta_y
646                do  y = 1, ny  ;  sy(y,1) = sy(y,3)
647                                  sy(y,2) = exp (  (0,1) * delta_y * k2(y) )
648                                  sy(y,3) = exp (  (0,1) * ( delta_y + pi / ny ) * k2(y) ); end do
649
650                call random_number(delta_z);delta_z = 2*pi / nz * delta_z
651                do  z = 1, nz  ;  sz(z,1) = sz(z,3)
652                                  sz(z,2) = exp (  (0,1) * delta_z * k3(z) )
653                                  sz(z,3) = exp (  (0,1) * ( delta_z + pi / nz ) * k3(z) ); end do
654           end if
655
656 end  subroutine  define_shifts
657
658  !(***********************************************************************************************************
659  !       define_step  :   update time, metric, shifts for the next step
660  !**********************************************************************************************************)
661
662  subroutine  define_step
663      use run_size
664      implicit none
665
666    call MPI_BARRIER(MPI_COMM_WORLD, ierror)
667
668    if (cfl /= 0) then
669cpu_time = cpu_time + WALLTIME()
670        call MPI_ALLREDUCE(velmax, max_velmax, 1, MPI_REAL, MPI_MAX, MPI_COMM_WORLD, ierror)
671        velmax = max_velmax
672cpu_time = cpu_time - WALLTIME()
673        dt = cfl / velmax
674    end if
675
676    if (        shear > 0               &
677        .and.  .01*b11*shear*dt < b12   &
678        .and.  b12 <= b11*shear*dt )    then
679            dt = b12 / ( b11 * shear )          !(* limit dt, hit the orthognal mesh *)
680            oflag = 1
681    else if ( mod (step,output_step) == 0 ) then
682            oflag = 1
683    end if
684
685    b12 = b12 - b11 * shear * dt
686
687    if ( b12 < -b22/2 )     rflag = 1                                   !(* remesh at the end of the step? *)
688    if ( step == nsteps )   stop = 1                                    !(* last step? *)
689
690 end   subroutine    define_step
691
692   !(***********************************************************************************************************
693   !      define_kspace  :   define physical wavespace from computational wavespace and metric
694   !**********************************************************************************************************)
695
696  subroutine    define_kspace
697    use run_size
698    implicit none
699
700     integer(int64) ::  x, y, z
701
702       do  x = 1, nx/2   ;   k1(x) = x - 1;     end do
703       do  y = 1, ny/2+1 ;   k2(y) = y - 1;     end do
704       do  z = 1, nz/2+1 ;   k3(z) = z - 1;     end do
705
706       do  y = ny/2+2, ny;   k2(y) = y - 1 - ny;    end do
707       do  z = nz/2+2, nz;   k3(z) = z - 1 - nz;    end do
708
709       do  x = 1, nx/2 ;     mk1(x) = (k1(x)/nx)**2;    kx(x) =   b11 * k1(x);  end do
710       do  z = 1, nz   ;     mk3(z) = (k3(z)/nz)**2;    kz(z) =   b33 * k3(z);  end do
711       do  y = 1, ny   ;     mk2(y) = (k2(y)/ny)**2
712                             do  x = 1, nx/2 ;    ky(x,y) = b22 * k2(y) + b12 * k1(x);  end do; end do
713
714end   subroutine    define_kspace
715
716  !(***********************************************************************************************************
717  !   phase 1 :  on entry, data-plane contains velocity in wave space.  interpolate database, shifted mesh,
718  !              and proceed to physical y space .
719  !************************************************************************************************************)
720
721  subroutine  phase1
722
723      use constants_module, only : one
724      use run_size
725      implicit none
726
727    complex :: shift
728    integer(int64) :: i, x, y, z
729
730   	do  x = first_x, last_x
731
732   		do  y = 1, ny;    do  z = 1, nz
733             shift = sz(z,rkstep+1) * sy(y,rkstep+1) * sx(x,rkstep+1)
734             u(z,1,x,y) = shift * u(z,1,x,y)
735             u(z,2,x,y) = shift * u(z,2,x,y)
736             u(z,3,x,y) = shift * u(z,3,x,y)
737        end do; end do
738
739!(*---------------------------   LEAVING FOURIER WAVE SPACE  --------------------------*)
740
741        do i = 1, 3
742            call cfft ( ny, nz, u(1,i,x,1), nz*4*mx, one, trigy, one );    end do
743	end do
744
745 end   subroutine  phase1
746
747 !(**********************************************************************************************************
748 !     phase 2 :  on entry, data-plane contains velocity in physical y space, and wave x,z space on shifted
749 !                mesh.  Proceed to physical x,z space,  form nonlinear terms, and return to wave x,z space.
750 !***********************************************************************************************************)
751
752 subroutine  phase2
753
754     use constants_module, only : one
755     use run_size
756     implicit none
757
758     complex :: s2(nz,nx/2), vs(nz,nx/2)
759     integer(int64) ::  i, x, y, z
760     real :: v2r, v2i, s2r, s2i, u1r, u1i, u2r, u2i, u3r, u3i, u4r, u4i
761
762  velmax = 0
763
764  do  y = first_y, last_y
765
766      do  x = 1, nx/2 ;  do  z = 1, nz ;   vs(z,x) = ur(z,2,y,x);    end do; end do
767
768      do  i = 1, 3
769           call cfft ( nz, nx/2, ur(1,i,y,1), one, nz*4*my, trigz, one )
770           call rfft ( nx, nz,   ur(1,i,y,1), nz*4*my, one, trigx, one )
771      end do
772
773!(*----------------------------  WELCOME TO PHYSICAL SPACE  --------------------------*)
774
775      do  x = 1, nx/2;  do  z = 1, nz
776           u1r = real(ur(z,1,y,x));  u1i = aimag(ur(z,1,y,x))
777           u2r = real(ur(z,2,y,x));  u2i = aimag(ur(z,2,y,x))
778           u3r = real(ur(z,3,y,x));  u3i = aimag(ur(z,3,y,x))
779
780      if ( rkstep == 1 ) velmax = max( velmax &
781                                     , b11*nx*abs(u1r) + b22*ny*abs(u2r) + b33*nz*abs(u3r) &
782                                     , b11*nx*abs(u1i) + b22*ny*abs(u2i) + b33*nz*abs(u3i) )
783
784           v2r = u2r * u2r;             v2i = u2i * u2i
785           s2r = u1r * u3r;             s2i = u1i * u3i
786           u4r = u2r * u3r;             u4i = u2i * u3i
787           u3r = u3r * u3r  -  v2r;     u3i = u3i * u3i  -  v2i
788           u2r = u1r * u2r;             u2i = u1i * u2i
789           u1r = u1r * u1r  -  v2r;     u1i = u1i * u1i  -  v2i
790
791           s2(z,x)     = cmplx(s2r, s2i)
792           ur(z,1,y,x) = cmplx(u1r, u1i)
793           ur(z,2,y,x) = cmplx(u2r, u2i)
794           ur(z,3,y,x) = cmplx(u3r, u3i)
795           ur(z,4,y,x) = cmplx(u4r, u4i)
796      end do;   end do
797
798!(*----------------------------  LEAVING PHYSICAL SPACE  --------------------------*)
799
800      do  i = 1,  4
801           call rfft ( nx, nz,   ur(1,i,y,1), nz*4*my, one, trigx, -one )
802           do  z = 1, nz ;   ur(z,i,y,1) = cmplx(real(ur(z,i,y,1)),0);   end do
803           call cfft ( nz, nx/2, ur(1,i,y,1), one, nz*4*my, trigz, -one )
804      end do
805
806      call rfft ( nx, nz, s2, nz, one, trigx, -one )
807      do  z = 1, nz ;   s2(z,1) = cmplx(real(s2(z,1)),0);    end do
808      call cfft ( nz, nx/2, s2, one, nz, trigz, -one )
809
810      do  x = 1, nx/2;  do  z = 1, nz
811          ur(z,1,y,x) = kx(x) * ur(z,1,y,x) + kz(z) * s2(z,x)   - (0,1) * 2*nx*nz*shear * vs(z,x)
812          ur(z,3,y,x) = kx(x) * s2(z,x)     + kz(z) * ur(z,3,y,x)
813      end do;   end do
814  end do
815
816 end  subroutine  phase2
817
818  !(***********************************************************************************************************
819  !     phase 3 :  on entry, the data-plane contains the four stresses on a shifted mesh in physical y space,
820  !                wave x,z space.   Return to y  wave space on unshifted mesh and complete time derivative of
821  !                velocity ( not divergence free yet )
822  !***********************************************************************************************************)
823
824  subroutine  phase3
825
826      use constants_module, only : one
827      use run_size
828      implicit none
829
830    integer(int64) :: i, x, y, z
831    complex :: shift
832
833    do  x = first_x, last_x
834
835       do i = 1, 4
836           call cfft ( ny, nz, u(1,i,x,1), nz*4*mx, one, trigy, -one )
837       end do
838
839!(*---------------------------   WELCOME TO FOURIER WAVE SPACE  --------------------------*)
840
841       do  y = 1, ny ;   do  z = 1, nz
842               shift = -dt / (4*nx*ny*nz) * (0,1)*conjg( sy(y,rkstep) * sz(z,rkstep) * sx(x,rkstep) )
843               u(z,1,x,y) = shift * (         u(z,1,x,y) + ky(x,y) * u(z,2,x,y) )
844               u(z,2,x,y) = shift * ( kx(x) * u(z,2,x,y) + kz(z)   * u(z,4,x,y) )
845               u(z,3,x,y) = shift * (         u(z,3,x,y) + ky(x,y) * u(z,4,x,y) )
846       end do;  end do
847    end do
848
849 end   subroutine  phase3
850
851  !(***********************************************************************************************************
852  !   pressure :  add the gradient of a scalar, enforce continuity ( zero divergence )
853  !***********************************************************************************************************)
854
855  subroutine  pressure
856
857      use run_size
858      implicit none
859
860     complex :: psi
861     integer(int64) :: x, y, z
862
863      do x = first_x, last_x ;     do  y = 1, ny
864
865            if ( x /= 1 )  then
866                  do  z = 1, nz
867                        psi = ( kx(x) * u(z,1,x,y) + ky(x,y) * u(z,2,x,y) + kz(z) * u(z,3,x,y) ) &
868                              / ( kx(x)**2 + ky(x,y)**2 + kz(z)**2 )
869                        u(z,1,x,y) = u(z,1,x,y) - kx(x) * psi
870                        u(z,2,x,y) = u(z,2,x,y) - ky(x,y) * psi
871                        u(z,3,x,y) = u(z,3,x,y) - kz(z) * psi
872                   end do
873			else if ( y /= 1 )  then
874                  do  z = 1, nz
875                        psi = ( ky(1,y) * u(z,2,1,y) + kz(z) * u(z,3,1,y) ) &
876                              / ( ky(1,y)**2 + kz(z)**2 )
877                        u(z,2,1,y) = u(z,2,1,y) - ky(1,y) * psi
878                        u(z,3,1,y) = u(z,3,1,y) - kz(z) * psi
879                  end do
880            else
881                  do  z = 1, nz ;    u(z,3,1,1) = 0;     end do
882            end if
883	 end do;    end do
884
885end   subroutine  pressure
886
887!(*****************************************************************************************************************
888!                                remesh  :  remesh the sheared coordinate system
889!*****************************************************************************************************************)
890
891subroutine   remesh
892
893    use constants_module, only : one
894    use run_size
895    implicit none
896
897    complex :: u2(nx+ny,nz), shift(nx+ny)
898    integer(int64) :: i, x, y, z
899
900    write(6,fmt="(A,i4)") "remesh image ", my_node
901
902    total_time =  total_time + WALLTIME()     !-- stop the clock!
903
904    do x = first_x, last_x
905
906        do  y = 1, nx+ny ;   shift(y) =  exp( (0,-2) * pi / (nx+ny) * k1(x) * (y - 1) ) / (nx+ny);    end do
907
908        do  i = 1,  3
909            do  z = 1, nz
910                do  y = 1, ny/2           ;   u2(y,z) = u(z,i,x,y);     end do
911                do  y = ny/2+1, nx+ny/2+1 ;   u2(y,z) = 0;              end do
912                do  y = nx+ny/2+2, nx+ny  ;   u2(y,z) = u(z,i,x,y-nx);  end do
913            end do
914
915            call cfft ( nx+ny, nz, u2, one, nx+ny, trigxy, one )
916
917            do  z = 1, nz ;  do  y = 1, nx+ny ;     u2(y,z) = u2(y,z) * shift(y);  end do;  end do
918
919            call cfft ( nx+ny, nz, u2, one, nx+ny, trigxy, -one )
920
921            do  z = 1, nz
922                do  y = 1, ny/2
923                    if (mk1(x)+mk2(y)+mk3(z) > 2./9.) &
924                        then;  u(z,i,x,y) = 0
925                        else;  u(z,i,x,y) = u2(y,z)
926                    end if
927                end do
928                do  y = ny/2+1, ny
929                    if (mk1(x)+mk2(y)+mk3(z) > 2./9.) &
930                        then;  u(z,i,x,y) = 0
931                        else;  u(z,i,x,y) = u2(y+nx,z)
932                    end if
933                end do
934            end do
935        end do
936
937        do  y = 1, ny ;    ky(x,y) = ky(x,y) + b22 * k1(x);     end do           !(* update ky for this x *)
938
939    end do
940
941    b12 = b12 + b22;     rflag = 0                                               !(*   update metric, account for remesh    *)
942
943    total_time =  total_time - WALLTIME()     !-- restart the clock!
944 end subroutine   remesh
945
946 !(***********************************************************************************************************
947 !         copy_n_s,   copy_s_n :  copy data between data_s and data_n
948 !***********************************************************************************************************)
949
950 subroutine  copy_n_s
951
952     use run_size
953     implicit none
954
955	integer(int64) ::  x, y, z
956
957	do y = 1, ny;    do x = first_x, last_x;   do z = 1, nz
958		u(z,1,x,y) = un(z,1,x,y)
959		u(z,2,x,y) = un(z,2,x,y)
960    	u(z,3,x,y) = un(z,3,x,y)
961	end do; end do; end do
962 end  subroutine  copy_n_s
963
964 subroutine  copy_s_n
965
966     use run_size
967     implicit none
968
969	integer(int64) ::  x, y, z
970
971	do y = 1, ny;	do x = first_x, last_x;     do z = 1, nz
972		un(z,1,x,y) = u(z,1,x,y)
973		un(z,2,x,y) = u(z,2,x,y)
974		un(z,3,x,y) = u(z,3,x,y)
975    end do; end do; end do
976
977 end  subroutine  copy_s_n
978
979 !(***********************************************************************************************************
980 !                         advance :     second-order runge-kutta time step algorithm
981 !***********************************************************************************************************)
982
983 subroutine  advance
984
985     use run_size
986     implicit none
987
988    integer(int64) ::  x, y, z
989    real :: factor, xyfac, zfac(nz)	  !(* viscous integrating factors *)
990
991    if (rkstep == 1) then
992        do z = 1, nz;     zfac(z) = exp( - viscos * dt * kz(z)**2 ); end do
993
994        do x = first_x, last_x
995            do  y = 1, ny
996                ky_(x,y) = ky(x,y)
997                ky(x,y) = b22 * k2(y) + b12 * k1(x)
998
999                do z = 1, nz
1000                    if (mk1(x)+mk2(y)+mk3(z) > 2./9.) then
1001                        u(z,1,x,y) = 0;     u(z,2,x,y) = 0;     u(z,3,x,y) = 0
1002                    else
1003                        factor = zfac(z) * exp( - viscos * dt * ( kx(x)**2 + ( ky_(x,y)**2 + ky_(x,y)*ky(x,y) + ky(x,y)**2 )/3 ) )
1004
1005                        un(z,1,x,y) = factor * ( un(z,1,x,y) + u(z,1,x,y) )
1006                        u(z,1,x,y)  = un(z,1,x,y) + factor * u(z,1,x,y)
1007
1008                        un(z,2,x,y) = factor * ( un(z,2,x,y) + u(z,2,x,y) )
1009                        u(z,2,x,y)  = un(z,2,x,y) + factor * u(z,2,x,y)
1010
1011                        un(z,3,x,y) = factor * ( un(z,3,x,y) + u(z,3,x,y) )
1012                        u(z,3,x,y)  = un(z,3,x,y) + factor * u(z,3,x,y)
1013                    end if
1014        end do;  end do; end do
1015
1016    else if (rkstep == 2) then
1017
1018        do x = first_x, last_x
1019            do  y = 1, ny
1020                do z = 1, nz
1021                    if (mk1(x)+mk2(y)+mk3(z) > 2./9.) then
1022                        u(z,1,x,y) = 0;     u(z,2,x,y) = 0;     u(z,3,x,y) = 0
1023                    else
1024                        u(z,1,x,y) = un(z,1,x,y) + u(z,1,x,y)
1025                        u(z,2,x,y) = un(z,2,x,y) + u(z,2,x,y)
1026                        u(z,3,x,y) = un(z,3,x,y) + u(z,3,x,y)
1027                    end if
1028        end do; end do; end do
1029
1030    end if
1031
1032 end  subroutine  advance
1033
1034 end   subroutine solve_navier_stokes
1035