LX in mat_dist and MMIO.

ILmat
Salvatore Filippone 8 years ago
parent 07f845cd26
commit 41b2943f71

@ -338,3 +338,319 @@ subroutine psb_cmatdist(a_glob, a, ictxt, desc_a,&
return
end subroutine psb_cmatdist
subroutine psb_lcmatdist(a_glob, a, ictxt, desc_a,&
& info, parts, v, inroot,fmt,mold)
!
! an utility subroutine to distribute a matrix among processors
! according to a user defined data distribution, using
! sparse matrix subroutines.
!
! type(psb_cspmat) :: a_glob
! on entry: this contains the global sparse matrix as follows:
!
! type(psb_cspmat_type) :: a
! on exit : this will contain the local sparse matrix.
!
! interface parts
! ! .....user passed subroutine.....
! subroutine parts(global_indx,n,np,pv,nv)
! implicit none
! integer(psb_ipk_), intent(in) :: global_indx, n, np
! integer(psb_ipk_), intent(out) :: nv
! integer(psb_ipk_), intent(out) :: pv(*)
!
! end subroutine parts
! end interface
! on entry: subroutine providing user defined data distribution.
! for each global_indx the subroutine should return
! the list pv of all processes owning the row with
! that index; the list will contain nv entries.
! usually nv=1; if nv >1 then we have an overlap in the data
! distribution.
!
! integer(psb_ipk_) :: ictxt
! on entry: the PSBLAS parallel environment context.
!
! type (desc_type) :: desc_a
! on exit : the updated array descriptor
!
! integer(psb_ipk_), optional :: inroot
! on entry: specifies processor holding a_glob. default: 0
! on exit : unchanged.
!
use psb_base_mod
use psb_mat_mod
implicit none
! parameters
type(psb_lcspmat_type) :: a_glob
integer(psb_ipk_) :: ictxt
type(psb_cspmat_type) :: a
type(psb_desc_type) :: desc_a
integer(psb_ipk_), intent(out) :: info
integer(psb_ipk_), optional :: inroot
character(len=*), optional :: fmt
class(psb_c_base_sparse_mat), optional :: mold
procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:)
! local variables
logical :: use_parts, use_v
integer(psb_mpk_) :: np, iam, np_sharing, root, iproc
integer(psb_ipk_) :: err_act, il, inz
integer(psb_lpk_) :: k_count, liwork, nnzero, nrhs,&
& i, ll, nz, isize, nnr, err
integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig
integer(psb_ipk_), allocatable :: iwork(:), iwrk2(:)
integer(psb_lpk_), allocatable :: irow(:),icol(:)
complex(psb_spk_), allocatable :: val(:)
integer(psb_ipk_), parameter :: nb=30
real(psb_dpk_) :: t0, t1, t2, t3, t4, t5
character(len=20) :: name, ch_err
info = psb_success_
err = 0
name = 'psb_c_mat_dist'
call psb_erractionsave(err_act)
! executable statements
if (present(inroot)) then
root = inroot
else
root = psb_root_
end if
call psb_info(ictxt, iam, np)
if (iam == root) then
nrow = a_glob%get_nrows()
ncol = a_glob%get_ncols()
if (nrow /= ncol) then
write(psb_err_unit,*) 'a rectangular matrix ? ',nrow,ncol
info=-1
call psb_errpush(info,name)
goto 9999
endif
nnzero = a_glob%get_nzeros()
nrhs = 1
endif
use_parts = present(parts)
use_v = present(v)
if (count((/ use_parts, use_v /)) /= 1) then
info=psb_err_no_optional_arg_
call psb_errpush(info,name,a_err=" v, parts")
goto 9999
endif
! broadcast informations to other processors
call psb_bcast(ictxt,nrow, root)
call psb_bcast(ictxt,ncol, root)
call psb_bcast(ictxt,nnzero, root)
call psb_bcast(ictxt,nrhs, root)
liwork = max(np, nrow + ncol)
allocate(iwork(liwork), iwrk2(np),stat = info)
if (info /= psb_success_) then
info=psb_err_alloc_request_
call psb_errpush(info,name,l_err=(/liwork/),a_err='integer')
goto 9999
endif
if (iam == root) then
write (*, fmt = *) 'start matdist',root, size(iwork),&
&nrow, ncol, nnzero,nrhs
endif
if (use_parts) then
call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts)
else if (use_v) then
call psb_cdall(ictxt,desc_a,info,vg=v)
else
info = -1
end if
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_cdall'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
inz = ((nnzero+np-1)/np)
call psb_spall(a,desc_a,info,nnz=inz)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_spall'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
isize = 3*nb*max(((nnzero+nrow)/nrow),nb)
allocate(val(isize),irow(isize),icol(isize),stat=info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='Allocate'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
i_count = 1
do while (i_count <= nrow)
if (use_parts) then
call parts(i_count,nrow,np,iwork, np_sharing)
!
! np_sharing allows for overlap in the data distribution.
! If an index is overlapped, then we have to send its row
! to multiple processes. NOTE: we are assuming the output
! from PARTS is coherent, otherwise a deadlock is the most
! likely outcome.
!
j_count = i_count
if (np_sharing == 1) then
iproc = iwork(1)
do
j_count = j_count + 1
if (j_count-i_count >= nb) exit
if (j_count > nrow) exit
call parts(j_count,nrow,np,iwrk2, np_sharing)
if (np_sharing /= 1 ) exit
if (iwrk2(1) /= iproc ) exit
end do
end if
else
np_sharing = 1
j_count = i_count
iproc = v(i_count)
iwork(1:np_sharing) = iproc
do
j_count = j_count + 1
if (j_count-i_count >= nb) exit
if (j_count > nrow) exit
if (v(j_count) /= iproc ) exit
end do
end if
! now we should insert rows i_count..j_count-1
nnr = j_count - i_count
if (iam == root) then
ll = 0
do i= i_count, j_count-1
call a_glob%csget(i,i,nz,&
& irow,icol,val,info,nzin=ll,append=.true.)
if (info /= psb_success_) then
if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then
write(psb_err_unit,*) 'Allocation failure? This should not happen!'
end if
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
ll = ll + nz
end do
do k_count = 1, np_sharing
iproc = iwork(k_count)
if (iproc == iam) then
il = ll
call psb_spins(il,irow,icol,val,a,desc_a,info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_spins'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
else
call psb_snd(ictxt,nnr,iproc)
call psb_snd(ictxt,ll,iproc)
call psb_snd(ictxt,irow(1:ll),iproc)
call psb_snd(ictxt,icol(1:ll),iproc)
call psb_snd(ictxt,val(1:ll),iproc)
call psb_rcv(ictxt,ll,iproc)
endif
end do
else if (iam /= root) then
do k_count = 1, np_sharing
iproc = iwork(k_count)
if (iproc == iam) then
call psb_rcv(ictxt,nnr,root)
call psb_rcv(ictxt,ll,root)
if (ll > size(irow)) then
write(psb_err_unit,*) iam,'need to reallocate ',ll
deallocate(val,irow,icol)
allocate(val(ll),irow(ll),icol(ll),stat=info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='Allocate'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
endif
call psb_rcv(ictxt,irow(1:ll),root)
call psb_rcv(ictxt,icol(1:ll),root)
call psb_rcv(ictxt,val(1:ll),root)
call psb_snd(ictxt,ll,root)
il = ll
call psb_spins(il,irow,icol,val,a,desc_a,info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psspins'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
endif
end do
endif
i_count = j_count
end do
call psb_barrier(ictxt)
t0 = psb_wtime()
call psb_cdasb(desc_a,info)
t1 = psb_wtime()
if(info /= psb_success_)then
info=psb_err_from_subroutine_
ch_err='psb_cdasb'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
call psb_barrier(ictxt)
t2 = psb_wtime()
call psb_spasb(a,desc_a,info,dupl=psb_dupl_err_,afmt=fmt,mold=mold)
t3 = psb_wtime()
if(info /= psb_success_)then
info=psb_err_from_subroutine_
ch_err='psb_spasb'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
if (iam == root) then
write(psb_out_unit,*) 'descriptor assembly: ',t1-t0
write(psb_out_unit,*) 'sparse matrix assembly: ',t3-t2
end if
deallocate(val,irow,icol,iwork,iwrk2,stat=info)
if(info /= psb_success_)then
info=psb_err_from_subroutine_
ch_err='deallocate'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
if (iam == root) write (*, fmt = *) 'end matdist'
call psb_erractionrestore(err_act)
return
9999 call psb_error_handler(ictxt,err_act)
return
end subroutine psb_lcmatdist

@ -31,7 +31,8 @@
!
module psb_c_mat_dist_mod
use psb_base_mod, only : psb_ipk_, psb_spk_, psb_desc_type, psb_parts, &
& psb_cspmat_type, psb_c_base_sparse_mat, psb_c_vect_type
& psb_cspmat_type, psb_c_base_sparse_mat, psb_c_vect_type, &
& psb_lcspmat_type
interface psb_matdist
subroutine psb_cmatdist(a_glob, a, ictxt, desc_a,&
@ -91,6 +92,64 @@ module psb_c_mat_dist_mod
procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:)
end subroutine psb_cmatdist
subroutine psb_lcmatdist(a_glob, a, ictxt, desc_a,&
& info, parts, v, inroot,fmt,mold)
!
! an utility subroutine to distribute a matrix among processors
! according to a user defined data distribution, using
! sparse matrix subroutines.
!
! type(psb_lcspmat) :: a_glob
! on entry: this contains the global sparse matrix as follows:
!
! type(psb_cspmat_type) :: a
! on exit : this will contain the local sparse matrix.
!
! interface parts
! ! .....user passed subroutine.....
! subroutine parts(global_indx,n,np,pv,nv)
! implicit none
! integer(psb_ipk_), intent(in) :: global_indx, n, np
! integer(psb_ipk_), intent(out) :: nv
! integer(psb_ipk_), intent(out) :: pv(*)
!
! end subroutine parts
! end interface
! on entry: subroutine providing user defined data distribution.
! for each global_indx the subroutine should return
! the list pv of all processes owning the row with
! that index; the list will contain nv entries.
! usually nv=1; if nv >1 then we have an overlap in the data
! distribution.
!
! integer(psb_ipk_) :: ictxt
! on entry: the PSBLAS parallel environment context.
!
! type (desc_type) :: desc_a
! on exit : the updated array descriptor
!
!
! integer(psb_ipk_), optional :: inroot
! on entry: specifies processor holding a_glob. default: 0
! on exit : unchanged.
!
import :: psb_ipk_, psb_cspmat_type, psb_spk_, psb_desc_type,&
& psb_c_base_sparse_mat, psb_c_vect_type, psb_parts, &
& psb_lcspmat_type
implicit none
! parameters
type(psb_lcspmat_type) :: a_glob
integer(psb_ipk_) :: ictxt
type(psb_cspmat_type) :: a
type(psb_desc_type) :: desc_a
integer(psb_ipk_), intent(out) :: info
integer(psb_ipk_), optional :: inroot
character(len=*), optional :: fmt
class(psb_c_base_sparse_mat), optional :: mold
procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:)
end subroutine psb_lcmatdist
end interface
end module psb_c_mat_dist_mod

@ -461,3 +461,179 @@ subroutine cmm_mat_write(a,mtitle,info,iunit,filename)
write(psb_err_unit,*) 'Error while opening ',filename
return
end subroutine cmm_mat_write
subroutine lcmm_mat_read(a, info, iunit, filename)
use psb_base_mod
implicit none
type(psb_lcspmat_type), intent(out) :: a
integer(psb_ipk_), intent(out) :: info
integer(psb_ipk_), optional, intent(in) :: iunit
character(len=*), optional, intent(in) :: filename
character :: mmheader*15, fmt*15, object*10, type*10, sym*15
character(1024) :: line
integer(psb_lpk_) :: nrow, ncol, nnzero, i,nzr
integer(psb_ipk_) :: ircode,infile
type(psb_lc_coo_sparse_mat), allocatable :: acoo
real(psb_spk_) :: are, aim
info = psb_success_
if (present(filename)) then
if (filename == '-') then
infile=5
else
if (present(iunit)) then
infile=iunit
else
infile=99
endif
open(infile,file=filename, status='OLD', err=901, action='READ')
endif
else
if (present(iunit)) then
infile=iunit
else
infile=5
endif
endif
read(infile,fmt=*,end=902) mmheader, object, fmt, type, sym
if ( (psb_tolower(object) /= 'matrix').or.(psb_tolower(fmt) /= 'coordinate')) then
write(psb_err_unit,*) 'READ_MATRIX: input file type not yet supported'
info=909
return
end if
do
read(infile,fmt='(a)') line
if (line(1:1) /= '%') exit
end do
read(line,fmt=*) nrow,ncol,nnzero
allocate(acoo, stat=ircode)
if (ircode /= 0) goto 993
if ((psb_tolower(type) == 'complex').and.(psb_tolower(sym) == 'general')) then
call acoo%allocate(nrow,ncol,nnzero)
do i=1,nnzero
read(infile,fmt=*,end=902) acoo%ia(i),acoo%ja(i),are,aim
acoo%val(i) = cmplx(are,aim,kind=psb_spk_)
end do
call acoo%set_nzeros(nnzero)
call acoo%fix(info)
call a%mv_from(acoo)
call a%cscnv(ircode,type='csr')
else if ((psb_tolower(type) == 'complex').and.(psb_tolower(sym) == 'symmetric')) then
! we are generally working with non-symmetric matrices, so
! we de-symmetrize what we are about to read
call acoo%allocate(nrow,ncol,2*nnzero)
do i=1,nnzero
read(infile,fmt=*,end=902,err=905) acoo%ia(i),acoo%ja(i),are,aim
acoo%val(i) = cmplx(are,aim,kind=psb_spk_)
end do
nzr = nnzero
do i=1,nnzero
if (acoo%ia(i) /= acoo%ja(i)) then
nzr = nzr + 1
acoo%val(nzr) = acoo%val(i)
acoo%ia(nzr) = acoo%ja(i)
acoo%ja(nzr) = acoo%ia(i)
end if
end do
call acoo%set_nzeros(nzr)
call acoo%fix(info)
call a%mv_from(acoo)
call a%cscnv(ircode,type='csr')
else if ((psb_tolower(type) == 'complex').and.(psb_tolower(sym) == 'hermitian')) then
! we are generally working with non-symmetric matrices, so
! we de-symmetrize what we are about to read
call acoo%allocate(nrow,ncol,2*nnzero)
do i=1,nnzero
read(infile,fmt=*,end=902,err=905) acoo%ia(i),acoo%ja(i),are,aim
acoo%val(i) = cmplx(are,aim,kind=psb_spk_)
end do
nzr = nnzero
do i=1,nnzero
if (acoo%ia(i) /= acoo%ja(i)) then
nzr = nzr + 1
acoo%val(nzr) = conjg(acoo%val(i))
acoo%ia(nzr) = acoo%ja(i)
acoo%ja(nzr) = acoo%ia(i)
end if
end do
call acoo%set_nzeros(nzr)
call acoo%fix(info)
call a%mv_from(acoo)
call a%cscnv(ircode,type='csr')
else
write(psb_err_unit,*) 'read_matrix: matrix type not yet supported'
info=904
end if
if (infile /= 5) close(infile)
return
! open failed
901 info=901
write(psb_err_unit,*) 'read_matrix: could not open file ',filename,' for input'
return
902 info=902
write(psb_err_unit,*) 'READ_MATRIX: Unexpected end of file '
return
905 info=905
write(psb_err_unit,*) 'READ_MATRIX: Error at line',i
return
993 info=993
write(psb_err_unit,*) 'READ_MATRIX: Memory allocation failure'
return
end subroutine lcmm_mat_read
subroutine lcmm_mat_write(a,mtitle,info,iunit,filename)
use psb_base_mod
implicit none
type(psb_lcspmat_type), intent(in) :: a
integer(psb_ipk_), intent(out) :: info
character(len=*), intent(in) :: mtitle
integer(psb_ipk_), optional, intent(in) :: iunit
character(len=*), optional, intent(in) :: filename
integer(psb_ipk_) :: iout
info = psb_success_
if (present(filename)) then
if (filename == '-') then
iout=6
else
if (present(iunit)) then
iout = iunit
else
iout=99
endif
open(iout,file=filename, err=901, action='WRITE')
endif
else
if (present(iunit)) then
iout = iunit
else
iout=6
endif
endif
call a%print(iout,head=mtitle)
if (iout /= 6) close(iout)
return
901 continue
info=901
write(psb_err_unit,*) 'Error while opening ',filename
return
end subroutine lcmm_mat_write

@ -338,3 +338,319 @@ subroutine psb_dmatdist(a_glob, a, ictxt, desc_a,&
return
end subroutine psb_dmatdist
subroutine psb_ldmatdist(a_glob, a, ictxt, desc_a,&
& info, parts, v, inroot,fmt,mold)
!
! an utility subroutine to distribute a matrix among processors
! according to a user defined data distribution, using
! sparse matrix subroutines.
!
! type(psb_dspmat) :: a_glob
! on entry: this contains the global sparse matrix as follows:
!
! type(psb_dspmat_type) :: a
! on exit : this will contain the local sparse matrix.
!
! interface parts
! ! .....user passed subroutine.....
! subroutine parts(global_indx,n,np,pv,nv)
! implicit none
! integer(psb_ipk_), intent(in) :: global_indx, n, np
! integer(psb_ipk_), intent(out) :: nv
! integer(psb_ipk_), intent(out) :: pv(*)
!
! end subroutine parts
! end interface
! on entry: subroutine providing user defined data distribution.
! for each global_indx the subroutine should return
! the list pv of all processes owning the row with
! that index; the list will contain nv entries.
! usually nv=1; if nv >1 then we have an overlap in the data
! distribution.
!
! integer(psb_ipk_) :: ictxt
! on entry: the PSBLAS parallel environment context.
!
! type (desc_type) :: desc_a
! on exit : the updated array descriptor
!
! integer(psb_ipk_), optional :: inroot
! on entry: specifies processor holding a_glob. default: 0
! on exit : unchanged.
!
use psb_base_mod
use psb_mat_mod
implicit none
! parameters
type(psb_ldspmat_type) :: a_glob
integer(psb_ipk_) :: ictxt
type(psb_dspmat_type) :: a
type(psb_desc_type) :: desc_a
integer(psb_ipk_), intent(out) :: info
integer(psb_ipk_), optional :: inroot
character(len=*), optional :: fmt
class(psb_d_base_sparse_mat), optional :: mold
procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:)
! local variables
logical :: use_parts, use_v
integer(psb_mpk_) :: np, iam, np_sharing, root, iproc
integer(psb_ipk_) :: err_act, il, inz
integer(psb_lpk_) :: k_count, liwork, nnzero, nrhs,&
& i, ll, nz, isize, nnr, err
integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig
integer(psb_ipk_), allocatable :: iwork(:), iwrk2(:)
integer(psb_lpk_), allocatable :: irow(:),icol(:)
real(psb_dpk_), allocatable :: val(:)
integer(psb_ipk_), parameter :: nb=30
real(psb_dpk_) :: t0, t1, t2, t3, t4, t5
character(len=20) :: name, ch_err
info = psb_success_
err = 0
name = 'psb_d_mat_dist'
call psb_erractionsave(err_act)
! executable statements
if (present(inroot)) then
root = inroot
else
root = psb_root_
end if
call psb_info(ictxt, iam, np)
if (iam == root) then
nrow = a_glob%get_nrows()
ncol = a_glob%get_ncols()
if (nrow /= ncol) then
write(psb_err_unit,*) 'a rectangular matrix ? ',nrow,ncol
info=-1
call psb_errpush(info,name)
goto 9999
endif
nnzero = a_glob%get_nzeros()
nrhs = 1
endif
use_parts = present(parts)
use_v = present(v)
if (count((/ use_parts, use_v /)) /= 1) then
info=psb_err_no_optional_arg_
call psb_errpush(info,name,a_err=" v, parts")
goto 9999
endif
! broadcast informations to other processors
call psb_bcast(ictxt,nrow, root)
call psb_bcast(ictxt,ncol, root)
call psb_bcast(ictxt,nnzero, root)
call psb_bcast(ictxt,nrhs, root)
liwork = max(np, nrow + ncol)
allocate(iwork(liwork), iwrk2(np),stat = info)
if (info /= psb_success_) then
info=psb_err_alloc_request_
call psb_errpush(info,name,l_err=(/liwork/),a_err='integer')
goto 9999
endif
if (iam == root) then
write (*, fmt = *) 'start matdist',root, size(iwork),&
&nrow, ncol, nnzero,nrhs
endif
if (use_parts) then
call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts)
else if (use_v) then
call psb_cdall(ictxt,desc_a,info,vg=v)
else
info = -1
end if
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_cdall'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
inz = ((nnzero+np-1)/np)
call psb_spall(a,desc_a,info,nnz=inz)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_spall'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
isize = 3*nb*max(((nnzero+nrow)/nrow),nb)
allocate(val(isize),irow(isize),icol(isize),stat=info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='Allocate'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
i_count = 1
do while (i_count <= nrow)
if (use_parts) then
call parts(i_count,nrow,np,iwork, np_sharing)
!
! np_sharing allows for overlap in the data distribution.
! If an index is overlapped, then we have to send its row
! to multiple processes. NOTE: we are assuming the output
! from PARTS is coherent, otherwise a deadlock is the most
! likely outcome.
!
j_count = i_count
if (np_sharing == 1) then
iproc = iwork(1)
do
j_count = j_count + 1
if (j_count-i_count >= nb) exit
if (j_count > nrow) exit
call parts(j_count,nrow,np,iwrk2, np_sharing)
if (np_sharing /= 1 ) exit
if (iwrk2(1) /= iproc ) exit
end do
end if
else
np_sharing = 1
j_count = i_count
iproc = v(i_count)
iwork(1:np_sharing) = iproc
do
j_count = j_count + 1
if (j_count-i_count >= nb) exit
if (j_count > nrow) exit
if (v(j_count) /= iproc ) exit
end do
end if
! now we should insert rows i_count..j_count-1
nnr = j_count - i_count
if (iam == root) then
ll = 0
do i= i_count, j_count-1
call a_glob%csget(i,i,nz,&
& irow,icol,val,info,nzin=ll,append=.true.)
if (info /= psb_success_) then
if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then
write(psb_err_unit,*) 'Allocation failure? This should not happen!'
end if
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
ll = ll + nz
end do
do k_count = 1, np_sharing
iproc = iwork(k_count)
if (iproc == iam) then
il = ll
call psb_spins(il,irow,icol,val,a,desc_a,info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_spins'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
else
call psb_snd(ictxt,nnr,iproc)
call psb_snd(ictxt,ll,iproc)
call psb_snd(ictxt,irow(1:ll),iproc)
call psb_snd(ictxt,icol(1:ll),iproc)
call psb_snd(ictxt,val(1:ll),iproc)
call psb_rcv(ictxt,ll,iproc)
endif
end do
else if (iam /= root) then
do k_count = 1, np_sharing
iproc = iwork(k_count)
if (iproc == iam) then
call psb_rcv(ictxt,nnr,root)
call psb_rcv(ictxt,ll,root)
if (ll > size(irow)) then
write(psb_err_unit,*) iam,'need to reallocate ',ll
deallocate(val,irow,icol)
allocate(val(ll),irow(ll),icol(ll),stat=info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='Allocate'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
endif
call psb_rcv(ictxt,irow(1:ll),root)
call psb_rcv(ictxt,icol(1:ll),root)
call psb_rcv(ictxt,val(1:ll),root)
call psb_snd(ictxt,ll,root)
il = ll
call psb_spins(il,irow,icol,val,a,desc_a,info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psspins'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
endif
end do
endif
i_count = j_count
end do
call psb_barrier(ictxt)
t0 = psb_wtime()
call psb_cdasb(desc_a,info)
t1 = psb_wtime()
if(info /= psb_success_)then
info=psb_err_from_subroutine_
ch_err='psb_cdasb'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
call psb_barrier(ictxt)
t2 = psb_wtime()
call psb_spasb(a,desc_a,info,dupl=psb_dupl_err_,afmt=fmt,mold=mold)
t3 = psb_wtime()
if(info /= psb_success_)then
info=psb_err_from_subroutine_
ch_err='psb_spasb'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
if (iam == root) then
write(psb_out_unit,*) 'descriptor assembly: ',t1-t0
write(psb_out_unit,*) 'sparse matrix assembly: ',t3-t2
end if
deallocate(val,irow,icol,iwork,iwrk2,stat=info)
if(info /= psb_success_)then
info=psb_err_from_subroutine_
ch_err='deallocate'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
if (iam == root) write (*, fmt = *) 'end matdist'
call psb_erractionrestore(err_act)
return
9999 call psb_error_handler(ictxt,err_act)
return
end subroutine psb_ldmatdist

@ -31,7 +31,8 @@
!
module psb_d_mat_dist_mod
use psb_base_mod, only : psb_ipk_, psb_dpk_, psb_desc_type, psb_parts, &
& psb_dspmat_type, psb_d_base_sparse_mat, psb_d_vect_type
& psb_dspmat_type, psb_d_base_sparse_mat, psb_d_vect_type, &
& psb_ldspmat_type
interface psb_matdist
subroutine psb_dmatdist(a_glob, a, ictxt, desc_a,&
@ -91,6 +92,64 @@ module psb_d_mat_dist_mod
procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:)
end subroutine psb_dmatdist
subroutine psb_ldmatdist(a_glob, a, ictxt, desc_a,&
& info, parts, v, inroot,fmt,mold)
!
! an utility subroutine to distribute a matrix among processors
! according to a user defined data distribution, using
! sparse matrix subroutines.
!
! type(psb_ldspmat) :: a_glob
! on entry: this contains the global sparse matrix as follows:
!
! type(psb_dspmat_type) :: a
! on exit : this will contain the local sparse matrix.
!
! interface parts
! ! .....user passed subroutine.....
! subroutine parts(global_indx,n,np,pv,nv)
! implicit none
! integer(psb_ipk_), intent(in) :: global_indx, n, np
! integer(psb_ipk_), intent(out) :: nv
! integer(psb_ipk_), intent(out) :: pv(*)
!
! end subroutine parts
! end interface
! on entry: subroutine providing user defined data distribution.
! for each global_indx the subroutine should return
! the list pv of all processes owning the row with
! that index; the list will contain nv entries.
! usually nv=1; if nv >1 then we have an overlap in the data
! distribution.
!
! integer(psb_ipk_) :: ictxt
! on entry: the PSBLAS parallel environment context.
!
! type (desc_type) :: desc_a
! on exit : the updated array descriptor
!
!
! integer(psb_ipk_), optional :: inroot
! on entry: specifies processor holding a_glob. default: 0
! on exit : unchanged.
!
import :: psb_ipk_, psb_dspmat_type, psb_dpk_, psb_desc_type,&
& psb_d_base_sparse_mat, psb_d_vect_type, psb_parts, &
& psb_ldspmat_type
implicit none
! parameters
type(psb_ldspmat_type) :: a_glob
integer(psb_ipk_) :: ictxt
type(psb_dspmat_type) :: a
type(psb_desc_type) :: desc_a
integer(psb_ipk_), intent(out) :: info
integer(psb_ipk_), optional :: inroot
character(len=*), optional :: fmt
class(psb_d_base_sparse_mat), optional :: mold
procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:)
end subroutine psb_ldmatdist
end interface
end module psb_d_mat_dist_mod

@ -452,3 +452,180 @@ subroutine dmm_mat_write(a,mtitle,info,iunit,filename)
write(psb_err_unit,*) 'Error while opening ',filename
return
end subroutine dmm_mat_write
subroutine ldmm_mat_read(a, info, iunit, filename)
use psb_base_mod
implicit none
type(psb_ldspmat_type), intent(out) :: a
integer(psb_ipk_), intent(out) :: info
integer(psb_ipk_), optional, intent(in) :: iunit
character(len=*), optional, intent(in) :: filename
character :: mmheader*15, fmt*15, object*10, type*10, sym*15
character(1024) :: line
integer(psb_lpk_) :: nrow, ncol, nnzero, i, nzr
integer(psb_ipk_) :: ircode, infile
type(psb_ld_coo_sparse_mat), allocatable :: acoo
info = psb_success_
if (present(filename)) then
if (filename == '-') then
infile=5
else
if (present(iunit)) then
infile=iunit
else
infile=99
endif
open(infile,file=filename, status='OLD', err=901, action='READ')
endif
else
if (present(iunit)) then
infile=iunit
else
infile=5
endif
endif
read(infile,fmt=*,end=902) mmheader, object, fmt, type, sym
if ( (psb_tolower(object) /= 'matrix').or.(psb_tolower(fmt) /= 'coordinate')) then
write(psb_err_unit,*) 'READ_MATRIX: input file type not yet supported'
info=909
return
end if
do
read(infile,fmt='(a)') line
if (line(1:1) /= '%') exit
end do
read(line,fmt=*) nrow,ncol,nnzero
allocate(acoo, stat=ircode)
if (ircode /= 0) goto 993
if ((psb_tolower(type) == 'real').and.(psb_tolower(sym) == 'general')) then
call acoo%allocate(nrow,ncol,nnzero)
do i=1,nnzero
read(infile,fmt=*,end=902,err=905) acoo%ia(i),acoo%ja(i),acoo%val(i)
end do
call acoo%set_nzeros(nnzero)
else if ((psb_tolower(type) == 'pattern').and.(psb_tolower(sym) == 'general')) then
call acoo%allocate(nrow,ncol,nnzero)
do i=1,nnzero
read(infile,fmt=*,end=902,err=905) acoo%ia(i),acoo%ja(i)
end do
acoo%val(:) = done
call acoo%set_nzeros(nnzero)
else if ((psb_tolower(type) == 'real').and.(psb_tolower(sym) == 'symmetric')) then
! we are generally working with non-symmetric matrices, so
! we de-symmetrize what we are about to read
call acoo%allocate(nrow,ncol,2*nnzero)
do i=1,nnzero
read(infile,fmt=*,end=902,err=905) acoo%ia(i),acoo%ja(i),acoo%val(i)
end do
nzr = nnzero
do i=1,nnzero
if (acoo%ia(i) /= acoo%ja(i)) then
nzr = nzr + 1
acoo%val(nzr) = acoo%val(i)
acoo%ia(nzr) = acoo%ja(i)
acoo%ja(nzr) = acoo%ia(i)
end if
end do
call acoo%set_nzeros(nzr)
else if ((psb_tolower(type) == 'pattern').and.(psb_tolower(sym) == 'symmetric')) then
call acoo%allocate(nrow,ncol,2*nnzero)
do i=1,nnzero
read(infile,fmt=*,end=902,err=905) acoo%ia(i),acoo%ja(i)
end do
acoo%val(:) = done
nzr = nnzero
do i=1,nnzero
if (acoo%ia(i) /= acoo%ja(i)) then
nzr = nzr + 1
acoo%ia(nzr) = acoo%ja(i)
acoo%ja(nzr) = acoo%ia(i)
end if
end do
call acoo%set_nzeros(nzr)
else
write(psb_err_unit,*) 'read_matrix: matrix type not yet supported'
info=904
end if
if (info == 0) then
call acoo%fix(info)
call a%mv_from(acoo)
call a%cscnv(ircode,type='csr')
end if
if (infile /= 5) close(infile)
return
! open failed
901 info=901
write(psb_err_unit,*) 'read_matrix: could not open file ',filename,' for input'
return
902 info=902
write(psb_err_unit,*) 'READ_MATRIX: Unexpected end of file '
return
905 info=905
write(psb_err_unit,*) 'READ_MATRIX: Error at line',i
return
993 info=993
write(psb_err_unit,*) 'READ_MATRIX: Memory allocation failure'
return
end subroutine ldmm_mat_read
subroutine ldmm_mat_write(a,mtitle,info,iunit,filename)
use psb_base_mod
implicit none
type(psb_ldspmat_type), intent(in) :: a
integer(psb_ipk_), intent(out) :: info
character(len=*), intent(in) :: mtitle
integer(psb_ipk_), optional, intent(in) :: iunit
character(len=*), optional, intent(in) :: filename
integer(psb_ipk_) :: iout
info = psb_success_
if (present(filename)) then
if (filename == '-') then
iout=6
else
if (present(iunit)) then
iout = iunit
else
iout=99
endif
open(iout,file=filename, err=901, action='WRITE')
endif
else
if (present(iunit)) then
iout = iunit
else
iout=6
endif
endif
call a%print(iout,head=mtitle)
if (iout /= 6) close(iout)
return
901 continue
info=901
write(psb_err_unit,*) 'Error while opening ',filename
return
end subroutine ldmm_mat_write

@ -34,7 +34,9 @@ module psb_mmio_mod
use psb_base_mod, only : psb_ipk_, psb_spk_, psb_dpk_,&
& psb_sspmat_type, psb_cspmat_type, &
& psb_dspmat_type, psb_zspmat_type
& psb_dspmat_type, psb_zspmat_type, &
& psb_lsspmat_type, psb_lcspmat_type, &
& psb_ldspmat_type, psb_lzspmat_type
public mm_mat_read, mm_mat_write, mm_array_read, mm_array_write
@ -270,6 +272,38 @@ module psb_mmio_mod
integer(psb_ipk_), optional, intent(in) :: iunit
character(len=*), optional, intent(in) :: filename
end subroutine zmm_mat_read
subroutine lsmm_mat_read(a, info, iunit, filename)
import :: psb_lsspmat_type, psb_ipk_
implicit none
type(psb_lsspmat_type), intent(out) :: a
integer(psb_ipk_), intent(out) :: info
integer(psb_ipk_), optional, intent(in) :: iunit
character(len=*), optional, intent(in) :: filename
end subroutine lsmm_mat_read
subroutine ldmm_mat_read(a, info, iunit, filename)
import :: psb_ldspmat_type, psb_ipk_
implicit none
type(psb_ldspmat_type), intent(out) :: a
integer(psb_ipk_), intent(out) :: info
integer(psb_ipk_), optional, intent(in) :: iunit
character(len=*), optional, intent(in) :: filename
end subroutine ldmm_mat_read
subroutine lcmm_mat_read(a, info, iunit, filename)
import :: psb_lcspmat_type, psb_ipk_
implicit none
type(psb_lcspmat_type), intent(out) :: a
integer(psb_ipk_), intent(out) :: info
integer(psb_ipk_), optional, intent(in) :: iunit
character(len=*), optional, intent(in) :: filename
end subroutine lcmm_mat_read
subroutine lzmm_mat_read(a, info, iunit, filename)
import :: psb_lzspmat_type, psb_ipk_
implicit none
type(psb_lzspmat_type), intent(out) :: a
integer(psb_ipk_), intent(out) :: info
integer(psb_ipk_), optional, intent(in) :: iunit
character(len=*), optional, intent(in) :: filename
end subroutine lzmm_mat_read
end interface
interface mm_mat_write
@ -309,6 +343,42 @@ module psb_mmio_mod
integer(psb_ipk_), optional, intent(in) :: iunit
character(len=*), optional, intent(in) :: filename
end subroutine zmm_mat_write
subroutine lsmm_mat_write(a,mtitle,info,iunit,filename)
import :: psb_lsspmat_type, psb_ipk_
implicit none
type(psb_lsspmat_type), intent(in) :: a
integer(psb_ipk_), intent(out) :: info
character(len=*), intent(in) :: mtitle
integer(psb_ipk_), optional, intent(in) :: iunit
character(len=*), optional, intent(in) :: filename
end subroutine lsmm_mat_write
subroutine ldmm_mat_write(a,mtitle,info,iunit,filename)
import :: psb_ldspmat_type, psb_ipk_
implicit none
type(psb_ldspmat_type), intent(in) :: a
integer(psb_ipk_), intent(out) :: info
character(len=*), intent(in) :: mtitle
integer(psb_ipk_), optional, intent(in) :: iunit
character(len=*), optional, intent(in) :: filename
end subroutine ldmm_mat_write
subroutine lcmm_mat_write(a,mtitle,info,iunit,filename)
import :: psb_lcspmat_type, psb_ipk_
implicit none
type(psb_lcspmat_type), intent(in) :: a
integer(psb_ipk_), intent(out) :: info
character(len=*), intent(in) :: mtitle
integer(psb_ipk_), optional, intent(in) :: iunit
character(len=*), optional, intent(in) :: filename
end subroutine lcmm_mat_write
subroutine lzmm_mat_write(a,mtitle,info,iunit,filename)
import :: psb_lzspmat_type, psb_ipk_
implicit none
type(psb_lzspmat_type), intent(in) :: a
integer(psb_ipk_), intent(out) :: info
character(len=*), intent(in) :: mtitle
integer(psb_ipk_), optional, intent(in) :: iunit
character(len=*), optional, intent(in) :: filename
end subroutine lzmm_mat_write
end interface
end module psb_mmio_mod

@ -338,3 +338,319 @@ subroutine psb_smatdist(a_glob, a, ictxt, desc_a,&
return
end subroutine psb_smatdist
subroutine psb_lsmatdist(a_glob, a, ictxt, desc_a,&
& info, parts, v, inroot,fmt,mold)
!
! an utility subroutine to distribute a matrix among processors
! according to a user defined data distribution, using
! sparse matrix subroutines.
!
! type(psb_sspmat) :: a_glob
! on entry: this contains the global sparse matrix as follows:
!
! type(psb_sspmat_type) :: a
! on exit : this will contain the local sparse matrix.
!
! interface parts
! ! .....user passed subroutine.....
! subroutine parts(global_indx,n,np,pv,nv)
! implicit none
! integer(psb_ipk_), intent(in) :: global_indx, n, np
! integer(psb_ipk_), intent(out) :: nv
! integer(psb_ipk_), intent(out) :: pv(*)
!
! end subroutine parts
! end interface
! on entry: subroutine providing user defined data distribution.
! for each global_indx the subroutine should return
! the list pv of all processes owning the row with
! that index; the list will contain nv entries.
! usually nv=1; if nv >1 then we have an overlap in the data
! distribution.
!
! integer(psb_ipk_) :: ictxt
! on entry: the PSBLAS parallel environment context.
!
! type (desc_type) :: desc_a
! on exit : the updated array descriptor
!
! integer(psb_ipk_), optional :: inroot
! on entry: specifies processor holding a_glob. default: 0
! on exit : unchanged.
!
use psb_base_mod
use psb_mat_mod
implicit none
! parameters
type(psb_lsspmat_type) :: a_glob
integer(psb_ipk_) :: ictxt
type(psb_sspmat_type) :: a
type(psb_desc_type) :: desc_a
integer(psb_ipk_), intent(out) :: info
integer(psb_ipk_), optional :: inroot
character(len=*), optional :: fmt
class(psb_s_base_sparse_mat), optional :: mold
procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:)
! local variables
logical :: use_parts, use_v
integer(psb_mpk_) :: np, iam, np_sharing, root, iproc
integer(psb_ipk_) :: err_act, il, inz
integer(psb_lpk_) :: k_count, liwork, nnzero, nrhs,&
& i, ll, nz, isize, nnr, err
integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig
integer(psb_ipk_), allocatable :: iwork(:), iwrk2(:)
integer(psb_lpk_), allocatable :: irow(:),icol(:)
real(psb_spk_), allocatable :: val(:)
integer(psb_ipk_), parameter :: nb=30
real(psb_dpk_) :: t0, t1, t2, t3, t4, t5
character(len=20) :: name, ch_err
info = psb_success_
err = 0
name = 'psb_s_mat_dist'
call psb_erractionsave(err_act)
! executable statements
if (present(inroot)) then
root = inroot
else
root = psb_root_
end if
call psb_info(ictxt, iam, np)
if (iam == root) then
nrow = a_glob%get_nrows()
ncol = a_glob%get_ncols()
if (nrow /= ncol) then
write(psb_err_unit,*) 'a rectangular matrix ? ',nrow,ncol
info=-1
call psb_errpush(info,name)
goto 9999
endif
nnzero = a_glob%get_nzeros()
nrhs = 1
endif
use_parts = present(parts)
use_v = present(v)
if (count((/ use_parts, use_v /)) /= 1) then
info=psb_err_no_optional_arg_
call psb_errpush(info,name,a_err=" v, parts")
goto 9999
endif
! broadcast informations to other processors
call psb_bcast(ictxt,nrow, root)
call psb_bcast(ictxt,ncol, root)
call psb_bcast(ictxt,nnzero, root)
call psb_bcast(ictxt,nrhs, root)
liwork = max(np, nrow + ncol)
allocate(iwork(liwork), iwrk2(np),stat = info)
if (info /= psb_success_) then
info=psb_err_alloc_request_
call psb_errpush(info,name,l_err=(/liwork/),a_err='integer')
goto 9999
endif
if (iam == root) then
write (*, fmt = *) 'start matdist',root, size(iwork),&
&nrow, ncol, nnzero,nrhs
endif
if (use_parts) then
call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts)
else if (use_v) then
call psb_cdall(ictxt,desc_a,info,vg=v)
else
info = -1
end if
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_cdall'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
inz = ((nnzero+np-1)/np)
call psb_spall(a,desc_a,info,nnz=inz)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_spall'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
isize = 3*nb*max(((nnzero+nrow)/nrow),nb)
allocate(val(isize),irow(isize),icol(isize),stat=info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='Allocate'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
i_count = 1
do while (i_count <= nrow)
if (use_parts) then
call parts(i_count,nrow,np,iwork, np_sharing)
!
! np_sharing allows for overlap in the data distribution.
! If an index is overlapped, then we have to send its row
! to multiple processes. NOTE: we are assuming the output
! from PARTS is coherent, otherwise a deadlock is the most
! likely outcome.
!
j_count = i_count
if (np_sharing == 1) then
iproc = iwork(1)
do
j_count = j_count + 1
if (j_count-i_count >= nb) exit
if (j_count > nrow) exit
call parts(j_count,nrow,np,iwrk2, np_sharing)
if (np_sharing /= 1 ) exit
if (iwrk2(1) /= iproc ) exit
end do
end if
else
np_sharing = 1
j_count = i_count
iproc = v(i_count)
iwork(1:np_sharing) = iproc
do
j_count = j_count + 1
if (j_count-i_count >= nb) exit
if (j_count > nrow) exit
if (v(j_count) /= iproc ) exit
end do
end if
! now we should insert rows i_count..j_count-1
nnr = j_count - i_count
if (iam == root) then
ll = 0
do i= i_count, j_count-1
call a_glob%csget(i,i,nz,&
& irow,icol,val,info,nzin=ll,append=.true.)
if (info /= psb_success_) then
if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then
write(psb_err_unit,*) 'Allocation failure? This should not happen!'
end if
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
ll = ll + nz
end do
do k_count = 1, np_sharing
iproc = iwork(k_count)
if (iproc == iam) then
il = ll
call psb_spins(il,irow,icol,val,a,desc_a,info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_spins'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
else
call psb_snd(ictxt,nnr,iproc)
call psb_snd(ictxt,ll,iproc)
call psb_snd(ictxt,irow(1:ll),iproc)
call psb_snd(ictxt,icol(1:ll),iproc)
call psb_snd(ictxt,val(1:ll),iproc)
call psb_rcv(ictxt,ll,iproc)
endif
end do
else if (iam /= root) then
do k_count = 1, np_sharing
iproc = iwork(k_count)
if (iproc == iam) then
call psb_rcv(ictxt,nnr,root)
call psb_rcv(ictxt,ll,root)
if (ll > size(irow)) then
write(psb_err_unit,*) iam,'need to reallocate ',ll
deallocate(val,irow,icol)
allocate(val(ll),irow(ll),icol(ll),stat=info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='Allocate'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
endif
call psb_rcv(ictxt,irow(1:ll),root)
call psb_rcv(ictxt,icol(1:ll),root)
call psb_rcv(ictxt,val(1:ll),root)
call psb_snd(ictxt,ll,root)
il = ll
call psb_spins(il,irow,icol,val,a,desc_a,info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psspins'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
endif
end do
endif
i_count = j_count
end do
call psb_barrier(ictxt)
t0 = psb_wtime()
call psb_cdasb(desc_a,info)
t1 = psb_wtime()
if(info /= psb_success_)then
info=psb_err_from_subroutine_
ch_err='psb_cdasb'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
call psb_barrier(ictxt)
t2 = psb_wtime()
call psb_spasb(a,desc_a,info,dupl=psb_dupl_err_,afmt=fmt,mold=mold)
t3 = psb_wtime()
if(info /= psb_success_)then
info=psb_err_from_subroutine_
ch_err='psb_spasb'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
if (iam == root) then
write(psb_out_unit,*) 'descriptor assembly: ',t1-t0
write(psb_out_unit,*) 'sparse matrix assembly: ',t3-t2
end if
deallocate(val,irow,icol,iwork,iwrk2,stat=info)
if(info /= psb_success_)then
info=psb_err_from_subroutine_
ch_err='deallocate'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
if (iam == root) write (*, fmt = *) 'end matdist'
call psb_erractionrestore(err_act)
return
9999 call psb_error_handler(ictxt,err_act)
return
end subroutine psb_lsmatdist

@ -31,7 +31,8 @@
!
module psb_s_mat_dist_mod
use psb_base_mod, only : psb_ipk_, psb_spk_, psb_desc_type, psb_parts, &
& psb_sspmat_type, psb_s_base_sparse_mat, psb_s_vect_type
& psb_sspmat_type, psb_s_base_sparse_mat, psb_s_vect_type, &
& psb_lsspmat_type
interface psb_matdist
subroutine psb_smatdist(a_glob, a, ictxt, desc_a,&
@ -91,6 +92,64 @@ module psb_s_mat_dist_mod
procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:)
end subroutine psb_smatdist
subroutine psb_lsmatdist(a_glob, a, ictxt, desc_a,&
& info, parts, v, inroot,fmt,mold)
!
! an utility subroutine to distribute a matrix among processors
! according to a user defined data distribution, using
! sparse matrix subroutines.
!
! type(psb_lsspmat) :: a_glob
! on entry: this contains the global sparse matrix as follows:
!
! type(psb_sspmat_type) :: a
! on exit : this will contain the local sparse matrix.
!
! interface parts
! ! .....user passed subroutine.....
! subroutine parts(global_indx,n,np,pv,nv)
! implicit none
! integer(psb_ipk_), intent(in) :: global_indx, n, np
! integer(psb_ipk_), intent(out) :: nv
! integer(psb_ipk_), intent(out) :: pv(*)
!
! end subroutine parts
! end interface
! on entry: subroutine providing user defined data distribution.
! for each global_indx the subroutine should return
! the list pv of all processes owning the row with
! that index; the list will contain nv entries.
! usually nv=1; if nv >1 then we have an overlap in the data
! distribution.
!
! integer(psb_ipk_) :: ictxt
! on entry: the PSBLAS parallel environment context.
!
! type (desc_type) :: desc_a
! on exit : the updated array descriptor
!
!
! integer(psb_ipk_), optional :: inroot
! on entry: specifies processor holding a_glob. default: 0
! on exit : unchanged.
!
import :: psb_ipk_, psb_sspmat_type, psb_spk_, psb_desc_type,&
& psb_s_base_sparse_mat, psb_s_vect_type, psb_parts, &
& psb_lsspmat_type
implicit none
! parameters
type(psb_lsspmat_type) :: a_glob
integer(psb_ipk_) :: ictxt
type(psb_sspmat_type) :: a
type(psb_desc_type) :: desc_a
integer(psb_ipk_), intent(out) :: info
integer(psb_ipk_), optional :: inroot
character(len=*), optional :: fmt
class(psb_s_base_sparse_mat), optional :: mold
procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:)
end subroutine psb_lsmatdist
end interface
end module psb_s_mat_dist_mod

@ -434,3 +434,156 @@ subroutine smm_mat_write(a,mtitle,info,iunit,filename)
write(psb_err_unit,*) 'Error while opening ',filename
return
end subroutine smm_mat_write
subroutine lsmm_mat_read(a, info, iunit, filename)
use psb_base_mod
implicit none
type(psb_lsspmat_type), intent(out) :: a
integer(psb_ipk_), intent(out) :: info
integer(psb_ipk_), optional, intent(in) :: iunit
character(len=*), optional, intent(in) :: filename
character :: mmheader*15, fmt*15, object*10, type*10, sym*15
character(1024) :: line
integer(psb_lpk_) :: nrow, ncol, nnzero, i,nzr
integer(psb_ipk_) :: ircode,infile
type(psb_ls_coo_sparse_mat), allocatable :: acoo
info = psb_success_
if (present(filename)) then
if (filename == '-') then
infile=5
else
if (present(iunit)) then
infile=iunit
else
infile=99
endif
open(infile,file=filename, status='OLD', err=901, action='READ')
endif
else
if (present(iunit)) then
infile=iunit
else
infile=5
endif
endif
read(infile,fmt=*,end=902) mmheader, object, fmt, type, sym
if ( (psb_tolower(object) /= 'matrix').or.(psb_tolower(fmt) /= 'coordinate')) then
write(psb_err_unit,*) 'READ_MATRIX: input file type not yet supported'
info=909
return
end if
do
read(infile,fmt='(a)') line
if (line(1:1) /= '%') exit
end do
read(line,fmt=*) nrow,ncol,nnzero
allocate(acoo, stat=ircode)
if (ircode /= 0) goto 993
if ((psb_tolower(type) == 'real').and.(psb_tolower(sym) == 'general')) then
call acoo%allocate(nrow,ncol,nnzero)
do i=1,nnzero
read(infile,fmt=*,end=902,err=905) acoo%ia(i),acoo%ja(i),acoo%val(i)
end do
call acoo%set_nzeros(nnzero)
call acoo%fix(info)
call a%mv_from(acoo)
call a%cscnv(ircode,type='csr')
else if ((psb_tolower(type) == 'real').and.(psb_tolower(sym) == 'symmetric')) then
! we are generally working with non-symmetric matrices, so
! we de-symmetrize what we are about to read
call acoo%allocate(nrow,ncol,2*nnzero)
do i=1,nnzero
read(infile,fmt=*,end=902,err=905) acoo%ia(i),acoo%ja(i),acoo%val(i)
end do
nzr = nnzero
do i=1,nnzero
if (acoo%ia(i) /= acoo%ja(i)) then
nzr = nzr + 1
acoo%val(nzr) = acoo%val(i)
acoo%ia(nzr) = acoo%ja(i)
acoo%ja(nzr) = acoo%ia(i)
end if
end do
call acoo%set_nzeros(nzr)
call acoo%fix(info)
call a%mv_from(acoo)
call a%cscnv(ircode,type='csr')
else
write(psb_err_unit,*) 'read_matrix: matrix type not yet supported'
info=904
end if
if (infile /= 5) close(infile)
return
! open failed
901 info=901
write(psb_err_unit,*) 'read_matrix: could not open file ',filename,' for input'
return
902 info=902
write(psb_err_unit,*) 'READ_MATRIX: Unexpected end of file '
return
905 info=905
write(psb_err_unit,*) 'READ_MATRIX: Error at line',i
return
993 info=993
write(psb_err_unit,*) 'READ_MATRIX: Memory allocation failure'
return
end subroutine lsmm_mat_read
subroutine lsmm_mat_write(a,mtitle,info,iunit,filename)
use psb_base_mod
implicit none
type(psb_lsspmat_type), intent(in) :: a
integer(psb_ipk_), intent(out) :: info
character(len=*), intent(in) :: mtitle
integer(psb_ipk_), optional, intent(in) :: iunit
character(len=*), optional, intent(in) :: filename
integer(psb_ipk_) :: iout
info = psb_success_
if (present(filename)) then
if (filename == '-') then
iout=6
else
if (present(iunit)) then
iout = iunit
else
iout=99
endif
open(iout,file=filename, err=901, action='WRITE')
endif
else
if (present(iunit)) then
iout = iunit
else
iout=6
endif
endif
call a%print(iout,head=mtitle)
if (iout /= 6) close(iout)
return
901 continue
info=901
write(psb_err_unit,*) 'Error while opening ',filename
return
end subroutine lsmm_mat_write

@ -338,3 +338,319 @@ subroutine psb_zmatdist(a_glob, a, ictxt, desc_a,&
return
end subroutine psb_zmatdist
subroutine psb_lzmatdist(a_glob, a, ictxt, desc_a,&
& info, parts, v, inroot,fmt,mold)
!
! an utility subroutine to distribute a matrix among processors
! according to a user defined data distribution, using
! sparse matrix subroutines.
!
! type(psb_zspmat) :: a_glob
! on entry: this contains the global sparse matrix as follows:
!
! type(psb_zspmat_type) :: a
! on exit : this will contain the local sparse matrix.
!
! interface parts
! ! .....user passed subroutine.....
! subroutine parts(global_indx,n,np,pv,nv)
! implicit none
! integer(psb_ipk_), intent(in) :: global_indx, n, np
! integer(psb_ipk_), intent(out) :: nv
! integer(psb_ipk_), intent(out) :: pv(*)
!
! end subroutine parts
! end interface
! on entry: subroutine providing user defined data distribution.
! for each global_indx the subroutine should return
! the list pv of all processes owning the row with
! that index; the list will contain nv entries.
! usually nv=1; if nv >1 then we have an overlap in the data
! distribution.
!
! integer(psb_ipk_) :: ictxt
! on entry: the PSBLAS parallel environment context.
!
! type (desc_type) :: desc_a
! on exit : the updated array descriptor
!
! integer(psb_ipk_), optional :: inroot
! on entry: specifies processor holding a_glob. default: 0
! on exit : unchanged.
!
use psb_base_mod
use psb_mat_mod
implicit none
! parameters
type(psb_lzspmat_type) :: a_glob
integer(psb_ipk_) :: ictxt
type(psb_zspmat_type) :: a
type(psb_desc_type) :: desc_a
integer(psb_ipk_), intent(out) :: info
integer(psb_ipk_), optional :: inroot
character(len=*), optional :: fmt
class(psb_z_base_sparse_mat), optional :: mold
procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:)
! local variables
logical :: use_parts, use_v
integer(psb_mpk_) :: np, iam, np_sharing, root, iproc
integer(psb_ipk_) :: err_act, il, inz
integer(psb_lpk_) :: k_count, liwork, nnzero, nrhs,&
& i, ll, nz, isize, nnr, err
integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig
integer(psb_ipk_), allocatable :: iwork(:), iwrk2(:)
integer(psb_lpk_), allocatable :: irow(:),icol(:)
complex(psb_dpk_), allocatable :: val(:)
integer(psb_ipk_), parameter :: nb=30
real(psb_dpk_) :: t0, t1, t2, t3, t4, t5
character(len=20) :: name, ch_err
info = psb_success_
err = 0
name = 'psb_z_mat_dist'
call psb_erractionsave(err_act)
! executable statements
if (present(inroot)) then
root = inroot
else
root = psb_root_
end if
call psb_info(ictxt, iam, np)
if (iam == root) then
nrow = a_glob%get_nrows()
ncol = a_glob%get_ncols()
if (nrow /= ncol) then
write(psb_err_unit,*) 'a rectangular matrix ? ',nrow,ncol
info=-1
call psb_errpush(info,name)
goto 9999
endif
nnzero = a_glob%get_nzeros()
nrhs = 1
endif
use_parts = present(parts)
use_v = present(v)
if (count((/ use_parts, use_v /)) /= 1) then
info=psb_err_no_optional_arg_
call psb_errpush(info,name,a_err=" v, parts")
goto 9999
endif
! broadcast informations to other processors
call psb_bcast(ictxt,nrow, root)
call psb_bcast(ictxt,ncol, root)
call psb_bcast(ictxt,nnzero, root)
call psb_bcast(ictxt,nrhs, root)
liwork = max(np, nrow + ncol)
allocate(iwork(liwork), iwrk2(np),stat = info)
if (info /= psb_success_) then
info=psb_err_alloc_request_
call psb_errpush(info,name,l_err=(/liwork/),a_err='integer')
goto 9999
endif
if (iam == root) then
write (*, fmt = *) 'start matdist',root, size(iwork),&
&nrow, ncol, nnzero,nrhs
endif
if (use_parts) then
call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts)
else if (use_v) then
call psb_cdall(ictxt,desc_a,info,vg=v)
else
info = -1
end if
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_cdall'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
inz = ((nnzero+np-1)/np)
call psb_spall(a,desc_a,info,nnz=inz)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_spall'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
isize = 3*nb*max(((nnzero+nrow)/nrow),nb)
allocate(val(isize),irow(isize),icol(isize),stat=info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='Allocate'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
i_count = 1
do while (i_count <= nrow)
if (use_parts) then
call parts(i_count,nrow,np,iwork, np_sharing)
!
! np_sharing allows for overlap in the data distribution.
! If an index is overlapped, then we have to send its row
! to multiple processes. NOTE: we are assuming the output
! from PARTS is coherent, otherwise a deadlock is the most
! likely outcome.
!
j_count = i_count
if (np_sharing == 1) then
iproc = iwork(1)
do
j_count = j_count + 1
if (j_count-i_count >= nb) exit
if (j_count > nrow) exit
call parts(j_count,nrow,np,iwrk2, np_sharing)
if (np_sharing /= 1 ) exit
if (iwrk2(1) /= iproc ) exit
end do
end if
else
np_sharing = 1
j_count = i_count
iproc = v(i_count)
iwork(1:np_sharing) = iproc
do
j_count = j_count + 1
if (j_count-i_count >= nb) exit
if (j_count > nrow) exit
if (v(j_count) /= iproc ) exit
end do
end if
! now we should insert rows i_count..j_count-1
nnr = j_count - i_count
if (iam == root) then
ll = 0
do i= i_count, j_count-1
call a_glob%csget(i,i,nz,&
& irow,icol,val,info,nzin=ll,append=.true.)
if (info /= psb_success_) then
if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then
write(psb_err_unit,*) 'Allocation failure? This should not happen!'
end if
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
ll = ll + nz
end do
do k_count = 1, np_sharing
iproc = iwork(k_count)
if (iproc == iam) then
il = ll
call psb_spins(il,irow,icol,val,a,desc_a,info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psb_spins'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
else
call psb_snd(ictxt,nnr,iproc)
call psb_snd(ictxt,ll,iproc)
call psb_snd(ictxt,irow(1:ll),iproc)
call psb_snd(ictxt,icol(1:ll),iproc)
call psb_snd(ictxt,val(1:ll),iproc)
call psb_rcv(ictxt,ll,iproc)
endif
end do
else if (iam /= root) then
do k_count = 1, np_sharing
iproc = iwork(k_count)
if (iproc == iam) then
call psb_rcv(ictxt,nnr,root)
call psb_rcv(ictxt,ll,root)
if (ll > size(irow)) then
write(psb_err_unit,*) iam,'need to reallocate ',ll
deallocate(val,irow,icol)
allocate(val(ll),irow(ll),icol(ll),stat=info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='Allocate'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
endif
call psb_rcv(ictxt,irow(1:ll),root)
call psb_rcv(ictxt,icol(1:ll),root)
call psb_rcv(ictxt,val(1:ll),root)
call psb_snd(ictxt,ll,root)
il = ll
call psb_spins(il,irow,icol,val,a,desc_a,info)
if(info /= psb_success_) then
info=psb_err_from_subroutine_
ch_err='psspins'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
endif
end do
endif
i_count = j_count
end do
call psb_barrier(ictxt)
t0 = psb_wtime()
call psb_cdasb(desc_a,info)
t1 = psb_wtime()
if(info /= psb_success_)then
info=psb_err_from_subroutine_
ch_err='psb_cdasb'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
call psb_barrier(ictxt)
t2 = psb_wtime()
call psb_spasb(a,desc_a,info,dupl=psb_dupl_err_,afmt=fmt,mold=mold)
t3 = psb_wtime()
if(info /= psb_success_)then
info=psb_err_from_subroutine_
ch_err='psb_spasb'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
if (iam == root) then
write(psb_out_unit,*) 'descriptor assembly: ',t1-t0
write(psb_out_unit,*) 'sparse matrix assembly: ',t3-t2
end if
deallocate(val,irow,icol,iwork,iwrk2,stat=info)
if(info /= psb_success_)then
info=psb_err_from_subroutine_
ch_err='deallocate'
call psb_errpush(info,name,a_err=ch_err)
goto 9999
end if
if (iam == root) write (*, fmt = *) 'end matdist'
call psb_erractionrestore(err_act)
return
9999 call psb_error_handler(ictxt,err_act)
return
end subroutine psb_lzmatdist

@ -31,7 +31,8 @@
!
module psb_z_mat_dist_mod
use psb_base_mod, only : psb_ipk_, psb_dpk_, psb_desc_type, psb_parts, &
& psb_zspmat_type, psb_z_base_sparse_mat, psb_z_vect_type
& psb_zspmat_type, psb_z_base_sparse_mat, psb_z_vect_type, &
& psb_lzspmat_type
interface psb_matdist
subroutine psb_zmatdist(a_glob, a, ictxt, desc_a,&
@ -91,6 +92,64 @@ module psb_z_mat_dist_mod
procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:)
end subroutine psb_zmatdist
subroutine psb_lzmatdist(a_glob, a, ictxt, desc_a,&
& info, parts, v, inroot,fmt,mold)
!
! an utility subroutine to distribute a matrix among processors
! according to a user defined data distribution, using
! sparse matrix subroutines.
!
! type(psb_lzspmat) :: a_glob
! on entry: this contains the global sparse matrix as follows:
!
! type(psb_zspmat_type) :: a
! on exit : this will contain the local sparse matrix.
!
! interface parts
! ! .....user passed subroutine.....
! subroutine parts(global_indx,n,np,pv,nv)
! implicit none
! integer(psb_ipk_), intent(in) :: global_indx, n, np
! integer(psb_ipk_), intent(out) :: nv
! integer(psb_ipk_), intent(out) :: pv(*)
!
! end subroutine parts
! end interface
! on entry: subroutine providing user defined data distribution.
! for each global_indx the subroutine should return
! the list pv of all processes owning the row with
! that index; the list will contain nv entries.
! usually nv=1; if nv >1 then we have an overlap in the data
! distribution.
!
! integer(psb_ipk_) :: ictxt
! on entry: the PSBLAS parallel environment context.
!
! type (desc_type) :: desc_a
! on exit : the updated array descriptor
!
!
! integer(psb_ipk_), optional :: inroot
! on entry: specifies processor holding a_glob. default: 0
! on exit : unchanged.
!
import :: psb_ipk_, psb_zspmat_type, psb_dpk_, psb_desc_type,&
& psb_z_base_sparse_mat, psb_z_vect_type, psb_parts, &
& psb_lzspmat_type
implicit none
! parameters
type(psb_lzspmat_type) :: a_glob
integer(psb_ipk_) :: ictxt
type(psb_zspmat_type) :: a
type(psb_desc_type) :: desc_a
integer(psb_ipk_), intent(out) :: info
integer(psb_ipk_), optional :: inroot
character(len=*), optional :: fmt
class(psb_z_base_sparse_mat), optional :: mold
procedure(psb_parts), optional :: parts
integer(psb_ipk_), optional :: v(:)
end subroutine psb_lzmatdist
end interface
end module psb_z_mat_dist_mod

@ -462,3 +462,178 @@ subroutine zmm_mat_write(a,mtitle,info,iunit,filename)
end subroutine zmm_mat_write
subroutine lzmm_mat_read(a, info, iunit, filename)
use psb_base_mod
implicit none
type(psb_lzspmat_type), intent(out) :: a
integer(psb_ipk_), intent(out) :: info
integer(psb_ipk_), optional, intent(in) :: iunit
character(len=*), optional, intent(in) :: filename
character :: mmheader*15, fmt*15, object*10, type*10, sym*15
character(1024) :: line
integer(psb_lpk_) :: nrow, ncol, nnzero, i,nzr
integer(psb_ipk_) :: ircode,infile
type(psb_lz_coo_sparse_mat), allocatable :: acoo
real(psb_dpk_) :: are, aim
info = psb_success_
if (present(filename)) then
if (filename == '-') then
infile=5
else
if (present(iunit)) then
infile=iunit
else
infile=99
endif
open(infile,file=filename, status='OLD', err=901, action='READ')
endif
else
if (present(iunit)) then
infile=iunit
else
infile=5
endif
endif
read(infile,fmt=*,end=902) mmheader, object, fmt, type, sym
if ( (psb_tolower(object) /= 'matrix').or.(psb_tolower(fmt) /= 'coordinate')) then
write(psb_err_unit,*) 'READ_MATRIX: input file type not yet supported'
info=909
return
end if
do
read(infile,fmt='(a)') line
if (line(1:1) /= '%') exit
end do
read(line,fmt=*) nrow,ncol,nnzero
allocate(acoo, stat=ircode)
if (ircode /= 0) goto 993
if ((psb_tolower(type) == 'complex').and.(psb_tolower(sym) == 'general')) then
call acoo%allocate(nrow,ncol,nnzero)
do i=1,nnzero
read(infile,fmt=*,end=902,err=905) acoo%ia(i),acoo%ja(i),are,aim
acoo%val(i) = cmplx(are,aim,kind=psb_dpk_)
end do
call acoo%set_nzeros(nnzero)
call acoo%fix(info)
call a%mv_from(acoo)
call a%cscnv(ircode,type='csr')
else if ((psb_tolower(type) == 'complex').and.(psb_tolower(sym) == 'symmetric')) then
! we are generally working with non-symmetric matrices, so
! we de-symmetrize what we are about to read
call acoo%allocate(nrow,ncol,2*nnzero)
do i=1,nnzero
read(infile,fmt=*,end=902,err=905) acoo%ia(i),acoo%ja(i),are,aim
acoo%val(i) = cmplx(are,aim,kind=psb_dpk_)
end do
nzr = nnzero
do i=1,nnzero
if (acoo%ia(i) /= acoo%ja(i)) then
nzr = nzr + 1
acoo%val(nzr) = acoo%val(i)
acoo%ia(nzr) = acoo%ja(i)
acoo%ja(nzr) = acoo%ia(i)
end if
end do
call acoo%set_nzeros(nzr)
call acoo%fix(info)
call a%mv_from(acoo)
call a%cscnv(ircode,type='csr')
else if ((psb_tolower(type) == 'complex').and.(psb_tolower(sym) == 'hermitian')) then
! we are generally working with non-symmetric matrices, so
! we de-symmetrize what we are about to read
call acoo%allocate(nrow,ncol,2*nnzero)
do i=1,nnzero
read(infile,fmt=*,end=902) acoo%ia(i),acoo%ja(i),are,aim
acoo%val(i) = cmplx(are,aim,kind=psb_dpk_)
end do
nzr = nnzero
do i=1,nnzero
if (acoo%ia(i) /= acoo%ja(i)) then
nzr = nzr + 1
acoo%val(nzr) = conjg(acoo%val(i))
acoo%ia(nzr) = acoo%ja(i)
acoo%ja(nzr) = acoo%ia(i)
end if
end do
call acoo%set_nzeros(nzr)
call acoo%fix(info)
call a%mv_from(acoo)
call a%cscnv(ircode,type='csr')
else
write(psb_err_unit,*) 'read_matrix: matrix type not yet supported'
info=904
end if
if (infile /= 5) close(infile)
return
! open failed
901 info=901
write(psb_err_unit,*) 'read_matrix: could not open file ',filename,' for input'
return
902 info=902
write(psb_err_unit,*) 'READ_MATRIX: Unexpected end of file '
return
905 info=905
write(psb_err_unit,*) 'READ_MATRIX: Error at line',i
return
993 info=993
write(psb_err_unit,*) 'READ_MATRIX: Memory allocation failure'
return
end subroutine lzmm_mat_read
subroutine lzmm_mat_write(a,mtitle,info,iunit,filename)
use psb_base_mod
implicit none
type(psb_lzspmat_type), intent(in) :: a
integer(psb_ipk_), intent(out) :: info
character(len=*), intent(in) :: mtitle
integer(psb_ipk_), optional, intent(in) :: iunit
character(len=*), optional, intent(in) :: filename
integer(psb_ipk_) :: iout
info = psb_success_
if (present(filename)) then
if (filename == '-') then
iout=6
else
if (present(iunit)) then
iout = iunit
else
iout=99
endif
open(iout,file=filename, err=901, action='WRITE')
endif
else
if (present(iunit)) then
iout = iunit
else
iout=6
endif
endif
call a%print(iout,head=mtitle)
if (iout /= 6) close(iout)
return
901 continue
info=901
write(psb_err_unit,*) 'Error while opening ',filename
return
end subroutine lzmm_mat_write

Loading…
Cancel
Save