From 0e235de552f726bd3c2f5ae20440059db2f51e67 Mon Sep 17 00:00:00 2001 From: Alfredo Buttari Date: Thu, 29 Sep 2005 09:02:07 +0000 Subject: [PATCH] *** empty log message *** --- src/internals/psi_dswapdata.f90 | 267 ++++++++++++++++--------------- src/internals/psi_dswaptran.f90 | 271 ++++++++++++++++++-------------- src/modules/psb_const.fh | 1 + src/modules/psb_const_mod.f90 | 1 + src/modules/psb_prec_mod.f90 | 2 +- src/modules/psb_prec_type.f90 | 27 ++++ src/modules/psi_mod.f90 | 6 +- src/prec/psb_dcslu.f90 | 2 +- src/prec/psb_dprec.f90 | 8 +- src/psblas/psb_dspmm.f90 | 30 ++-- src/serial/f77/dswmm.f | 2 +- src/serial/psb_dcsdp.f90 | 2 +- src/serial/psb_dfixcoo.f90 | 4 +- test/pargen/RUNS/ppde.inp | 5 +- test/pargen/ppde90.f90 | 153 +++++++++--------- 15 files changed, 424 insertions(+), 357 deletions(-) diff --git a/src/internals/psi_dswapdata.f90 b/src/internals/psi_dswapdata.f90 index b28db747..8cef990f 100644 --- a/src/internals/psi_dswapdata.f90 +++ b/src/internals/psi_dswapdata.f90 @@ -1,4 +1,4 @@ -subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info) +subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info,data) use psb_error_mod use psb_descriptor_type @@ -10,6 +10,7 @@ subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info) real(kind(1.d0)) :: y(:,:), beta real(kind(1.d0)), target :: work(:) type(psb_desc_type) :: desc_a + integer, optional :: data ! locals integer :: icontxt, nprow, npcol, myrow,& @@ -20,7 +21,7 @@ subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info) & snd_pt, rcv_pt integer :: blacs_pnum, krecvid, ksendid integer, pointer, dimension(:) :: bsdidx, brvidx,& - & sdsz, rvsz, prcid, ptp, rvhd, h_idx + & sdsz, rvsz, prcid, ptp, rvhd, d_idx integer :: int_err(5) logical :: swap_mpi, swap_sync, swap_send, swap_recv real(kind(1.d0)), pointer, dimension(:) :: sndbuf, rcvbuf @@ -95,7 +96,19 @@ subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info) swap_sync = iand(flag,psb_swap_sync_).ne.0 swap_send = iand(flag,psb_swap_send_).ne.0 swap_recv = iand(flag,psb_swap_recv_).ne.0 - h_idx => desc_a%halo_index + + if(present(data)) then + if(data.eq.psb_comm_halo_) then + d_idx => desc_a%halo_index + else if(data.eq.psb_comm_ovr_) then + d_idx => desc_a%ovrlap_index + else + d_idx => desc_a%halo_index + end if + else + d_idx => desc_a%halo_index + end if + idxs = 1 idxr = 1 totxch = 0 @@ -103,11 +116,11 @@ subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info) rvhd(:) = mpi_request_null ! prepare info for communications - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm.ne.-1) if(proc_to_comm .ne. myrow) totxch = totxch+1 - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) prcid(proc_to_comm) = blacs_pnum(icontxt,proc_to_comm,mycol) ptp(proc_to_comm) = point_to_proc @@ -121,7 +134,7 @@ subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info) idxs = idxs+sdsz(proc_to_comm) point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do if((idxr+idxs).lt.size(work)) then @@ -140,19 +153,19 @@ subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info) ! gather elements into sendbuffer for swapping point_to_proc = 1 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm .ne. -1) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) idx_pt = point_to_proc+nerv+psb_elem_send_ snd_pt = bsdidx(proc_to_comm) - call psi_gth(nesd,n,h_idx(idx_pt:idx_pt+nesd-1),& + call psi_gth(nesd,n,d_idx(idx_pt:idx_pt+nesd-1),& & y,sndbuf(snd_pt:snd_pt+nesd*n-1)) point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do ! swap elements using mpi_alltoallv @@ -168,32 +181,32 @@ subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info) ! scatter elements from receivebuffer after swapping point_to_proc = 1 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm .ne. -1) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) idx_pt = point_to_proc+psb_elem_recv_ rcv_pt = brvidx(proc_to_comm) - call psi_sct(nerv,n,h_idx(idx_pt:idx_pt+nerv-1),& + call psi_sct(nerv,n,d_idx(idx_pt:idx_pt+nerv-1),& & rcvbuf(rcv_pt:rcv_pt+n*nerv-1),beta,y) point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do else if (swap_sync) then point_to_proc = 1 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm .ne. -1) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) if (proc_to_comm .lt. myrow) then ! First I send idx_pt = point_to_proc+nerv+psb_elem_send_ snd_pt = bsdidx(proc_to_comm) - call psi_gth(nesd,n,h_idx(idx_pt:idx_pt+nesd-1),& + call psi_gth(nesd,n,d_idx(idx_pt:idx_pt+nesd-1),& & y,sndbuf(snd_pt:snd_pt+nesd*n-1)) call dgesd2d(icontxt,nesd,n,sndbuf(snd_pt),nesd,proc_to_comm,0) ! Then I receive @@ -206,50 +219,50 @@ subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info) ! Then I send idx_pt = point_to_proc+nerv+psb_elem_send_ snd_pt = bsdidx(proc_to_comm) - call psi_gth(nesd,n,h_idx(idx_pt:idx_pt+nesd-1),& + call psi_gth(nesd,n,d_idx(idx_pt:idx_pt+nesd-1),& & y,sndbuf(snd_pt:snd_pt+nesd*n-1)) call dgesd2d(icontxt,nesd,n,sndbuf(snd_pt),nesd,proc_to_comm,0) else if (proc_to_comm .eq. myrow) then ! I send to myself idx_pt = point_to_proc+nerv+psb_elem_send_ snd_pt = bsdidx(proc_to_comm) - call psi_gth(nesd,n,h_idx(idx_pt:idx_pt+nesd-1),& + call psi_gth(nesd,n,d_idx(idx_pt:idx_pt+nesd-1),& & y,sndbuf(snd_pt:snd_pt+nesd*n-1)) end if point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do point_to_proc = 1 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm .ne. -1) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) if(proc_to_comm.ne.myrow) then idx_pt = point_to_proc+psb_elem_recv_ rcv_pt = brvidx(proc_to_comm) - call psi_sct(nerv,n,h_idx(idx_pt:idx_pt+nerv-1),& + call psi_sct(nerv,n,d_idx(idx_pt:idx_pt+nerv-1),& & rcvbuf(rcv_pt:rcv_pt+n*nerv-1),beta,y) else idx_pt = point_to_proc+psb_elem_recv_ snd_pt = bsdidx(proc_to_comm) - call psi_sct(nerv,n,h_idx(idx_pt:idx_pt+nerv-1),& + call psi_sct(nerv,n,d_idx(idx_pt:idx_pt+nerv-1),& & sndbuf(snd_pt:snd_pt+n*nesd-1),beta,y) end if point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do else if (swap_send .and. swap_recv) then ! First I post all the non blocking receives point_to_proc = 1 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm .ne. -1) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) if(proc_to_comm.ne.myrow) then p2ptag = krecvid(icontxt,proc_to_comm,myrow) @@ -266,20 +279,20 @@ subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info) end if point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do ! Then I post all the blocking sends point_to_proc = 1 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm .ne. -1) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) idx_pt = point_to_proc+nerv+psb_elem_send_ snd_pt = bsdidx(proc_to_comm) - call psi_gth(nesd,n,h_idx(idx_pt:idx_pt+nesd-1),& + call psi_gth(nesd,n,d_idx(idx_pt:idx_pt+nesd-1),& & y,sndbuf(snd_pt:snd_pt+nesd*n-1)) if(proc_to_comm .ne. myrow) then @@ -295,7 +308,7 @@ subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info) end if end if point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do do i=1, totxch @@ -310,13 +323,13 @@ subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info) if (ixrec .ne. mpi_undefined) then ixrec=ixrec-1 ! mpi_waitany returns an 1 to nprow index point_to_proc = ptp(ixrec) - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) idx_pt = point_to_proc+psb_elem_recv_ rcv_pt = brvidx(proc_to_comm) - call psi_sct(nerv,n,h_idx(idx_pt:idx_pt+nerv-1),& + call psi_sct(nerv,n,d_idx(idx_pt:idx_pt+nerv-1),& & rcvbuf(rcv_pt:rcv_pt+n*nerv-1),beta,y) else int_err(1) = ixrec @@ -327,20 +340,20 @@ subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info) end do point_to_proc = 1 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm .ne. -1) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) if(proc_to_comm .eq. myrow) then idx_pt = point_to_proc+psb_elem_recv_ snd_pt = bsdidx(proc_to_comm) - call psi_sct(nerv,n,h_idx(idx_pt:idx_pt+nerv-1),& + call psi_sct(nerv,n,d_idx(idx_pt:idx_pt+nerv-1),& & sndbuf(snd_pt:snd_pt+n*nesd-1),beta,y) end if point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do @@ -348,45 +361,45 @@ subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info) else if (swap_send) then point_to_proc = 1 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm .ne. -1) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) idx_pt = point_to_proc+nerv+psb_elem_send_ snd_pt = bsdidx(proc_to_comm) - call psi_gth(nesd,n,h_idx(idx_pt:idx_pt+nesd-1),& + call psi_gth(nesd,n,d_idx(idx_pt:idx_pt+nesd-1),& & y,sndbuf(snd_pt:snd_pt+nesd*n-1)) call dgesd2d(icontxt,nesd,n,sndbuf(snd_pt),nesd,proc_to_comm,0) point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do else if (swap_recv) then point_to_proc = 1 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm .ne. -1) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) if(proc_to_comm.ne.myrow) then rcv_pt = brvidx(proc_to_comm) call dgerv2d(icontxt,nerv,n,rcvbuf(rcv_pt),nerv,proc_to_comm,0) idx_pt = point_to_proc+psb_elem_recv_ rcv_pt = brvidx(proc_to_comm) - call psi_sct(nerv,n,h_idx(idx_pt:idx_pt+nerv-1),& + call psi_sct(nerv,n,d_idx(idx_pt:idx_pt+nerv-1),& & rcvbuf(rcv_pt:rcv_pt+n*nerv-1),beta,y) else idx_pt = point_to_proc+psb_elem_recv_ snd_pt = bsdidx(proc_to_comm) - call psi_sct(nerv,n,h_idx(idx_pt:idx_pt+nerv-1),& + call psi_sct(nerv,n,d_idx(idx_pt:idx_pt+nerv-1),& & sndbuf(snd_pt:snd_pt+n*nesd-1),beta,y) end if point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do end if @@ -404,12 +417,7 @@ subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info) end subroutine psi_dswapdatam - - - - - -subroutine psi_dswapdatav(flag,beta,y,desc_a,work,info) +subroutine psi_dswapdatav(flag,beta,y,desc_a,work,info,data) use psb_error_mod use psb_descriptor_type @@ -421,6 +429,7 @@ subroutine psi_dswapdatav(flag,beta,y,desc_a,work,info) real(kind(1.d0)) :: y(:), beta real(kind(1.d0)), target :: work(:) type(psb_desc_type) :: desc_a + integer, optional :: data ! locals integer :: icontxt, nprow, npcol, myrow,& @@ -431,7 +440,7 @@ subroutine psi_dswapdatav(flag,beta,y,desc_a,work,info) & snd_pt, rcv_pt, n integer, pointer, dimension(:) :: bsdidx, brvidx,& - & sdsz, rvsz, prcid, ptp, rvhd, h_idx + & sdsz, rvsz, prcid, ptp, rvhd, d_idx integer :: blacs_pnum, krecvid, ksendid integer :: int_err(5) logical :: swap_mpi, swap_sync, swap_send, swap_recv @@ -508,7 +517,19 @@ subroutine psi_dswapdatav(flag,beta,y,desc_a,work,info) swap_sync = iand(flag,psb_swap_sync_).ne.0 swap_send = iand(flag,psb_swap_send_).ne.0 swap_recv = iand(flag,psb_swap_recv_).ne.0 - h_idx => desc_a%halo_index + + if(present(data)) then + if(data.eq.psb_comm_halo_) then + d_idx => desc_a%halo_index + else if(data.eq.psb_comm_ovr_) then + d_idx => desc_a%ovrlap_index + else + d_idx => desc_a%halo_index + end if + else + d_idx => desc_a%halo_index + end if + idxs = 1 idxr = 1 totxch = 0 @@ -517,11 +538,11 @@ subroutine psi_dswapdatav(flag,beta,y,desc_a,work,info) n=1 ! prepare info for communications - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm.ne.-1) if(proc_to_comm .ne. myrow) totxch = totxch+1 - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) prcid(proc_to_comm) = blacs_pnum(icontxt,proc_to_comm,mycol) ptp(proc_to_comm) = point_to_proc @@ -535,7 +556,7 @@ subroutine psi_dswapdatav(flag,beta,y,desc_a,work,info) idxs = idxs+sdsz(proc_to_comm) point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do if((idxr+idxs).lt.size(work)) then @@ -554,18 +575,18 @@ subroutine psi_dswapdatav(flag,beta,y,desc_a,work,info) ! gather elements into sendbuffer for swapping point_to_proc = 1 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm .ne. -1) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) idx_pt = point_to_proc+nerv+psb_elem_send_ snd_pt = bsdidx(proc_to_comm) - call psi_gth(nesd,h_idx(idx_pt:idx_pt+nesd-1),& + call psi_gth(nesd,d_idx(idx_pt:idx_pt+nesd-1),& & y,sndbuf(snd_pt:snd_pt+nesd-1)) point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do ! swap elements using mpi_alltoallv @@ -581,32 +602,32 @@ subroutine psi_dswapdatav(flag,beta,y,desc_a,work,info) ! scatter elements from receivebuffer after swapping point_to_proc = 1 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm .ne. -1) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) idx_pt = point_to_proc+psb_elem_recv_ rcv_pt = brvidx(proc_to_comm) - call psi_sct(nerv,h_idx(idx_pt:idx_pt+nerv-1),& + call psi_sct(nerv,d_idx(idx_pt:idx_pt+nerv-1),& & rcvbuf(rcv_pt:rcv_pt+nerv-1),beta,y) point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do else if (swap_sync) then point_to_proc = 1 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm .ne. -1) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) if (proc_to_comm .lt. myrow) then ! First I send idx_pt = point_to_proc+nerv+psb_elem_send_ snd_pt = bsdidx(proc_to_comm) - call psi_gth(nesd,h_idx(idx_pt:idx_pt+nesd-1),& + call psi_gth(nesd,d_idx(idx_pt:idx_pt+nesd-1),& & y,sndbuf(snd_pt:snd_pt+nesd-1)) call dgesd2d(icontxt,nesd,1,sndbuf(snd_pt),nesd,proc_to_comm,0) ! Then I receive @@ -619,50 +640,50 @@ subroutine psi_dswapdatav(flag,beta,y,desc_a,work,info) ! Then I send idx_pt = point_to_proc+nerv+psb_elem_send_ snd_pt = bsdidx(proc_to_comm) - call psi_gth(nesd,h_idx(idx_pt:idx_pt+nesd-1),& + call psi_gth(nesd,d_idx(idx_pt:idx_pt+nesd-1),& & y,sndbuf(snd_pt:snd_pt+nesd-1)) call dgesd2d(icontxt,nesd,1,sndbuf(snd_pt),nesd,proc_to_comm,0) else if (proc_to_comm .eq. myrow) then ! I send to myself idx_pt = point_to_proc+nerv+psb_elem_send_ snd_pt = bsdidx(proc_to_comm) - call psi_gth(nesd,h_idx(idx_pt:idx_pt+nesd-1),& + call psi_gth(nesd,d_idx(idx_pt:idx_pt+nesd-1),& & y,sndbuf(snd_pt:snd_pt+nesd-1)) end if point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do point_to_proc = 1 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm .ne. -1) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) if(proc_to_comm.ne.myrow) then idx_pt = point_to_proc+psb_elem_recv_ rcv_pt = brvidx(proc_to_comm) - call psi_sct(nerv,h_idx(idx_pt:idx_pt+nerv-1),& + call psi_sct(nerv,d_idx(idx_pt:idx_pt+nerv-1),& & rcvbuf(rcv_pt:rcv_pt+nerv-1),beta,y) else idx_pt = point_to_proc+psb_elem_recv_ snd_pt = bsdidx(proc_to_comm) - call psi_sct(nerv,h_idx(idx_pt:idx_pt+nerv-1),& + call psi_sct(nerv,d_idx(idx_pt:idx_pt+nerv-1),& & sndbuf(snd_pt:snd_pt+nesd-1),beta,y) end if point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do else if (swap_send .and. swap_recv) then ! First I post all the non blocking receives point_to_proc = 1 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm .ne. -1) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) if(proc_to_comm.ne.myrow) then p2ptag = krecvid(icontxt,proc_to_comm,myrow) @@ -679,19 +700,19 @@ subroutine psi_dswapdatav(flag,beta,y,desc_a,work,info) end if point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do ! Then I post all the blocking sends point_to_proc = 1 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm .ne. -1) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) idx_pt = point_to_proc+nerv+psb_elem_send_ snd_pt = bsdidx(proc_to_comm) - call psi_gth(nesd,h_idx(idx_pt:idx_pt+nesd-1),& + call psi_gth(nesd,d_idx(idx_pt:idx_pt+nesd-1),& & y,sndbuf(snd_pt:snd_pt+nesd-1)) if(proc_to_comm .ne. myrow) then @@ -707,7 +728,7 @@ subroutine psi_dswapdatav(flag,beta,y,desc_a,work,info) end if end if point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do do i=1, totxch @@ -721,13 +742,13 @@ subroutine psi_dswapdatav(flag,beta,y,desc_a,work,info) if (ixrec .ne. mpi_undefined) then ixrec=ixrec-1 ! mpi_waitany returns an 1 to nprow index point_to_proc = ptp(ixrec) - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) idx_pt = point_to_proc+psb_elem_recv_ rcv_pt = brvidx(proc_to_comm) - call psi_sct(nerv,h_idx(idx_pt:idx_pt+nerv-1),& + call psi_sct(nerv,d_idx(idx_pt:idx_pt+nerv-1),& & rcvbuf(rcv_pt:rcv_pt+nerv-1),beta,y) else int_err(1) = ixrec @@ -738,65 +759,65 @@ subroutine psi_dswapdatav(flag,beta,y,desc_a,work,info) end do point_to_proc = 1 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm .ne. -1) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) if(proc_to_comm .eq. myrow) then idx_pt = point_to_proc+psb_elem_recv_ snd_pt = bsdidx(proc_to_comm) - call psi_sct(nerv,h_idx(idx_pt:idx_pt+nerv-1),& + call psi_sct(nerv,d_idx(idx_pt:idx_pt+nerv-1),& & sndbuf(snd_pt:snd_pt+nesd-1),beta,y) end if point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do else if (swap_send) then point_to_proc = 1 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm .ne. -1) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) idx_pt = point_to_proc+nerv+psb_elem_send_ snd_pt = bsdidx(proc_to_comm) - call psi_gth(nesd,h_idx(idx_pt:idx_pt+nesd-1),& + call psi_gth(nesd,d_idx(idx_pt:idx_pt+nesd-1),& & y,sndbuf(snd_pt:snd_pt+nesd-1)) call dgesd2d(icontxt,nesd,1,sndbuf(snd_pt),nesd,proc_to_comm,0) point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do else if (swap_recv) then point_to_proc = 1 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm .ne. -1) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) if(proc_to_comm.ne.myrow) then rcv_pt = brvidx(proc_to_comm) call dgerv2d(icontxt,nerv,1,rcvbuf(rcv_pt),nerv,proc_to_comm,0) idx_pt = point_to_proc+psb_elem_recv_ rcv_pt = brvidx(proc_to_comm) - call psi_sct(nerv,h_idx(idx_pt:idx_pt+nerv-1),& + call psi_sct(nerv,d_idx(idx_pt:idx_pt+nerv-1),& & rcvbuf(rcv_pt:rcv_pt+nerv-1),beta,y) else idx_pt = point_to_proc+psb_elem_recv_ snd_pt = bsdidx(proc_to_comm) - call psi_sct(nerv,h_idx(idx_pt:idx_pt+nerv-1),& + call psi_sct(nerv,d_idx(idx_pt:idx_pt+nerv-1),& & sndbuf(snd_pt:snd_pt+nesd-1),beta,y) end if point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do end if diff --git a/src/internals/psi_dswaptran.f90 b/src/internals/psi_dswaptran.f90 index b90757ea..f5dd92bb 100644 --- a/src/internals/psi_dswaptran.f90 +++ b/src/internals/psi_dswaptran.f90 @@ -1,26 +1,27 @@ -subroutine psi_dswaptranm(flag,n,beta,y,desc_a,work,info) +subroutine psi_dswaptranm(flag,n,beta,y,desc_a,work,info,data) use psb_error_mod use psb_descriptor_type + use mpi implicit none - include 'mpif.h' integer, intent(in) :: flag, n integer, intent(out) :: info real(kind(1.d0)) :: y(:,:), beta real(kind(1.d0)), target :: work(:) type(psb_desc_type) :: desc_a + integer, optional :: data ! locals integer :: icontxt, nprow, npcol, myrow,& & mycol, point_to_proc, nesd, nerv,& - & proc_to_comm, p2ptag, icomm, p2pstat,& + & proc_to_comm, p2ptag, icomm, p2pstat(mpi_status_size),& & idxs, idxr, iret, errlen, ifcomm, rank,& & err_act, totxch, ixrec, i, lw, idx_pt,& & snd_pt, rcv_pt integer, pointer, dimension(:) :: bsdidx, brvidx,& - & sdsz, rvsz, prcid, ptp, rvhd, h_idx + & sdsz, rvsz, prcid, ptp, rvhd, d_idx integer :: int_err(5) integer :: blacs_pnum, krecvid, ksendid logical :: swap_mpi, swap_sync, swap_send, swap_recv @@ -96,7 +97,19 @@ subroutine psi_dswaptranm(flag,n,beta,y,desc_a,work,info) swap_sync = iand(flag,psb_swap_sync_).ne.0 swap_send = iand(flag,psb_swap_send_).ne.0 swap_recv = iand(flag,psb_swap_recv_).ne.0 - h_idx => desc_a%halo_index + + if(present(data)) then + if(data.eq.psb_comm_halo_) then + d_idx => desc_a%halo_index + else if(data.eq.psb_comm_ovr_) then + d_idx => desc_a%ovrlap_index + else + d_idx => desc_a%halo_index + end if + else + d_idx => desc_a%halo_index + end if + idxs = 0 idxr = 0 totxch = 0 @@ -104,11 +117,11 @@ subroutine psi_dswaptranm(flag,n,beta,y,desc_a,work,info) rvhd(:) = mpi_request_null ! prepare info for communications - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm.ne.-1) if(proc_to_comm .ne. myrow) totxch = totxch+1 - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) prcid(proc_to_comm) = blacs_pnum(icontxt,proc_to_comm,mycol) ptp(proc_to_comm) = point_to_proc @@ -122,7 +135,7 @@ subroutine psi_dswaptranm(flag,n,beta,y,desc_a,work,info) idxs = idxs+sdsz(proc_to_comm) point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do if((idxr+idxs).lt.size(work)) then @@ -141,18 +154,19 @@ subroutine psi_dswaptranm(flag,n,beta,y,desc_a,work,info) ! gather elements into sendbuffer for swapping point_to_proc = 1 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm .ne. -1) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) idx_pt = point_to_proc+psb_elem_recv_ rcv_pt = brvidx(proc_to_comm) - call psi_gth(nerv,n,h_idx(idx_pt:idx_pt+nerv-1),& + + call psi_gth(nerv,n,d_idx(idx_pt:idx_pt+nerv-1),& & y,rcvbuf(rcv_pt:rcv_pt+nerv*n-1)) point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do ! swap elements using mpi_alltoallv @@ -168,32 +182,32 @@ subroutine psi_dswaptranm(flag,n,beta,y,desc_a,work,info) ! scatter elements from receivebuffer after swapping point_to_proc = 1 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm .ne. -1) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) idx_pt = point_to_proc+nerv+psb_elem_send_ snd_pt = bsdidx(proc_to_comm) - call psi_sct(nesd,n,h_idx(idx_pt:idx_pt+nesd-1),& + call psi_sct(nesd,n,d_idx(idx_pt:idx_pt+nesd-1),& & sndbuf(snd_pt:snd_pt+n*nesd-1),beta,y) point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do else if (swap_sync) then point_to_proc = 1 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm .ne. -1) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) if (proc_to_comm .lt. myrow) then ! First I send idx_pt = point_to_proc+psb_elem_recv_ rcv_pt = brvidx(proc_to_comm) - call psi_gth(nerv,n,h_idx(idx_pt:idx_pt+nerv-1),& + call psi_gth(nerv,n,d_idx(idx_pt:idx_pt+nerv-1),& & y,rcvbuf(rcv_pt:rcv_pt+nerv*n-1)) call dgesd2d(icontxt,nerv,n,rcvbuf(rcv_pt),nerv,proc_to_comm,0) ! Then I receive @@ -206,51 +220,51 @@ subroutine psi_dswaptranm(flag,n,beta,y,desc_a,work,info) ! Then I send idx_pt = point_to_proc+psb_elem_recv_ rcv_pt = brvidx(proc_to_comm) - call psi_gth(nerv,n,h_idx(idx_pt:idx_pt+nerv-1),& + call psi_gth(nerv,n,d_idx(idx_pt:idx_pt+nerv-1),& & y,rcvbuf(rcv_pt:rcv_pt+nerv*n-1)) call dgesd2d(icontxt,nerv,n,rcvbuf(rcv_pt),nerv,proc_to_comm,0) else if (proc_to_comm .eq. myrow) then ! I send to myself idx_pt = point_to_proc+psb_elem_recv_ rcv_pt = bsdidx(proc_to_comm) - call psi_gth(nerv,n,h_idx(idx_pt:idx_pt+nerv-1),& + call psi_gth(nerv,n,d_idx(idx_pt:idx_pt+nerv-1),& & y,rcvbuf(rcv_pt:rcv_pt+nerv*n-1)) end if point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do point_to_proc = 1 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm .ne. -1) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) if(proc_to_comm.ne.myrow) then idx_pt = point_to_proc+nerv+psb_elem_send_ snd_pt = bsdidx(proc_to_comm) - call psi_sct(nesd,n,h_idx(idx_pt:idx_pt+nesd-1),& + call psi_sct(nesd,n,d_idx(idx_pt:idx_pt+nesd-1),& & sndbuf(snd_pt:snd_pt+n*nesd-1),beta,y) else idx_pt = point_to_proc+nerv+psb_elem_send_ rcv_pt = brvidx(proc_to_comm) - call psi_sct(nesd,n,h_idx(idx_pt:idx_pt+nesd-1),& + call psi_sct(nesd,n,d_idx(idx_pt:idx_pt+nesd-1),& & rcvbuf(rcv_pt:rcv_pt+n*nerv-1),beta,y) end if point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do else if (swap_send .and. swap_recv) then ! First I post all the non blocking receives point_to_proc = 1 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm .ne. -1) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) if(proc_to_comm.ne.myrow) then p2ptag = krecvid(icontxt,proc_to_comm,myrow) @@ -267,19 +281,19 @@ subroutine psi_dswaptranm(flag,n,beta,y,desc_a,work,info) end if point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do ! Then I post all the blocking sends point_to_proc = 1 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm .ne. -1) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) idx_pt = point_to_proc+psb_elem_recv_ rcv_pt = brvidx(proc_to_comm) - call psi_gth(nerv,n,h_idx(idx_pt:idx_pt+nerv-1),& + call psi_gth(nerv,n,d_idx(idx_pt:idx_pt+nerv-1),& & y,rcvbuf(rcv_pt:rcv_pt+nerv*n-1)) if(proc_to_comm .ne. myrow) then @@ -295,7 +309,7 @@ subroutine psi_dswaptranm(flag,n,beta,y,desc_a,work,info) end if end if point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do do i=1, totxch @@ -310,13 +324,13 @@ subroutine psi_dswaptranm(flag,n,beta,y,desc_a,work,info) if (ixrec .ne. mpi_undefined) then ixrec=ixrec-1 ! mpi_waitany returns an 1 to nprow index point_to_proc = ptp(ixrec) - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) idx_pt = point_to_proc+nerv+psb_elem_send_ snd_pt = bsdidx(proc_to_comm) - call psi_sct(nesd,n,h_idx(idx_pt:idx_pt+nesd-1),& + call psi_sct(nesd,n,d_idx(idx_pt:idx_pt+nesd-1),& & sndbuf(snd_pt:snd_pt+n*nesd-1),beta,y) else int_err(1) = ixrec @@ -327,63 +341,63 @@ subroutine psi_dswaptranm(flag,n,beta,y,desc_a,work,info) end do point_to_proc = 1 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm .ne. -1) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) if(proc_to_comm .eq. myrow) then idx_pt = point_to_proc+nerv+psb_elem_send_ rcv_pt = brvidx(proc_to_comm) - call psi_sct(nesd,n,h_idx(idx_pt:idx_pt+nesd-1),& + call psi_sct(nesd,n,d_idx(idx_pt:idx_pt+nesd-1),& & rcvbuf(rcv_pt:rcv_pt+n*nerv-1),beta,y) end if point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do else if (swap_send) then point_to_proc = 1 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm .ne. -1) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) idx_pt = point_to_proc+psb_elem_recv_ rcv_pt = brvidx(proc_to_comm) - call psi_gth(nerv,n,h_idx(idx_pt:idx_pt+nerv-1),& + call psi_gth(nerv,n,d_idx(idx_pt:idx_pt+nerv-1),& & y,rcvbuf(rcv_pt:rcv_pt+nerv*n-1)) call dgesd2d(icontxt,nerv,n,rcvbuf(rcv_pt),nerv,proc_to_comm,0) point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do else if (swap_recv) then point_to_proc = 1 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm .ne. -1) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) if(proc_to_comm.ne.myrow) then snd_pt = bsdidx(proc_to_comm) call dgerv2d(icontxt,nesd,n,sndbuf(snd_pt),nesd,proc_to_comm,0) idx_pt = point_to_proc+nerv+psb_elem_send_ - call psi_sct(nesd,n,h_idx(idx_pt:idx_pt+nesd-1),& + call psi_sct(nesd,n,d_idx(idx_pt:idx_pt+nesd-1),& & sndbuf(snd_pt:snd_pt+n*nesd-1),beta,y) else idx_pt = point_to_proc+nerv+psb_elem_send_ rcv_pt = brvidx(proc_to_comm) - call psi_sct(nesd,n,h_idx(idx_pt:idx_pt+nesd-1),& + call psi_sct(nesd,n,d_idx(idx_pt:idx_pt+nesd-1),& & rcvbuf(rcv_pt:rcv_pt+n*nerv-1),beta,y) end if point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do end if @@ -406,29 +420,30 @@ end subroutine psi_dswaptranm -subroutine psi_dswaptranv(flag,beta,y,desc_a,work,info) +subroutine psi_dswaptranv(flag,beta,y,desc_a,work,info,data) use psb_error_mod use psb_descriptor_type + use mpi implicit none - include 'mpif.h' integer, intent(in) :: flag integer, intent(out) :: info real(kind(1.d0)) :: y(:), beta real(kind(1.d0)), target :: work(:) type(psb_desc_type) :: desc_a + integer, optional :: data ! locals integer :: icontxt, nprow, npcol, myrow,& & mycol, point_to_proc, nesd, nerv,& - & proc_to_comm, p2ptag, icomm, p2pstat,& + & proc_to_comm, p2ptag, icomm, p2pstat(mpi_status_size),& & idxs, idxr, iret, errlen, ifcomm, rank,& & err_act, totxch, ixrec, i, lw, idx_pt,& & snd_pt, rcv_pt, n integer, pointer, dimension(:) :: bsdidx, brvidx,& - & sdsz, rvsz, prcid, ptp, rvhd, h_idx + & sdsz, rvsz, prcid, ptp, rvhd, d_idx integer :: int_err(5) integer :: blacs_pnum, krecvid, ksendid logical :: swap_mpi, swap_sync, swap_send, swap_recv @@ -504,7 +519,19 @@ subroutine psi_dswaptranv(flag,beta,y,desc_a,work,info) swap_sync = iand(flag,psb_swap_sync_).ne.0 swap_send = iand(flag,psb_swap_send_).ne.0 swap_recv = iand(flag,psb_swap_recv_).ne.0 - h_idx => desc_a%halo_index + + if(present(data)) then + if(data.eq.psb_comm_halo_) then + d_idx => desc_a%halo_index + else if(data.eq.psb_comm_ovr_) then + d_idx => desc_a%ovrlap_index + else + d_idx => desc_a%halo_index + end if + else + d_idx => desc_a%halo_index + end if + idxs = 0 idxr = 0 totxch = 0 @@ -513,11 +540,11 @@ subroutine psi_dswaptranv(flag,beta,y,desc_a,work,info) n=1 ! prepare info for communications - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm.ne.-1) if(proc_to_comm .ne. myrow) totxch = totxch+1 - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) prcid(proc_to_comm) = blacs_pnum(icontxt,proc_to_comm,mycol) ptp(proc_to_comm) = point_to_proc @@ -531,7 +558,7 @@ subroutine psi_dswaptranv(flag,beta,y,desc_a,work,info) idxs = idxs+sdsz(proc_to_comm) point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do if((idxr+idxs).lt.size(work)) then @@ -550,18 +577,18 @@ subroutine psi_dswaptranv(flag,beta,y,desc_a,work,info) ! gather elements into sendbuffer for swapping point_to_proc = 1 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm .ne. -1) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) idx_pt = point_to_proc+psb_elem_recv_ rcv_pt = brvidx(proc_to_comm) - call psi_gth(nerv,h_idx(idx_pt:idx_pt+nerv-1),& + call psi_gth(nerv,d_idx(idx_pt:idx_pt+nerv-1),& & y,rcvbuf(rcv_pt:rcv_pt+nerv-1)) point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do ! swap elements using mpi_alltoallv @@ -577,32 +604,32 @@ subroutine psi_dswaptranv(flag,beta,y,desc_a,work,info) ! scatter elements from receivebuffer after swapping point_to_proc = 1 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm .ne. -1) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) idx_pt = point_to_proc+nerv+psb_elem_send_ snd_pt = bsdidx(proc_to_comm) - call psi_sct(nesd,h_idx(idx_pt:idx_pt+nesd-1),& + call psi_sct(nesd,d_idx(idx_pt:idx_pt+nesd-1),& & sndbuf(snd_pt:snd_pt+nesd-1),beta,y) point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do else if (swap_sync) then point_to_proc = 1 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm .ne. -1) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) if (proc_to_comm .lt. myrow) then ! First I send idx_pt = point_to_proc+psb_elem_recv_ rcv_pt = brvidx(proc_to_comm) - call psi_gth(nerv,h_idx(idx_pt:idx_pt+nerv-1),& + call psi_gth(nerv,d_idx(idx_pt:idx_pt+nerv-1),& & y,rcvbuf(rcv_pt:rcv_pt+nerv-1)) call dgesd2d(icontxt,nerv,1,rcvbuf(rcv_pt),nerv,proc_to_comm,0) ! Then I receive @@ -615,51 +642,51 @@ subroutine psi_dswaptranv(flag,beta,y,desc_a,work,info) ! Then I send idx_pt = point_to_proc+psb_elem_recv_ rcv_pt = brvidx(proc_to_comm) - call psi_gth(nerv,h_idx(idx_pt:idx_pt+nerv-1),& + call psi_gth(nerv,d_idx(idx_pt:idx_pt+nerv-1),& & y,rcvbuf(rcv_pt:rcv_pt+nerv-1)) call dgesd2d(icontxt,nerv,1,rcvbuf(rcv_pt),nerv,proc_to_comm,0) else if (proc_to_comm .eq. myrow) then ! I send to myself idx_pt = point_to_proc+psb_elem_recv_ rcv_pt = bsdidx(proc_to_comm) - call psi_gth(nerv,h_idx(idx_pt:idx_pt+nerv-1),& + call psi_gth(nerv,d_idx(idx_pt:idx_pt+nerv-1),& & y,rcvbuf(rcv_pt:rcv_pt+nerv-1)) end if point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do point_to_proc = 1 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm .ne. -1) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) if(proc_to_comm.ne.myrow) then idx_pt = point_to_proc+nerv+psb_elem_send_ snd_pt = bsdidx(proc_to_comm) - call psi_sct(nesd,h_idx(idx_pt:idx_pt+nesd-1),& + call psi_sct(nesd,d_idx(idx_pt:idx_pt+nesd-1),& & sndbuf(snd_pt:snd_pt+nesd-1),beta,y) else idx_pt = point_to_proc+nerv+psb_elem_send_ rcv_pt = brvidx(proc_to_comm) - call psi_sct(nesd,h_idx(idx_pt:idx_pt+nesd-1),& + call psi_sct(nesd,d_idx(idx_pt:idx_pt+nesd-1),& & rcvbuf(rcv_pt:rcv_pt+nerv-1),beta,y) end if point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do else if (swap_send .and. swap_recv) then ! First I post all the non blocking receives point_to_proc = 1 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm .ne. -1) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) if(proc_to_comm.ne.myrow) then p2ptag = krecvid(icontxt,proc_to_comm,myrow) @@ -676,19 +703,19 @@ subroutine psi_dswaptranv(flag,beta,y,desc_a,work,info) end if point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do ! Then I post all the blocking sends point_to_proc = 1 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm .ne. -1) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) idx_pt = point_to_proc+psb_elem_recv_ rcv_pt = brvidx(proc_to_comm) - call psi_gth(nerv,h_idx(idx_pt:idx_pt+nerv-1),& + call psi_gth(nerv,d_idx(idx_pt:idx_pt+nerv-1),& & y,rcvbuf(rcv_pt:rcv_pt+nerv-1)) if(proc_to_comm .ne. myrow) then @@ -704,7 +731,7 @@ subroutine psi_dswaptranv(flag,beta,y,desc_a,work,info) end if end if point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do do i=1, totxch @@ -719,13 +746,13 @@ subroutine psi_dswaptranv(flag,beta,y,desc_a,work,info) if (ixrec .ne. mpi_undefined) then ixrec=ixrec-1 ! mpi_waitany returns an 1 to nprow index point_to_proc = ptp(ixrec) - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) idx_pt = point_to_proc+nerv+psb_elem_send_ rcv_pt = bsdidx(proc_to_comm) - call psi_sct(nesd,h_idx(idx_pt:idx_pt+nesd-1),& + call psi_sct(nesd,d_idx(idx_pt:idx_pt+nesd-1),& & rcvbuf(rcv_pt:rcv_pt+nerv-1),beta,y) else int_err(1) = ixrec @@ -736,64 +763,64 @@ subroutine psi_dswaptranv(flag,beta,y,desc_a,work,info) end do point_to_proc = 1 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm .ne. -1) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) if(proc_to_comm .eq. myrow) then idx_pt = point_to_proc+nerv+psb_elem_send_ rcv_pt = brvidx(proc_to_comm) - call psi_sct(nesd,h_idx(idx_pt:idx_pt+nesd-1),& + call psi_sct(nesd,d_idx(idx_pt:idx_pt+nesd-1),& & rcvbuf(rcv_pt:rcv_pt+nerv-1),beta,y) end if point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do else if (swap_send) then point_to_proc = 1 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm .ne. -1) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) idx_pt = point_to_proc+psb_elem_recv_ rcv_pt = brvidx(proc_to_comm) - call psi_gth(nerv,h_idx(idx_pt:idx_pt+nerv-1),& + call psi_gth(nerv,d_idx(idx_pt:idx_pt+nerv-1),& & y,rcvbuf(rcv_pt:rcv_pt+nerv-1)) call dgesd2d(icontxt,nerv,1,rcvbuf(rcv_pt),nerv,proc_to_comm,0) point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do else if (swap_recv) then point_to_proc = 1 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) do while (proc_to_comm .ne. -1) - nerv = h_idx(point_to_proc+psb_n_elem_recv_) - nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) + nerv = d_idx(point_to_proc+psb_n_elem_recv_) + nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_) if(proc_to_comm.ne.myrow) then snd_pt = bsdidx(proc_to_comm) call dgerv2d(icontxt,nesd,1,sndbuf(snd_pt),nesd,proc_to_comm,0) idx_pt = point_to_proc+psb_elem_recv_ rcv_pt = brvidx(proc_to_comm) - call psi_sct(nesd,h_idx(idx_pt:idx_pt+nesd-1),& + call psi_sct(nesd,d_idx(idx_pt:idx_pt+nesd-1),& & sndbuf(snd_pt:snd_pt+nesd-1),beta,y) else idx_pt = point_to_proc+nerv+psb_elem_send_ rcv_pt = brvidx(proc_to_comm) - call psi_sct(nesd,h_idx(idx_pt:idx_pt+nesd-1),& + call psi_sct(nesd,d_idx(idx_pt:idx_pt+nesd-1),& & rcvbuf(rcv_pt:rcv_pt+nerv-1),beta,y) end if point_to_proc = point_to_proc+nerv+nesd+3 - proc_to_comm = h_idx(point_to_proc+psb_proc_id_) + proc_to_comm = d_idx(point_to_proc+psb_proc_id_) end do end if diff --git a/src/modules/psb_const.fh b/src/modules/psb_const.fh index 60179f89..797a38e1 100644 --- a/src/modules/psb_const.fh +++ b/src/modules/psb_const.fh @@ -46,6 +46,7 @@ integer, parameter :: psb_maxjdrows_=8, psb_minjdrows_=4 integer, parameter :: psb_dbleint_=2 integer, parameter :: act_ret=0, act_abort=1, no_err=0 + integer, parameter :: psb_comm_halo_=0, psb_comm_ovr_=1 real(kind(1.d0)), parameter :: psb_colrow_=0.33, psb_percent_=0.7 real(kind(1.d0)), parameter :: dzero=0.d0, done=1.d0 diff --git a/src/modules/psb_const_mod.f90 b/src/modules/psb_const_mod.f90 index c81f26c1..d49cc0a4 100644 --- a/src/modules/psb_const_mod.f90 +++ b/src/modules/psb_const_mod.f90 @@ -39,6 +39,7 @@ module psb_const_mod integer, parameter :: psb_perm_update_=98765, psb_isrtdcoo_=98764 integer, parameter :: psb_maxjdrows_=8, psb_minjdrows_=4 integer, parameter :: psb_dbleint_=2 + integer, parameter :: psb_comm_halo_=0, psb_comm_ovr_=1 real(kind(1.d0)), parameter :: psb_colrow_=0.33, psb_percent_=0.7 real(kind(1.d0)), parameter :: dzero=0.d0, done=1.d0 diff --git a/src/modules/psb_prec_mod.f90 b/src/modules/psb_prec_mod.f90 index e710d9fd..52b83977 100644 --- a/src/modules/psb_prec_mod.f90 +++ b/src/modules/psb_prec_mod.f90 @@ -48,7 +48,7 @@ end interface use psb_prec_type implicit none type(psb_dprec_type), intent(inout) :: prec - character(len=10), intent(in) :: ptype + character(len=*), intent(in) :: ptype integer, optional, intent(in) :: iv(:) real(kind(1.d0)), optional, intent(in) :: rs real(kind(1.d0)), optional, intent(in) :: rv(:) diff --git a/src/modules/psb_prec_type.f90 b/src/modules/psb_prec_type.f90 index 947165da..de0476fd 100644 --- a/src/modules/psb_prec_type.f90 +++ b/src/modules/psb_prec_type.f90 @@ -12,6 +12,10 @@ module psb_prec_type & asm_=3, ras_=5, ash_=4, rash_=6, ras2lv_=7, ras2lvm_=8,& & lv2mras_=9, lv2smth_=10, lv2lsm_=11, sl2sm_=12, superlu_=13,& & new_loc_smth_=14, new_glb_smth_=15, max_prec_=15 + integer, parameter :: nohalo_=0, halo_=4 + integer, parameter :: none_=0, sum_=1 + integer, parameter :: avg_=2, square_root_=3 + ! Multilevel stuff. integer, parameter :: no_ml_=0, add_ml_prec_=1, mult_ml_prec_=2 integer, parameter :: new_ml_prec_=3, max_ml_=new_ml_prec_ @@ -369,5 +373,28 @@ contains end subroutine psb_nullify_baseprec + function pr_to_str(iprec) + + integer, intent(in) :: iprec + character(len=10) :: pr_to_str + + select case(iprec) + case(noprec_) + pr_to_str='NOPREC' + case(diagsc_) + pr_to_str='DIAGSC' + case(bja_) + pr_to_str='BJA' + case(asm_) + pr_to_str='ASM' + case(ash_) + pr_to_str='ASM' + case(ras_) + pr_to_str='ASM' + case(rash_) + pr_to_str='ASM' + end select + +end function pr_to_str end module psb_prec_type diff --git a/src/modules/psi_mod.f90 b/src/modules/psi_mod.f90 index 8a6e4617..05984885 100644 --- a/src/modules/psi_mod.f90 +++ b/src/modules/psi_mod.f90 @@ -57,19 +57,21 @@ module psi_mod end interface interface psi_swapdata - subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info) + subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info,data) use psb_descriptor_type integer, intent(in) :: flag, n integer, intent(out) :: info real(kind(1.d0)) :: y(:,:), beta, work(:) type(psb_desc_type) :: desc_a + integer, optional :: data end subroutine psi_dswapdatam - subroutine psi_dswapdatav(flag,beta,y,desc_a,work,info) + subroutine psi_dswapdatav(flag,beta,y,desc_a,work,info,data) use psb_descriptor_type integer, intent(in) :: flag integer, intent(out) :: info real(kind(1.d0)) :: y(:), beta, work(:) type(psb_desc_type) :: desc_a + integer, optional :: data end subroutine psi_dswapdatav subroutine psi_iswapdatam(flag,n,beta,y,desc_a,work,info) use psb_descriptor_type diff --git a/src/prec/psb_dcslu.f90 b/src/prec/psb_dcslu.f90 index 43cfc03e..e4c12f11 100644 --- a/src/prec/psb_dcslu.f90 +++ b/src/prec/psb_dcslu.f90 @@ -108,7 +108,7 @@ subroutine psb_dcslu(a,desc_a,p,upd,info) call psb_nullify_sp(blck) t1= mpi_wtime() - if(debug) write(0,*)me,': calling psb_csrsetup' + if(debug) write(0,*)me,': calling psb_csrsetup',p%iprcparm(p_type_),p%iprcparm(n_ovr_) call psb_csrsetup(p%iprcparm(p_type_),p%iprcparm(n_ovr_),a,& & blck,desc_a,upd,p%desc_data,info) if(info/=0) then diff --git a/src/prec/psb_dprec.f90 b/src/prec/psb_dprec.f90 index a826f292..be312e51 100644 --- a/src/prec/psb_dprec.f90 +++ b/src/prec/psb_dprec.f90 @@ -243,7 +243,7 @@ subroutine psb_dbaseprcaply(prec,x,beta,y,desc_data,trans,work,info) tx(desc_data%matrix_data(psb_n_row_)+1:isz) = zero if (prec%iprcparm(restr_)==psb_halo_) then - call psb_halo(tx,prec%desc_data,info,work=aux) + call psb_halo(tx,prec%desc_data,info,work=aux) if(info /=0) then info=4010 ch_err='psb_halo' @@ -281,9 +281,9 @@ subroutine psb_dbaseprcaply(prec,x,beta,y,desc_data,trans,work,info) ! call f90_psovrl(ty,prec%desc_data,update_type=prec%a_restrict) case(psb_sum_,psb_avg_) - call psb_ovrl(ty,prec%desc_data,info,& - & update_type=prec%iprcparm(prol_),work=aux) - if(info /=0) then + call psb_ovrl(ty,prec%desc_data,info,& + & update_type=prec%iprcparm(prol_),work=aux) + if(info /=0) then info=4010 ch_err='psb_ovrl' goto 9999 diff --git a/src/psblas/psb_dspmm.f90 b/src/psblas/psb_dspmm.f90 index 6444db0e..b85b8cb5 100644 --- a/src/psblas/psb_dspmm.f90 +++ b/src/psblas/psb_dspmm.f90 @@ -448,16 +448,19 @@ subroutine psb_dspmv(alpha,a,x,beta,y,desc_a,info,& n = desc_a%matrix_data(psb_n_) nrow = desc_a%matrix_data(psb_n_row_) ncol = desc_a%matrix_data(psb_n_col_) - lldx = size(x,1) - lldy = size(y,1) + lldx = size(x) + lldy = size(y) ! check for presence/size of a work area liwork= 2*ncol - if (a%pr(1) /= 0) llwork = liwork + n * ik - if (a%pl(1) /= 0) llwork = liwork + m * ik - if (present(work)) then - if(size(work).lt.liwork) then - call psb_realloc(liwork,work,info) + if (a%pr(1) /= 0) liwork = liwork + n * ik + if (a%pl(1) /= 0) liwork = liwork + m * ik + if (present(work)) then + if(size(work).ge.liwork) then + iwork => work + liwork=size(work) + else + call psb_realloc(liwork,iwork,info) if(info.ne.0) then info=4010 ch_err='psb_realloc' @@ -465,7 +468,6 @@ subroutine psb_dspmv(alpha,a,x,beta,y,desc_a,info,& goto 9999 end if end if - iwork => work else call psb_realloc(liwork,iwork,info) if(info.ne.0) then @@ -516,15 +518,12 @@ subroutine psb_dspmv(alpha,a,x,beta,y,desc_a,info,& x(nrow:ncol)=0.d0 else call psi_swapdata(ior(psb_swap_send_,psb_swap_recv_),& - & dzero,x,desc_a,iwork,info) -!!$ call PSI_dSwapData(ior(SWAP_SEND,SWAP_RECV),1,& -!!$ & dzero,x(iix,jjx),lldx,desc_a%matrix_data,& -!!$ & desc_a%halo_index,iwork,liwork,info) + & dzero,x,desc_a,iwork,info,data=psb_comm_halo_) end if ! local Matrix-vector product - call dcsmm(itrans,nrow,ib,ncol,alpha,a%pr,a%fida,& - & a%descra,a%aspk,a%ia1,a%ia2,a%infoa,a%pl,& + call dcsmm(itrans,nrow,ib,ncol,alpha,a%pl,a%fida,& + & a%descra,a%aspk,a%ia1,a%ia2,a%infoa,a%pr,& & x(iix),lldx,beta,y(iiy),lldy,& & iwork,liwork,info) if(info.ne.0) then @@ -585,9 +584,6 @@ subroutine psb_dspmv(alpha,a,x,beta,y,desc_a,info,& if(idoswap.gt.0)& & call psi_swaptran(ior(psb_swap_send_,psb_swap_recv_),& & done,yp,desc_a,iwork,info) -!!$ & call PSI_dSwapTran(ior(SWAP_SEND,SWAP_RECV),& -!!$ & ik,done,y(iiy,jjy),lldy,desc_a%matrix_data,& -!!$ & desc_a%halo_index),iwork,liwork,info if(info.ne.0) then info = 4010 ch_err='PSI_dSwapTran' diff --git a/src/serial/f77/dswmm.f b/src/serial/f77/dswmm.f index e7ad1a5f..0da78133 100644 --- a/src/serial/f77/dswmm.f +++ b/src/serial/f77/dswmm.f @@ -153,7 +153,7 @@ C C C CALL DCOOMM(TRANS,M,N,K,ALPHA,DESCRA,A,IA1, - + IA2,INFOA,B,LDB,BETA,C,LDC,WORK) + + IA2,INFOA,B,LDB,BETA,C,LDC,WORK,LWORK,IERROR) ELSE C C This data structure not yet considered diff --git a/src/serial/psb_dcsdp.f90 b/src/serial/psb_dcsdp.f90 index ef220a4c..a318c07c 100644 --- a/src/serial/psb_dcsdp.f90 +++ b/src/serial/psb_dcsdp.f90 @@ -266,7 +266,7 @@ subroutine psb_dcsdp(a, b,info,ifc,check,trans,unitd) case ('COO') call dcoco(trans_, a%m, a%k, unitd_, d, a%descra, a%aspk,& - & a%ia2, a%ia1, a%infoa, b%pl, b%descra, b%aspk, b%ia1,& + & a%ia1, a%ia2, a%infoa, b%pl, b%descra, b%aspk, b%ia1,& & b%ia2, b%infoa, b%pr, size(b%aspk), size(b%ia1),& & size(b%ia2), work, 2*size(work), info) if (info/=0) then diff --git a/src/serial/psb_dfixcoo.f90 b/src/serial/psb_dfixcoo.f90 index 4f1d8b72..221436e3 100644 --- a/src/serial/psb_dfixcoo.f90 +++ b/src/serial/psb_dfixcoo.f90 @@ -41,9 +41,9 @@ Subroutine psb_dfixcoo(A,INFO) if (j > nza) exit enddo nzl = j - i - call mrgsrt(nzl,a%ia2(i:i+nzl-1),iaux,iret) + call mrgsrt(nzl,a%ia2(i),iaux,iret) if (iret.eq.0) & - & call reordvn(nzl,a%aspk(i:i+nzl-1),a%ia1(i:i+nzl-1),a%ia2(i:i+nzl-1),iaux) + & call reordvn(nzl,a%aspk(i),a%ia1(i),a%ia2(i),iaux) i = j enddo diff --git a/test/pargen/RUNS/ppde.inp b/test/pargen/RUNS/ppde.inp index b0e8eeaf..65f32a65 100644 --- a/test/pargen/RUNS/ppde.inp +++ b/test/pargen/RUNS/ppde.inp @@ -1,7 +1,8 @@ 7 Number of entries below this BICGSTAB Iterative method BICGSTAB CGS BICG BICGSTABL -NONE Preconditioner ILU DIAGSC NONE -CSR A Storage format CSR COO JAD +2 Preconditioner ILU DIAGSC NONE +2 Number ov overlapping levels +COO A Storage format CSR COO JAD 20 Domain size (acutal sistem is this**3) 1 Stopping criterion 80 MAXIT diff --git a/test/pargen/ppde90.f90 b/test/pargen/ppde90.f90 index 2d5646a5..225e7d1d 100644 --- a/test/pargen/ppde90.f90 +++ b/test/pargen/ppde90.f90 @@ -1,5 +1,6 @@ - +! File: ppde90.f90 ! +! Program: ppde90 ! This sample program shows how to build and solve a sparse linear ! ! The program solves a linear system based on the partial differential @@ -7,7 +8,8 @@ ! ! ! -! the equation generated is: +! The equation generated is +! ! b1 d d (u) b2 d d (u) a1 d (u)) a2 d (u))) ! - ------ - ------ + ----- + ------ + a3 u = 0 ! dx dx dy dy dx dy @@ -41,6 +43,7 @@ program pde90 use psb_sparse_mod use psb_error_mod + use psb_prec_mod implicit none interface @@ -75,7 +78,7 @@ program pde90 ! solver parameters integer :: iter, itmax,ierr,itrace, methd,iprec, istopc,& - & iparm(20), ml + & iparm(20), ml, novr real(kind(1.d0)) :: err, eps, rparm(20) ! other variables @@ -101,7 +104,7 @@ program pde90 ! ! get parameters ! - call get_parms(icontxt,cmethd,prec,afmt,idim,istopc,itmax,itrace,ml) + call get_parms(icontxt,cmethd,iprec,novr,afmt,idim,istopc,itmax,itrace,ml) ! ! allocate and fill in the coefficient matrix, rhs and initial guess @@ -120,50 +123,30 @@ program pde90 dim=size(a%aspk) -!!$ allocate(h%aspk(dim),h%ia1(dim),h%ia2(dim),h%pl(size(a%pl)),& -!!$ & h%pl(size(a%pl)),d(size(a%pl)),& -!!$ & desc_a_out%matrix_data(size(desc_a%matrix_data)),& -!!$ & desc_a_out%halo_index(size(desc_a%halo_index)),& -!!$ & desc_a_out%ovrlap_index(size(desc_a%ovrlap_index)),& -!!$ & desc_a_out%ovrlap_elem(size(desc_a%ovrlap_elem)),& -!!$ & desc_a_out%loc_to_glob(size(desc_a%loc_to_glob)),& -!!$ & desc_a_out%glob_to_loc(size(desc_a%glob_to_loc)), work(1024)) -!!$ check_descr=15 -! work(5)=9 -!!$ write(0,*)'calling verify' -!!$ call f90_psverify(d,a,desc_a,check_descr,convert_descr,h,& -!!$ & desc_a_out,work) -!!$ write(0,*)'verify done',convert_descr - -! deallocate(work) - call dgamx2d(icontxt,'a',' ',ione, ione,t2,ione,t1,t1,-1,-1,-1) - if (iam.eq.0) write(6,*) 'matrix creation time : ',t2 + if (iam.eq.0) write(*,'("Overall matrix creation time : ",es10.4)')t2 + if (iam.eq.0) write(*,'(" ")') ! ! prepare the preconditioner. ! - write(0,*)'precondizionatore=',prec - select case (prec) - case ('ILU') - iprec = 2 - ptype='bja' - call psb_precset(pre,ptype) - case ('DIAGSC') - iprec = 1 - ptype='diagsc' - call psb_precset(pre,ptype) - case ('NONE') - iprec = 0 - ptype='noprec' - call psb_precset(pre,ptype) - case default - info=5003 - ch_err(1:3)=prec(1:3) - call psb_errpush(info,name,a_err=ch_err) - goto 9999 + if(iam.eq.psb_root_) write(0,'("Setting preconditioner to : ",a)')pr_to_str(iprec) + select case(iprec) + case(noprec_) + call psb_precset(pre,'noprec') + case(diagsc_) + call psb_precset(pre,'diagsc') + case(bja_) + call psb_precset(pre,'ilu') + case(asm_) + call psb_precset(pre,'asm',iv=(/novr,halo_,sum_/)) + case(ash_) + call psb_precset(pre,'asm',iv=(/novr,nohalo_,sum_/)) + case(ras_) + call psb_precset(pre,'asm',iv=(/novr,halo_,none_/)) + case(rash_) + call psb_precset(pre,'asm',iv=(/novr,nohalo_,none_/)) end select - write(0,*)'preconditioner set' call blacs_barrier(icontxt,'ALL') t1 = mpi_wtime() @@ -179,12 +162,13 @@ program pde90 call dgamx2d(icontxt,'a',' ',ione, ione,tprec,ione,t1,t1,-1,-1,-1) - if (iam.eq.0) write(6,*) 'preconditioner time : ',tprec + if (iam.eq.0) write(*,'("Preconditioner time : ",es10.4)')tprec + if (iam.eq.0) write(*,'(" ")') ! ! iterative method parameters ! - write(*,*) 'calling iterative method', a%ia2(7999:8001) + if(iam.eq.psb_root_) write(*,'("Calling iterative method ",a)')cmethd call blacs_barrier(icontxt,'ALL') t1 = mpi_wtime() eps = 1.d-9 @@ -213,11 +197,12 @@ program pde90 call dgamx2d(icontxt,'a',' ',ione, ione,t2,ione,t1,t1,-1,-1,-1) if (iam.eq.0) then - write(6,*) 'time to solve matrix : ',t2 - write(6,*) 'time per iteration : ',t2/iter - write(6,*) 'number of iterations : ',iter - write(6,*) 'error on exit : ',err - write(6,*) 'info on exit : ',info + write(*,'(" ")') + write(*,'("Time to solve matrix : ",es10.4)')t2 + write(*,'("Time per iteration : ",es10.4)')t2/iter + write(*,'("Number of iterations : ",i)')iter + write(*,'("Error on exit : ",es10.4)')err + write(*,'("Info on exit : ",i)')info end if ! @@ -250,10 +235,10 @@ contains ! ! get iteration parameters from the command line ! - subroutine get_parms(icontxt,cmethd,prec,afmt,idim,istopc,itmax,itrace,ml) + subroutine get_parms(icontxt,cmethd,iprec,novr,afmt,idim,istopc,itmax,itrace,ml) integer :: icontxt - character :: cmethd*10, prec*10, afmt*5 - integer :: idim, iret, istopc,itmax,itrace,ml + character :: cmethd*10, afmt*5 + integer :: idim, iret, istopc,itmax,itrace,ml, iprec, novr character*40 :: charbuf integer :: iargc, nprow, npcol, myprow, mypcol external iargc @@ -265,7 +250,8 @@ contains read(*,*) ip if (ip.ge.3) then read(*,*) cmethd - read(*,*) prec + read(*,*) iprec + read(*,*) novr read(*,*) afmt ! convert strings in array @@ -275,11 +261,11 @@ contains ! broadcast parameters to all processors call igebs2d(icontxt,'ALL',' ',10,1,intbuf,10) - do i = 1, len(prec) - intbuf(i) = iachar(prec(i:i)) - end do ! broadcast parameters to all processors - call igebs2d(icontxt,'ALL',' ',10,1,intbuf,10) + call igebs2d(icontxt,'ALL',' ',1,1,iprec,10) + + ! broadcast parameters to all processors + call igebs2d(icontxt,'ALL',' ',1,1,novr,10) do i = 1, len(afmt) intbuf(i) = iachar(afmt(i:i)) @@ -317,11 +303,14 @@ contains intbuf(5) = ml call igebs2d(icontxt,'ALL',' ',5,1,intbuf,5) - write(6,*)'solving matrix: ell1' - write(6,*)'on grid',idim,'x',idim,'x',idim - write(6,*)' with block data distribution, np=',np,& - & ' preconditioner=',prec,& - & ' iterative methd=',cmethd + write(*,'("Solving matrix : ell1")') + write(*,'("Grid dimensions : ",i4,"x",i4,"x",i4)')idim,idim,idim + write(*,'("Number of processors : ",i)')nprow + write(*,'("Data distribution : BLOCK")') + write(*,'("Preconditioner : ",a)')pr_to_str(iprec) + if(iprec.gt.2) write(*,'("Overlapping levels : ",i)')novr + write(*,'("Iterative method : ",a)')cmethd + write(*,'(" ")') else ! wrong number of parameter, print an error message and exit call pr_usage(0) @@ -334,10 +323,11 @@ contains do i = 1, 10 cmethd(i:i) = achar(intbuf(i)) end do - call igebr2d(icontxt,'ALL',' ',10,1,intbuf,10,0,0) - do i = 1, 10 - prec(i:i) = achar(intbuf(i)) - end do + + call igebr2d(icontxt,'ALL',' ',1,1,iprec,10,0,0) + + call igebr2d(icontxt,'ALL',' ',1,1,novr,10,0,0) + call igebr2d(icontxt,'ALL',' ',10,1,intbuf,10,0,0) do i = 1, 5 afmt(i:i) = achar(intbuf(i)) @@ -429,7 +419,7 @@ contains ! deltat discretization time real(kind(1.d0)) :: deltah real(kind(1.d0)),parameter :: rhs=0.d0,one=1.d0,zero=0.d0 - real(kind(1.d0)) :: mpi_wtime, t1, t2, t3, tins + real(kind(1.d0)) :: mpi_wtime, t1, t2, t3, tins, tasb real(kind(1.d0)) :: a1, a2, a3, a4, b1, b2, b3 external mpi_wtime,a1, a2, a3, a4, b1, b2, b3 integer :: nb, ir1, ir2, ipr, err_act @@ -452,14 +442,12 @@ contains m = idim*idim*idim n = m nnz = ((n*9)/(nprow*npcol)) - write(*,*) 'size: n ',n,myprow + if(myprow.eq.psb_root_) write(0,'("Generating Matrix (size=",i,")...")')n + call psb_dscall(n,n,parts,icontxt,desc_a,info) - write(*,*) 'allocating a. nnz:',nnz,myprow call psb_spalloc(a,desc_a,info,nnz=nnz) ! define rhs from boundary conditions; also build initial guess - write(*,*) 'allocating b', info,myprow call psb_alloc(n,b,desc_a,info) - write(*,*) 'allocating t', info,myprow call psb_alloc(n,t,desc_a,info) if(info.ne.0) then info=4010 @@ -642,7 +630,7 @@ contains end do call blacs_barrier(icontxt,'ALL') - t2 = mpi_wtime() + t2 = mpi_wtime()-t1 if(info.ne.0) then info=4010 @@ -651,23 +639,30 @@ contains goto 9999 end if - write(*,*) ' pspins time',tins - write(*,*) ' insert time',(t2-t1) - deallocate(row_mat%aspk,row_mat%ia1,row_mat%ia2) t1 = mpi_wtime() call psb_dscasb(desc_a,info) call psb_spasb(a,desc_a,info,dup=1,afmt=afmt) call blacs_barrier(icontxt,'ALL') - t2 = mpi_wtime() + tasb = mpi_wtime()-t1 if(info.ne.0) then info=4010 ch_err='asb rout.' call psb_errpush(info,name,a_err=ch_err) goto 9999 end if - write(0,*) ' assembly time',(t2-t1),' ',a%fida(1:4) + + call dgamx2d(icontxt,'a',' ',ione, ione,t2,ione,t1,t1,-1,-1,-1) + call dgamx2d(icontxt,'a',' ',ione, ione,tins,ione,t1,t1,-1,-1,-1) + call dgamx2d(icontxt,'a',' ',ione, ione,tasb,ione,t1,t1,-1,-1,-1) + + if(myprow.eq.psb_root_) then + write(*,'("The matrix has been generated and assembeld in ",a3," format.")')a%fida(1:3) + write(*,'("-pspins time : ",es10.4)')tins + write(*,'("-insert time : ",es10.4)')t2 + write(*,'("-assembly time : ",es10.4)')tasb + end if call psb_asb(b,desc_a,info) call psb_asb(t,desc_a,info) @@ -678,10 +673,6 @@ contains goto 9999 end if - if (myprow.eq.0) then - write(0,*) ' end create_matrix' - endif - call psb_erractionrestore(err_act) return