Created new internal to set bld status of descriptor.

Fixed glob_to_loc actions (and their description).
psblas3-type-indexed
Salvatore Filippone 18 years ago
parent 36417f6f21
commit bcb22d2195

@ -777,8 +777,8 @@ Specified as: Integer scalar.\\
% %
\subroutine{psb\_glob\_to\_loc}{Global to local indices convertion} \subroutine{psb\_glob\_to\_loc}{Global to local indices convertion}
\syntax{call psb\_glob\_to\_loc}{x, y, desc\_a, info, iact} \syntax{call psb\_glob\_to\_loc}{x, y, desc\_a, info, iact,owned}
\syntax*{call psb\_glob\_to\_loc}{x, desc\_a, info, iact} \syntax*{call psb\_glob\_to\_loc}{x, desc\_a, info, iact,owned}
\begin{description} \begin{description}
\item[\bf On Entry] \item[\bf On Entry]
@ -793,7 +793,14 @@ Specified as: a structured data of type \descdata.
\item[iact] specifies action to be taken in case of range errors. \item[iact] specifies action to be taken in case of range errors.
Scope: {\bf global} \\ Scope: {\bf global} \\
Type: {\bf optional}\\ Type: {\bf optional}\\
Specified as: a character variable \verb|E|, \verb|W| or \verb|A|. Specified as: a character variable \verb|I|gnore, \verb|W|arning or
\verb|A|bort, default \verb|I|gnore.
\item[owned] Specfies valid range of input
Scope: {\bf global} \\
Type: {\bf optional}\\
If true, then only indices strictly owned by the current process are
considered valid, if false then halo indices are also
accepted. Default: false.
\end{description} \end{description}
\begin{description} \begin{description}
@ -803,7 +810,7 @@ Specified as: a character variable \verb|E|, \verb|W| or \verb|A|.
Scope: {\bf global} \\ Scope: {\bf global} \\
Type: {\bf required}\\ Type: {\bf required}\\
Specified as: a rank one integer array. Specified as: a rank one integer array.
\item[y] If $y$ is not present, \item[y] If $y$ is present,
then $y$ is overwritten with the translated integer indices, and $x$ then $y$ is overwritten with the translated integer indices, and $x$
is left unchanged. is left unchanged.
Scope: {\bf global} \\ Scope: {\bf global} \\
@ -815,6 +822,13 @@ Type: {\bf required}\\
Specified as: an integer variable. Specified as: an integer variable.
\end{description} \end{description}
\section*{Notes}
\begin{enumerate}
\item If an input index is out of range, then the corresponding output
index is set to a negative number;
\item The default \verb|I|gnore means that the negative output is the
only action taken on an out-of-range input.
\end{enumerate}
% %

File diff suppressed because one or more lines are too long

@ -21,7 +21,7 @@ psb_spmat_type.o : psb_realloc_mod.o psb_const_mod.o psb_string_mod.o
psb_error_mod.o: psb_const_mod.o psb_error_mod.o: psb_const_mod.o
psb_penv_mod.o: psb_const_mod.o psb_error_mod.o psb_penv_mod.o: psb_const_mod.o psb_error_mod.o
psi_mod.o: psb_penv_mod.o psb_error_mod.o psb_desc_type.o psi_mod.o: psb_penv_mod.o psb_error_mod.o psb_desc_type.o
psb_desc_type.o: psb_const_mod.o psb_desc_type.o: psb_const_mod.o psb_error_mod.o psb_penv_mod.o
psb_check_mod.o: psb_desc_type.o psb_check_mod.o: psb_desc_type.o
psb_methd_mod.o: psb_serial_mod.o psb_desc_type.o psb_prec_type.o psb_methd_mod.o: psb_serial_mod.o psb_desc_type.o psb_prec_type.o
psb_tools_mod.o: psb_spmat_type.o psb_desc_type.o psi_mod.o psb_tools_mod.o: psb_spmat_type.o psb_desc_type.o psi_mod.o

@ -227,5 +227,64 @@ contains
end function psb_is_large_dec end function psb_is_large_dec
subroutine psb_cd_set_bld(desc,info)
use psb_const_mod
use psb_error_mod
use psb_penv_mod
implicit none
type(psb_desc_type), intent(inout) :: desc
integer :: info
!locals
integer :: np,me,ictxt, isz, err_act,idx,gidx,lidx
logical, parameter :: debug=.false.,debugprt=.false.
character(len=20) :: name, char_err
if (debug) write(0,*) me,'Entered CDCPY'
if (psb_get_errstatus() /= 0) return
info = 0
call psb_erractionsave(err_act)
name = 'psb_cdcpy'
ictxt = psb_cd_get_context(desc)
! check on blacs grid
call psb_info(ictxt, me, np)
if (debug) write(0,*) me,'Entered CDCPY'
if (psb_is_large_desc(desc)) then
if (.not.allocated(desc%ptree)) then
allocate(desc%ptree(2),stat=info)
if (info /= 0) then
info=4000
goto 9999
endif
call InitPairSearchTree(desc%ptree,info)
do idx=1, psb_cd_get_local_cols(desc)
gidx = desc%loc_to_glob(idx)
call SearchInsKeyVal(desc%ptree,gidx,idx,lidx,info)
if (lidx /= idx) then
write(0,*) 'Warning from cdset: mismatch in PTREE ',idx,lidx
endif
enddo
end if
desc%matrix_data(psb_dec_type_) = psb_desc_large_bld_
else
desc%matrix_data(psb_dec_type_) = psb_desc_bld_
end if
call psb_erractionrestore(err_act)
return
9999 continue
call psb_erractionrestore(err_act)
if (err_act == act_ret) then
return
else
call psb_error(ictxt)
end if
return
end subroutine psb_cd_set_bld
end module psb_descriptor_type end module psb_descriptor_type

@ -351,6 +351,8 @@ contains
write (0,'("indices in input array are not within problem dimension ",2(i0,2x))')i_e_d(1:2) write (0,'("indices in input array are not within problem dimension ",2(i0,2x))')i_e_d(1:2)
case(150) case(150)
write (0,'("indices in input array are not belonging to the calling process ",i0)')i_e_d(1) write (0,'("indices in input array are not belonging to the calling process ",i0)')i_e_d(1)
case(151)
write (0,'("indices in input array are not belonging to the calling process ")')
case(290) case(290)
write (0,'("To call this routine you must first call psb_geall on the same matrix")') write (0,'("To call this routine you must first call psb_geall on the same matrix")')
case(295) case(295)

@ -557,20 +557,22 @@ Module psb_tools_mod
interface psb_glob_to_loc interface psb_glob_to_loc
subroutine psb_glob_to_loc2(x,y,desc_a,info,iact) subroutine psb_glob_to_loc2(x,y,desc_a,info,iact,owned)
use psb_descriptor_type use psb_descriptor_type
type(psb_desc_type), intent(in) :: desc_a type(psb_desc_type), intent(in) :: desc_a
integer,intent(in) :: x(:) integer,intent(in) :: x(:)
integer,intent(out) :: y(:) integer,intent(out) :: y(:)
integer, intent(out) :: info integer, intent(out) :: info
character, intent(in), optional :: iact character, intent(in), optional :: iact
logical, intent(in), optional :: owned
end subroutine psb_glob_to_loc2 end subroutine psb_glob_to_loc2
subroutine psb_glob_to_loc(x,desc_a,info,iact) subroutine psb_glob_to_loc(x,desc_a,info,iact,owned)
use psb_descriptor_type use psb_descriptor_type
type(psb_desc_type), intent(in) :: desc_a type(psb_desc_type), intent(in) :: desc_a
integer,intent(inout) :: x(:) integer,intent(inout) :: x(:)
integer, intent(out) :: info integer, intent(out) :: info
character, intent(in), optional :: iact character, intent(in), optional :: iact
logical, intent(in), optional :: owned
end subroutine psb_glob_to_loc end subroutine psb_glob_to_loc
end interface end interface

@ -387,6 +387,7 @@ subroutine psb_cdall(m, n, parts, ictxt, desc_a, info)
! set fields in desc_a%MATRIX_DATA.... ! set fields in desc_a%MATRIX_DATA....
desc_a%matrix_data(psb_n_row_) = loc_row desc_a%matrix_data(psb_n_row_) = loc_row
desc_a%matrix_data(psb_n_col_) = loc_row desc_a%matrix_data(psb_n_col_) = loc_row
call psb_cd_set_bld(desc_a,info)
call psb_realloc(1,desc_a%halo_index, info) call psb_realloc(1,desc_a%halo_index, info)
if (info /= no_err) then if (info /= no_err) then

@ -130,6 +130,16 @@ subroutine psb_cdasb(desc_a,info)
end if end if
if (psb_is_large_dec(dectype) ) then if (psb_is_large_dec(dectype) ) then
if (allocated(desc_a%ptree)) then
call FreePairSearchTree(desc_a%ptree)
deallocate(desc_a%ptree,stat=info)
if (info /= 0) then
info=2059
call psb_errpush(info,name)
goto 9999
end if
end if
desc_a%matrix_data(psb_dec_type_) = psb_desc_large_asb_ desc_a%matrix_data(psb_dec_type_) = psb_desc_large_asb_
!!$ write(0,*) 'Done large dec asmbly',desc_a%matrix_data(psb_dec_type_),& !!$ write(0,*) 'Done large dec asmbly',desc_a%matrix_data(psb_dec_type_),&
!!$ & psb_desc_large_asb_,psb_is_asb_dec(desc_a%matrix_data(psb_dec_type_)) !!$ & psb_desc_large_asb_,psb_is_asb_dec(desc_a%matrix_data(psb_dec_type_))

@ -157,87 +157,164 @@ Subroutine psb_dcdovr(a,desc_a,novr,desc_ov,info)
l_tmp_ovr_idx = novr*(3*Max(2*index_dim,1)+1) l_tmp_ovr_idx = novr*(3*Max(2*index_dim,1)+1)
l_tmp_halo = novr*(3*Size(desc_a%halo_index)) l_tmp_halo = novr*(3*Size(desc_a%halo_index))
if (psb_is_large_desc(desc_a)) then call psb_cd_set_bld(desc_ov,info)
desc_ov%matrix_data(psb_dec_type_) = psb_desc_large_bld_
else
desc_ov%matrix_data(psb_dec_type_) = psb_desc_bld_
end if
If(debug) then If(debug) then
Write(0,*)'Start cdovrbld',me,lworks,lworkr Write(0,*)'Start cdovrbld',me,lworks,lworkr
call psb_barrier(ictxt) call psb_barrier(ictxt)
endif endif
if (.false.) then
!
! The real work goes on in here....
!
Call psb_cdovrbld(novr,desc_ov,desc_a,a,&
& l_tmp_halo,l_tmp_ovr_idx,lworks,lworkr,info)
if (info /= 0) then
info=4010
ch_err='psb_cdovrbld'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
If(debug) then
Write(0,*)'Done cdovrbld',me,lworks,lworkr
call psb_barrier(ictxt)
endif
else Allocate(brvindx(np+1),rvsz(np),sdsz(np),bsdindx(np+1),stat=info)
if (info /= 0) then
call psb_errpush(4010,name,a_err='Allocate')
goto 9999
end if
Allocate(works(lworks),workr(lworkr),t_halo_in(l_tmp_halo),&
& t_halo_out(l_tmp_halo), temp(lworkr),stat=info)
if (info /= 0) then
call psb_errpush(4010,name,a_err='Allocate')
goto 9999
end if
Allocate(brvindx(np+1),rvsz(np),sdsz(np),bsdindx(np+1),stat=info)
if (info /= 0) then
call psb_errpush(4010,name,a_err='Allocate')
goto 9999
end if
Allocate(works(lworks),workr(lworkr),t_halo_in(l_tmp_halo),& call psb_sp_all(blk,max(lworks,lworkr),info)
& t_halo_out(l_tmp_halo), temp(lworkr),stat=info) if (info /= 0) then
if (info /= 0) then info=4010
call psb_errpush(4010,name,a_err='Allocate') ch_err='psb_sp_all'
goto 9999 call psb_errpush(info,name,a_err=ch_err)
end if goto 9999
end if
blk%fida='COO'
call psb_sp_all(blk,max(lworks,lworkr),info) Allocate(tmp_ovr_idx(l_tmp_ovr_idx),tmp_halo(l_tmp_halo),&
if (info /= 0) then & halo(size(desc_a%halo_index)),stat=info)
info=4010 if (info /= 0) then
ch_err='psb_sp_all' call psb_errpush(4010,name,a_err='Allocate')
call psb_errpush(info,name,a_err=ch_err) goto 9999
goto 9999 end if
end if halo(:) = desc_a%halo_index(:)
desc_ov%ovrlap_elem(:) = -1
tmp_ovr_idx(:) = -1
tmp_halo(:) = -1
counter_e = 1
tot_recv = 0
counter_h = 1
counter_o = 1
! Init overlap with desc_a%ovrlap (if any)
counter = 1
Do While (desc_a%ovrlap_index(counter) /= -1)
proc = desc_a%ovrlap_index(counter+psb_proc_id_)
n_elem_recv = desc_a%ovrlap_index(counter+psb_n_elem_recv_)
n_elem_send = desc_a%ovrlap_index(counter+n_elem_recv+psb_n_elem_send_)
Do j=0,n_elem_recv-1
idx = desc_a%ovrlap_index(counter+psb_elem_recv_+j)
If(idx > Size(desc_ov%loc_to_glob)) then
info=-3
call psb_errpush(info,name)
goto 9999
endif
gidx = desc_ov%loc_to_glob(idx)
call psb_check_size((counter_o+3),tmp_ovr_idx,info,pad=-1)
if (info /= 0) then
info=4010
call psb_errpush(info,name,a_err='psb_check_size')
goto 9999
end if
blk%fida='COO' tmp_ovr_idx(counter_o)=proc
tmp_ovr_idx(counter_o+1)=1
tmp_ovr_idx(counter_o+2)=gidx
tmp_ovr_idx(counter_o+3)=-1
counter_o=counter_o+3
end Do
counter=counter+n_elem_recv+n_elem_send+2
end Do
Allocate(tmp_ovr_idx(l_tmp_ovr_idx),tmp_halo(l_tmp_halo),&
& halo(size(desc_a%halo_index)),stat=info)
if (info /= 0) then
call psb_errpush(4010,name,a_err='Allocate')
goto 9999
end if
halo(:) = desc_a%halo_index(:)
desc_ov%ovrlap_elem(:) = -1
tmp_ovr_idx(:) = -1
tmp_halo(:) = -1
counter_e = 1
tot_recv = 0
counter_h = 1
counter_o = 1
! Init overlap with desc_a%ovrlap (if any)
counter = 1
Do While (desc_a%ovrlap_index(counter) /= -1)
proc = desc_a%ovrlap_index(counter+psb_proc_id_)
n_elem_recv = desc_a%ovrlap_index(counter+psb_n_elem_recv_)
n_elem_send = desc_a%ovrlap_index(counter+n_elem_recv+psb_n_elem_send_)
!
! A picture is in order to understand what goes on here.
! I is the internal part; H is halo, R row, C column. The final
! matrix with N levels of overlap looks like this
!
! I | Hc1 | 0 | 0 |
! -------|-----|-----|-----|
! Hr1 | Hd1 | Hc2 | 0 |
! -------|-----|-----|-----|
! 0 | Hr2 | Hd2 | Hc2 |
! -------|-----|-----|-----|
! 0 | 0 | Hr3 | Hd3 | Hc3
!
! At the start we already have I and Hc1, so we know the row
! indices that will make up Hr1, and also who owns them. As we
! actually get those rows, we receive the column indices in Hc2;
! these define the row indices for Hr2, and so on. When we have
! reached the desired level HrN, we may ignore HcN.
!
!
Do i_ovr = 1, novr
if (debug) write(0,*) me,'Running on overlap level ',i_ovr,' of ',novr
!
! At this point, halo contains a valid halo corresponding to the
! matrix enlarged with the elements in the frontier for I_OVR-1.
! At the start, this is just the halo for A; the rows for indices in
! the first halo will contain column indices defining the second halo
! level and so on.
!
bsdindx(:) = 0
sdsz(:) = 0
brvindx(:) = 0
rvsz(:) = 0
idxr = 0
idxs = 0
counter = 1
counter_t = 1
Do While (halo(counter) /= -1)
tot_elem=0
proc=halo(counter+psb_proc_id_)
n_elem_recv=halo(counter+psb_n_elem_recv_)
n_elem_send=halo(counter+n_elem_recv+psb_n_elem_send_)
If ((counter+n_elem_recv+n_elem_send) > Size(halo)) then
info = -1
call psb_errpush(info,name)
goto 9999
end If
tot_recv=tot_recv+n_elem_recv
if (debug) write(0,*) me,' CDOVRBLD tot_recv:',proc,n_elem_recv,tot_recv
!
!
! The format of the halo vector exists in two forms: 1. Temporary
! 2. Assembled. In this loop we are using the (assembled) halo_in and
! copying it into (temporary) halo_out; this is because tmp_halo will
! be enlarged with the new column indices received, and will reassemble
! everything for the next iteration.
!
!
! add recv elements in halo_index into ovrlap_index
!
Do j=0,n_elem_recv-1 Do j=0,n_elem_recv-1
If((counter+psb_elem_recv_+j)>Size(halo)) then
info=-2
call psb_errpush(info,name)
goto 9999
end If
idx = desc_a%ovrlap_index(counter+psb_elem_recv_+j) idx = halo(counter+psb_elem_recv_+j)
If(idx > Size(desc_ov%loc_to_glob)) then If(idx > Size(desc_ov%loc_to_glob)) then
info=-3 info=-3
call psb_errpush(info,name) call psb_errpush(info,name)
@ -258,444 +335,344 @@ Subroutine psb_dcdovr(a,desc_a,novr,desc_ov,info)
tmp_ovr_idx(counter_o+2)=gidx tmp_ovr_idx(counter_o+2)=gidx
tmp_ovr_idx(counter_o+3)=-1 tmp_ovr_idx(counter_o+3)=-1
counter_o=counter_o+3 counter_o=counter_o+3
end Do
counter=counter+n_elem_recv+n_elem_send+2
end Do
if (.not.psb_is_large_desc(desc_ov)) then
call psb_check_size((counter_h+3),tmp_halo,info,pad=-1)
if (info /= 0) then
info=4010
call psb_errpush(info,name,a_err='psb_check_size')
goto 9999
end if
tmp_halo(counter_h)=proc
tmp_halo(counter_h+1)=1
tmp_halo(counter_h+2)=idx
tmp_halo(counter_h+3)=-1
! counter_h=counter_h+3
! A picture is in order to understand what goes on here. end if
! I is the internal part; H is halo, R row, C column. The final
! matrix with N levels of overlap looks like this
!
! I | Hc1 | 0 | 0 |
! -------|-----|-----|-----|
! Hr1 | Hd1 | Hc2 | 0 |
! -------|-----|-----|-----|
! 0 | Hr2 | Hd2 | Hc2 |
! -------|-----|-----|-----|
! 0 | 0 | Hr3 | Hd3 | Hc3
!
! At the start we already have I and Hc1, so we know the row
! indices that will make up Hr1, and also who owns them. As we
! actually get those rows, we receive the column indices in Hc2;
! these define the row indices for Hr2, and so on. When we have
! reached the desired level HrN, we may ignore HcN.
!
!
Do i_ovr = 1, novr
if (debug) write(0,*) me,'Running on overlap level ',i_ovr,' of ',novr Enddo
if (debug) write(0,*) me,'Checktmp_o_i Loop Mid1',tmp_ovr_idx(1:10)
counter = counter+n_elem_recv
! !
! At this point, halo contains a valid halo corresponding to the ! add send elements in halo_index into ovrlap_index
! matrix enlarged with the elements in the frontier for I_OVR-1.
! At the start, this is just the halo for A; the rows for indices in
! the first halo will contain column indices defining the second halo
! level and so on.
! !
bsdindx(:) = 0 Do j=0,n_elem_send-1
sdsz(:) = 0
brvindx(:) = 0 idx = halo(counter+psb_elem_send_+j)
rvsz(:) = 0 gidx = desc_ov%loc_to_glob(idx)
idxr = 0 if (idx > psb_cd_get_local_rows(Desc_a)) &
idxs = 0 & write(0,*) me,i_ovr,'Out of local rows ',&
counter = 1 & idx,psb_cd_get_local_rows(Desc_a)
counter_t = 1
call psb_check_size((counter_o+3),tmp_ovr_idx,info,pad=-1)
if (info /= 0) then
Do While (halo(counter) /= -1) info=4010
tot_elem=0 call psb_errpush(info,name,a_err='psb_check_size')
proc=halo(counter+psb_proc_id_)
n_elem_recv=halo(counter+psb_n_elem_recv_)
n_elem_send=halo(counter+n_elem_recv+psb_n_elem_send_)
If ((counter+n_elem_recv+n_elem_send) > Size(halo)) then
info = -1
call psb_errpush(info,name)
goto 9999 goto 9999
end If end if
tot_recv=tot_recv+n_elem_recv
if (debug) write(0,*) me,' CDOVRBLD tot_recv:',proc,n_elem_recv,tot_recv tmp_ovr_idx(counter_o)=proc
! tmp_ovr_idx(counter_o+1)=1
! tmp_ovr_idx(counter_o+2)=gidx
! The format of the halo vector exists in two forms: 1. Temporary tmp_ovr_idx(counter_o+3)=-1
! 2. Assembled. In this loop we are using the (assembled) halo_in and counter_o=counter_o+3
! copying it into (temporary) halo_out; this is because tmp_halo will
! be enlarged with the new column indices received, and will reassemble
! everything for the next iteration.
!
! !
! add recv elements in halo_index into ovrlap_index ! Prepare to exchange the halo rows with the other proc.
! !
Do j=0,n_elem_recv-1 If (i_ovr < (novr)) Then
If((counter+psb_elem_recv_+j)>Size(halo)) then n_elem = psb_sp_get_nnz_row(idx,a)
info=-2
call psb_errpush(info,name)
goto 9999
end If
idx = halo(counter+psb_elem_recv_+j)
If(idx > Size(desc_ov%loc_to_glob)) then
info=-3
call psb_errpush(info,name)
goto 9999
endif
gidx = desc_ov%loc_to_glob(idx) call psb_check_size((idxs+tot_elem+n_elem),works,info)
call psb_check_size((counter_o+3),tmp_ovr_idx,info,pad=-1)
if (info /= 0) then if (info /= 0) then
info=4010 info=4010
call psb_errpush(info,name,a_err='psb_check_size') call psb_errpush(info,name,a_err='psb_check_size')
goto 9999 goto 9999
end if end if
tmp_ovr_idx(counter_o)=proc If((n_elem) > size(blk%ia2)) Then
tmp_ovr_idx(counter_o+1)=1 isz = max((3*size(blk%ia2))/2,(n_elem))
tmp_ovr_idx(counter_o+2)=gidx if (debug) write(0,*) me,'Realloc blk',isz
tmp_ovr_idx(counter_o+3)=-1 call psb_sp_reall(blk,isz,info)
counter_o=counter_o+3
if (.not.psb_is_large_desc(desc_ov)) then
call psb_check_size((counter_h+3),tmp_halo,info,pad=-1)
if (info /= 0) then if (info /= 0) then
info=4010 info=4010
call psb_errpush(info,name,a_err='psb_check_size') ch_err='psb_sp_reall'
call psb_errpush(info,name,a_err=ch_err)
goto 9999 goto 9999
end if end if
End If
tmp_halo(counter_h)=proc call psb_sp_getblk(idx,a,blk,info)
tmp_halo(counter_h+1)=1
tmp_halo(counter_h+2)=idx
tmp_halo(counter_h+3)=-1
counter_h=counter_h+3
end if
Enddo
if (debug) write(0,*) me,'Checktmp_o_i Loop Mid1',tmp_ovr_idx(1:10)
counter = counter+n_elem_recv
!
! add send elements in halo_index into ovrlap_index
!
Do j=0,n_elem_send-1
idx = halo(counter+psb_elem_send_+j)
gidx = desc_ov%loc_to_glob(idx)
if (idx > psb_cd_get_local_rows(Desc_a)) &
& write(0,*) me,i_ovr,'Out of local rows ',&
& idx,psb_cd_get_local_rows(Desc_a)
call psb_check_size((counter_o+3),tmp_ovr_idx,info,pad=-1)
if (info /= 0) then if (info /= 0) then
info=4010 info=4010
call psb_errpush(info,name,a_err='psb_check_size') ch_err='psb_sp_getblk'
call psb_errpush(info,name,a_err=ch_err)
goto 9999 goto 9999
end if end if
!!$ write(0,*) me,'Iteration: ',j,i_ovr
Do jj=1,n_elem
works(idxs+tot_elem+jj)=desc_ov%loc_to_glob(blk%ia2(jj))
End Do
tot_elem=tot_elem+n_elem
End If
tmp_ovr_idx(counter_o)=proc Enddo
tmp_ovr_idx(counter_o+1)=1
tmp_ovr_idx(counter_o+2)=gidx
tmp_ovr_idx(counter_o+3)=-1
counter_o=counter_o+3
!
! Prepare to exchange the halo rows with the other proc.
!
If (i_ovr < (novr)) Then
n_elem = psb_sp_get_nnz_row(idx,a)
call psb_check_size((idxs+tot_elem+n_elem),works,info)
if (info /= 0) then
info=4010
call psb_errpush(info,name,a_err='psb_check_size')
goto 9999
end if
If((n_elem) > size(blk%ia2)) Then if (i_ovr < novr) then
isz = max((3*size(blk%ia2))/2,(n_elem)) if (tot_elem > 1) then
if (debug) write(0,*) me,'Realloc blk',isz call imsr(tot_elem,works(idxs+1))
call psb_sp_reall(blk,isz,info) lx = works(idxs+1)
if (info /= 0) then i = 1
info=4010 j = 1
ch_err='psb_sp_reall' do
call psb_errpush(info,name,a_err=ch_err) j = j + 1
goto 9999 if (j > tot_elem) exit
end if if (works(idxs+j) /= lx) then
End If i = i + 1
works(idxs+i) = works(idxs+j)
call psb_sp_getblk(idx,a,blk,info) lx = works(idxs+i)
if (info /= 0) then
info=4010
ch_err='psb_sp_getblk'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if end if
!!$ write(0,*) me,'Iteration: ',j,i_ovr end do
Do jj=1,n_elem tot_elem = i
works(idxs+tot_elem+jj)=desc_ov%loc_to_glob(blk%ia2(jj)) endif
End Do if (debug) write(0,*) me,'Checktmp_o_i Loop Mid2',tmp_ovr_idx(1:10)
tot_elem=tot_elem+n_elem sdsz(proc+1) = tot_elem
End If idxs = idxs + tot_elem
end if
counter = counter+n_elem_send+3
if (debug) write(0,*) me,'Checktmp_o_i Loop End',tmp_ovr_idx(1:10)
Enddo
if (debug) write(0,*)me,'End phase 1 CDOVRBLD', m, n_col, tot_recv
Enddo if (i_ovr < novr) then
!
! Exchange data requests with everybody else: so far we have
if (i_ovr < novr) then ! accumulated RECV requests, we have an all-to-all to build
if (tot_elem > 1) then ! matchings SENDs.
call imsr(tot_elem,works(idxs+1)) !
lx = works(idxs+1) call mpi_alltoall(sdsz,1,mpi_integer,rvsz,1,mpi_integer,icomm,info)
i = 1 if (info /= 0) then
j = 1 info=4010
do ch_err='mpi_alltoall'
j = j + 1 call psb_errpush(info,name,a_err=ch_err)
if (j > tot_elem) exit goto 9999
if (works(idxs+j) /= lx) then end if
i = i + 1 idxs = 0
works(idxs+i) = works(idxs+j) idxr = 0
lx = works(idxs+i) counter = 1
end if Do
end do proc=halo(counter)
tot_elem = i if (proc == -1) exit
endif n_elem_recv = halo(counter+psb_n_elem_recv_)
if (debug) write(0,*) me,'Checktmp_o_i Loop Mid2',tmp_ovr_idx(1:10) counter = counter+n_elem_recv
sdsz(proc+1) = tot_elem n_elem_send = halo(counter+psb_n_elem_send_)
idxs = idxs + tot_elem
end if bsdindx(proc+1) = idxs
idxs = idxs + sdsz(proc+1)
brvindx(proc+1) = idxr
idxr = idxr + rvsz(proc+1)
counter = counter+n_elem_send+3 counter = counter+n_elem_send+3
if (debug) write(0,*) me,'Checktmp_o_i Loop End',tmp_ovr_idx(1:10)
Enddo Enddo
if (debug) write(0,*)me,'End phase 1 CDOVRBLD', m, n_col, tot_recv
if (i_ovr < novr) then iszr=sum(rvsz)
! if (max(iszr,1) > lworkr) then
! Exchange data requests with everybody else: so far we have call psb_realloc(max(iszr,1),workr,info)
! accumulated RECV requests, we have an all-to-all to build
! matchings SENDs.
!
call mpi_alltoall(sdsz,1,mpi_integer,rvsz,1,mpi_integer,icomm,info)
if (info /= 0) then if (info /= 0) then
info=4010 info=4010
ch_err='mpi_alltoall' ch_err='psb_realloc'
call psb_errpush(info,name,a_err=ch_err) call psb_errpush(info,name,a_err=ch_err)
goto 9999 goto 9999
end if end if
idxs = 0 lworkr=max(iszr,1)
idxr = 0 end if
counter = 1
Do
proc=halo(counter)
if (proc == -1) exit
n_elem_recv = halo(counter+psb_n_elem_recv_)
counter = counter+n_elem_recv
n_elem_send = halo(counter+psb_n_elem_send_)
bsdindx(proc+1) = idxs
idxs = idxs + sdsz(proc+1)
brvindx(proc+1) = idxr
idxr = idxr + rvsz(proc+1)
counter = counter+n_elem_send+3
Enddo
iszr=sum(rvsz)
if (max(iszr,1) > lworkr) then
call psb_realloc(max(iszr,1),workr,info)
if (info /= 0) then
info=4010
ch_err='psb_realloc'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
lworkr=max(iszr,1)
end if
call mpi_alltoallv(works,sdsz,bsdindx,mpi_integer,& call mpi_alltoallv(works,sdsz,bsdindx,mpi_integer,&
& workr,rvsz,brvindx,mpi_integer,icomm,info) & workr,rvsz,brvindx,mpi_integer,icomm,info)
if (info /= 0) then
info=4010
ch_err='mpi_alltoallv'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
if (debug) write(0,*) 'ISZR :',iszr
if (psb_is_large_desc(desc_a)) then
call psb_check_size(iszr,maskr,info)
if (info /= 0) then if (info /= 0) then
info=4010 info=4010
ch_err='mpi_alltoallv' call psb_errpush(info,name,a_err='psb_check_size')
call psb_errpush(info,name,a_err=ch_err)
goto 9999 goto 9999
end if end if
call psi_idx_cnv(iszr,workr,maskr,desc_ov,info)
iszs = count(maskr(1:iszr)<=0)
if (iszs > size(works)) call psb_realloc(iszs,works,info)
j = 0
do i=1,iszr
if (maskr(i) < 0) then
j=j+1
works(j) = workr(i)
end if
end do
!
! fnd_owner on desc_a because we want the procs who
! owned the rows from the beginning!
!
call psi_fnd_owner(iszs,works,temp,desc_a,info)
n_col=psb_cd_get_local_cols(desc_ov)
do i=1,iszs
idx = works(i)
n_col = psb_cd_get_local_cols(desc_ov)
call psi_idx_ins_cnv(idx,lidx,desc_ov,info)
if (psb_cd_get_local_cols(desc_ov) > n_col ) then
!
! This is a new index. Assigning a local index as
! we receive them guarantees that all indices for HALO(I)
! will be less than those for HALO(J) whenever I<J
!
proc_id = temp(i)
call psb_check_size((counter_t+3),t_halo_in,info,pad=-1)
if (info /= 0) then
info=4010
call psb_errpush(info,name,a_err='psb_check_size')
goto 9999
end if
if (debug) write(0,*) 'ISZR :',iszr t_halo_in(counter_t)=proc_id
t_halo_in(counter_t+1)=1
t_halo_in(counter_t+2)=lidx
counter_t=counter_t+3
if (.false.) write(0,*) me,' CDOVRBLD: Added t_halo_in ',&
&proc_id,lidx,idx
endif
end Do
else
Do i=1,iszr
idx=workr(i)
if (idx <1) then
write(0,*) me,'Error in CDOVRBLD level',i_ovr,' : ',idx,i,iszr
else If (desc_ov%glob_to_loc(idx) < -np) Then
!
! This is a new index. Assigning a local index as
! we receive them guarantees that all indices for HALO(I)
! will be less than those for HALO(J) whenever I<J
!
n_col=n_col+1
proc_id=-desc_ov%glob_to_loc(idx)-np-1
call psb_check_size(n_col,desc_ov%loc_to_glob,info,pad=-1)
if (info /= 0) then
info=4010
call psb_errpush(info,name,a_err='psb_check_size')
goto 9999
end if
if (psb_is_large_desc(desc_a)) then desc_ov%glob_to_loc(idx)=n_col
call psb_check_size(iszr,maskr,info) desc_ov%loc_to_glob(n_col)=idx
if (info /= 0) then
info=4010 call psb_check_size((counter_t+3),t_halo_in,info,pad=-1)
call psb_errpush(info,name,a_err='psb_check_size') if (info /= 0) then
goto 9999 info=4010
end if call psb_errpush(info,name,a_err='psb_check_size')
call psi_idx_cnv(iszr,workr,maskr,desc_ov,info) goto 9999
iszs = count(maskr(1:iszr)<=0)
if (iszs > size(works)) call psb_realloc(iszs,works,info)
j = 0
do i=1,iszr
if (maskr(i) < 0) then
j=j+1
works(j) = workr(i)
end if end if
end do
!
! fnd_owner on desc_a because we want the procs who
! owned the rows from the beginning!
!
call psi_fnd_owner(iszs,works,temp,desc_a,info)
n_col=psb_cd_get_local_cols(desc_ov)
do i=1,iszs
idx = works(i)
n_col = psb_cd_get_local_cols(desc_ov)
call psi_idx_ins_cnv(idx,lidx,desc_ov,info)
if (psb_cd_get_local_cols(desc_ov) > n_col ) then
!
! This is a new index. Assigning a local index as
! we receive them guarantees that all indices for HALO(I)
! will be less than those for HALO(J) whenever I<J
!
proc_id = temp(i)
call psb_check_size((counter_t+3),t_halo_in,info,pad=-1)
if (info /= 0) then
info=4010
call psb_errpush(info,name,a_err='psb_check_size')
goto 9999
end if
t_halo_in(counter_t)=proc_id
t_halo_in(counter_t+1)=1
t_halo_in(counter_t+2)=lidx
counter_t=counter_t+3
if (.false.) write(0,*) me,' CDOVRBLD: Added t_halo_in ',&
&proc_id,lidx,idx
endif
end Do
else
Do i=1,iszr
idx=workr(i)
if (idx <1) then
write(0,*) me,'Error in CDOVRBLD level',i_ovr,' : ',idx,i,iszr
else If (desc_ov%glob_to_loc(idx) < -np) Then
!
! This is a new index. Assigning a local index as
! we receive them guarantees that all indices for HALO(I)
! will be less than those for HALO(J) whenever I<J
!
n_col=n_col+1
proc_id=-desc_ov%glob_to_loc(idx)-np-1
call psb_check_size(n_col,desc_ov%loc_to_glob,info,pad=-1)
if (info /= 0) then
info=4010
call psb_errpush(info,name,a_err='psb_check_size')
goto 9999
end if
desc_ov%glob_to_loc(idx)=n_col
desc_ov%loc_to_glob(n_col)=idx
call psb_check_size((counter_t+3),t_halo_in,info,pad=-1)
if (info /= 0) then
info=4010
call psb_errpush(info,name,a_err='psb_check_size')
goto 9999
end if
t_halo_in(counter_t)=proc_id
t_halo_in(counter_t+1)=1
t_halo_in(counter_t+2)=n_col
counter_t=counter_t+3
if (debug) write(0,*) me,' CDOVRBLD: Added into t_halo_in from recv',&
&proc_id,n_col,idx
else if (desc_ov%glob_to_loc(idx) < 0) Then
if (debug) write(0,*) me,'Wrong input to cdovrbld??',&
&idx,desc_ov%glob_to_loc(idx)
End If
End Do
desc_ov%matrix_data(psb_n_col_)=n_col
end if t_halo_in(counter_t)=proc_id
t_halo_in(counter_t+1)=1
t_halo_in(counter_t+2)=n_col
counter_t=counter_t+3
if (debug) write(0,*) me,' CDOVRBLD: Added into t_halo_in from recv',&
&proc_id,n_col,idx
else if (desc_ov%glob_to_loc(idx) < 0) Then
if (debug) write(0,*) me,'Wrong input to cdovrbld??',&
&idx,desc_ov%glob_to_loc(idx)
End If
End Do
desc_ov%matrix_data(psb_n_col_)=n_col
end if end if
!!$ desc_ov%matrix_data(psb_n_row_)=desc_ov%matrix_data(psb_n_col_)
!
! Ok, now we have a temporary halo with all the info for the
! next round. If we need to keep going, convert the halo format
! from temporary to final, so that we can work out the next iteration.
! This uses one of the convert_comm internals, i.e. we are doing
! the equivalent of a partial call to convert_comm
!
If (i_ovr < (novr)) Then end if
!!$ desc_ov%matrix_data(psb_n_row_)=desc_ov%matrix_data(psb_n_col_)
!
! Ok, now we have a temporary halo with all the info for the
! next round. If we need to keep going, convert the halo format
! from temporary to final, so that we can work out the next iteration.
! This uses one of the convert_comm internals, i.e. we are doing
! the equivalent of a partial call to convert_comm
!
t_halo_in(counter_t)=-1 If (i_ovr < (novr)) Then
if (debug) write(0,*) me,'Checktmp_o_i 1',tmp_ovr_idx(1:10) t_halo_in(counter_t)=-1
if (debug) write(0,*) me,'Calling Crea_Halo'
call psi_crea_index(desc_ov,t_halo_in,t_halo_out,.false.,& if (debug) write(0,*) me,'Checktmp_o_i 1',tmp_ovr_idx(1:10)
& nxch,nsnd,nrcv,info) if (debug) write(0,*) me,'Calling Crea_Halo'
if (debug) then call psi_crea_index(desc_ov,t_halo_in,t_halo_out,.false.,&
write(0,*) me,'Done Crea_Index' & nxch,nsnd,nrcv,info)
call psb_barrier(ictxt)
end if
if (debug) write(0,*) me,'Checktmp_o_i 2',tmp_ovr_idx(1:10)
if (debug) write(0,*) me,'Done Crea_Halo'
call psb_transfer(t_halo_out,halo,info)
!
! At this point we have built the halo necessary for I_OVR+1.
!
End If
if (debug) write(0,*) me,'Checktmp_o_i ',tmp_ovr_idx(1:10)
End Do if (debug) then
write(0,*) me,'Done Crea_Index'
call psb_barrier(ictxt)
end if
if (debug) write(0,*) me,'Checktmp_o_i 2',tmp_ovr_idx(1:10)
if (debug) write(0,*) me,'Done Crea_Halo'
call psb_transfer(t_halo_out,halo,info)
!
! At this point we have built the halo necessary for I_OVR+1.
!
End If
if (debug) write(0,*) me,'Checktmp_o_i ',tmp_ovr_idx(1:10)
desc_ov%matrix_data(psb_m_)=psb_cd_get_global_rows(desc_a) End Do
desc_ov%matrix_data(psb_n_)=psb_cd_get_global_cols(desc_a)
tmp_halo(counter_h:)=-1 desc_ov%matrix_data(psb_m_)=psb_cd_get_global_rows(desc_a)
tmp_ovr_idx(counter_o:)=-1 desc_ov%matrix_data(psb_n_)=psb_cd_get_global_cols(desc_a)
tmp_halo(counter_h:)=-1
tmp_ovr_idx(counter_o:)=-1
!
! At this point we have gathered all the indices in the halo at
! N levels of overlap. Just call cnv_dsc. This is
! the same routine as gets called inside CDASB.
!
if (debug) then !
write(0,*) 'psb_cdovrbld: converting indexes' ! At this point we have gathered all the indices in the halo at
call psb_barrier(ictxt) ! N levels of overlap. Just call cnv_dsc. This is
end if ! the same routine as gets called inside CDASB.
!.... convert comunication stuctures.... !
! Note that we have to keep local_rows until the very end,
! because otherwise the halo build mechanism of cdasb
! will not work.
call psb_transfer(tmp_ovr_idx,desc_ov%ovrlap_index,info)
call psb_transfer(tmp_halo,desc_ov%halo_index,info)
call psb_cdasb(desc_ov,info)
desc_ov%matrix_data(psb_n_row_)=desc_ov%matrix_data(psb_n_col_)
if (debug) then
write(0,*) me,'Done CDASB'
call psb_barrier(ictxt)
end if
if (info == 0) call psb_sp_free(blk,info) if (debug) then
if (info /= 0) then write(0,*) 'psb_cdovrbld: converting indexes'
ch_err='sp_free' call psb_barrier(ictxt)
call psb_errpush(4013,name,a_err=ch_err,i_err=(/info,0,0,0,0/)) end if
goto 9999 !.... convert comunication stuctures....
end if ! Note that we have to keep local_rows until the very end,
! because otherwise the halo build mechanism of cdasb
! will not work.
call psb_transfer(tmp_ovr_idx,desc_ov%ovrlap_index,info)
call psb_transfer(tmp_halo,desc_ov%halo_index,info)
call psb_cdasb(desc_ov,info)
desc_ov%matrix_data(psb_n_row_)=desc_ov%matrix_data(psb_n_col_)
if (debug) then
write(0,*) me,'Done CDASB'
call psb_barrier(ictxt)
end if
if (info == 0) call psb_sp_free(blk,info)
if (info /= 0) then
ch_err='sp_free'
call psb_errpush(4013,name,a_err=ch_err,i_err=(/info,0,0,0,0/))
goto 9999
end if end if
call psb_erractionrestore(err_act) call psb_erractionrestore(err_act)
return return

@ -40,7 +40,7 @@
! info - integer. Eventually returns an error code. ! info - integer. Eventually returns an error code.
! iact - integer(optional). A character defining the behaviour of this subroutine when is found an index not belonging to the calling process ! iact - integer(optional). A character defining the behaviour of this subroutine when is found an index not belonging to the calling process
! !
subroutine psb_glob_to_loc2(x,y,desc_a,info,iact) subroutine psb_glob_to_loc2(x,y,desc_a,info,iact,owned)
use psb_descriptor_type use psb_descriptor_type
use psb_const_mod use psb_const_mod
@ -52,50 +52,62 @@ subroutine psb_glob_to_loc2(x,y,desc_a,info,iact)
!...parameters.... !...parameters....
type(psb_desc_type), intent(in) :: desc_a type(psb_desc_type), intent(in) :: desc_a
integer, intent(in) :: x(:) integer, intent(in) :: x(:)
integer, intent(out) :: y(:), info integer, intent(out) :: y(:), info
character, intent(in), optional :: iact character, intent(in), optional :: iact
logical, intent(in), optional :: owned
!....locals.... !....locals....
integer :: n, i, tmp integer :: n, i, tmp
character :: act character :: act
integer :: int_err(5), err_act integer :: int_err(5), err_act
real(kind(1.d0)) :: real_val real(kind(1.d0)) :: real_val
integer, parameter :: zero=0 integer, parameter :: zero=0
logical :: owned_
character(len=20) :: name character(len=20) :: name
integer :: ictxt, iam, np
if(psb_get_errstatus() /= 0) return if(psb_get_errstatus() /= 0) return
info=0 info=0
name = 'glob_to_loc' name = 'glob_to_loc'
ictxt = psb_cd_get_context(desc_a)
call psb_info(ictxt,iam,np)
call psb_erractionsave(err_act) call psb_erractionsave(err_act)
if (present(iact)) then if (present(iact)) then
act=iact act=iact
else else
act='A' act='I'
endif endif
act = toupper(act) act = toupper(act)
if (present(owned)) then
owned_ = owned
else
owned_ = .false.
end if
int_err=0 int_err=0
real_val = 0.d0 real_val = 0.d0
n = size(x) n = size(x)
call psi_idx_cnv(n,x,y,desc_a,info) call psi_idx_cnv(n,x,y,desc_a,info,owned=owned_)
select case(act) select case(act)
case('E','I') case('I')
call psb_erractionrestore(err_act)
return
case('W') case('W')
if ((info /= 0).or.(count(y(1:n)<0) >0)) then if (count(y(1:n)<0) >0) then
write(0,'("Error ",i5," in subroutine glob_to_loc")') info write(0,'("Out of bounds input in subroutine glob_to_loc")')
end if end if
case('A')
if ((info /= 0).or.(count(y(1:n)<0) >0)) then case('E','A')
call psb_errpush(info,name) if (count(y(1:n)<0) >0) then
goto 9999 info = 151
end if end if
end select end select
if (info /= 0) then
call psb_errpush(info,name)
goto 9999
end if
call psb_erractionrestore(err_act) call psb_erractionrestore(err_act)
return return
@ -153,7 +165,7 @@ end subroutine psb_glob_to_loc2
! info - integer. Eventually returns an error code. ! info - integer. Eventually returns an error code.
! iact - integer(optional). A character defining the behaviour of this subroutine when is found an index not belonging to the calling process ! iact - integer(optional). A character defining the behaviour of this subroutine when is found an index not belonging to the calling process
! !
subroutine psb_glob_to_loc(x,desc_a,info,iact) subroutine psb_glob_to_loc(x,desc_a,info,iact,owned)
use psb_penv_mod use psb_penv_mod
use psb_descriptor_type use psb_descriptor_type
@ -168,50 +180,57 @@ subroutine psb_glob_to_loc(x,desc_a,info,iact)
integer, intent(inout) :: x(:) integer, intent(inout) :: x(:)
integer, intent(out) :: info integer, intent(out) :: info
character, intent(in), optional :: iact character, intent(in), optional :: iact
logical, intent(in), optional :: owned
!....locals.... !....locals....
integer :: n, i, tmp, nk, key, idx, ih, nh, lb, ub, lm integer :: n, i, tmp, nk, key, idx, ih, nh, lb, ub, lm
character :: act character :: act
integer :: int_err(5), err_act, dectype integer :: int_err(5), err_act
real(kind(1.d0)) :: real_val, t0, t1,t2 real(kind(1.d0)) :: real_val, t0, t1,t2
integer, parameter :: zero=0 integer, parameter :: zero=0
logical :: owned_
character(len=20) :: name character(len=20) :: name
integer :: ictxt, iam, np integer :: ictxt, iam, np
if(psb_get_errstatus() /= 0) return if(psb_get_errstatus() /= 0) return
info=0 info=0
name = 'glob_to_loc' name = 'glob_to_loc'
ictxt = desc_a%matrix_data(psb_ctxt_) ictxt = psb_cd_get_context(desc_a)
call psb_info(ictxt,iam,np) call psb_info(ictxt,iam,np)
call psb_erractionsave(err_act) call psb_erractionsave(err_act)
dectype = desc_a%matrix_data(psb_dec_type_)
if (present(iact)) then if (present(iact)) then
act=iact act=iact
else else
act='A' act='I'
endif endif
act = toupper(act) act = toupper(act)
if (present(owned)) then
owned_ = owned
else
owned_ = .false.
end if
n = size(x) n = size(x)
call psi_idx_cnv(n,x,desc_a,info) call psi_idx_cnv(n,x,desc_a,info,owned=owned_)
select case(act) select case(act)
case('E','I') case('I')
call psb_erractionrestore(err_act)
return
case('W') case('W')
if ((info /= 0).or.(count(x(1:n)<0) >0)) then if (count(x(1:n)<0) >0) then
write(0,'("Error ",i5," in subroutine glob_to_loc")') info write(0,'("Out of bounds input in subroutine glob_to_loc")')
end if end if
case('A')
if ((info /= 0).or.(count(x(1:n)<0) >0)) then case('E','A')
write(0,*) count(x(1:n)<0) if (count(x(1:n)<0) >0) then
call psb_errpush(info,name) info = 151
goto 9999
end if end if
end select end select
if (info /= 0) then
call psb_errpush(info,name)
goto 9999
end if
call psb_erractionrestore(err_act) call psb_erractionrestore(err_act)
return return
@ -226,69 +245,69 @@ subroutine psb_glob_to_loc(x,desc_a,info,iact)
end if end if
return return
contains !!$contains
!!$
subroutine inlbsrch(ipos,key,n,v) !!$ subroutine inlbsrch(ipos,key,n,v)
implicit none !!$ implicit none
integer ipos, key, n !!$ integer ipos, key, n
integer v(n) !!$ integer v(n)
!!$
integer lb, ub, m !!$ integer lb, ub, m
!!$
!!$
lb = 1 !!$ lb = 1
ub = n !!$ ub = n
ipos = -1 !!$ ipos = -1
!!$
do !!$ do
if (lb > ub) return !!$ if (lb > ub) return
m = (lb+ub)/2 !!$ m = (lb+ub)/2
if (key.eq.v(m)) then !!$ if (key.eq.v(m)) then
ipos = m !!$ ipos = m
return !!$ return
else if (key.lt.v(m)) then !!$ else if (key.lt.v(m)) then
ub = m-1 !!$ ub = m-1
else !!$ else
lb = m + 1 !!$ lb = m + 1
end if !!$ end if
enddo !!$ enddo
return !!$ return
end subroutine inlbsrch !!$ end subroutine inlbsrch
!!$
subroutine inner_cnv(n,x,hashsize,hashmask,hashv,glb_lc) !!$ subroutine inner_cnv(n,x,hashsize,hashmask,hashv,glb_lc)
integer :: n, hashsize,hashmask,x(:), hashv(0:),glb_lc(:,:) !!$ integer :: n, hashsize,hashmask,x(:), hashv(0:),glb_lc(:,:)
integer :: i, ih, key, idx,nh,tmp,lb,ub,lm !!$ integer :: i, ih, key, idx,nh,tmp,lb,ub,lm
do i=1, n !!$ do i=1, n
key = x(i) !!$ key = x(i)
ih = iand(key,hashmask) !!$ ih = iand(key,hashmask)
idx = hashv(ih) !!$ idx = hashv(ih)
nh = hashv(ih+1) - hashv(ih) !!$ nh = hashv(ih+1) - hashv(ih)
if (nh > 0) then !!$ if (nh > 0) then
tmp = -1 !!$ tmp = -1
lb = idx !!$ lb = idx
ub = idx+nh-1 !!$ ub = idx+nh-1
do !!$ do
if (lb>ub) exit !!$ if (lb>ub) exit
lm = (lb+ub)/2 !!$ lm = (lb+ub)/2
if (key==glb_lc(lm,1)) then !!$ if (key==glb_lc(lm,1)) then
tmp = lm !!$ tmp = lm
exit !!$ exit
else if (key<glb_lc(lm,1)) then !!$ else if (key<glb_lc(lm,1)) then
ub = lm - 1 !!$ ub = lm - 1
else !!$ else
lb = lm + 1 !!$ lb = lm + 1
end if !!$ end if
end do !!$ end do
else !!$ else
tmp = -1 !!$ tmp = -1
end if !!$ end if
if (tmp > 0) then !!$ if (tmp > 0) then
x(i) = glb_lc(tmp,2) !!$ x(i) = glb_lc(tmp,2)
else !!$ else
x(i) = tmp !!$ x(i) = tmp
end if !!$ end if
end do !!$ end do
end subroutine inner_cnv !!$ end subroutine inner_cnv
end subroutine psb_glob_to_loc end subroutine psb_glob_to_loc

@ -100,12 +100,12 @@ subroutine psb_loc_to_glob2(x,y,desc_a,info,iact)
if (info /= 0) then if (info /= 0) then
select case(act) select case(act)
case('E') case('I')
call psb_erractionrestore(err_act) call psb_erractionrestore(err_act)
return return
case('W') case('W')
write(0,'("Error ",i5," in subroutine glob_to_loc")') info write(0,'("Error ",i5," in subroutine glob_to_loc")') info
case('A') case('E','A')
call psb_errpush(info,name) call psb_errpush(info,name)
goto 9999 goto 9999
end select end select
@ -223,12 +223,12 @@ subroutine psb_loc_to_glob(x,desc_a,info,iact)
if (info /= 0) then if (info /= 0) then
select case(act) select case(act)
case('E') case('I')
call psb_erractionrestore(err_act) call psb_erractionrestore(err_act)
return return
case('W') case('W')
write(0,'("Error ",i5," in subroutine glob_to_loc")') info write(0,'("Error ",i5," in subroutine glob_to_loc")') info
case('A') case('A','E')
call psb_errpush(info,name) call psb_errpush(info,name)
goto 9999 goto 9999
end select end select

@ -102,11 +102,11 @@ Subroutine psb_zcdovr(a,desc_a,novr,desc_ov,info)
If(debug) Write(0,*)'IN CDOVR1',novr ,m,nnzero,n_col If(debug) Write(0,*)'IN CDOVR1',novr ,m,nnzero,n_col
if (novr<0) then if (novr<0) then
info=10 info=10
int_err(1)=1 int_err(1)=1
int_err(2)=novr int_err(2)=novr
call psb_errpush(info,name,i_err=int_err) call psb_errpush(info,name,i_err=int_err)
goto 9999 goto 9999
endif endif
if (debug) write(0,*) 'Calling desccpy' if (debug) write(0,*) 'Calling desccpy'
@ -139,9 +139,9 @@ Subroutine psb_zcdovr(a,desc_a,novr,desc_ov,info)
! !
nztot = psb_sp_get_nnzeros(a) nztot = psb_sp_get_nnzeros(a)
if (nztot>0) then if (nztot>0) then
lovr = ((nztot+m-1)/m)*nhalo*novr lovr = ((nztot+m-1)/m)*nhalo*novr
lworks = ((nztot+m-1)/m)*nhalo lworks = ((nztot+m-1)/m)*nhalo
lworkr = ((nztot+m-1)/m)*nhalo lworkr = ((nztot+m-1)/m)*nhalo
else else
info=-1 info=-1
call psb_errpush(info,name) call psb_errpush(info,name)
@ -156,87 +156,167 @@ Subroutine psb_zcdovr(a,desc_a,novr,desc_ov,info)
l_tmp_ovr_idx = novr*(3*Max(2*index_dim,1)+1) l_tmp_ovr_idx = novr*(3*Max(2*index_dim,1)+1)
l_tmp_halo = novr*(3*Size(desc_a%halo_index)) l_tmp_halo = novr*(3*Size(desc_a%halo_index))
if (psb_is_large_desc(desc_a)) then call psb_cd_set_bld(desc_ov,info)
desc_ov%matrix_data(psb_dec_type_) = psb_desc_large_bld_ !!$ if (psb_is_large_desc(desc_a)) then
else !!$ desc_ov%matrix_data(psb_dec_type_) = psb_desc_large_bld_
desc_ov%matrix_data(psb_dec_type_) = psb_desc_bld_ !!$ else
end if !!$ desc_ov%matrix_data(psb_dec_type_) = psb_desc_bld_
!!$ end if
If(debug) then If(debug) then
Write(0,*)'Start cdovrbld',me,lworks,lworkr Write(0,*)'Start cdovrbld',me,lworks,lworkr
call psb_barrier(ictxt) call psb_barrier(ictxt)
endif endif
if (.false.) then
!
! The real work goes on in here....
!
Call psb_cdovrbld(novr,desc_ov,desc_a,a,&
& l_tmp_halo,l_tmp_ovr_idx,lworks,lworkr,info)
if (info /= 0) then
info=4010
ch_err='psb_cdovrbld'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
If(debug) then Allocate(brvindx(np+1),rvsz(np),sdsz(np),bsdindx(np+1),stat=info)
Write(0,*)'Done cdovrbld',me,lworks,lworkr if (info /= 0) then
call psb_barrier(ictxt) call psb_errpush(4010,name,a_err='Allocate')
endif goto 9999
end if
else Allocate(works(lworks),workr(lworkr),t_halo_in(l_tmp_halo),&
& t_halo_out(l_tmp_halo), temp(lworkr),stat=info)
if (info /= 0) then
call psb_errpush(4010,name,a_err='Allocate')
goto 9999
end if
Allocate(brvindx(np+1),rvsz(np),sdsz(np),bsdindx(np+1),stat=info) call psb_sp_all(blk,max(lworks,lworkr),info)
if (info /= 0) then if (info /= 0) then
call psb_errpush(4010,name,a_err='Allocate') info=4010
goto 9999 ch_err='psb_sp_all'
end if call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
Allocate(works(lworks),workr(lworkr),t_halo_in(l_tmp_halo),& blk%fida='COO'
& t_halo_out(l_tmp_halo), temp(lworkr),stat=info)
if (info /= 0) then Allocate(tmp_ovr_idx(l_tmp_ovr_idx),tmp_halo(l_tmp_halo),&
call psb_errpush(4010,name,a_err='Allocate') & halo(size(desc_a%halo_index)),stat=info)
goto 9999 if (info /= 0) then
end if call psb_errpush(4010,name,a_err='Allocate')
goto 9999
end if
halo(:) = desc_a%halo_index(:)
desc_ov%ovrlap_elem(:) = -1
tmp_ovr_idx(:) = -1
tmp_halo(:) = -1
counter_e = 1
tot_recv = 0
counter_h = 1
counter_o = 1
! Init overlap with desc_a%ovrlap (if any)
counter = 1
Do While (desc_a%ovrlap_index(counter) /= -1)
proc = desc_a%ovrlap_index(counter+psb_proc_id_)
n_elem_recv = desc_a%ovrlap_index(counter+psb_n_elem_recv_)
n_elem_send = desc_a%ovrlap_index(counter+n_elem_recv+psb_n_elem_send_)
Do j=0,n_elem_recv-1
idx = desc_a%ovrlap_index(counter+psb_elem_recv_+j)
If(idx > Size(desc_ov%loc_to_glob)) then
info=-3
call psb_errpush(info,name)
goto 9999
endif
gidx = desc_ov%loc_to_glob(idx)
call psb_check_size((counter_o+3),tmp_ovr_idx,info,pad=-1)
if (info /= 0) then
info=4010
call psb_errpush(info,name,a_err='psb_check_size')
goto 9999
end if
tmp_ovr_idx(counter_o)=proc
tmp_ovr_idx(counter_o+1)=1
tmp_ovr_idx(counter_o+2)=gidx
tmp_ovr_idx(counter_o+3)=-1
counter_o=counter_o+3
end Do
counter=counter+n_elem_recv+n_elem_send+2
end Do
call psb_sp_all(blk,max(lworks,lworkr),info)
if (info /= 0) then
info=4010
ch_err='psb_sp_all'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
blk%fida='COO'
Allocate(tmp_ovr_idx(l_tmp_ovr_idx),tmp_halo(l_tmp_halo),&
& halo(size(desc_a%halo_index)),stat=info)
if (info /= 0) then
call psb_errpush(4010,name,a_err='Allocate')
goto 9999
end if
halo(:) = desc_a%halo_index(:)
desc_ov%ovrlap_elem(:) = -1
tmp_ovr_idx(:) = -1
tmp_halo(:) = -1
counter_e = 1
tot_recv = 0
counter_h = 1
counter_o = 1
! Init overlap with desc_a%ovrlap (if any)
counter = 1
Do While (desc_a%ovrlap_index(counter) /= -1)
proc = desc_a%ovrlap_index(counter+psb_proc_id_)
n_elem_recv = desc_a%ovrlap_index(counter+psb_n_elem_recv_)
n_elem_send = desc_a%ovrlap_index(counter+n_elem_recv+psb_n_elem_send_)
!
! A picture is in order to understand what goes on here.
! I is the internal part; H is halo, R row, C column. The final
! matrix with N levels of overlap looks like this
!
! I | Hc1 | 0 | 0 |
! -------|-----|-----|-----|
! Hr1 | Hd1 | Hc2 | 0 |
! -------|-----|-----|-----|
! 0 | Hr2 | Hd2 | Hc2 |
! -------|-----|-----|-----|
! 0 | 0 | Hr3 | Hd3 | Hc3
!
! At the start we already have I and Hc1, so we know the row
! indices that will make up Hr1, and also who owns them. As we
! actually get those rows, we receive the column indices in Hc2;
! these define the row indices for Hr2, and so on. When we have
! reached the desired level HrN, we may ignore HcN.
!
!
Do i_ovr = 1, novr
if (debug) write(0,*) me,'Running on overlap level ',i_ovr,' of ',novr
!
! At this point, halo contains a valid halo corresponding to the
! matrix enlarged with the elements in the frontier for I_OVR-1.
! At the start, this is just the halo for A; the rows for indices in
! the first halo will contain column indices defining the second halo
! level and so on.
!
bsdindx(:) = 0
sdsz(:) = 0
brvindx(:) = 0
rvsz(:) = 0
idxr = 0
idxs = 0
counter = 1
counter_t = 1
Do While (halo(counter) /= -1)
tot_elem=0
proc=halo(counter+psb_proc_id_)
n_elem_recv=halo(counter+psb_n_elem_recv_)
n_elem_send=halo(counter+n_elem_recv+psb_n_elem_send_)
If ((counter+n_elem_recv+n_elem_send) > Size(halo)) then
info = -1
call psb_errpush(info,name)
goto 9999
end If
tot_recv=tot_recv+n_elem_recv
if (debug) write(0,*) me,' CDOVRBLD tot_recv:',proc,n_elem_recv,tot_recv
!
!
! The format of the halo vector exists in two forms: 1. Temporary
! 2. Assembled. In this loop we are using the (assembled) halo_in and
! copying it into (temporary) halo_out; this is because tmp_halo will
! be enlarged with the new column indices received, and will reassemble
! everything for the next iteration.
!
!
! add recv elements in halo_index into ovrlap_index
!
Do j=0,n_elem_recv-1 Do j=0,n_elem_recv-1
If((counter+psb_elem_recv_+j)>Size(halo)) then
info=-2
call psb_errpush(info,name)
goto 9999
end If
idx = desc_a%ovrlap_index(counter+psb_elem_recv_+j) idx = halo(counter+psb_elem_recv_+j)
If(idx > Size(desc_ov%loc_to_glob)) then If(idx > Size(desc_ov%loc_to_glob)) then
info=-3 info=-3
call psb_errpush(info,name) call psb_errpush(info,name)
@ -257,444 +337,343 @@ Subroutine psb_zcdovr(a,desc_a,novr,desc_ov,info)
tmp_ovr_idx(counter_o+2)=gidx tmp_ovr_idx(counter_o+2)=gidx
tmp_ovr_idx(counter_o+3)=-1 tmp_ovr_idx(counter_o+3)=-1
counter_o=counter_o+3 counter_o=counter_o+3
end Do if (.not.psb_is_large_desc(desc_ov)) then
counter=counter+n_elem_recv+n_elem_send+2 call psb_check_size((counter_h+3),tmp_halo,info,pad=-1)
end Do if (info /= 0) then
info=4010
call psb_errpush(info,name,a_err='psb_check_size')
goto 9999
end if
tmp_halo(counter_h)=proc
tmp_halo(counter_h+1)=1
tmp_halo(counter_h+2)=idx
tmp_halo(counter_h+3)=-1
! counter_h=counter_h+3
! A picture is in order to understand what goes on here. end if
! I is the internal part; H is halo, R row, C column. The final
! matrix with N levels of overlap looks like this
!
! I | Hc1 | 0 | 0 |
! -------|-----|-----|-----|
! Hr1 | Hd1 | Hc2 | 0 |
! -------|-----|-----|-----|
! 0 | Hr2 | Hd2 | Hc2 |
! -------|-----|-----|-----|
! 0 | 0 | Hr3 | Hd3 | Hc3
!
! At the start we already have I and Hc1, so we know the row
! indices that will make up Hr1, and also who owns them. As we
! actually get those rows, we receive the column indices in Hc2;
! these define the row indices for Hr2, and so on. When we have
! reached the desired level HrN, we may ignore HcN.
!
!
Do i_ovr = 1, novr
if (debug) write(0,*) me,'Running on overlap level ',i_ovr,' of ',novr Enddo
if (debug) write(0,*) me,'Checktmp_o_i Loop Mid1',tmp_ovr_idx(1:10)
counter = counter+n_elem_recv
! !
! At this point, halo contains a valid halo corresponding to the ! add send elements in halo_index into ovrlap_index
! matrix enlarged with the elements in the frontier for I_OVR-1.
! At the start, this is just the halo for A; the rows for indices in
! the first halo will contain column indices defining the second halo
! level and so on.
! !
bsdindx(:) = 0 Do j=0,n_elem_send-1
sdsz(:) = 0
brvindx(:) = 0 idx = halo(counter+psb_elem_send_+j)
rvsz(:) = 0 gidx = desc_ov%loc_to_glob(idx)
idxr = 0 if (idx > psb_cd_get_local_rows(Desc_a)) &
idxs = 0 & write(0,*) me,i_ovr,'Out of local rows ',&
counter = 1 & idx,psb_cd_get_local_rows(Desc_a)
counter_t = 1
call psb_check_size((counter_o+3),tmp_ovr_idx,info,pad=-1)
if (info /= 0) then
Do While (halo(counter) /= -1) info=4010
tot_elem=0 call psb_errpush(info,name,a_err='psb_check_size')
proc=halo(counter+psb_proc_id_)
n_elem_recv=halo(counter+psb_n_elem_recv_)
n_elem_send=halo(counter+n_elem_recv+psb_n_elem_send_)
If ((counter+n_elem_recv+n_elem_send) > Size(halo)) then
info = -1
call psb_errpush(info,name)
goto 9999 goto 9999
end If end if
tot_recv=tot_recv+n_elem_recv
if (debug) write(0,*) me,' CDOVRBLD tot_recv:',proc,n_elem_recv,tot_recv tmp_ovr_idx(counter_o)=proc
! tmp_ovr_idx(counter_o+1)=1
! tmp_ovr_idx(counter_o+2)=gidx
! The format of the halo vector exists in two forms: 1. Temporary tmp_ovr_idx(counter_o+3)=-1
! 2. Assembled. In this loop we are using the (assembled) halo_in and counter_o=counter_o+3
! copying it into (temporary) halo_out; this is because tmp_halo will
! be enlarged with the new column indices received, and will reassemble
! everything for the next iteration.
!
! !
! add recv elements in halo_index into ovrlap_index ! Prepare to exchange the halo rows with the other proc.
! !
Do j=0,n_elem_recv-1 If (i_ovr < (novr)) Then
If((counter+psb_elem_recv_+j)>Size(halo)) then n_elem = psb_sp_get_nnz_row(idx,a)
info=-2
call psb_errpush(info,name)
goto 9999
end If
idx = halo(counter+psb_elem_recv_+j)
If(idx > Size(desc_ov%loc_to_glob)) then
info=-3
call psb_errpush(info,name)
goto 9999
endif
gidx = desc_ov%loc_to_glob(idx) call psb_check_size((idxs+tot_elem+n_elem),works,info)
call psb_check_size((counter_o+3),tmp_ovr_idx,info,pad=-1)
if (info /= 0) then if (info /= 0) then
info=4010 info=4010
call psb_errpush(info,name,a_err='psb_check_size') call psb_errpush(info,name,a_err='psb_check_size')
goto 9999 goto 9999
end if end if
tmp_ovr_idx(counter_o)=proc If((n_elem) > size(blk%ia2)) Then
tmp_ovr_idx(counter_o+1)=1 isz = max((3*size(blk%ia2))/2,(n_elem))
tmp_ovr_idx(counter_o+2)=gidx if (debug) write(0,*) me,'Realloc blk',isz
tmp_ovr_idx(counter_o+3)=-1 call psb_sp_reall(blk,isz,info)
counter_o=counter_o+3
if (.not.psb_is_large_desc(desc_ov)) then
call psb_check_size((counter_h+3),tmp_halo,info,pad=-1)
if (info /= 0) then if (info /= 0) then
info=4010 info=4010
call psb_errpush(info,name,a_err='psb_check_size') ch_err='psb_sp_reall'
call psb_errpush(info,name,a_err=ch_err)
goto 9999 goto 9999
end if end if
End If
tmp_halo(counter_h)=proc call psb_sp_getblk(idx,a,blk,info)
tmp_halo(counter_h+1)=1
tmp_halo(counter_h+2)=idx
tmp_halo(counter_h+3)=-1
counter_h=counter_h+3
end if
Enddo
if (debug) write(0,*) me,'Checktmp_o_i Loop Mid1',tmp_ovr_idx(1:10)
counter = counter+n_elem_recv
!
! add send elements in halo_index into ovrlap_index
!
Do j=0,n_elem_send-1
idx = halo(counter+psb_elem_send_+j)
gidx = desc_ov%loc_to_glob(idx)
if (idx > psb_cd_get_local_rows(Desc_a)) &
& write(0,*) me,i_ovr,'Out of local rows ',&
& idx,psb_cd_get_local_rows(Desc_a)
call psb_check_size((counter_o+3),tmp_ovr_idx,info,pad=-1)
if (info /= 0) then if (info /= 0) then
info=4010 info=4010
call psb_errpush(info,name,a_err='psb_check_size') ch_err='psb_sp_getblk'
call psb_errpush(info,name,a_err=ch_err)
goto 9999 goto 9999
end if end if
!!$ write(0,*) me,'Iteration: ',j,i_ovr
Do jj=1,n_elem
works(idxs+tot_elem+jj)=desc_ov%loc_to_glob(blk%ia2(jj))
End Do
tot_elem=tot_elem+n_elem
End If
tmp_ovr_idx(counter_o)=proc Enddo
tmp_ovr_idx(counter_o+1)=1
tmp_ovr_idx(counter_o+2)=gidx
tmp_ovr_idx(counter_o+3)=-1
counter_o=counter_o+3
!
! Prepare to exchange the halo rows with the other proc.
!
If (i_ovr < (novr)) Then
n_elem = psb_sp_get_nnz_row(idx,a)
call psb_check_size((idxs+tot_elem+n_elem),works,info)
if (info /= 0) then
info=4010
call psb_errpush(info,name,a_err='psb_check_size')
goto 9999
end if
If((n_elem) > size(blk%ia2)) Then if (i_ovr < novr) then
isz = max((3*size(blk%ia2))/2,(n_elem)) if (tot_elem > 1) then
if (debug) write(0,*) me,'Realloc blk',isz call imsr(tot_elem,works(idxs+1))
call psb_sp_reall(blk,isz,info) lx = works(idxs+1)
if (info /= 0) then i = 1
info=4010 j = 1
ch_err='psb_sp_reall' do
call psb_errpush(info,name,a_err=ch_err) j = j + 1
goto 9999 if (j > tot_elem) exit
end if if (works(idxs+j) /= lx) then
End If i = i + 1
works(idxs+i) = works(idxs+j)
call psb_sp_getblk(idx,a,blk,info) lx = works(idxs+i)
if (info /= 0) then
info=4010
ch_err='psb_sp_getblk'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if end if
!!$ write(0,*) me,'Iteration: ',j,i_ovr end do
Do jj=1,n_elem tot_elem = i
works(idxs+tot_elem+jj)=desc_ov%loc_to_glob(blk%ia2(jj)) endif
End Do if (debug) write(0,*) me,'Checktmp_o_i Loop Mid2',tmp_ovr_idx(1:10)
tot_elem=tot_elem+n_elem sdsz(proc+1) = tot_elem
End If idxs = idxs + tot_elem
end if
counter = counter+n_elem_send+3
if (debug) write(0,*) me,'Checktmp_o_i Loop End',tmp_ovr_idx(1:10)
Enddo
if (debug) write(0,*)me,'End phase 1 CDOVRBLD', m, n_col, tot_recv
Enddo if (i_ovr < novr) then
!
! Exchange data requests with everybody else: so far we have
if (i_ovr < novr) then ! accumulated RECV requests, we have an all-to-all to build
if (tot_elem > 1) then ! matchings SENDs.
call imsr(tot_elem,works(idxs+1)) !
lx = works(idxs+1) call mpi_alltoall(sdsz,1,mpi_integer,rvsz,1,mpi_integer,icomm,info)
i = 1 if (info /= 0) then
j = 1 info=4010
do ch_err='mpi_alltoall'
j = j + 1 call psb_errpush(info,name,a_err=ch_err)
if (j > tot_elem) exit goto 9999
if (works(idxs+j) /= lx) then end if
i = i + 1 idxs = 0
works(idxs+i) = works(idxs+j) idxr = 0
lx = works(idxs+i) counter = 1
end if Do
end do proc=halo(counter)
tot_elem = i if (proc == -1) exit
endif n_elem_recv = halo(counter+psb_n_elem_recv_)
if (debug) write(0,*) me,'Checktmp_o_i Loop Mid2',tmp_ovr_idx(1:10) counter = counter+n_elem_recv
sdsz(proc+1) = tot_elem n_elem_send = halo(counter+psb_n_elem_send_)
idxs = idxs + tot_elem
end if bsdindx(proc+1) = idxs
idxs = idxs + sdsz(proc+1)
brvindx(proc+1) = idxr
idxr = idxr + rvsz(proc+1)
counter = counter+n_elem_send+3 counter = counter+n_elem_send+3
if (debug) write(0,*) me,'Checktmp_o_i Loop End',tmp_ovr_idx(1:10)
Enddo Enddo
if (debug) write(0,*)me,'End phase 1 CDOVRBLD', m, n_col, tot_recv
if (i_ovr < novr) then iszr=sum(rvsz)
! if (max(iszr,1) > lworkr) then
! Exchange data requests with everybody else: so far we have call psb_realloc(max(iszr,1),workr,info)
! accumulated RECV requests, we have an all-to-all to build
! matchings SENDs.
!
call mpi_alltoall(sdsz,1,mpi_integer,rvsz,1,mpi_integer,icomm,info)
if (info /= 0) then if (info /= 0) then
info=4010 info=4010
ch_err='mpi_alltoall' ch_err='psb_realloc'
call psb_errpush(info,name,a_err=ch_err) call psb_errpush(info,name,a_err=ch_err)
goto 9999 goto 9999
end if end if
idxs = 0 lworkr=max(iszr,1)
idxr = 0 end if
counter = 1
Do
proc=halo(counter)
if (proc == -1) exit
n_elem_recv = halo(counter+psb_n_elem_recv_)
counter = counter+n_elem_recv
n_elem_send = halo(counter+psb_n_elem_send_)
bsdindx(proc+1) = idxs
idxs = idxs + sdsz(proc+1)
brvindx(proc+1) = idxr
idxr = idxr + rvsz(proc+1)
counter = counter+n_elem_send+3
Enddo
iszr=sum(rvsz)
if (max(iszr,1) > lworkr) then
call psb_realloc(max(iszr,1),workr,info)
if (info /= 0) then
info=4010
ch_err='psb_realloc'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
lworkr=max(iszr,1)
end if
call mpi_alltoallv(works,sdsz,bsdindx,mpi_integer,& call mpi_alltoallv(works,sdsz,bsdindx,mpi_integer,&
& workr,rvsz,brvindx,mpi_integer,icomm,info) & workr,rvsz,brvindx,mpi_integer,icomm,info)
if (info /= 0) then
info=4010
ch_err='mpi_alltoallv'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
if (debug) write(0,*) 'ISZR :',iszr
if (psb_is_large_desc(desc_a)) then
call psb_check_size(iszr,maskr,info)
if (info /= 0) then if (info /= 0) then
info=4010 info=4010
ch_err='mpi_alltoallv' call psb_errpush(info,name,a_err='psb_check_size')
call psb_errpush(info,name,a_err=ch_err)
goto 9999 goto 9999
end if end if
call psi_idx_cnv(iszr,workr,maskr,desc_ov,info)
iszs = count(maskr(1:iszr)<=0)
if (iszs > size(works)) call psb_realloc(iszs,works,info)
j = 0
do i=1,iszr
if (maskr(i) < 0) then
j=j+1
works(j) = workr(i)
end if
end do
!
! fnd_owner on desc_a because we want the procs who
! owned the rows from the beginning!
!
call psi_fnd_owner(iszs,works,temp,desc_a,info)
n_col=psb_cd_get_local_cols(desc_ov)
do i=1,iszs
idx = works(i)
n_col = psb_cd_get_local_cols(desc_ov)
call psi_idx_ins_cnv(idx,lidx,desc_ov,info)
if (psb_cd_get_local_cols(desc_ov) > n_col ) then
!
! This is a new index. Assigning a local index as
! we receive them guarantees that all indices for HALO(I)
! will be less than those for HALO(J) whenever I<J
!
proc_id = temp(i)
call psb_check_size((counter_t+3),t_halo_in,info,pad=-1)
if (info /= 0) then
info=4010
call psb_errpush(info,name,a_err='psb_check_size')
goto 9999
end if
if (debug) write(0,*) 'ISZR :',iszr t_halo_in(counter_t)=proc_id
t_halo_in(counter_t+1)=1
t_halo_in(counter_t+2)=lidx
counter_t=counter_t+3
if (.false.) write(0,*) me,' CDOVRBLD: Added t_halo_in ',&
&proc_id,lidx,idx
endif
end Do
else
Do i=1,iszr
idx=workr(i)
if (idx <1) then
write(0,*) me,'Error in CDOVRBLD level',i_ovr,' : ',idx,i,iszr
else If (desc_ov%glob_to_loc(idx) < -np) Then
!
! This is a new index. Assigning a local index as
! we receive them guarantees that all indices for HALO(I)
! will be less than those for HALO(J) whenever I<J
!
n_col=n_col+1
proc_id=-desc_ov%glob_to_loc(idx)-np-1
call psb_check_size(n_col,desc_ov%loc_to_glob,info,pad=-1)
if (info /= 0) then
info=4010
call psb_errpush(info,name,a_err='psb_check_size')
goto 9999
end if
if (psb_is_large_desc(desc_a)) then desc_ov%glob_to_loc(idx)=n_col
call psb_check_size(iszr,maskr,info) desc_ov%loc_to_glob(n_col)=idx
if (info /= 0) then
info=4010 call psb_check_size((counter_t+3),t_halo_in,info,pad=-1)
call psb_errpush(info,name,a_err='psb_check_size') if (info /= 0) then
goto 9999 info=4010
end if call psb_errpush(info,name,a_err='psb_check_size')
call psi_idx_cnv(iszr,workr,maskr,desc_ov,info) goto 9999
iszs = count(maskr(1:iszr)<=0)
if (iszs > size(works)) call psb_realloc(iszs,works,info)
j = 0
do i=1,iszr
if (maskr(i) < 0) then
j=j+1
works(j) = workr(i)
end if end if
end do
!
! fnd_owner on desc_a because we want the procs who
! owned the rows from the beginning!
!
call psi_fnd_owner(iszs,works,temp,desc_a,info)
n_col=psb_cd_get_local_cols(desc_ov)
do i=1,iszs
idx = works(i)
n_col = psb_cd_get_local_cols(desc_ov)
call psi_idx_ins_cnv(idx,lidx,desc_ov,info)
if (psb_cd_get_local_cols(desc_ov) > n_col ) then
!
! This is a new index. Assigning a local index as
! we receive them guarantees that all indices for HALO(I)
! will be less than those for HALO(J) whenever I<J
!
proc_id = temp(i)
call psb_check_size((counter_t+3),t_halo_in,info,pad=-1)
if (info /= 0) then
info=4010
call psb_errpush(info,name,a_err='psb_check_size')
goto 9999
end if
t_halo_in(counter_t)=proc_id
t_halo_in(counter_t+1)=1
t_halo_in(counter_t+2)=lidx
counter_t=counter_t+3
if (.false.) write(0,*) me,' CDOVRBLD: Added t_halo_in ',&
&proc_id,lidx,idx
endif
end Do
else
Do i=1,iszr
idx=workr(i)
if (idx <1) then
write(0,*) me,'Error in CDOVRBLD level',i_ovr,' : ',idx,i,iszr
else If (desc_ov%glob_to_loc(idx) < -np) Then
!
! This is a new index. Assigning a local index as
! we receive them guarantees that all indices for HALO(I)
! will be less than those for HALO(J) whenever I<J
!
n_col=n_col+1
proc_id=-desc_ov%glob_to_loc(idx)-np-1
call psb_check_size(n_col,desc_ov%loc_to_glob,info,pad=-1)
if (info /= 0) then
info=4010
call psb_errpush(info,name,a_err='psb_check_size')
goto 9999
end if
desc_ov%glob_to_loc(idx)=n_col
desc_ov%loc_to_glob(n_col)=idx
call psb_check_size((counter_t+3),t_halo_in,info,pad=-1)
if (info /= 0) then
info=4010
call psb_errpush(info,name,a_err='psb_check_size')
goto 9999
end if
t_halo_in(counter_t)=proc_id
t_halo_in(counter_t+1)=1
t_halo_in(counter_t+2)=n_col
counter_t=counter_t+3
if (debug) write(0,*) me,' CDOVRBLD: Added into t_halo_in from recv',&
&proc_id,n_col,idx
else if (desc_ov%glob_to_loc(idx) < 0) Then
if (debug) write(0,*) me,'Wrong input to cdovrbld??',&
&idx,desc_ov%glob_to_loc(idx)
End If
End Do
desc_ov%matrix_data(psb_n_col_)=n_col
end if t_halo_in(counter_t)=proc_id
t_halo_in(counter_t+1)=1
t_halo_in(counter_t+2)=n_col
counter_t=counter_t+3
if (debug) write(0,*) me,' CDOVRBLD: Added into t_halo_in from recv',&
&proc_id,n_col,idx
else if (desc_ov%glob_to_loc(idx) < 0) Then
if (debug) write(0,*) me,'Wrong input to cdovrbld??',&
&idx,desc_ov%glob_to_loc(idx)
End If
End Do
desc_ov%matrix_data(psb_n_col_)=n_col
end if end if
!!$ desc_ov%matrix_data(psb_n_row_)=desc_ov%matrix_data(psb_n_col_)
!
! Ok, now we have a temporary halo with all the info for the
! next round. If we need to keep going, convert the halo format
! from temporary to final, so that we can work out the next iteration.
! This uses one of the convert_comm internals, i.e. we are doing
! the equivalent of a partial call to convert_comm
!
If (i_ovr < (novr)) Then end if
!!$ desc_ov%matrix_data(psb_n_row_)=desc_ov%matrix_data(psb_n_col_)
!
! Ok, now we have a temporary halo with all the info for the
! next round. If we need to keep going, convert the halo format
! from temporary to final, so that we can work out the next iteration.
! This uses one of the convert_comm internals, i.e. we are doing
! the equivalent of a partial call to convert_comm
!
t_halo_in(counter_t)=-1 If (i_ovr < (novr)) Then
if (debug) write(0,*) me,'Checktmp_o_i 1',tmp_ovr_idx(1:10) t_halo_in(counter_t)=-1
if (debug) write(0,*) me,'Calling Crea_Halo'
call psi_crea_index(desc_ov,t_halo_in,t_halo_out,.false.,& if (debug) write(0,*) me,'Checktmp_o_i 1',tmp_ovr_idx(1:10)
& nxch,nsnd,nrcv,info) if (debug) write(0,*) me,'Calling Crea_Halo'
if (debug) then call psi_crea_index(desc_ov,t_halo_in,t_halo_out,.false.,&
write(0,*) me,'Done Crea_Index' & nxch,nsnd,nrcv,info)
call psb_barrier(ictxt)
end if
if (debug) write(0,*) me,'Checktmp_o_i 2',tmp_ovr_idx(1:10)
if (debug) write(0,*) me,'Done Crea_Halo'
call psb_transfer(t_halo_out,halo,info)
!
! At this point we have built the halo necessary for I_OVR+1.
!
End If
if (debug) write(0,*) me,'Checktmp_o_i ',tmp_ovr_idx(1:10)
End Do if (debug) then
write(0,*) me,'Done Crea_Index'
call psb_barrier(ictxt)
end if
if (debug) write(0,*) me,'Checktmp_o_i 2',tmp_ovr_idx(1:10)
if (debug) write(0,*) me,'Done Crea_Halo'
call psb_transfer(t_halo_out,halo,info)
!
! At this point we have built the halo necessary for I_OVR+1.
!
End If
if (debug) write(0,*) me,'Checktmp_o_i ',tmp_ovr_idx(1:10)
desc_ov%matrix_data(psb_m_)=psb_cd_get_global_rows(desc_a) End Do
desc_ov%matrix_data(psb_n_)=psb_cd_get_global_cols(desc_a)
tmp_halo(counter_h:)=-1 desc_ov%matrix_data(psb_m_)=psb_cd_get_global_rows(desc_a)
tmp_ovr_idx(counter_o:)=-1 desc_ov%matrix_data(psb_n_)=psb_cd_get_global_cols(desc_a)
tmp_halo(counter_h:)=-1
tmp_ovr_idx(counter_o:)=-1
!
! At this point we have gathered all the indices in the halo at
! N levels of overlap. Just call cnv_dsc. This is
! the same routine as gets called inside CDASB.
!
if (debug) then !
write(0,*) 'psb_cdovrbld: converting indexes' ! At this point we have gathered all the indices in the halo at
call psb_barrier(ictxt) ! N levels of overlap. Just call cnv_dsc. This is
end if ! the same routine as gets called inside CDASB.
!.... convert comunication stuctures.... !
! Note that we have to keep local_rows until the very end,
! because otherwise the halo build mechanism of cdasb
! will not work.
call psb_transfer(tmp_ovr_idx,desc_ov%ovrlap_index,info)
call psb_transfer(tmp_halo,desc_ov%halo_index,info)
call psb_cdasb(desc_ov,info)
desc_ov%matrix_data(psb_n_row_)=desc_ov%matrix_data(psb_n_col_)
if (debug) then
write(0,*) me,'Done CDASB'
call psb_barrier(ictxt)
end if
if (info == 0) call psb_sp_free(blk,info) if (debug) then
if (info /= 0) then write(0,*) 'psb_cdovrbld: converting indexes'
ch_err='sp_free' call psb_barrier(ictxt)
call psb_errpush(4013,name,a_err=ch_err,i_err=(/info,0,0,0,0/)) end if
goto 9999 !.... convert comunication stuctures....
end if ! Note that we have to keep local_rows until the very end,
! because otherwise the halo build mechanism of cdasb
! will not work.
call psb_transfer(tmp_ovr_idx,desc_ov%ovrlap_index,info)
call psb_transfer(tmp_halo,desc_ov%halo_index,info)
call psb_cdasb(desc_ov,info)
desc_ov%matrix_data(psb_n_row_)=desc_ov%matrix_data(psb_n_col_)
if (debug) then
write(0,*) me,'Done CDASB'
call psb_barrier(ictxt)
end if
if (info == 0) call psb_sp_free(blk,info)
if (info /= 0) then
ch_err='sp_free'
call psb_errpush(4013,name,a_err=ch_err,i_err=(/info,0,0,0,0/))
goto 9999
end if end if
call psb_erractionrestore(err_act) call psb_erractionrestore(err_act)
return return

Loading…
Cancel
Save