@ -83,7 +83,7 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
Integer , allocatable :: sdid ( : , : ) , brvindx ( : ) , rvid ( : , : ) , &
& rvsz ( : ) , bsdindx ( : ) , sdsz ( : ) , iasnd ( : ) , jasnd ( : )
real ( psb_dpk_ ) , allocatable :: valsnd ( : )
class( psb_d_base _sparse_mat) , allocatable :: acoo
type( psb_d_coo _sparse_mat) , allocatable :: acoo
integer , pointer :: idxv ( : )
logical :: rowcnv_ , colcnv_ , rowscale_ , colscale_
character ( len = 5 ) :: outfmt_
@ -164,200 +164,190 @@ Subroutine psb_dsphalo(a,desc_a,blk,info,rowcnv,colcnv,&
end select
allocate ( psb_d_coo_sparse_mat :: acoo, stat = info )
allocate ( acoo, stat = info )
if ( info / = psb_success_ ) then
info = psb_err_alloc_dealloc_
call psb_errpush ( info , name )
go to 9999
end if
select type ( acoo )
type is ( psb_d_coo_sparse_mat )
l1 = 0
sdsz ( : ) = 0
rvsz ( : ) = 0
ipx = 1
brvindx ( ipx ) = 0
bsdindx ( ipx ) = 0
counter = 1
idx = 0
idxs = 0
idxr = 0
call acoo % allocate ( 0 , a % get_ncols ( ) , info )
! For all rows in the halo descriptor , extract and send / receive .
Do
proc = idxv ( counter )
if ( proc == - 1 ) exit
n_el_recv = idxv ( counter + psb_n_elem_recv_ )
counter = counter + n_el_recv
n_el_send = idxv ( counter + psb_n_elem_send_ )
tot_elem = 0
Do j = 0 , n_el_send - 1
idx = idxv ( counter + psb_elem_send_ + j )
n_elem = a % get_nz_row ( idx )
tot_elem = tot_elem + n_elem
Enddo
sdsz ( proc + 1 ) = tot_elem
call acoo % set_nrows ( acoo % get_nrows ( ) + n_el_recv )
counter = counter + n_el_send + 3
Enddo
call mpi_alltoall ( sdsz , 1 , mpi_integer , rvsz , 1 , mpi_integer , icomm , info )
if ( info / = psb_success_ ) then
info = psb_err_from_subroutine_
ch_err = 'mpi_alltoall'
call psb_errpush ( info , name , a_err = ch_err )
go to 9999
end if
idxs = 0
idxr = 0
counter = 1
Do
proc = idxv ( counter )
if ( proc == - 1 ) exit
n_el_recv = idxv ( counter + psb_n_elem_recv_ )
counter = counter + n_el_recv
n_el_send = idxv ( counter + psb_n_elem_send_ )
bsdindx ( proc + 1 ) = idxs
idxs = idxs + sdsz ( proc + 1 )
brvindx ( proc + 1 ) = idxr
idxr = idxr + rvsz ( proc + 1 )
counter = counter + n_el_send + 3
l1 = 0
sdsz ( : ) = 0
rvsz ( : ) = 0
ipx = 1
brvindx ( ipx ) = 0
bsdindx ( ipx ) = 0
counter = 1
idx = 0
idxs = 0
idxr = 0
call acoo % allocate ( 0 , a % get_ncols ( ) , info )
! For all rows in the halo descriptor , extract and send / receive .
Do
proc = idxv ( counter )
if ( proc == - 1 ) exit
n_el_recv = idxv ( counter + psb_n_elem_recv_ )
counter = counter + n_el_recv
n_el_send = idxv ( counter + psb_n_elem_send_ )
tot_elem = 0
Do j = 0 , n_el_send - 1
idx = idxv ( counter + psb_elem_send_ + j )
n_elem = a % get_nz_row ( idx )
tot_elem = tot_elem + n_elem
Enddo
sdsz ( proc + 1 ) = tot_elem
call acoo % set_nrows ( acoo % get_nrows ( ) + n_el_recv )
counter = counter + n_el_send + 3
Enddo
iszr = sum ( rvsz )
call acoo % reallocate ( max ( iszr , 1 ) )
if ( debug_level > = psb_debug_outer_ ) &
& write ( debug_unit , * ) me , ' ' , trim ( name ) , ': Sizes:' , acoo % get_size ( ) , &
& ' Send:' , sdsz ( : ) , ' Receive:' , rvsz ( : )
if ( info / = psb_success_ ) then
info = psb_err_from_subroutine_
ch_err = 'psb_sp_reall'
call psb_errpush ( info , name , a_err = ch_err )
go to 9999
end if
mat_recv = iszr
iszs = sum ( sdsz )
call psb_ensure_size ( max ( iszs , 1 ) , iasnd , info )
if ( info == psb_success_ ) call psb_ensure_size ( max ( iszs , 1 ) , jasnd , info )
if ( info == psb_success_ ) call psb_ensure_size ( max ( iszs , 1 ) , valsnd , info )
l1 = 0
ipx = 1
counter = 1
idx = 0
tot_elem = 0
Do
proc = idxv ( counter )
if ( proc == - 1 ) exit
n_el_recv = idxv ( counter + psb_n_elem_recv_ )
counter = counter + n_el_recv
n_el_send = idxv ( counter + psb_n_elem_send_ )
Do j = 0 , n_el_send - 1
idx = idxv ( counter + psb_elem_send_ + j )
n_elem = a % get_nz_row ( idx )
call a % csget ( idx , idx , ngtz , iasnd , jasnd , valsnd , info , &
& append = . true . , nzin = tot_elem )
if ( info / = psb_success_ ) then
info = psb_err_from_subroutine_
ch_err = 'psb_sp_getrow'
call psb_errpush ( info , name , a_err = ch_err )
go to 9999
end if
tot_elem = tot_elem + n_elem
Enddo
ipx = ipx + 1
counter = counter + n_el_send + 3
Enddo
nz = tot_elem
if ( rowcnv_ ) call psb_loc_to_glob ( iasnd ( 1 : nz ) , desc_a , info , iact = 'I' )
if ( colcnv_ ) call psb_loc_to_glob ( jasnd ( 1 : nz ) , desc_a , info , iact = 'I' )
if ( info / = psb_success_ ) then
info = psb_err_from_subroutine_
ch_err = 'psb_loc_to_glob'
call psb_errpush ( info , name , a_err = ch_err )
go to 9999
end if
call mpi_alltoallv ( valsnd , sdsz , bsdindx , mpi_double_precision , &
& acoo % val , rvsz , brvindx , mpi_double_precision , icomm , info )
call mpi_alltoallv ( iasnd , sdsz , bsdindx , mpi_integer , &
& acoo % ia , rvsz , brvindx , mpi_integer , icomm , info )
call mpi_alltoallv ( jasnd , sdsz , bsdindx , mpi_integer , &
& acoo % ja , rvsz , brvindx , mpi_integer , icomm , info )
if ( info / = psb_success_ ) then
info = psb_err_from_subroutine_
ch_err = 'mpi_alltoallv'
call psb_errpush ( info , name , a_err = ch_err )
go to 9999
end if
!
! Convert into local numbering
!
if ( rowcnv_ ) call psb_glob_to_loc ( acoo % ia ( 1 : iszr ) , desc_a , info , iact = 'I' )
if ( colcnv_ ) call psb_glob_to_loc ( acoo % ja ( 1 : iszr ) , desc_a , info , iact = 'I' )
if ( info / = psb_success_ ) then
info = psb_err_from_subroutine_
ch_err = 'psbglob_to_loc'
call psb_errpush ( info , name , a_err = ch_err )
go to 9999
end if
l1 = 0
call acoo % set_nrows ( 0 )
!
irmin = huge ( irmin )
icmin = huge ( icmin )
irmax = 0
icmax = 0
Do i = 1 , iszr
r = ( acoo % ia ( i ) )
k = ( acoo % ja ( i ) )
! Just in case some of the conversions were out - of - range
If ( ( r > 0 ) . and . ( k > 0 ) ) Then
l1 = l1 + 1
acoo % val ( l1 ) = acoo % val ( i )
acoo % ia ( l1 ) = r
acoo % ja ( l1 ) = k
irmin = min ( irmin , r )
irmax = max ( irmax , r )
icmin = min ( icmin , k )
icmax = max ( icmax , k )
End If
call mpi_alltoall ( sdsz , 1 , mpi_integer , rvsz , 1 , mpi_integer , icomm , info )
if ( info / = psb_success_ ) then
info = psb_err_from_subroutine_
ch_err = 'mpi_alltoall'
call psb_errpush ( info , name , a_err = ch_err )
go to 9999
end if
idxs = 0
idxr = 0
counter = 1
Do
proc = idxv ( counter )
if ( proc == - 1 ) exit
n_el_recv = idxv ( counter + psb_n_elem_recv_ )
counter = counter + n_el_recv
n_el_send = idxv ( counter + psb_n_elem_send_ )
bsdindx ( proc + 1 ) = idxs
idxs = idxs + sdsz ( proc + 1 )
brvindx ( proc + 1 ) = idxr
idxr = idxr + rvsz ( proc + 1 )
counter = counter + n_el_send + 3
Enddo
iszr = sum ( rvsz )
call acoo % reallocate ( max ( iszr , 1 ) )
if ( debug_level > = psb_debug_outer_ ) &
& write ( debug_unit , * ) me , ' ' , trim ( name ) , ': Sizes:' , acoo % get_size ( ) , &
& ' Send:' , sdsz ( : ) , ' Receive:' , rvsz ( : )
if ( info / = psb_success_ ) then
info = psb_err_from_subroutine_
ch_err = 'psb_sp_reall'
call psb_errpush ( info , name , a_err = ch_err )
go to 9999
end if
mat_recv = iszr
iszs = sum ( sdsz )
call psb_ensure_size ( max ( iszs , 1 ) , iasnd , info )
if ( info == psb_success_ ) call psb_ensure_size ( max ( iszs , 1 ) , jasnd , info )
if ( info == psb_success_ ) call psb_ensure_size ( max ( iszs , 1 ) , valsnd , info )
l1 = 0
ipx = 1
counter = 1
idx = 0
tot_elem = 0
Do
proc = idxv ( counter )
if ( proc == - 1 ) exit
n_el_recv = idxv ( counter + psb_n_elem_recv_ )
counter = counter + n_el_recv
n_el_send = idxv ( counter + psb_n_elem_send_ )
Do j = 0 , n_el_send - 1
idx = idxv ( counter + psb_elem_send_ + j )
n_elem = a % get_nz_row ( idx )
call a % csget ( idx , idx , ngtz , iasnd , jasnd , valsnd , info , &
& append = . true . , nzin = tot_elem )
if ( info / = psb_success_ ) then
info = psb_err_from_subroutine_
ch_err = 'psb_sp_getrow'
call psb_errpush ( info , name , a_err = ch_err )
go to 9999
end if
tot_elem = tot_elem + n_elem
Enddo
if ( rowscale_ ) then
call acoo % set_nrows ( max ( irmax - irmin + 1 , 0 ) )
acoo % ia ( 1 : l1 ) = acoo % ia ( 1 : l1 ) - irmin + 1
else
call acoo % set_nrows ( irmax )
end if
if ( colscale_ ) then
call acoo % set_ncols ( max ( icmax - icmin + 1 , 0 ) )
acoo % ja ( 1 : l1 ) = acoo % ja ( 1 : l1 ) - icmin + 1
else
call acoo % set_ncols ( icmax )
end if
call acoo % set_nzeros ( l1 )
class default
! This is impossible
info = psb_err_internal_error_
call psb_Errpush ( info , name )
ipx = ipx + 1
counter = counter + n_el_send + 3
Enddo
nz = tot_elem
if ( rowcnv_ ) call psb_loc_to_glob ( iasnd ( 1 : nz ) , desc_a , info , iact = 'I' )
if ( colcnv_ ) call psb_loc_to_glob ( jasnd ( 1 : nz ) , desc_a , info , iact = 'I' )
if ( info / = psb_success_ ) then
info = psb_err_from_subroutine_
ch_err = 'psb_loc_to_glob'
call psb_errpush ( info , name , a_err = ch_err )
go to 9999
end select
end if
call mpi_alltoallv ( valsnd , sdsz , bsdindx , mpi_double_precision , &
& acoo % val , rvsz , brvindx , mpi_double_precision , icomm , info )
call mpi_alltoallv ( iasnd , sdsz , bsdindx , mpi_integer , &
& acoo % ia , rvsz , brvindx , mpi_integer , icomm , info )
call mpi_alltoallv ( jasnd , sdsz , bsdindx , mpi_integer , &
& acoo % ja , rvsz , brvindx , mpi_integer , icomm , info )
if ( info / = psb_success_ ) then
info = psb_err_from_subroutine_
ch_err = 'mpi_alltoallv'
call psb_errpush ( info , name , a_err = ch_err )
go to 9999
end if
!
! Convert into local numbering
!
if ( rowcnv_ ) call psb_glob_to_loc ( acoo % ia ( 1 : iszr ) , desc_a , info , iact = 'I' )
if ( colcnv_ ) call psb_glob_to_loc ( acoo % ja ( 1 : iszr ) , desc_a , info , iact = 'I' )
if ( info / = psb_success_ ) then
info = psb_err_from_subroutine_
ch_err = 'psbglob_to_loc'
call psb_errpush ( info , name , a_err = ch_err )
go to 9999
end if
l1 = 0
call acoo % set_nrows ( 0 )
!
irmin = huge ( irmin )
icmin = huge ( icmin )
irmax = 0
icmax = 0
Do i = 1 , iszr
r = ( acoo % ia ( i ) )
k = ( acoo % ja ( i ) )
! Just in case some of the conversions were out - of - range
If ( ( r > 0 ) . and . ( k > 0 ) ) Then
l1 = l1 + 1
acoo % val ( l1 ) = acoo % val ( i )
acoo % ia ( l1 ) = r
acoo % ja ( l1 ) = k
irmin = min ( irmin , r )
irmax = max ( irmax , r )
icmin = min ( icmin , k )
icmax = max ( icmax , k )
End If
Enddo
if ( rowscale_ ) then
call acoo % set_nrows ( max ( irmax - irmin + 1 , 0 ) )
acoo % ia ( 1 : l1 ) = acoo % ia ( 1 : l1 ) - irmin + 1
else
call acoo % set_nrows ( irmax )
end if
if ( colscale_ ) then
call acoo % set_ncols ( max ( icmax - icmin + 1 , 0 ) )
acoo % ja ( 1 : l1 ) = acoo % ja ( 1 : l1 ) - icmin + 1
else
call acoo % set_ncols ( icmax )
end if
call acoo % set_nzeros ( l1 )
if ( debug_level > = psb_debug_outer_ ) &
& write ( debug_unit , * ) me , ' ' , trim ( name ) , &