*** empty log message ***

psblas3-type-indexed
Alfredo Buttari 20 years ago
parent 3ac80ee128
commit 0e235de552

@ -1,4 +1,4 @@
subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info) subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info,data)
use psb_error_mod use psb_error_mod
use psb_descriptor_type use psb_descriptor_type
@ -10,6 +10,7 @@ subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info)
real(kind(1.d0)) :: y(:,:), beta real(kind(1.d0)) :: y(:,:), beta
real(kind(1.d0)), target :: work(:) real(kind(1.d0)), target :: work(:)
type(psb_desc_type) :: desc_a type(psb_desc_type) :: desc_a
integer, optional :: data
! locals ! locals
integer :: icontxt, nprow, npcol, myrow,& integer :: icontxt, nprow, npcol, myrow,&
@ -20,7 +21,7 @@ subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info)
& snd_pt, rcv_pt & snd_pt, rcv_pt
integer :: blacs_pnum, krecvid, ksendid integer :: blacs_pnum, krecvid, ksendid
integer, pointer, dimension(:) :: bsdidx, brvidx,& integer, pointer, dimension(:) :: bsdidx, brvidx,&
& sdsz, rvsz, prcid, ptp, rvhd, h_idx & sdsz, rvsz, prcid, ptp, rvhd, d_idx
integer :: int_err(5) integer :: int_err(5)
logical :: swap_mpi, swap_sync, swap_send, swap_recv logical :: swap_mpi, swap_sync, swap_send, swap_recv
real(kind(1.d0)), pointer, dimension(:) :: sndbuf, rcvbuf real(kind(1.d0)), pointer, dimension(:) :: sndbuf, rcvbuf
@ -95,7 +96,19 @@ subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info)
swap_sync = iand(flag,psb_swap_sync_).ne.0 swap_sync = iand(flag,psb_swap_sync_).ne.0
swap_send = iand(flag,psb_swap_send_).ne.0 swap_send = iand(flag,psb_swap_send_).ne.0
swap_recv = iand(flag,psb_swap_recv_).ne.0 swap_recv = iand(flag,psb_swap_recv_).ne.0
h_idx => desc_a%halo_index
if(present(data)) then
if(data.eq.psb_comm_halo_) then
d_idx => desc_a%halo_index
else if(data.eq.psb_comm_ovr_) then
d_idx => desc_a%ovrlap_index
else
d_idx => desc_a%halo_index
end if
else
d_idx => desc_a%halo_index
end if
idxs = 1 idxs = 1
idxr = 1 idxr = 1
totxch = 0 totxch = 0
@ -103,11 +116,11 @@ subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info)
rvhd(:) = mpi_request_null rvhd(:) = mpi_request_null
! prepare info for communications ! prepare info for communications
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm.ne.-1) do while (proc_to_comm.ne.-1)
if(proc_to_comm .ne. myrow) totxch = totxch+1 if(proc_to_comm .ne. myrow) totxch = totxch+1
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
prcid(proc_to_comm) = blacs_pnum(icontxt,proc_to_comm,mycol) prcid(proc_to_comm) = blacs_pnum(icontxt,proc_to_comm,mycol)
ptp(proc_to_comm) = point_to_proc ptp(proc_to_comm) = point_to_proc
@ -121,7 +134,7 @@ subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info)
idxs = idxs+sdsz(proc_to_comm) idxs = idxs+sdsz(proc_to_comm)
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
if((idxr+idxs).lt.size(work)) then if((idxr+idxs).lt.size(work)) then
@ -140,19 +153,19 @@ subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info)
! gather elements into sendbuffer for swapping ! gather elements into sendbuffer for swapping
point_to_proc = 1 point_to_proc = 1
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm .ne. -1) do while (proc_to_comm .ne. -1)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
idx_pt = point_to_proc+nerv+psb_elem_send_ idx_pt = point_to_proc+nerv+psb_elem_send_
snd_pt = bsdidx(proc_to_comm) snd_pt = bsdidx(proc_to_comm)
call psi_gth(nesd,n,h_idx(idx_pt:idx_pt+nesd-1),& call psi_gth(nesd,n,d_idx(idx_pt:idx_pt+nesd-1),&
& y,sndbuf(snd_pt:snd_pt+nesd*n-1)) & y,sndbuf(snd_pt:snd_pt+nesd*n-1))
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
! swap elements using mpi_alltoallv ! swap elements using mpi_alltoallv
@ -168,32 +181,32 @@ subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info)
! scatter elements from receivebuffer after swapping ! scatter elements from receivebuffer after swapping
point_to_proc = 1 point_to_proc = 1
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm .ne. -1) do while (proc_to_comm .ne. -1)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
idx_pt = point_to_proc+psb_elem_recv_ idx_pt = point_to_proc+psb_elem_recv_
rcv_pt = brvidx(proc_to_comm) rcv_pt = brvidx(proc_to_comm)
call psi_sct(nerv,n,h_idx(idx_pt:idx_pt+nerv-1),& call psi_sct(nerv,n,d_idx(idx_pt:idx_pt+nerv-1),&
& rcvbuf(rcv_pt:rcv_pt+n*nerv-1),beta,y) & rcvbuf(rcv_pt:rcv_pt+n*nerv-1),beta,y)
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
else if (swap_sync) then else if (swap_sync) then
point_to_proc = 1 point_to_proc = 1
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm .ne. -1) do while (proc_to_comm .ne. -1)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
if (proc_to_comm .lt. myrow) then if (proc_to_comm .lt. myrow) then
! First I send ! First I send
idx_pt = point_to_proc+nerv+psb_elem_send_ idx_pt = point_to_proc+nerv+psb_elem_send_
snd_pt = bsdidx(proc_to_comm) snd_pt = bsdidx(proc_to_comm)
call psi_gth(nesd,n,h_idx(idx_pt:idx_pt+nesd-1),& call psi_gth(nesd,n,d_idx(idx_pt:idx_pt+nesd-1),&
& y,sndbuf(snd_pt:snd_pt+nesd*n-1)) & y,sndbuf(snd_pt:snd_pt+nesd*n-1))
call dgesd2d(icontxt,nesd,n,sndbuf(snd_pt),nesd,proc_to_comm,0) call dgesd2d(icontxt,nesd,n,sndbuf(snd_pt),nesd,proc_to_comm,0)
! Then I receive ! Then I receive
@ -206,50 +219,50 @@ subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info)
! Then I send ! Then I send
idx_pt = point_to_proc+nerv+psb_elem_send_ idx_pt = point_to_proc+nerv+psb_elem_send_
snd_pt = bsdidx(proc_to_comm) snd_pt = bsdidx(proc_to_comm)
call psi_gth(nesd,n,h_idx(idx_pt:idx_pt+nesd-1),& call psi_gth(nesd,n,d_idx(idx_pt:idx_pt+nesd-1),&
& y,sndbuf(snd_pt:snd_pt+nesd*n-1)) & y,sndbuf(snd_pt:snd_pt+nesd*n-1))
call dgesd2d(icontxt,nesd,n,sndbuf(snd_pt),nesd,proc_to_comm,0) call dgesd2d(icontxt,nesd,n,sndbuf(snd_pt),nesd,proc_to_comm,0)
else if (proc_to_comm .eq. myrow) then else if (proc_to_comm .eq. myrow) then
! I send to myself ! I send to myself
idx_pt = point_to_proc+nerv+psb_elem_send_ idx_pt = point_to_proc+nerv+psb_elem_send_
snd_pt = bsdidx(proc_to_comm) snd_pt = bsdidx(proc_to_comm)
call psi_gth(nesd,n,h_idx(idx_pt:idx_pt+nesd-1),& call psi_gth(nesd,n,d_idx(idx_pt:idx_pt+nesd-1),&
& y,sndbuf(snd_pt:snd_pt+nesd*n-1)) & y,sndbuf(snd_pt:snd_pt+nesd*n-1))
end if end if
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
point_to_proc = 1 point_to_proc = 1
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm .ne. -1) do while (proc_to_comm .ne. -1)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
if(proc_to_comm.ne.myrow) then if(proc_to_comm.ne.myrow) then
idx_pt = point_to_proc+psb_elem_recv_ idx_pt = point_to_proc+psb_elem_recv_
rcv_pt = brvidx(proc_to_comm) rcv_pt = brvidx(proc_to_comm)
call psi_sct(nerv,n,h_idx(idx_pt:idx_pt+nerv-1),& call psi_sct(nerv,n,d_idx(idx_pt:idx_pt+nerv-1),&
& rcvbuf(rcv_pt:rcv_pt+n*nerv-1),beta,y) & rcvbuf(rcv_pt:rcv_pt+n*nerv-1),beta,y)
else else
idx_pt = point_to_proc+psb_elem_recv_ idx_pt = point_to_proc+psb_elem_recv_
snd_pt = bsdidx(proc_to_comm) snd_pt = bsdidx(proc_to_comm)
call psi_sct(nerv,n,h_idx(idx_pt:idx_pt+nerv-1),& call psi_sct(nerv,n,d_idx(idx_pt:idx_pt+nerv-1),&
& sndbuf(snd_pt:snd_pt+n*nesd-1),beta,y) & sndbuf(snd_pt:snd_pt+n*nesd-1),beta,y)
end if end if
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
else if (swap_send .and. swap_recv) then else if (swap_send .and. swap_recv) then
! First I post all the non blocking receives ! First I post all the non blocking receives
point_to_proc = 1 point_to_proc = 1
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm .ne. -1) do while (proc_to_comm .ne. -1)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
if(proc_to_comm.ne.myrow) then if(proc_to_comm.ne.myrow) then
p2ptag = krecvid(icontxt,proc_to_comm,myrow) p2ptag = krecvid(icontxt,proc_to_comm,myrow)
@ -266,20 +279,20 @@ subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info)
end if end if
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
! Then I post all the blocking sends ! Then I post all the blocking sends
point_to_proc = 1 point_to_proc = 1
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm .ne. -1) do while (proc_to_comm .ne. -1)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
idx_pt = point_to_proc+nerv+psb_elem_send_ idx_pt = point_to_proc+nerv+psb_elem_send_
snd_pt = bsdidx(proc_to_comm) snd_pt = bsdidx(proc_to_comm)
call psi_gth(nesd,n,h_idx(idx_pt:idx_pt+nesd-1),& call psi_gth(nesd,n,d_idx(idx_pt:idx_pt+nesd-1),&
& y,sndbuf(snd_pt:snd_pt+nesd*n-1)) & y,sndbuf(snd_pt:snd_pt+nesd*n-1))
if(proc_to_comm .ne. myrow) then if(proc_to_comm .ne. myrow) then
@ -295,7 +308,7 @@ subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info)
end if end if
end if end if
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
do i=1, totxch do i=1, totxch
@ -310,13 +323,13 @@ subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info)
if (ixrec .ne. mpi_undefined) then if (ixrec .ne. mpi_undefined) then
ixrec=ixrec-1 ! mpi_waitany returns an 1 to nprow index ixrec=ixrec-1 ! mpi_waitany returns an 1 to nprow index
point_to_proc = ptp(ixrec) point_to_proc = ptp(ixrec)
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
idx_pt = point_to_proc+psb_elem_recv_ idx_pt = point_to_proc+psb_elem_recv_
rcv_pt = brvidx(proc_to_comm) rcv_pt = brvidx(proc_to_comm)
call psi_sct(nerv,n,h_idx(idx_pt:idx_pt+nerv-1),& call psi_sct(nerv,n,d_idx(idx_pt:idx_pt+nerv-1),&
& rcvbuf(rcv_pt:rcv_pt+n*nerv-1),beta,y) & rcvbuf(rcv_pt:rcv_pt+n*nerv-1),beta,y)
else else
int_err(1) = ixrec int_err(1) = ixrec
@ -327,20 +340,20 @@ subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info)
end do end do
point_to_proc = 1 point_to_proc = 1
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm .ne. -1) do while (proc_to_comm .ne. -1)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
if(proc_to_comm .eq. myrow) then if(proc_to_comm .eq. myrow) then
idx_pt = point_to_proc+psb_elem_recv_ idx_pt = point_to_proc+psb_elem_recv_
snd_pt = bsdidx(proc_to_comm) snd_pt = bsdidx(proc_to_comm)
call psi_sct(nerv,n,h_idx(idx_pt:idx_pt+nerv-1),& call psi_sct(nerv,n,d_idx(idx_pt:idx_pt+nerv-1),&
& sndbuf(snd_pt:snd_pt+n*nesd-1),beta,y) & sndbuf(snd_pt:snd_pt+n*nesd-1),beta,y)
end if end if
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
@ -348,45 +361,45 @@ subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info)
else if (swap_send) then else if (swap_send) then
point_to_proc = 1 point_to_proc = 1
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm .ne. -1) do while (proc_to_comm .ne. -1)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
idx_pt = point_to_proc+nerv+psb_elem_send_ idx_pt = point_to_proc+nerv+psb_elem_send_
snd_pt = bsdidx(proc_to_comm) snd_pt = bsdidx(proc_to_comm)
call psi_gth(nesd,n,h_idx(idx_pt:idx_pt+nesd-1),& call psi_gth(nesd,n,d_idx(idx_pt:idx_pt+nesd-1),&
& y,sndbuf(snd_pt:snd_pt+nesd*n-1)) & y,sndbuf(snd_pt:snd_pt+nesd*n-1))
call dgesd2d(icontxt,nesd,n,sndbuf(snd_pt),nesd,proc_to_comm,0) call dgesd2d(icontxt,nesd,n,sndbuf(snd_pt),nesd,proc_to_comm,0)
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
else if (swap_recv) then else if (swap_recv) then
point_to_proc = 1 point_to_proc = 1
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm .ne. -1) do while (proc_to_comm .ne. -1)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
if(proc_to_comm.ne.myrow) then if(proc_to_comm.ne.myrow) then
rcv_pt = brvidx(proc_to_comm) rcv_pt = brvidx(proc_to_comm)
call dgerv2d(icontxt,nerv,n,rcvbuf(rcv_pt),nerv,proc_to_comm,0) call dgerv2d(icontxt,nerv,n,rcvbuf(rcv_pt),nerv,proc_to_comm,0)
idx_pt = point_to_proc+psb_elem_recv_ idx_pt = point_to_proc+psb_elem_recv_
rcv_pt = brvidx(proc_to_comm) rcv_pt = brvidx(proc_to_comm)
call psi_sct(nerv,n,h_idx(idx_pt:idx_pt+nerv-1),& call psi_sct(nerv,n,d_idx(idx_pt:idx_pt+nerv-1),&
& rcvbuf(rcv_pt:rcv_pt+n*nerv-1),beta,y) & rcvbuf(rcv_pt:rcv_pt+n*nerv-1),beta,y)
else else
idx_pt = point_to_proc+psb_elem_recv_ idx_pt = point_to_proc+psb_elem_recv_
snd_pt = bsdidx(proc_to_comm) snd_pt = bsdidx(proc_to_comm)
call psi_sct(nerv,n,h_idx(idx_pt:idx_pt+nerv-1),& call psi_sct(nerv,n,d_idx(idx_pt:idx_pt+nerv-1),&
& sndbuf(snd_pt:snd_pt+n*nesd-1),beta,y) & sndbuf(snd_pt:snd_pt+n*nesd-1),beta,y)
end if end if
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
end if end if
@ -404,12 +417,7 @@ subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info)
end subroutine psi_dswapdatam end subroutine psi_dswapdatam
subroutine psi_dswapdatav(flag,beta,y,desc_a,work,info,data)
subroutine psi_dswapdatav(flag,beta,y,desc_a,work,info)
use psb_error_mod use psb_error_mod
use psb_descriptor_type use psb_descriptor_type
@ -421,6 +429,7 @@ subroutine psi_dswapdatav(flag,beta,y,desc_a,work,info)
real(kind(1.d0)) :: y(:), beta real(kind(1.d0)) :: y(:), beta
real(kind(1.d0)), target :: work(:) real(kind(1.d0)), target :: work(:)
type(psb_desc_type) :: desc_a type(psb_desc_type) :: desc_a
integer, optional :: data
! locals ! locals
integer :: icontxt, nprow, npcol, myrow,& integer :: icontxt, nprow, npcol, myrow,&
@ -431,7 +440,7 @@ subroutine psi_dswapdatav(flag,beta,y,desc_a,work,info)
& snd_pt, rcv_pt, n & snd_pt, rcv_pt, n
integer, pointer, dimension(:) :: bsdidx, brvidx,& integer, pointer, dimension(:) :: bsdidx, brvidx,&
& sdsz, rvsz, prcid, ptp, rvhd, h_idx & sdsz, rvsz, prcid, ptp, rvhd, d_idx
integer :: blacs_pnum, krecvid, ksendid integer :: blacs_pnum, krecvid, ksendid
integer :: int_err(5) integer :: int_err(5)
logical :: swap_mpi, swap_sync, swap_send, swap_recv logical :: swap_mpi, swap_sync, swap_send, swap_recv
@ -508,7 +517,19 @@ subroutine psi_dswapdatav(flag,beta,y,desc_a,work,info)
swap_sync = iand(flag,psb_swap_sync_).ne.0 swap_sync = iand(flag,psb_swap_sync_).ne.0
swap_send = iand(flag,psb_swap_send_).ne.0 swap_send = iand(flag,psb_swap_send_).ne.0
swap_recv = iand(flag,psb_swap_recv_).ne.0 swap_recv = iand(flag,psb_swap_recv_).ne.0
h_idx => desc_a%halo_index
if(present(data)) then
if(data.eq.psb_comm_halo_) then
d_idx => desc_a%halo_index
else if(data.eq.psb_comm_ovr_) then
d_idx => desc_a%ovrlap_index
else
d_idx => desc_a%halo_index
end if
else
d_idx => desc_a%halo_index
end if
idxs = 1 idxs = 1
idxr = 1 idxr = 1
totxch = 0 totxch = 0
@ -517,11 +538,11 @@ subroutine psi_dswapdatav(flag,beta,y,desc_a,work,info)
n=1 n=1
! prepare info for communications ! prepare info for communications
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm.ne.-1) do while (proc_to_comm.ne.-1)
if(proc_to_comm .ne. myrow) totxch = totxch+1 if(proc_to_comm .ne. myrow) totxch = totxch+1
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
prcid(proc_to_comm) = blacs_pnum(icontxt,proc_to_comm,mycol) prcid(proc_to_comm) = blacs_pnum(icontxt,proc_to_comm,mycol)
ptp(proc_to_comm) = point_to_proc ptp(proc_to_comm) = point_to_proc
@ -535,7 +556,7 @@ subroutine psi_dswapdatav(flag,beta,y,desc_a,work,info)
idxs = idxs+sdsz(proc_to_comm) idxs = idxs+sdsz(proc_to_comm)
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
if((idxr+idxs).lt.size(work)) then if((idxr+idxs).lt.size(work)) then
@ -554,18 +575,18 @@ subroutine psi_dswapdatav(flag,beta,y,desc_a,work,info)
! gather elements into sendbuffer for swapping ! gather elements into sendbuffer for swapping
point_to_proc = 1 point_to_proc = 1
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm .ne. -1) do while (proc_to_comm .ne. -1)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
idx_pt = point_to_proc+nerv+psb_elem_send_ idx_pt = point_to_proc+nerv+psb_elem_send_
snd_pt = bsdidx(proc_to_comm) snd_pt = bsdidx(proc_to_comm)
call psi_gth(nesd,h_idx(idx_pt:idx_pt+nesd-1),& call psi_gth(nesd,d_idx(idx_pt:idx_pt+nesd-1),&
& y,sndbuf(snd_pt:snd_pt+nesd-1)) & y,sndbuf(snd_pt:snd_pt+nesd-1))
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
! swap elements using mpi_alltoallv ! swap elements using mpi_alltoallv
@ -581,32 +602,32 @@ subroutine psi_dswapdatav(flag,beta,y,desc_a,work,info)
! scatter elements from receivebuffer after swapping ! scatter elements from receivebuffer after swapping
point_to_proc = 1 point_to_proc = 1
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm .ne. -1) do while (proc_to_comm .ne. -1)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
idx_pt = point_to_proc+psb_elem_recv_ idx_pt = point_to_proc+psb_elem_recv_
rcv_pt = brvidx(proc_to_comm) rcv_pt = brvidx(proc_to_comm)
call psi_sct(nerv,h_idx(idx_pt:idx_pt+nerv-1),& call psi_sct(nerv,d_idx(idx_pt:idx_pt+nerv-1),&
& rcvbuf(rcv_pt:rcv_pt+nerv-1),beta,y) & rcvbuf(rcv_pt:rcv_pt+nerv-1),beta,y)
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
else if (swap_sync) then else if (swap_sync) then
point_to_proc = 1 point_to_proc = 1
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm .ne. -1) do while (proc_to_comm .ne. -1)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
if (proc_to_comm .lt. myrow) then if (proc_to_comm .lt. myrow) then
! First I send ! First I send
idx_pt = point_to_proc+nerv+psb_elem_send_ idx_pt = point_to_proc+nerv+psb_elem_send_
snd_pt = bsdidx(proc_to_comm) snd_pt = bsdidx(proc_to_comm)
call psi_gth(nesd,h_idx(idx_pt:idx_pt+nesd-1),& call psi_gth(nesd,d_idx(idx_pt:idx_pt+nesd-1),&
& y,sndbuf(snd_pt:snd_pt+nesd-1)) & y,sndbuf(snd_pt:snd_pt+nesd-1))
call dgesd2d(icontxt,nesd,1,sndbuf(snd_pt),nesd,proc_to_comm,0) call dgesd2d(icontxt,nesd,1,sndbuf(snd_pt),nesd,proc_to_comm,0)
! Then I receive ! Then I receive
@ -619,50 +640,50 @@ subroutine psi_dswapdatav(flag,beta,y,desc_a,work,info)
! Then I send ! Then I send
idx_pt = point_to_proc+nerv+psb_elem_send_ idx_pt = point_to_proc+nerv+psb_elem_send_
snd_pt = bsdidx(proc_to_comm) snd_pt = bsdidx(proc_to_comm)
call psi_gth(nesd,h_idx(idx_pt:idx_pt+nesd-1),& call psi_gth(nesd,d_idx(idx_pt:idx_pt+nesd-1),&
& y,sndbuf(snd_pt:snd_pt+nesd-1)) & y,sndbuf(snd_pt:snd_pt+nesd-1))
call dgesd2d(icontxt,nesd,1,sndbuf(snd_pt),nesd,proc_to_comm,0) call dgesd2d(icontxt,nesd,1,sndbuf(snd_pt),nesd,proc_to_comm,0)
else if (proc_to_comm .eq. myrow) then else if (proc_to_comm .eq. myrow) then
! I send to myself ! I send to myself
idx_pt = point_to_proc+nerv+psb_elem_send_ idx_pt = point_to_proc+nerv+psb_elem_send_
snd_pt = bsdidx(proc_to_comm) snd_pt = bsdidx(proc_to_comm)
call psi_gth(nesd,h_idx(idx_pt:idx_pt+nesd-1),& call psi_gth(nesd,d_idx(idx_pt:idx_pt+nesd-1),&
& y,sndbuf(snd_pt:snd_pt+nesd-1)) & y,sndbuf(snd_pt:snd_pt+nesd-1))
end if end if
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
point_to_proc = 1 point_to_proc = 1
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm .ne. -1) do while (proc_to_comm .ne. -1)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
if(proc_to_comm.ne.myrow) then if(proc_to_comm.ne.myrow) then
idx_pt = point_to_proc+psb_elem_recv_ idx_pt = point_to_proc+psb_elem_recv_
rcv_pt = brvidx(proc_to_comm) rcv_pt = brvidx(proc_to_comm)
call psi_sct(nerv,h_idx(idx_pt:idx_pt+nerv-1),& call psi_sct(nerv,d_idx(idx_pt:idx_pt+nerv-1),&
& rcvbuf(rcv_pt:rcv_pt+nerv-1),beta,y) & rcvbuf(rcv_pt:rcv_pt+nerv-1),beta,y)
else else
idx_pt = point_to_proc+psb_elem_recv_ idx_pt = point_to_proc+psb_elem_recv_
snd_pt = bsdidx(proc_to_comm) snd_pt = bsdidx(proc_to_comm)
call psi_sct(nerv,h_idx(idx_pt:idx_pt+nerv-1),& call psi_sct(nerv,d_idx(idx_pt:idx_pt+nerv-1),&
& sndbuf(snd_pt:snd_pt+nesd-1),beta,y) & sndbuf(snd_pt:snd_pt+nesd-1),beta,y)
end if end if
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
else if (swap_send .and. swap_recv) then else if (swap_send .and. swap_recv) then
! First I post all the non blocking receives ! First I post all the non blocking receives
point_to_proc = 1 point_to_proc = 1
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm .ne. -1) do while (proc_to_comm .ne. -1)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
if(proc_to_comm.ne.myrow) then if(proc_to_comm.ne.myrow) then
p2ptag = krecvid(icontxt,proc_to_comm,myrow) p2ptag = krecvid(icontxt,proc_to_comm,myrow)
@ -679,19 +700,19 @@ subroutine psi_dswapdatav(flag,beta,y,desc_a,work,info)
end if end if
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
! Then I post all the blocking sends ! Then I post all the blocking sends
point_to_proc = 1 point_to_proc = 1
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm .ne. -1) do while (proc_to_comm .ne. -1)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
idx_pt = point_to_proc+nerv+psb_elem_send_ idx_pt = point_to_proc+nerv+psb_elem_send_
snd_pt = bsdidx(proc_to_comm) snd_pt = bsdidx(proc_to_comm)
call psi_gth(nesd,h_idx(idx_pt:idx_pt+nesd-1),& call psi_gth(nesd,d_idx(idx_pt:idx_pt+nesd-1),&
& y,sndbuf(snd_pt:snd_pt+nesd-1)) & y,sndbuf(snd_pt:snd_pt+nesd-1))
if(proc_to_comm .ne. myrow) then if(proc_to_comm .ne. myrow) then
@ -707,7 +728,7 @@ subroutine psi_dswapdatav(flag,beta,y,desc_a,work,info)
end if end if
end if end if
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
do i=1, totxch do i=1, totxch
@ -721,13 +742,13 @@ subroutine psi_dswapdatav(flag,beta,y,desc_a,work,info)
if (ixrec .ne. mpi_undefined) then if (ixrec .ne. mpi_undefined) then
ixrec=ixrec-1 ! mpi_waitany returns an 1 to nprow index ixrec=ixrec-1 ! mpi_waitany returns an 1 to nprow index
point_to_proc = ptp(ixrec) point_to_proc = ptp(ixrec)
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
idx_pt = point_to_proc+psb_elem_recv_ idx_pt = point_to_proc+psb_elem_recv_
rcv_pt = brvidx(proc_to_comm) rcv_pt = brvidx(proc_to_comm)
call psi_sct(nerv,h_idx(idx_pt:idx_pt+nerv-1),& call psi_sct(nerv,d_idx(idx_pt:idx_pt+nerv-1),&
& rcvbuf(rcv_pt:rcv_pt+nerv-1),beta,y) & rcvbuf(rcv_pt:rcv_pt+nerv-1),beta,y)
else else
int_err(1) = ixrec int_err(1) = ixrec
@ -738,65 +759,65 @@ subroutine psi_dswapdatav(flag,beta,y,desc_a,work,info)
end do end do
point_to_proc = 1 point_to_proc = 1
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm .ne. -1) do while (proc_to_comm .ne. -1)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
if(proc_to_comm .eq. myrow) then if(proc_to_comm .eq. myrow) then
idx_pt = point_to_proc+psb_elem_recv_ idx_pt = point_to_proc+psb_elem_recv_
snd_pt = bsdidx(proc_to_comm) snd_pt = bsdidx(proc_to_comm)
call psi_sct(nerv,h_idx(idx_pt:idx_pt+nerv-1),& call psi_sct(nerv,d_idx(idx_pt:idx_pt+nerv-1),&
& sndbuf(snd_pt:snd_pt+nesd-1),beta,y) & sndbuf(snd_pt:snd_pt+nesd-1),beta,y)
end if end if
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
else if (swap_send) then else if (swap_send) then
point_to_proc = 1 point_to_proc = 1
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm .ne. -1) do while (proc_to_comm .ne. -1)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
idx_pt = point_to_proc+nerv+psb_elem_send_ idx_pt = point_to_proc+nerv+psb_elem_send_
snd_pt = bsdidx(proc_to_comm) snd_pt = bsdidx(proc_to_comm)
call psi_gth(nesd,h_idx(idx_pt:idx_pt+nesd-1),& call psi_gth(nesd,d_idx(idx_pt:idx_pt+nesd-1),&
& y,sndbuf(snd_pt:snd_pt+nesd-1)) & y,sndbuf(snd_pt:snd_pt+nesd-1))
call dgesd2d(icontxt,nesd,1,sndbuf(snd_pt),nesd,proc_to_comm,0) call dgesd2d(icontxt,nesd,1,sndbuf(snd_pt),nesd,proc_to_comm,0)
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
else if (swap_recv) then else if (swap_recv) then
point_to_proc = 1 point_to_proc = 1
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm .ne. -1) do while (proc_to_comm .ne. -1)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
if(proc_to_comm.ne.myrow) then if(proc_to_comm.ne.myrow) then
rcv_pt = brvidx(proc_to_comm) rcv_pt = brvidx(proc_to_comm)
call dgerv2d(icontxt,nerv,1,rcvbuf(rcv_pt),nerv,proc_to_comm,0) call dgerv2d(icontxt,nerv,1,rcvbuf(rcv_pt),nerv,proc_to_comm,0)
idx_pt = point_to_proc+psb_elem_recv_ idx_pt = point_to_proc+psb_elem_recv_
rcv_pt = brvidx(proc_to_comm) rcv_pt = brvidx(proc_to_comm)
call psi_sct(nerv,h_idx(idx_pt:idx_pt+nerv-1),& call psi_sct(nerv,d_idx(idx_pt:idx_pt+nerv-1),&
& rcvbuf(rcv_pt:rcv_pt+nerv-1),beta,y) & rcvbuf(rcv_pt:rcv_pt+nerv-1),beta,y)
else else
idx_pt = point_to_proc+psb_elem_recv_ idx_pt = point_to_proc+psb_elem_recv_
snd_pt = bsdidx(proc_to_comm) snd_pt = bsdidx(proc_to_comm)
call psi_sct(nerv,h_idx(idx_pt:idx_pt+nerv-1),& call psi_sct(nerv,d_idx(idx_pt:idx_pt+nerv-1),&
& sndbuf(snd_pt:snd_pt+nesd-1),beta,y) & sndbuf(snd_pt:snd_pt+nesd-1),beta,y)
end if end if
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
end if end if

@ -1,26 +1,27 @@
subroutine psi_dswaptranm(flag,n,beta,y,desc_a,work,info) subroutine psi_dswaptranm(flag,n,beta,y,desc_a,work,info,data)
use psb_error_mod use psb_error_mod
use psb_descriptor_type use psb_descriptor_type
use mpi
implicit none implicit none
include 'mpif.h'
integer, intent(in) :: flag, n integer, intent(in) :: flag, n
integer, intent(out) :: info integer, intent(out) :: info
real(kind(1.d0)) :: y(:,:), beta real(kind(1.d0)) :: y(:,:), beta
real(kind(1.d0)), target :: work(:) real(kind(1.d0)), target :: work(:)
type(psb_desc_type) :: desc_a type(psb_desc_type) :: desc_a
integer, optional :: data
! locals ! locals
integer :: icontxt, nprow, npcol, myrow,& integer :: icontxt, nprow, npcol, myrow,&
& mycol, point_to_proc, nesd, nerv,& & mycol, point_to_proc, nesd, nerv,&
& proc_to_comm, p2ptag, icomm, p2pstat,& & proc_to_comm, p2ptag, icomm, p2pstat(mpi_status_size),&
& idxs, idxr, iret, errlen, ifcomm, rank,& & idxs, idxr, iret, errlen, ifcomm, rank,&
& err_act, totxch, ixrec, i, lw, idx_pt,& & err_act, totxch, ixrec, i, lw, idx_pt,&
& snd_pt, rcv_pt & snd_pt, rcv_pt
integer, pointer, dimension(:) :: bsdidx, brvidx,& integer, pointer, dimension(:) :: bsdidx, brvidx,&
& sdsz, rvsz, prcid, ptp, rvhd, h_idx & sdsz, rvsz, prcid, ptp, rvhd, d_idx
integer :: int_err(5) integer :: int_err(5)
integer :: blacs_pnum, krecvid, ksendid integer :: blacs_pnum, krecvid, ksendid
logical :: swap_mpi, swap_sync, swap_send, swap_recv logical :: swap_mpi, swap_sync, swap_send, swap_recv
@ -96,7 +97,19 @@ subroutine psi_dswaptranm(flag,n,beta,y,desc_a,work,info)
swap_sync = iand(flag,psb_swap_sync_).ne.0 swap_sync = iand(flag,psb_swap_sync_).ne.0
swap_send = iand(flag,psb_swap_send_).ne.0 swap_send = iand(flag,psb_swap_send_).ne.0
swap_recv = iand(flag,psb_swap_recv_).ne.0 swap_recv = iand(flag,psb_swap_recv_).ne.0
h_idx => desc_a%halo_index
if(present(data)) then
if(data.eq.psb_comm_halo_) then
d_idx => desc_a%halo_index
else if(data.eq.psb_comm_ovr_) then
d_idx => desc_a%ovrlap_index
else
d_idx => desc_a%halo_index
end if
else
d_idx => desc_a%halo_index
end if
idxs = 0 idxs = 0
idxr = 0 idxr = 0
totxch = 0 totxch = 0
@ -104,11 +117,11 @@ subroutine psi_dswaptranm(flag,n,beta,y,desc_a,work,info)
rvhd(:) = mpi_request_null rvhd(:) = mpi_request_null
! prepare info for communications ! prepare info for communications
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm.ne.-1) do while (proc_to_comm.ne.-1)
if(proc_to_comm .ne. myrow) totxch = totxch+1 if(proc_to_comm .ne. myrow) totxch = totxch+1
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
prcid(proc_to_comm) = blacs_pnum(icontxt,proc_to_comm,mycol) prcid(proc_to_comm) = blacs_pnum(icontxt,proc_to_comm,mycol)
ptp(proc_to_comm) = point_to_proc ptp(proc_to_comm) = point_to_proc
@ -122,7 +135,7 @@ subroutine psi_dswaptranm(flag,n,beta,y,desc_a,work,info)
idxs = idxs+sdsz(proc_to_comm) idxs = idxs+sdsz(proc_to_comm)
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
if((idxr+idxs).lt.size(work)) then if((idxr+idxs).lt.size(work)) then
@ -141,18 +154,19 @@ subroutine psi_dswaptranm(flag,n,beta,y,desc_a,work,info)
! gather elements into sendbuffer for swapping ! gather elements into sendbuffer for swapping
point_to_proc = 1 point_to_proc = 1
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm .ne. -1) do while (proc_to_comm .ne. -1)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
idx_pt = point_to_proc+psb_elem_recv_ idx_pt = point_to_proc+psb_elem_recv_
rcv_pt = brvidx(proc_to_comm) rcv_pt = brvidx(proc_to_comm)
call psi_gth(nerv,n,h_idx(idx_pt:idx_pt+nerv-1),&
call psi_gth(nerv,n,d_idx(idx_pt:idx_pt+nerv-1),&
& y,rcvbuf(rcv_pt:rcv_pt+nerv*n-1)) & y,rcvbuf(rcv_pt:rcv_pt+nerv*n-1))
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
! swap elements using mpi_alltoallv ! swap elements using mpi_alltoallv
@ -168,32 +182,32 @@ subroutine psi_dswaptranm(flag,n,beta,y,desc_a,work,info)
! scatter elements from receivebuffer after swapping ! scatter elements from receivebuffer after swapping
point_to_proc = 1 point_to_proc = 1
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm .ne. -1) do while (proc_to_comm .ne. -1)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
idx_pt = point_to_proc+nerv+psb_elem_send_ idx_pt = point_to_proc+nerv+psb_elem_send_
snd_pt = bsdidx(proc_to_comm) snd_pt = bsdidx(proc_to_comm)
call psi_sct(nesd,n,h_idx(idx_pt:idx_pt+nesd-1),& call psi_sct(nesd,n,d_idx(idx_pt:idx_pt+nesd-1),&
& sndbuf(snd_pt:snd_pt+n*nesd-1),beta,y) & sndbuf(snd_pt:snd_pt+n*nesd-1),beta,y)
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
else if (swap_sync) then else if (swap_sync) then
point_to_proc = 1 point_to_proc = 1
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm .ne. -1) do while (proc_to_comm .ne. -1)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
if (proc_to_comm .lt. myrow) then if (proc_to_comm .lt. myrow) then
! First I send ! First I send
idx_pt = point_to_proc+psb_elem_recv_ idx_pt = point_to_proc+psb_elem_recv_
rcv_pt = brvidx(proc_to_comm) rcv_pt = brvidx(proc_to_comm)
call psi_gth(nerv,n,h_idx(idx_pt:idx_pt+nerv-1),& call psi_gth(nerv,n,d_idx(idx_pt:idx_pt+nerv-1),&
& y,rcvbuf(rcv_pt:rcv_pt+nerv*n-1)) & y,rcvbuf(rcv_pt:rcv_pt+nerv*n-1))
call dgesd2d(icontxt,nerv,n,rcvbuf(rcv_pt),nerv,proc_to_comm,0) call dgesd2d(icontxt,nerv,n,rcvbuf(rcv_pt),nerv,proc_to_comm,0)
! Then I receive ! Then I receive
@ -206,51 +220,51 @@ subroutine psi_dswaptranm(flag,n,beta,y,desc_a,work,info)
! Then I send ! Then I send
idx_pt = point_to_proc+psb_elem_recv_ idx_pt = point_to_proc+psb_elem_recv_
rcv_pt = brvidx(proc_to_comm) rcv_pt = brvidx(proc_to_comm)
call psi_gth(nerv,n,h_idx(idx_pt:idx_pt+nerv-1),& call psi_gth(nerv,n,d_idx(idx_pt:idx_pt+nerv-1),&
& y,rcvbuf(rcv_pt:rcv_pt+nerv*n-1)) & y,rcvbuf(rcv_pt:rcv_pt+nerv*n-1))
call dgesd2d(icontxt,nerv,n,rcvbuf(rcv_pt),nerv,proc_to_comm,0) call dgesd2d(icontxt,nerv,n,rcvbuf(rcv_pt),nerv,proc_to_comm,0)
else if (proc_to_comm .eq. myrow) then else if (proc_to_comm .eq. myrow) then
! I send to myself ! I send to myself
idx_pt = point_to_proc+psb_elem_recv_ idx_pt = point_to_proc+psb_elem_recv_
rcv_pt = bsdidx(proc_to_comm) rcv_pt = bsdidx(proc_to_comm)
call psi_gth(nerv,n,h_idx(idx_pt:idx_pt+nerv-1),& call psi_gth(nerv,n,d_idx(idx_pt:idx_pt+nerv-1),&
& y,rcvbuf(rcv_pt:rcv_pt+nerv*n-1)) & y,rcvbuf(rcv_pt:rcv_pt+nerv*n-1))
end if end if
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
point_to_proc = 1 point_to_proc = 1
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm .ne. -1) do while (proc_to_comm .ne. -1)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
if(proc_to_comm.ne.myrow) then if(proc_to_comm.ne.myrow) then
idx_pt = point_to_proc+nerv+psb_elem_send_ idx_pt = point_to_proc+nerv+psb_elem_send_
snd_pt = bsdidx(proc_to_comm) snd_pt = bsdidx(proc_to_comm)
call psi_sct(nesd,n,h_idx(idx_pt:idx_pt+nesd-1),& call psi_sct(nesd,n,d_idx(idx_pt:idx_pt+nesd-1),&
& sndbuf(snd_pt:snd_pt+n*nesd-1),beta,y) & sndbuf(snd_pt:snd_pt+n*nesd-1),beta,y)
else else
idx_pt = point_to_proc+nerv+psb_elem_send_ idx_pt = point_to_proc+nerv+psb_elem_send_
rcv_pt = brvidx(proc_to_comm) rcv_pt = brvidx(proc_to_comm)
call psi_sct(nesd,n,h_idx(idx_pt:idx_pt+nesd-1),& call psi_sct(nesd,n,d_idx(idx_pt:idx_pt+nesd-1),&
& rcvbuf(rcv_pt:rcv_pt+n*nerv-1),beta,y) & rcvbuf(rcv_pt:rcv_pt+n*nerv-1),beta,y)
end if end if
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
else if (swap_send .and. swap_recv) then else if (swap_send .and. swap_recv) then
! First I post all the non blocking receives ! First I post all the non blocking receives
point_to_proc = 1 point_to_proc = 1
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm .ne. -1) do while (proc_to_comm .ne. -1)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
if(proc_to_comm.ne.myrow) then if(proc_to_comm.ne.myrow) then
p2ptag = krecvid(icontxt,proc_to_comm,myrow) p2ptag = krecvid(icontxt,proc_to_comm,myrow)
@ -267,19 +281,19 @@ subroutine psi_dswaptranm(flag,n,beta,y,desc_a,work,info)
end if end if
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
! Then I post all the blocking sends ! Then I post all the blocking sends
point_to_proc = 1 point_to_proc = 1
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm .ne. -1) do while (proc_to_comm .ne. -1)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
idx_pt = point_to_proc+psb_elem_recv_ idx_pt = point_to_proc+psb_elem_recv_
rcv_pt = brvidx(proc_to_comm) rcv_pt = brvidx(proc_to_comm)
call psi_gth(nerv,n,h_idx(idx_pt:idx_pt+nerv-1),& call psi_gth(nerv,n,d_idx(idx_pt:idx_pt+nerv-1),&
& y,rcvbuf(rcv_pt:rcv_pt+nerv*n-1)) & y,rcvbuf(rcv_pt:rcv_pt+nerv*n-1))
if(proc_to_comm .ne. myrow) then if(proc_to_comm .ne. myrow) then
@ -295,7 +309,7 @@ subroutine psi_dswaptranm(flag,n,beta,y,desc_a,work,info)
end if end if
end if end if
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
do i=1, totxch do i=1, totxch
@ -310,13 +324,13 @@ subroutine psi_dswaptranm(flag,n,beta,y,desc_a,work,info)
if (ixrec .ne. mpi_undefined) then if (ixrec .ne. mpi_undefined) then
ixrec=ixrec-1 ! mpi_waitany returns an 1 to nprow index ixrec=ixrec-1 ! mpi_waitany returns an 1 to nprow index
point_to_proc = ptp(ixrec) point_to_proc = ptp(ixrec)
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
idx_pt = point_to_proc+nerv+psb_elem_send_ idx_pt = point_to_proc+nerv+psb_elem_send_
snd_pt = bsdidx(proc_to_comm) snd_pt = bsdidx(proc_to_comm)
call psi_sct(nesd,n,h_idx(idx_pt:idx_pt+nesd-1),& call psi_sct(nesd,n,d_idx(idx_pt:idx_pt+nesd-1),&
& sndbuf(snd_pt:snd_pt+n*nesd-1),beta,y) & sndbuf(snd_pt:snd_pt+n*nesd-1),beta,y)
else else
int_err(1) = ixrec int_err(1) = ixrec
@ -327,63 +341,63 @@ subroutine psi_dswaptranm(flag,n,beta,y,desc_a,work,info)
end do end do
point_to_proc = 1 point_to_proc = 1
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm .ne. -1) do while (proc_to_comm .ne. -1)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
if(proc_to_comm .eq. myrow) then if(proc_to_comm .eq. myrow) then
idx_pt = point_to_proc+nerv+psb_elem_send_ idx_pt = point_to_proc+nerv+psb_elem_send_
rcv_pt = brvidx(proc_to_comm) rcv_pt = brvidx(proc_to_comm)
call psi_sct(nesd,n,h_idx(idx_pt:idx_pt+nesd-1),& call psi_sct(nesd,n,d_idx(idx_pt:idx_pt+nesd-1),&
& rcvbuf(rcv_pt:rcv_pt+n*nerv-1),beta,y) & rcvbuf(rcv_pt:rcv_pt+n*nerv-1),beta,y)
end if end if
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
else if (swap_send) then else if (swap_send) then
point_to_proc = 1 point_to_proc = 1
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm .ne. -1) do while (proc_to_comm .ne. -1)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
idx_pt = point_to_proc+psb_elem_recv_ idx_pt = point_to_proc+psb_elem_recv_
rcv_pt = brvidx(proc_to_comm) rcv_pt = brvidx(proc_to_comm)
call psi_gth(nerv,n,h_idx(idx_pt:idx_pt+nerv-1),& call psi_gth(nerv,n,d_idx(idx_pt:idx_pt+nerv-1),&
& y,rcvbuf(rcv_pt:rcv_pt+nerv*n-1)) & y,rcvbuf(rcv_pt:rcv_pt+nerv*n-1))
call dgesd2d(icontxt,nerv,n,rcvbuf(rcv_pt),nerv,proc_to_comm,0) call dgesd2d(icontxt,nerv,n,rcvbuf(rcv_pt),nerv,proc_to_comm,0)
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
else if (swap_recv) then else if (swap_recv) then
point_to_proc = 1 point_to_proc = 1
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm .ne. -1) do while (proc_to_comm .ne. -1)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
if(proc_to_comm.ne.myrow) then if(proc_to_comm.ne.myrow) then
snd_pt = bsdidx(proc_to_comm) snd_pt = bsdidx(proc_to_comm)
call dgerv2d(icontxt,nesd,n,sndbuf(snd_pt),nesd,proc_to_comm,0) call dgerv2d(icontxt,nesd,n,sndbuf(snd_pt),nesd,proc_to_comm,0)
idx_pt = point_to_proc+nerv+psb_elem_send_ idx_pt = point_to_proc+nerv+psb_elem_send_
call psi_sct(nesd,n,h_idx(idx_pt:idx_pt+nesd-1),& call psi_sct(nesd,n,d_idx(idx_pt:idx_pt+nesd-1),&
& sndbuf(snd_pt:snd_pt+n*nesd-1),beta,y) & sndbuf(snd_pt:snd_pt+n*nesd-1),beta,y)
else else
idx_pt = point_to_proc+nerv+psb_elem_send_ idx_pt = point_to_proc+nerv+psb_elem_send_
rcv_pt = brvidx(proc_to_comm) rcv_pt = brvidx(proc_to_comm)
call psi_sct(nesd,n,h_idx(idx_pt:idx_pt+nesd-1),& call psi_sct(nesd,n,d_idx(idx_pt:idx_pt+nesd-1),&
& rcvbuf(rcv_pt:rcv_pt+n*nerv-1),beta,y) & rcvbuf(rcv_pt:rcv_pt+n*nerv-1),beta,y)
end if end if
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
end if end if
@ -406,29 +420,30 @@ end subroutine psi_dswaptranm
subroutine psi_dswaptranv(flag,beta,y,desc_a,work,info) subroutine psi_dswaptranv(flag,beta,y,desc_a,work,info,data)
use psb_error_mod use psb_error_mod
use psb_descriptor_type use psb_descriptor_type
use mpi
implicit none implicit none
include 'mpif.h'
integer, intent(in) :: flag integer, intent(in) :: flag
integer, intent(out) :: info integer, intent(out) :: info
real(kind(1.d0)) :: y(:), beta real(kind(1.d0)) :: y(:), beta
real(kind(1.d0)), target :: work(:) real(kind(1.d0)), target :: work(:)
type(psb_desc_type) :: desc_a type(psb_desc_type) :: desc_a
integer, optional :: data
! locals ! locals
integer :: icontxt, nprow, npcol, myrow,& integer :: icontxt, nprow, npcol, myrow,&
& mycol, point_to_proc, nesd, nerv,& & mycol, point_to_proc, nesd, nerv,&
& proc_to_comm, p2ptag, icomm, p2pstat,& & proc_to_comm, p2ptag, icomm, p2pstat(mpi_status_size),&
& idxs, idxr, iret, errlen, ifcomm, rank,& & idxs, idxr, iret, errlen, ifcomm, rank,&
& err_act, totxch, ixrec, i, lw, idx_pt,& & err_act, totxch, ixrec, i, lw, idx_pt,&
& snd_pt, rcv_pt, n & snd_pt, rcv_pt, n
integer, pointer, dimension(:) :: bsdidx, brvidx,& integer, pointer, dimension(:) :: bsdidx, brvidx,&
& sdsz, rvsz, prcid, ptp, rvhd, h_idx & sdsz, rvsz, prcid, ptp, rvhd, d_idx
integer :: int_err(5) integer :: int_err(5)
integer :: blacs_pnum, krecvid, ksendid integer :: blacs_pnum, krecvid, ksendid
logical :: swap_mpi, swap_sync, swap_send, swap_recv logical :: swap_mpi, swap_sync, swap_send, swap_recv
@ -504,7 +519,19 @@ subroutine psi_dswaptranv(flag,beta,y,desc_a,work,info)
swap_sync = iand(flag,psb_swap_sync_).ne.0 swap_sync = iand(flag,psb_swap_sync_).ne.0
swap_send = iand(flag,psb_swap_send_).ne.0 swap_send = iand(flag,psb_swap_send_).ne.0
swap_recv = iand(flag,psb_swap_recv_).ne.0 swap_recv = iand(flag,psb_swap_recv_).ne.0
h_idx => desc_a%halo_index
if(present(data)) then
if(data.eq.psb_comm_halo_) then
d_idx => desc_a%halo_index
else if(data.eq.psb_comm_ovr_) then
d_idx => desc_a%ovrlap_index
else
d_idx => desc_a%halo_index
end if
else
d_idx => desc_a%halo_index
end if
idxs = 0 idxs = 0
idxr = 0 idxr = 0
totxch = 0 totxch = 0
@ -513,11 +540,11 @@ subroutine psi_dswaptranv(flag,beta,y,desc_a,work,info)
n=1 n=1
! prepare info for communications ! prepare info for communications
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm.ne.-1) do while (proc_to_comm.ne.-1)
if(proc_to_comm .ne. myrow) totxch = totxch+1 if(proc_to_comm .ne. myrow) totxch = totxch+1
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
prcid(proc_to_comm) = blacs_pnum(icontxt,proc_to_comm,mycol) prcid(proc_to_comm) = blacs_pnum(icontxt,proc_to_comm,mycol)
ptp(proc_to_comm) = point_to_proc ptp(proc_to_comm) = point_to_proc
@ -531,7 +558,7 @@ subroutine psi_dswaptranv(flag,beta,y,desc_a,work,info)
idxs = idxs+sdsz(proc_to_comm) idxs = idxs+sdsz(proc_to_comm)
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
if((idxr+idxs).lt.size(work)) then if((idxr+idxs).lt.size(work)) then
@ -550,18 +577,18 @@ subroutine psi_dswaptranv(flag,beta,y,desc_a,work,info)
! gather elements into sendbuffer for swapping ! gather elements into sendbuffer for swapping
point_to_proc = 1 point_to_proc = 1
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm .ne. -1) do while (proc_to_comm .ne. -1)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
idx_pt = point_to_proc+psb_elem_recv_ idx_pt = point_to_proc+psb_elem_recv_
rcv_pt = brvidx(proc_to_comm) rcv_pt = brvidx(proc_to_comm)
call psi_gth(nerv,h_idx(idx_pt:idx_pt+nerv-1),& call psi_gth(nerv,d_idx(idx_pt:idx_pt+nerv-1),&
& y,rcvbuf(rcv_pt:rcv_pt+nerv-1)) & y,rcvbuf(rcv_pt:rcv_pt+nerv-1))
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
! swap elements using mpi_alltoallv ! swap elements using mpi_alltoallv
@ -577,32 +604,32 @@ subroutine psi_dswaptranv(flag,beta,y,desc_a,work,info)
! scatter elements from receivebuffer after swapping ! scatter elements from receivebuffer after swapping
point_to_proc = 1 point_to_proc = 1
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm .ne. -1) do while (proc_to_comm .ne. -1)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
idx_pt = point_to_proc+nerv+psb_elem_send_ idx_pt = point_to_proc+nerv+psb_elem_send_
snd_pt = bsdidx(proc_to_comm) snd_pt = bsdidx(proc_to_comm)
call psi_sct(nesd,h_idx(idx_pt:idx_pt+nesd-1),& call psi_sct(nesd,d_idx(idx_pt:idx_pt+nesd-1),&
& sndbuf(snd_pt:snd_pt+nesd-1),beta,y) & sndbuf(snd_pt:snd_pt+nesd-1),beta,y)
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
else if (swap_sync) then else if (swap_sync) then
point_to_proc = 1 point_to_proc = 1
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm .ne. -1) do while (proc_to_comm .ne. -1)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
if (proc_to_comm .lt. myrow) then if (proc_to_comm .lt. myrow) then
! First I send ! First I send
idx_pt = point_to_proc+psb_elem_recv_ idx_pt = point_to_proc+psb_elem_recv_
rcv_pt = brvidx(proc_to_comm) rcv_pt = brvidx(proc_to_comm)
call psi_gth(nerv,h_idx(idx_pt:idx_pt+nerv-1),& call psi_gth(nerv,d_idx(idx_pt:idx_pt+nerv-1),&
& y,rcvbuf(rcv_pt:rcv_pt+nerv-1)) & y,rcvbuf(rcv_pt:rcv_pt+nerv-1))
call dgesd2d(icontxt,nerv,1,rcvbuf(rcv_pt),nerv,proc_to_comm,0) call dgesd2d(icontxt,nerv,1,rcvbuf(rcv_pt),nerv,proc_to_comm,0)
! Then I receive ! Then I receive
@ -615,51 +642,51 @@ subroutine psi_dswaptranv(flag,beta,y,desc_a,work,info)
! Then I send ! Then I send
idx_pt = point_to_proc+psb_elem_recv_ idx_pt = point_to_proc+psb_elem_recv_
rcv_pt = brvidx(proc_to_comm) rcv_pt = brvidx(proc_to_comm)
call psi_gth(nerv,h_idx(idx_pt:idx_pt+nerv-1),& call psi_gth(nerv,d_idx(idx_pt:idx_pt+nerv-1),&
& y,rcvbuf(rcv_pt:rcv_pt+nerv-1)) & y,rcvbuf(rcv_pt:rcv_pt+nerv-1))
call dgesd2d(icontxt,nerv,1,rcvbuf(rcv_pt),nerv,proc_to_comm,0) call dgesd2d(icontxt,nerv,1,rcvbuf(rcv_pt),nerv,proc_to_comm,0)
else if (proc_to_comm .eq. myrow) then else if (proc_to_comm .eq. myrow) then
! I send to myself ! I send to myself
idx_pt = point_to_proc+psb_elem_recv_ idx_pt = point_to_proc+psb_elem_recv_
rcv_pt = bsdidx(proc_to_comm) rcv_pt = bsdidx(proc_to_comm)
call psi_gth(nerv,h_idx(idx_pt:idx_pt+nerv-1),& call psi_gth(nerv,d_idx(idx_pt:idx_pt+nerv-1),&
& y,rcvbuf(rcv_pt:rcv_pt+nerv-1)) & y,rcvbuf(rcv_pt:rcv_pt+nerv-1))
end if end if
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
point_to_proc = 1 point_to_proc = 1
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm .ne. -1) do while (proc_to_comm .ne. -1)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
if(proc_to_comm.ne.myrow) then if(proc_to_comm.ne.myrow) then
idx_pt = point_to_proc+nerv+psb_elem_send_ idx_pt = point_to_proc+nerv+psb_elem_send_
snd_pt = bsdidx(proc_to_comm) snd_pt = bsdidx(proc_to_comm)
call psi_sct(nesd,h_idx(idx_pt:idx_pt+nesd-1),& call psi_sct(nesd,d_idx(idx_pt:idx_pt+nesd-1),&
& sndbuf(snd_pt:snd_pt+nesd-1),beta,y) & sndbuf(snd_pt:snd_pt+nesd-1),beta,y)
else else
idx_pt = point_to_proc+nerv+psb_elem_send_ idx_pt = point_to_proc+nerv+psb_elem_send_
rcv_pt = brvidx(proc_to_comm) rcv_pt = brvidx(proc_to_comm)
call psi_sct(nesd,h_idx(idx_pt:idx_pt+nesd-1),& call psi_sct(nesd,d_idx(idx_pt:idx_pt+nesd-1),&
& rcvbuf(rcv_pt:rcv_pt+nerv-1),beta,y) & rcvbuf(rcv_pt:rcv_pt+nerv-1),beta,y)
end if end if
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
else if (swap_send .and. swap_recv) then else if (swap_send .and. swap_recv) then
! First I post all the non blocking receives ! First I post all the non blocking receives
point_to_proc = 1 point_to_proc = 1
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm .ne. -1) do while (proc_to_comm .ne. -1)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
if(proc_to_comm.ne.myrow) then if(proc_to_comm.ne.myrow) then
p2ptag = krecvid(icontxt,proc_to_comm,myrow) p2ptag = krecvid(icontxt,proc_to_comm,myrow)
@ -676,19 +703,19 @@ subroutine psi_dswaptranv(flag,beta,y,desc_a,work,info)
end if end if
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
! Then I post all the blocking sends ! Then I post all the blocking sends
point_to_proc = 1 point_to_proc = 1
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm .ne. -1) do while (proc_to_comm .ne. -1)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
idx_pt = point_to_proc+psb_elem_recv_ idx_pt = point_to_proc+psb_elem_recv_
rcv_pt = brvidx(proc_to_comm) rcv_pt = brvidx(proc_to_comm)
call psi_gth(nerv,h_idx(idx_pt:idx_pt+nerv-1),& call psi_gth(nerv,d_idx(idx_pt:idx_pt+nerv-1),&
& y,rcvbuf(rcv_pt:rcv_pt+nerv-1)) & y,rcvbuf(rcv_pt:rcv_pt+nerv-1))
if(proc_to_comm .ne. myrow) then if(proc_to_comm .ne. myrow) then
@ -704,7 +731,7 @@ subroutine psi_dswaptranv(flag,beta,y,desc_a,work,info)
end if end if
end if end if
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
do i=1, totxch do i=1, totxch
@ -719,13 +746,13 @@ subroutine psi_dswaptranv(flag,beta,y,desc_a,work,info)
if (ixrec .ne. mpi_undefined) then if (ixrec .ne. mpi_undefined) then
ixrec=ixrec-1 ! mpi_waitany returns an 1 to nprow index ixrec=ixrec-1 ! mpi_waitany returns an 1 to nprow index
point_to_proc = ptp(ixrec) point_to_proc = ptp(ixrec)
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
idx_pt = point_to_proc+nerv+psb_elem_send_ idx_pt = point_to_proc+nerv+psb_elem_send_
rcv_pt = bsdidx(proc_to_comm) rcv_pt = bsdidx(proc_to_comm)
call psi_sct(nesd,h_idx(idx_pt:idx_pt+nesd-1),& call psi_sct(nesd,d_idx(idx_pt:idx_pt+nesd-1),&
& rcvbuf(rcv_pt:rcv_pt+nerv-1),beta,y) & rcvbuf(rcv_pt:rcv_pt+nerv-1),beta,y)
else else
int_err(1) = ixrec int_err(1) = ixrec
@ -736,64 +763,64 @@ subroutine psi_dswaptranv(flag,beta,y,desc_a,work,info)
end do end do
point_to_proc = 1 point_to_proc = 1
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm .ne. -1) do while (proc_to_comm .ne. -1)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
if(proc_to_comm .eq. myrow) then if(proc_to_comm .eq. myrow) then
idx_pt = point_to_proc+nerv+psb_elem_send_ idx_pt = point_to_proc+nerv+psb_elem_send_
rcv_pt = brvidx(proc_to_comm) rcv_pt = brvidx(proc_to_comm)
call psi_sct(nesd,h_idx(idx_pt:idx_pt+nesd-1),& call psi_sct(nesd,d_idx(idx_pt:idx_pt+nesd-1),&
& rcvbuf(rcv_pt:rcv_pt+nerv-1),beta,y) & rcvbuf(rcv_pt:rcv_pt+nerv-1),beta,y)
end if end if
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
else if (swap_send) then else if (swap_send) then
point_to_proc = 1 point_to_proc = 1
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm .ne. -1) do while (proc_to_comm .ne. -1)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
idx_pt = point_to_proc+psb_elem_recv_ idx_pt = point_to_proc+psb_elem_recv_
rcv_pt = brvidx(proc_to_comm) rcv_pt = brvidx(proc_to_comm)
call psi_gth(nerv,h_idx(idx_pt:idx_pt+nerv-1),& call psi_gth(nerv,d_idx(idx_pt:idx_pt+nerv-1),&
& y,rcvbuf(rcv_pt:rcv_pt+nerv-1)) & y,rcvbuf(rcv_pt:rcv_pt+nerv-1))
call dgesd2d(icontxt,nerv,1,rcvbuf(rcv_pt),nerv,proc_to_comm,0) call dgesd2d(icontxt,nerv,1,rcvbuf(rcv_pt),nerv,proc_to_comm,0)
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
else if (swap_recv) then else if (swap_recv) then
point_to_proc = 1 point_to_proc = 1
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
do while (proc_to_comm .ne. -1) do while (proc_to_comm .ne. -1)
nerv = h_idx(point_to_proc+psb_n_elem_recv_) nerv = d_idx(point_to_proc+psb_n_elem_recv_)
nesd = h_idx(point_to_proc+nerv+psb_n_elem_send_) nesd = d_idx(point_to_proc+nerv+psb_n_elem_send_)
if(proc_to_comm.ne.myrow) then if(proc_to_comm.ne.myrow) then
snd_pt = bsdidx(proc_to_comm) snd_pt = bsdidx(proc_to_comm)
call dgerv2d(icontxt,nesd,1,sndbuf(snd_pt),nesd,proc_to_comm,0) call dgerv2d(icontxt,nesd,1,sndbuf(snd_pt),nesd,proc_to_comm,0)
idx_pt = point_to_proc+psb_elem_recv_ idx_pt = point_to_proc+psb_elem_recv_
rcv_pt = brvidx(proc_to_comm) rcv_pt = brvidx(proc_to_comm)
call psi_sct(nesd,h_idx(idx_pt:idx_pt+nesd-1),& call psi_sct(nesd,d_idx(idx_pt:idx_pt+nesd-1),&
& sndbuf(snd_pt:snd_pt+nesd-1),beta,y) & sndbuf(snd_pt:snd_pt+nesd-1),beta,y)
else else
idx_pt = point_to_proc+nerv+psb_elem_send_ idx_pt = point_to_proc+nerv+psb_elem_send_
rcv_pt = brvidx(proc_to_comm) rcv_pt = brvidx(proc_to_comm)
call psi_sct(nesd,h_idx(idx_pt:idx_pt+nesd-1),& call psi_sct(nesd,d_idx(idx_pt:idx_pt+nesd-1),&
& rcvbuf(rcv_pt:rcv_pt+nerv-1),beta,y) & rcvbuf(rcv_pt:rcv_pt+nerv-1),beta,y)
end if end if
point_to_proc = point_to_proc+nerv+nesd+3 point_to_proc = point_to_proc+nerv+nesd+3
proc_to_comm = h_idx(point_to_proc+psb_proc_id_) proc_to_comm = d_idx(point_to_proc+psb_proc_id_)
end do end do
end if end if

@ -46,6 +46,7 @@
integer, parameter :: psb_maxjdrows_=8, psb_minjdrows_=4 integer, parameter :: psb_maxjdrows_=8, psb_minjdrows_=4
integer, parameter :: psb_dbleint_=2 integer, parameter :: psb_dbleint_=2
integer, parameter :: act_ret=0, act_abort=1, no_err=0 integer, parameter :: act_ret=0, act_abort=1, no_err=0
integer, parameter :: psb_comm_halo_=0, psb_comm_ovr_=1
real(kind(1.d0)), parameter :: psb_colrow_=0.33, psb_percent_=0.7 real(kind(1.d0)), parameter :: psb_colrow_=0.33, psb_percent_=0.7
real(kind(1.d0)), parameter :: dzero=0.d0, done=1.d0 real(kind(1.d0)), parameter :: dzero=0.d0, done=1.d0

@ -39,6 +39,7 @@ module psb_const_mod
integer, parameter :: psb_perm_update_=98765, psb_isrtdcoo_=98764 integer, parameter :: psb_perm_update_=98765, psb_isrtdcoo_=98764
integer, parameter :: psb_maxjdrows_=8, psb_minjdrows_=4 integer, parameter :: psb_maxjdrows_=8, psb_minjdrows_=4
integer, parameter :: psb_dbleint_=2 integer, parameter :: psb_dbleint_=2
integer, parameter :: psb_comm_halo_=0, psb_comm_ovr_=1
real(kind(1.d0)), parameter :: psb_colrow_=0.33, psb_percent_=0.7 real(kind(1.d0)), parameter :: psb_colrow_=0.33, psb_percent_=0.7
real(kind(1.d0)), parameter :: dzero=0.d0, done=1.d0 real(kind(1.d0)), parameter :: dzero=0.d0, done=1.d0

@ -48,7 +48,7 @@ end interface
use psb_prec_type use psb_prec_type
implicit none implicit none
type(psb_dprec_type), intent(inout) :: prec type(psb_dprec_type), intent(inout) :: prec
character(len=10), intent(in) :: ptype character(len=*), intent(in) :: ptype
integer, optional, intent(in) :: iv(:) integer, optional, intent(in) :: iv(:)
real(kind(1.d0)), optional, intent(in) :: rs real(kind(1.d0)), optional, intent(in) :: rs
real(kind(1.d0)), optional, intent(in) :: rv(:) real(kind(1.d0)), optional, intent(in) :: rv(:)

@ -12,6 +12,10 @@ module psb_prec_type
& asm_=3, ras_=5, ash_=4, rash_=6, ras2lv_=7, ras2lvm_=8,& & asm_=3, ras_=5, ash_=4, rash_=6, ras2lv_=7, ras2lvm_=8,&
& lv2mras_=9, lv2smth_=10, lv2lsm_=11, sl2sm_=12, superlu_=13,& & lv2mras_=9, lv2smth_=10, lv2lsm_=11, sl2sm_=12, superlu_=13,&
& new_loc_smth_=14, new_glb_smth_=15, max_prec_=15 & new_loc_smth_=14, new_glb_smth_=15, max_prec_=15
integer, parameter :: nohalo_=0, halo_=4
integer, parameter :: none_=0, sum_=1
integer, parameter :: avg_=2, square_root_=3
! Multilevel stuff. ! Multilevel stuff.
integer, parameter :: no_ml_=0, add_ml_prec_=1, mult_ml_prec_=2 integer, parameter :: no_ml_=0, add_ml_prec_=1, mult_ml_prec_=2
integer, parameter :: new_ml_prec_=3, max_ml_=new_ml_prec_ integer, parameter :: new_ml_prec_=3, max_ml_=new_ml_prec_
@ -369,5 +373,28 @@ contains
end subroutine psb_nullify_baseprec end subroutine psb_nullify_baseprec
function pr_to_str(iprec)
integer, intent(in) :: iprec
character(len=10) :: pr_to_str
select case(iprec)
case(noprec_)
pr_to_str='NOPREC'
case(diagsc_)
pr_to_str='DIAGSC'
case(bja_)
pr_to_str='BJA'
case(asm_)
pr_to_str='ASM'
case(ash_)
pr_to_str='ASM'
case(ras_)
pr_to_str='ASM'
case(rash_)
pr_to_str='ASM'
end select
end function pr_to_str
end module psb_prec_type end module psb_prec_type

@ -57,19 +57,21 @@ module psi_mod
end interface end interface
interface psi_swapdata interface psi_swapdata
subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info) subroutine psi_dswapdatam(flag,n,beta,y,desc_a,work,info,data)
use psb_descriptor_type use psb_descriptor_type
integer, intent(in) :: flag, n integer, intent(in) :: flag, n
integer, intent(out) :: info integer, intent(out) :: info
real(kind(1.d0)) :: y(:,:), beta, work(:) real(kind(1.d0)) :: y(:,:), beta, work(:)
type(psb_desc_type) :: desc_a type(psb_desc_type) :: desc_a
integer, optional :: data
end subroutine psi_dswapdatam end subroutine psi_dswapdatam
subroutine psi_dswapdatav(flag,beta,y,desc_a,work,info) subroutine psi_dswapdatav(flag,beta,y,desc_a,work,info,data)
use psb_descriptor_type use psb_descriptor_type
integer, intent(in) :: flag integer, intent(in) :: flag
integer, intent(out) :: info integer, intent(out) :: info
real(kind(1.d0)) :: y(:), beta, work(:) real(kind(1.d0)) :: y(:), beta, work(:)
type(psb_desc_type) :: desc_a type(psb_desc_type) :: desc_a
integer, optional :: data
end subroutine psi_dswapdatav end subroutine psi_dswapdatav
subroutine psi_iswapdatam(flag,n,beta,y,desc_a,work,info) subroutine psi_iswapdatam(flag,n,beta,y,desc_a,work,info)
use psb_descriptor_type use psb_descriptor_type

@ -108,7 +108,7 @@ subroutine psb_dcslu(a,desc_a,p,upd,info)
call psb_nullify_sp(blck) call psb_nullify_sp(blck)
t1= mpi_wtime() t1= mpi_wtime()
if(debug) write(0,*)me,': calling psb_csrsetup' if(debug) write(0,*)me,': calling psb_csrsetup',p%iprcparm(p_type_),p%iprcparm(n_ovr_)
call psb_csrsetup(p%iprcparm(p_type_),p%iprcparm(n_ovr_),a,& call psb_csrsetup(p%iprcparm(p_type_),p%iprcparm(n_ovr_),a,&
& blck,desc_a,upd,p%desc_data,info) & blck,desc_a,upd,p%desc_data,info)
if(info/=0) then if(info/=0) then

@ -243,7 +243,7 @@ subroutine psb_dbaseprcaply(prec,x,beta,y,desc_data,trans,work,info)
tx(desc_data%matrix_data(psb_n_row_)+1:isz) = zero tx(desc_data%matrix_data(psb_n_row_)+1:isz) = zero
if (prec%iprcparm(restr_)==psb_halo_) then if (prec%iprcparm(restr_)==psb_halo_) then
call psb_halo(tx,prec%desc_data,info,work=aux) call psb_halo(tx,prec%desc_data,info,work=aux)
if(info /=0) then if(info /=0) then
info=4010 info=4010
ch_err='psb_halo' ch_err='psb_halo'
@ -281,9 +281,9 @@ subroutine psb_dbaseprcaply(prec,x,beta,y,desc_data,trans,work,info)
! call f90_psovrl(ty,prec%desc_data,update_type=prec%a_restrict) ! call f90_psovrl(ty,prec%desc_data,update_type=prec%a_restrict)
case(psb_sum_,psb_avg_) case(psb_sum_,psb_avg_)
call psb_ovrl(ty,prec%desc_data,info,& call psb_ovrl(ty,prec%desc_data,info,&
& update_type=prec%iprcparm(prol_),work=aux) & update_type=prec%iprcparm(prol_),work=aux)
if(info /=0) then if(info /=0) then
info=4010 info=4010
ch_err='psb_ovrl' ch_err='psb_ovrl'
goto 9999 goto 9999

@ -448,16 +448,19 @@ subroutine psb_dspmv(alpha,a,x,beta,y,desc_a,info,&
n = desc_a%matrix_data(psb_n_) n = desc_a%matrix_data(psb_n_)
nrow = desc_a%matrix_data(psb_n_row_) nrow = desc_a%matrix_data(psb_n_row_)
ncol = desc_a%matrix_data(psb_n_col_) ncol = desc_a%matrix_data(psb_n_col_)
lldx = size(x,1) lldx = size(x)
lldy = size(y,1) lldy = size(y)
! check for presence/size of a work area ! check for presence/size of a work area
liwork= 2*ncol liwork= 2*ncol
if (a%pr(1) /= 0) llwork = liwork + n * ik if (a%pr(1) /= 0) liwork = liwork + n * ik
if (a%pl(1) /= 0) llwork = liwork + m * ik if (a%pl(1) /= 0) liwork = liwork + m * ik
if (present(work)) then if (present(work)) then
if(size(work).lt.liwork) then if(size(work).ge.liwork) then
call psb_realloc(liwork,work,info) iwork => work
liwork=size(work)
else
call psb_realloc(liwork,iwork,info)
if(info.ne.0) then if(info.ne.0) then
info=4010 info=4010
ch_err='psb_realloc' ch_err='psb_realloc'
@ -465,7 +468,6 @@ subroutine psb_dspmv(alpha,a,x,beta,y,desc_a,info,&
goto 9999 goto 9999
end if end if
end if end if
iwork => work
else else
call psb_realloc(liwork,iwork,info) call psb_realloc(liwork,iwork,info)
if(info.ne.0) then if(info.ne.0) then
@ -516,15 +518,12 @@ subroutine psb_dspmv(alpha,a,x,beta,y,desc_a,info,&
x(nrow:ncol)=0.d0 x(nrow:ncol)=0.d0
else else
call psi_swapdata(ior(psb_swap_send_,psb_swap_recv_),& call psi_swapdata(ior(psb_swap_send_,psb_swap_recv_),&
& dzero,x,desc_a,iwork,info) & dzero,x,desc_a,iwork,info,data=psb_comm_halo_)
!!$ call PSI_dSwapData(ior(SWAP_SEND,SWAP_RECV),1,&
!!$ & dzero,x(iix,jjx),lldx,desc_a%matrix_data,&
!!$ & desc_a%halo_index,iwork,liwork,info)
end if end if
! local Matrix-vector product ! local Matrix-vector product
call dcsmm(itrans,nrow,ib,ncol,alpha,a%pr,a%fida,& call dcsmm(itrans,nrow,ib,ncol,alpha,a%pl,a%fida,&
& a%descra,a%aspk,a%ia1,a%ia2,a%infoa,a%pl,& & a%descra,a%aspk,a%ia1,a%ia2,a%infoa,a%pr,&
& x(iix),lldx,beta,y(iiy),lldy,& & x(iix),lldx,beta,y(iiy),lldy,&
& iwork,liwork,info) & iwork,liwork,info)
if(info.ne.0) then if(info.ne.0) then
@ -585,9 +584,6 @@ subroutine psb_dspmv(alpha,a,x,beta,y,desc_a,info,&
if(idoswap.gt.0)& if(idoswap.gt.0)&
& call psi_swaptran(ior(psb_swap_send_,psb_swap_recv_),& & call psi_swaptran(ior(psb_swap_send_,psb_swap_recv_),&
& done,yp,desc_a,iwork,info) & done,yp,desc_a,iwork,info)
!!$ & call PSI_dSwapTran(ior(SWAP_SEND,SWAP_RECV),&
!!$ & ik,done,y(iiy,jjy),lldy,desc_a%matrix_data,&
!!$ & desc_a%halo_index),iwork,liwork,info
if(info.ne.0) then if(info.ne.0) then
info = 4010 info = 4010
ch_err='PSI_dSwapTran' ch_err='PSI_dSwapTran'

@ -153,7 +153,7 @@ C
C C
C C
CALL DCOOMM(TRANS,M,N,K,ALPHA,DESCRA,A,IA1, CALL DCOOMM(TRANS,M,N,K,ALPHA,DESCRA,A,IA1,
+ IA2,INFOA,B,LDB,BETA,C,LDC,WORK) + IA2,INFOA,B,LDB,BETA,C,LDC,WORK,LWORK,IERROR)
ELSE ELSE
C C
C This data structure not yet considered C This data structure not yet considered

@ -266,7 +266,7 @@ subroutine psb_dcsdp(a, b,info,ifc,check,trans,unitd)
case ('COO') case ('COO')
call dcoco(trans_, a%m, a%k, unitd_, d, a%descra, a%aspk,& call dcoco(trans_, a%m, a%k, unitd_, d, a%descra, a%aspk,&
& a%ia2, a%ia1, a%infoa, b%pl, b%descra, b%aspk, b%ia1,& & a%ia1, a%ia2, a%infoa, b%pl, b%descra, b%aspk, b%ia1,&
& b%ia2, b%infoa, b%pr, size(b%aspk), size(b%ia1),& & b%ia2, b%infoa, b%pr, size(b%aspk), size(b%ia1),&
& size(b%ia2), work, 2*size(work), info) & size(b%ia2), work, 2*size(work), info)
if (info/=0) then if (info/=0) then

@ -41,9 +41,9 @@ Subroutine psb_dfixcoo(A,INFO)
if (j > nza) exit if (j > nza) exit
enddo enddo
nzl = j - i nzl = j - i
call mrgsrt(nzl,a%ia2(i:i+nzl-1),iaux,iret) call mrgsrt(nzl,a%ia2(i),iaux,iret)
if (iret.eq.0) & if (iret.eq.0) &
& call reordvn(nzl,a%aspk(i:i+nzl-1),a%ia1(i:i+nzl-1),a%ia2(i:i+nzl-1),iaux) & call reordvn(nzl,a%aspk(i),a%ia1(i),a%ia2(i),iaux)
i = j i = j
enddo enddo

@ -1,7 +1,8 @@
7 Number of entries below this 7 Number of entries below this
BICGSTAB Iterative method BICGSTAB CGS BICG BICGSTABL BICGSTAB Iterative method BICGSTAB CGS BICG BICGSTABL
NONE Preconditioner ILU DIAGSC NONE 2 Preconditioner ILU DIAGSC NONE
CSR A Storage format CSR COO JAD 2 Number ov overlapping levels
COO A Storage format CSR COO JAD
20 Domain size (acutal sistem is this**3) 20 Domain size (acutal sistem is this**3)
1 Stopping criterion 1 Stopping criterion
80 MAXIT 80 MAXIT

@ -1,5 +1,6 @@
! File: ppde90.f90
! !
! Program: ppde90
! This sample program shows how to build and solve a sparse linear ! This sample program shows how to build and solve a sparse linear
! !
! The program solves a linear system based on the partial differential ! The program solves a linear system based on the partial differential
@ -7,7 +8,8 @@
! !
! !
! !
! the equation generated is: ! The equation generated is
!
! b1 d d (u) b2 d d (u) a1 d (u)) a2 d (u))) ! b1 d d (u) b2 d d (u) a1 d (u)) a2 d (u)))
! - ------ - ------ + ----- + ------ + a3 u = 0 ! - ------ - ------ + ----- + ------ + a3 u = 0
! dx dx dy dy dx dy ! dx dx dy dy dx dy
@ -41,6 +43,7 @@
program pde90 program pde90
use psb_sparse_mod use psb_sparse_mod
use psb_error_mod use psb_error_mod
use psb_prec_mod
implicit none implicit none
interface interface
@ -75,7 +78,7 @@ program pde90
! solver parameters ! solver parameters
integer :: iter, itmax,ierr,itrace, methd,iprec, istopc,& integer :: iter, itmax,ierr,itrace, methd,iprec, istopc,&
& iparm(20), ml & iparm(20), ml, novr
real(kind(1.d0)) :: err, eps, rparm(20) real(kind(1.d0)) :: err, eps, rparm(20)
! other variables ! other variables
@ -101,7 +104,7 @@ program pde90
! !
! get parameters ! get parameters
! !
call get_parms(icontxt,cmethd,prec,afmt,idim,istopc,itmax,itrace,ml) call get_parms(icontxt,cmethd,iprec,novr,afmt,idim,istopc,itmax,itrace,ml)
! !
! allocate and fill in the coefficient matrix, rhs and initial guess ! allocate and fill in the coefficient matrix, rhs and initial guess
@ -120,50 +123,30 @@ program pde90
dim=size(a%aspk) dim=size(a%aspk)
!!$ allocate(h%aspk(dim),h%ia1(dim),h%ia2(dim),h%pl(size(a%pl)),&
!!$ & h%pl(size(a%pl)),d(size(a%pl)),&
!!$ & desc_a_out%matrix_data(size(desc_a%matrix_data)),&
!!$ & desc_a_out%halo_index(size(desc_a%halo_index)),&
!!$ & desc_a_out%ovrlap_index(size(desc_a%ovrlap_index)),&
!!$ & desc_a_out%ovrlap_elem(size(desc_a%ovrlap_elem)),&
!!$ & desc_a_out%loc_to_glob(size(desc_a%loc_to_glob)),&
!!$ & desc_a_out%glob_to_loc(size(desc_a%glob_to_loc)), work(1024))
!!$ check_descr=15
! work(5)=9
!!$ write(0,*)'calling verify'
!!$ call f90_psverify(d,a,desc_a,check_descr,convert_descr,h,&
!!$ & desc_a_out,work)
!!$ write(0,*)'verify done',convert_descr
! deallocate(work)
call dgamx2d(icontxt,'a',' ',ione, ione,t2,ione,t1,t1,-1,-1,-1) call dgamx2d(icontxt,'a',' ',ione, ione,t2,ione,t1,t1,-1,-1,-1)
if (iam.eq.0) write(6,*) 'matrix creation time : ',t2 if (iam.eq.0) write(*,'("Overall matrix creation time : ",es10.4)')t2
if (iam.eq.0) write(*,'(" ")')
! !
! prepare the preconditioner. ! prepare the preconditioner.
! !
write(0,*)'precondizionatore=',prec if(iam.eq.psb_root_) write(0,'("Setting preconditioner to : ",a)')pr_to_str(iprec)
select case (prec) select case(iprec)
case ('ILU') case(noprec_)
iprec = 2 call psb_precset(pre,'noprec')
ptype='bja' case(diagsc_)
call psb_precset(pre,ptype) call psb_precset(pre,'diagsc')
case ('DIAGSC') case(bja_)
iprec = 1 call psb_precset(pre,'ilu')
ptype='diagsc' case(asm_)
call psb_precset(pre,ptype) call psb_precset(pre,'asm',iv=(/novr,halo_,sum_/))
case ('NONE') case(ash_)
iprec = 0 call psb_precset(pre,'asm',iv=(/novr,nohalo_,sum_/))
ptype='noprec' case(ras_)
call psb_precset(pre,ptype) call psb_precset(pre,'asm',iv=(/novr,halo_,none_/))
case default case(rash_)
info=5003 call psb_precset(pre,'asm',iv=(/novr,nohalo_,none_/))
ch_err(1:3)=prec(1:3)
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end select end select
write(0,*)'preconditioner set'
call blacs_barrier(icontxt,'ALL') call blacs_barrier(icontxt,'ALL')
t1 = mpi_wtime() t1 = mpi_wtime()
@ -179,12 +162,13 @@ program pde90
call dgamx2d(icontxt,'a',' ',ione, ione,tprec,ione,t1,t1,-1,-1,-1) call dgamx2d(icontxt,'a',' ',ione, ione,tprec,ione,t1,t1,-1,-1,-1)
if (iam.eq.0) write(6,*) 'preconditioner time : ',tprec if (iam.eq.0) write(*,'("Preconditioner time : ",es10.4)')tprec
if (iam.eq.0) write(*,'(" ")')
! !
! iterative method parameters ! iterative method parameters
! !
write(*,*) 'calling iterative method', a%ia2(7999:8001) if(iam.eq.psb_root_) write(*,'("Calling iterative method ",a)')cmethd
call blacs_barrier(icontxt,'ALL') call blacs_barrier(icontxt,'ALL')
t1 = mpi_wtime() t1 = mpi_wtime()
eps = 1.d-9 eps = 1.d-9
@ -213,11 +197,12 @@ program pde90
call dgamx2d(icontxt,'a',' ',ione, ione,t2,ione,t1,t1,-1,-1,-1) call dgamx2d(icontxt,'a',' ',ione, ione,t2,ione,t1,t1,-1,-1,-1)
if (iam.eq.0) then if (iam.eq.0) then
write(6,*) 'time to solve matrix : ',t2 write(*,'(" ")')
write(6,*) 'time per iteration : ',t2/iter write(*,'("Time to solve matrix : ",es10.4)')t2
write(6,*) 'number of iterations : ',iter write(*,'("Time per iteration : ",es10.4)')t2/iter
write(6,*) 'error on exit : ',err write(*,'("Number of iterations : ",i)')iter
write(6,*) 'info on exit : ',info write(*,'("Error on exit : ",es10.4)')err
write(*,'("Info on exit : ",i)')info
end if end if
! !
@ -250,10 +235,10 @@ contains
! !
! get iteration parameters from the command line ! get iteration parameters from the command line
! !
subroutine get_parms(icontxt,cmethd,prec,afmt,idim,istopc,itmax,itrace,ml) subroutine get_parms(icontxt,cmethd,iprec,novr,afmt,idim,istopc,itmax,itrace,ml)
integer :: icontxt integer :: icontxt
character :: cmethd*10, prec*10, afmt*5 character :: cmethd*10, afmt*5
integer :: idim, iret, istopc,itmax,itrace,ml integer :: idim, iret, istopc,itmax,itrace,ml, iprec, novr
character*40 :: charbuf character*40 :: charbuf
integer :: iargc, nprow, npcol, myprow, mypcol integer :: iargc, nprow, npcol, myprow, mypcol
external iargc external iargc
@ -265,7 +250,8 @@ contains
read(*,*) ip read(*,*) ip
if (ip.ge.3) then if (ip.ge.3) then
read(*,*) cmethd read(*,*) cmethd
read(*,*) prec read(*,*) iprec
read(*,*) novr
read(*,*) afmt read(*,*) afmt
! convert strings in array ! convert strings in array
@ -275,11 +261,11 @@ contains
! broadcast parameters to all processors ! broadcast parameters to all processors
call igebs2d(icontxt,'ALL',' ',10,1,intbuf,10) call igebs2d(icontxt,'ALL',' ',10,1,intbuf,10)
do i = 1, len(prec)
intbuf(i) = iachar(prec(i:i))
end do
! broadcast parameters to all processors ! broadcast parameters to all processors
call igebs2d(icontxt,'ALL',' ',10,1,intbuf,10) call igebs2d(icontxt,'ALL',' ',1,1,iprec,10)
! broadcast parameters to all processors
call igebs2d(icontxt,'ALL',' ',1,1,novr,10)
do i = 1, len(afmt) do i = 1, len(afmt)
intbuf(i) = iachar(afmt(i:i)) intbuf(i) = iachar(afmt(i:i))
@ -317,11 +303,14 @@ contains
intbuf(5) = ml intbuf(5) = ml
call igebs2d(icontxt,'ALL',' ',5,1,intbuf,5) call igebs2d(icontxt,'ALL',' ',5,1,intbuf,5)
write(6,*)'solving matrix: ell1' write(*,'("Solving matrix : ell1")')
write(6,*)'on grid',idim,'x',idim,'x',idim write(*,'("Grid dimensions : ",i4,"x",i4,"x",i4)')idim,idim,idim
write(6,*)' with block data distribution, np=',np,& write(*,'("Number of processors : ",i)')nprow
& ' preconditioner=',prec,& write(*,'("Data distribution : BLOCK")')
& ' iterative methd=',cmethd write(*,'("Preconditioner : ",a)')pr_to_str(iprec)
if(iprec.gt.2) write(*,'("Overlapping levels : ",i)')novr
write(*,'("Iterative method : ",a)')cmethd
write(*,'(" ")')
else else
! wrong number of parameter, print an error message and exit ! wrong number of parameter, print an error message and exit
call pr_usage(0) call pr_usage(0)
@ -334,10 +323,11 @@ contains
do i = 1, 10 do i = 1, 10
cmethd(i:i) = achar(intbuf(i)) cmethd(i:i) = achar(intbuf(i))
end do end do
call igebr2d(icontxt,'ALL',' ',10,1,intbuf,10,0,0)
do i = 1, 10 call igebr2d(icontxt,'ALL',' ',1,1,iprec,10,0,0)
prec(i:i) = achar(intbuf(i))
end do call igebr2d(icontxt,'ALL',' ',1,1,novr,10,0,0)
call igebr2d(icontxt,'ALL',' ',10,1,intbuf,10,0,0) call igebr2d(icontxt,'ALL',' ',10,1,intbuf,10,0,0)
do i = 1, 5 do i = 1, 5
afmt(i:i) = achar(intbuf(i)) afmt(i:i) = achar(intbuf(i))
@ -429,7 +419,7 @@ contains
! deltat discretization time ! deltat discretization time
real(kind(1.d0)) :: deltah real(kind(1.d0)) :: deltah
real(kind(1.d0)),parameter :: rhs=0.d0,one=1.d0,zero=0.d0 real(kind(1.d0)),parameter :: rhs=0.d0,one=1.d0,zero=0.d0
real(kind(1.d0)) :: mpi_wtime, t1, t2, t3, tins real(kind(1.d0)) :: mpi_wtime, t1, t2, t3, tins, tasb
real(kind(1.d0)) :: a1, a2, a3, a4, b1, b2, b3 real(kind(1.d0)) :: a1, a2, a3, a4, b1, b2, b3
external mpi_wtime,a1, a2, a3, a4, b1, b2, b3 external mpi_wtime,a1, a2, a3, a4, b1, b2, b3
integer :: nb, ir1, ir2, ipr, err_act integer :: nb, ir1, ir2, ipr, err_act
@ -452,14 +442,12 @@ contains
m = idim*idim*idim m = idim*idim*idim
n = m n = m
nnz = ((n*9)/(nprow*npcol)) nnz = ((n*9)/(nprow*npcol))
write(*,*) 'size: n ',n,myprow if(myprow.eq.psb_root_) write(0,'("Generating Matrix (size=",i,")...")')n
call psb_dscall(n,n,parts,icontxt,desc_a,info) call psb_dscall(n,n,parts,icontxt,desc_a,info)
write(*,*) 'allocating a. nnz:',nnz,myprow
call psb_spalloc(a,desc_a,info,nnz=nnz) call psb_spalloc(a,desc_a,info,nnz=nnz)
! define rhs from boundary conditions; also build initial guess ! define rhs from boundary conditions; also build initial guess
write(*,*) 'allocating b', info,myprow
call psb_alloc(n,b,desc_a,info) call psb_alloc(n,b,desc_a,info)
write(*,*) 'allocating t', info,myprow
call psb_alloc(n,t,desc_a,info) call psb_alloc(n,t,desc_a,info)
if(info.ne.0) then if(info.ne.0) then
info=4010 info=4010
@ -642,7 +630,7 @@ contains
end do end do
call blacs_barrier(icontxt,'ALL') call blacs_barrier(icontxt,'ALL')
t2 = mpi_wtime() t2 = mpi_wtime()-t1
if(info.ne.0) then if(info.ne.0) then
info=4010 info=4010
@ -651,23 +639,30 @@ contains
goto 9999 goto 9999
end if end if
write(*,*) ' pspins time',tins
write(*,*) ' insert time',(t2-t1)
deallocate(row_mat%aspk,row_mat%ia1,row_mat%ia2) deallocate(row_mat%aspk,row_mat%ia1,row_mat%ia2)
t1 = mpi_wtime() t1 = mpi_wtime()
call psb_dscasb(desc_a,info) call psb_dscasb(desc_a,info)
call psb_spasb(a,desc_a,info,dup=1,afmt=afmt) call psb_spasb(a,desc_a,info,dup=1,afmt=afmt)
call blacs_barrier(icontxt,'ALL') call blacs_barrier(icontxt,'ALL')
t2 = mpi_wtime() tasb = mpi_wtime()-t1
if(info.ne.0) then if(info.ne.0) then
info=4010 info=4010
ch_err='asb rout.' ch_err='asb rout.'
call psb_errpush(info,name,a_err=ch_err) call psb_errpush(info,name,a_err=ch_err)
goto 9999 goto 9999
end if end if
write(0,*) ' assembly time',(t2-t1),' ',a%fida(1:4)
call dgamx2d(icontxt,'a',' ',ione, ione,t2,ione,t1,t1,-1,-1,-1)
call dgamx2d(icontxt,'a',' ',ione, ione,tins,ione,t1,t1,-1,-1,-1)
call dgamx2d(icontxt,'a',' ',ione, ione,tasb,ione,t1,t1,-1,-1,-1)
if(myprow.eq.psb_root_) then
write(*,'("The matrix has been generated and assembeld in ",a3," format.")')a%fida(1:3)
write(*,'("-pspins time : ",es10.4)')tins
write(*,'("-insert time : ",es10.4)')t2
write(*,'("-assembly time : ",es10.4)')tasb
end if
call psb_asb(b,desc_a,info) call psb_asb(b,desc_a,info)
call psb_asb(t,desc_a,info) call psb_asb(t,desc_a,info)
@ -678,10 +673,6 @@ contains
goto 9999 goto 9999
end if end if
if (myprow.eq.0) then
write(0,*) ' end create_matrix'
endif
call psb_erractionrestore(err_act) call psb_erractionrestore(err_act)
return return

Loading…
Cancel
Save