diff --git a/util/psb_c_mat_dist_impl.f90 b/util/psb_c_mat_dist_impl.f90 index 9eeafdbea..a0a1d6eb0 100644 --- a/util/psb_c_mat_dist_impl.f90 +++ b/util/psb_c_mat_dist_impl.f90 @@ -338,3 +338,319 @@ subroutine psb_cmatdist(a_glob, a, ictxt, desc_a,& return end subroutine psb_cmatdist + + + +subroutine psb_lcmatdist(a_glob, a, ictxt, desc_a,& + & info, parts, v, inroot,fmt,mold) + ! + ! an utility subroutine to distribute a matrix among processors + ! according to a user defined data distribution, using + ! sparse matrix subroutines. + ! + ! type(psb_cspmat) :: a_glob + ! on entry: this contains the global sparse matrix as follows: + ! + ! type(psb_cspmat_type) :: a + ! on exit : this will contain the local sparse matrix. + ! + ! interface parts + ! ! .....user passed subroutine..... + ! subroutine parts(global_indx,n,np,pv,nv) + ! implicit none + ! integer(psb_ipk_), intent(in) :: global_indx, n, np + ! integer(psb_ipk_), intent(out) :: nv + ! integer(psb_ipk_), intent(out) :: pv(*) + ! + ! end subroutine parts + ! end interface + ! on entry: subroutine providing user defined data distribution. + ! for each global_indx the subroutine should return + ! the list pv of all processes owning the row with + ! that index; the list will contain nv entries. + ! usually nv=1; if nv >1 then we have an overlap in the data + ! distribution. + ! + ! integer(psb_ipk_) :: ictxt + ! on entry: the PSBLAS parallel environment context. + ! + ! type (desc_type) :: desc_a + ! on exit : the updated array descriptor + ! + ! integer(psb_ipk_), optional :: inroot + ! on entry: specifies processor holding a_glob. default: 0 + ! on exit : unchanged. + ! + use psb_base_mod + use psb_mat_mod + implicit none + + ! parameters + type(psb_lcspmat_type) :: a_glob + integer(psb_ipk_) :: ictxt + type(psb_cspmat_type) :: a + type(psb_desc_type) :: desc_a + integer(psb_ipk_), intent(out) :: info + integer(psb_ipk_), optional :: inroot + character(len=*), optional :: fmt + class(psb_c_base_sparse_mat), optional :: mold + procedure(psb_parts), optional :: parts + integer(psb_ipk_), optional :: v(:) + + ! local variables + logical :: use_parts, use_v + integer(psb_mpk_) :: np, iam, np_sharing, root, iproc + integer(psb_ipk_) :: err_act, il, inz + integer(psb_lpk_) :: k_count, liwork, nnzero, nrhs,& + & i, ll, nz, isize, nnr, err + integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig + integer(psb_ipk_), allocatable :: iwork(:), iwrk2(:) + integer(psb_lpk_), allocatable :: irow(:),icol(:) + complex(psb_spk_), allocatable :: val(:) + integer(psb_ipk_), parameter :: nb=30 + real(psb_dpk_) :: t0, t1, t2, t3, t4, t5 + character(len=20) :: name, ch_err + + info = psb_success_ + err = 0 + name = 'psb_c_mat_dist' + call psb_erractionsave(err_act) + + ! executable statements + if (present(inroot)) then + root = inroot + else + root = psb_root_ + end if + call psb_info(ictxt, iam, np) + if (iam == root) then + nrow = a_glob%get_nrows() + ncol = a_glob%get_ncols() + if (nrow /= ncol) then + write(psb_err_unit,*) 'a rectangular matrix ? ',nrow,ncol + info=-1 + call psb_errpush(info,name) + goto 9999 + endif + nnzero = a_glob%get_nzeros() + nrhs = 1 + endif + + use_parts = present(parts) + use_v = present(v) + if (count((/ use_parts, use_v /)) /= 1) then + info=psb_err_no_optional_arg_ + call psb_errpush(info,name,a_err=" v, parts") + goto 9999 + endif + + ! broadcast informations to other processors + call psb_bcast(ictxt,nrow, root) + call psb_bcast(ictxt,ncol, root) + call psb_bcast(ictxt,nnzero, root) + call psb_bcast(ictxt,nrhs, root) + liwork = max(np, nrow + ncol) + allocate(iwork(liwork), iwrk2(np),stat = info) + if (info /= psb_success_) then + info=psb_err_alloc_request_ + call psb_errpush(info,name,l_err=(/liwork/),a_err='integer') + goto 9999 + endif + if (iam == root) then + write (*, fmt = *) 'start matdist',root, size(iwork),& + &nrow, ncol, nnzero,nrhs + endif + if (use_parts) then + call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts) + else if (use_v) then + call psb_cdall(ictxt,desc_a,info,vg=v) + else + info = -1 + end if + if(info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='psb_cdall' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + inz = ((nnzero+np-1)/np) + call psb_spall(a,desc_a,info,nnz=inz) + if(info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='psb_spall' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + isize = 3*nb*max(((nnzero+nrow)/nrow),nb) + allocate(val(isize),irow(isize),icol(isize),stat=info) + if(info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='Allocate' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + i_count = 1 + + do while (i_count <= nrow) + + if (use_parts) then + call parts(i_count,nrow,np,iwork, np_sharing) + ! + ! np_sharing allows for overlap in the data distribution. + ! If an index is overlapped, then we have to send its row + ! to multiple processes. NOTE: we are assuming the output + ! from PARTS is coherent, otherwise a deadlock is the most + ! likely outcome. + ! + j_count = i_count + if (np_sharing == 1) then + iproc = iwork(1) + do + j_count = j_count + 1 + if (j_count-i_count >= nb) exit + if (j_count > nrow) exit + call parts(j_count,nrow,np,iwrk2, np_sharing) + if (np_sharing /= 1 ) exit + if (iwrk2(1) /= iproc ) exit + end do + end if + else + np_sharing = 1 + j_count = i_count + iproc = v(i_count) + iwork(1:np_sharing) = iproc + do + j_count = j_count + 1 + if (j_count-i_count >= nb) exit + if (j_count > nrow) exit + if (v(j_count) /= iproc ) exit + end do + end if + + ! now we should insert rows i_count..j_count-1 + nnr = j_count - i_count + + if (iam == root) then + + ll = 0 + do i= i_count, j_count-1 + call a_glob%csget(i,i,nz,& + & irow,icol,val,info,nzin=ll,append=.true.) + if (info /= psb_success_) then + if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then + write(psb_err_unit,*) 'Allocation failure? This should not happen!' + end if + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + ll = ll + nz + end do + + do k_count = 1, np_sharing + iproc = iwork(k_count) + + if (iproc == iam) then + il = ll + call psb_spins(il,irow,icol,val,a,desc_a,info) + if(info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='psb_spins' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + else + call psb_snd(ictxt,nnr,iproc) + call psb_snd(ictxt,ll,iproc) + call psb_snd(ictxt,irow(1:ll),iproc) + call psb_snd(ictxt,icol(1:ll),iproc) + call psb_snd(ictxt,val(1:ll),iproc) + call psb_rcv(ictxt,ll,iproc) + endif + end do + else if (iam /= root) then + + do k_count = 1, np_sharing + iproc = iwork(k_count) + if (iproc == iam) then + call psb_rcv(ictxt,nnr,root) + call psb_rcv(ictxt,ll,root) + if (ll > size(irow)) then + write(psb_err_unit,*) iam,'need to reallocate ',ll + deallocate(val,irow,icol) + allocate(val(ll),irow(ll),icol(ll),stat=info) + if(info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='Allocate' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + endif + call psb_rcv(ictxt,irow(1:ll),root) + call psb_rcv(ictxt,icol(1:ll),root) + call psb_rcv(ictxt,val(1:ll),root) + call psb_snd(ictxt,ll,root) + il = ll + call psb_spins(il,irow,icol,val,a,desc_a,info) + if(info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='psspins' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + endif + end do + endif + i_count = j_count + end do + + call psb_barrier(ictxt) + t0 = psb_wtime() + call psb_cdasb(desc_a,info) + t1 = psb_wtime() + if(info /= psb_success_)then + info=psb_err_from_subroutine_ + ch_err='psb_cdasb' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + call psb_barrier(ictxt) + t2 = psb_wtime() + call psb_spasb(a,desc_a,info,dupl=psb_dupl_err_,afmt=fmt,mold=mold) + t3 = psb_wtime() + if(info /= psb_success_)then + info=psb_err_from_subroutine_ + ch_err='psb_spasb' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + + if (iam == root) then + write(psb_out_unit,*) 'descriptor assembly: ',t1-t0 + write(psb_out_unit,*) 'sparse matrix assembly: ',t3-t2 + end if + + + + deallocate(val,irow,icol,iwork,iwrk2,stat=info) + if(info /= psb_success_)then + info=psb_err_from_subroutine_ + ch_err='deallocate' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + if (iam == root) write (*, fmt = *) 'end matdist' + + call psb_erractionrestore(err_act) + return + +9999 call psb_error_handler(ictxt,err_act) + + return + +end subroutine psb_lcmatdist diff --git a/util/psb_c_mat_dist_mod.f90 b/util/psb_c_mat_dist_mod.f90 index 7ce6abd93..422f4e963 100644 --- a/util/psb_c_mat_dist_mod.f90 +++ b/util/psb_c_mat_dist_mod.f90 @@ -31,7 +31,8 @@ ! module psb_c_mat_dist_mod use psb_base_mod, only : psb_ipk_, psb_spk_, psb_desc_type, psb_parts, & - & psb_cspmat_type, psb_c_base_sparse_mat, psb_c_vect_type + & psb_cspmat_type, psb_c_base_sparse_mat, psb_c_vect_type, & + & psb_lcspmat_type interface psb_matdist subroutine psb_cmatdist(a_glob, a, ictxt, desc_a,& @@ -91,6 +92,64 @@ module psb_c_mat_dist_mod procedure(psb_parts), optional :: parts integer(psb_ipk_), optional :: v(:) end subroutine psb_cmatdist + subroutine psb_lcmatdist(a_glob, a, ictxt, desc_a,& + & info, parts, v, inroot,fmt,mold) + ! + ! an utility subroutine to distribute a matrix among processors + ! according to a user defined data distribution, using + ! sparse matrix subroutines. + ! + ! type(psb_lcspmat) :: a_glob + ! on entry: this contains the global sparse matrix as follows: + ! + ! type(psb_cspmat_type) :: a + ! on exit : this will contain the local sparse matrix. + ! + ! interface parts + ! ! .....user passed subroutine..... + ! subroutine parts(global_indx,n,np,pv,nv) + ! implicit none + ! integer(psb_ipk_), intent(in) :: global_indx, n, np + ! integer(psb_ipk_), intent(out) :: nv + ! integer(psb_ipk_), intent(out) :: pv(*) + ! + ! end subroutine parts + ! end interface + ! on entry: subroutine providing user defined data distribution. + ! for each global_indx the subroutine should return + ! the list pv of all processes owning the row with + ! that index; the list will contain nv entries. + ! usually nv=1; if nv >1 then we have an overlap in the data + ! distribution. + ! + ! integer(psb_ipk_) :: ictxt + ! on entry: the PSBLAS parallel environment context. + ! + ! type (desc_type) :: desc_a + ! on exit : the updated array descriptor + ! + ! + ! integer(psb_ipk_), optional :: inroot + ! on entry: specifies processor holding a_glob. default: 0 + ! on exit : unchanged. + ! + import :: psb_ipk_, psb_cspmat_type, psb_spk_, psb_desc_type,& + & psb_c_base_sparse_mat, psb_c_vect_type, psb_parts, & + & psb_lcspmat_type + implicit none + + ! parameters + type(psb_lcspmat_type) :: a_glob + integer(psb_ipk_) :: ictxt + type(psb_cspmat_type) :: a + type(psb_desc_type) :: desc_a + integer(psb_ipk_), intent(out) :: info + integer(psb_ipk_), optional :: inroot + character(len=*), optional :: fmt + class(psb_c_base_sparse_mat), optional :: mold + procedure(psb_parts), optional :: parts + integer(psb_ipk_), optional :: v(:) + end subroutine psb_lcmatdist end interface end module psb_c_mat_dist_mod diff --git a/util/psb_c_mmio_impl.f90 b/util/psb_c_mmio_impl.f90 index e2f0ac60f..a07c8e752 100644 --- a/util/psb_c_mmio_impl.f90 +++ b/util/psb_c_mmio_impl.f90 @@ -461,3 +461,179 @@ subroutine cmm_mat_write(a,mtitle,info,iunit,filename) write(psb_err_unit,*) 'Error while opening ',filename return end subroutine cmm_mat_write + +subroutine lcmm_mat_read(a, info, iunit, filename) + use psb_base_mod + implicit none + type(psb_lcspmat_type), intent(out) :: a + integer(psb_ipk_), intent(out) :: info + integer(psb_ipk_), optional, intent(in) :: iunit + character(len=*), optional, intent(in) :: filename + character :: mmheader*15, fmt*15, object*10, type*10, sym*15 + character(1024) :: line + integer(psb_lpk_) :: nrow, ncol, nnzero, i,nzr + integer(psb_ipk_) :: ircode,infile + type(psb_lc_coo_sparse_mat), allocatable :: acoo + real(psb_spk_) :: are, aim + info = psb_success_ + + if (present(filename)) then + if (filename == '-') then + infile=5 + else + if (present(iunit)) then + infile=iunit + else + infile=99 + endif + open(infile,file=filename, status='OLD', err=901, action='READ') + endif + else + if (present(iunit)) then + infile=iunit + else + infile=5 + endif + endif + + read(infile,fmt=*,end=902) mmheader, object, fmt, type, sym + + if ( (psb_tolower(object) /= 'matrix').or.(psb_tolower(fmt) /= 'coordinate')) then + write(psb_err_unit,*) 'READ_MATRIX: input file type not yet supported' + info=909 + return + end if + + do + read(infile,fmt='(a)') line + if (line(1:1) /= '%') exit + end do + read(line,fmt=*) nrow,ncol,nnzero + + allocate(acoo, stat=ircode) + if (ircode /= 0) goto 993 + if ((psb_tolower(type) == 'complex').and.(psb_tolower(sym) == 'general')) then + call acoo%allocate(nrow,ncol,nnzero) + do i=1,nnzero + read(infile,fmt=*,end=902) acoo%ia(i),acoo%ja(i),are,aim + acoo%val(i) = cmplx(are,aim,kind=psb_spk_) + end do + call acoo%set_nzeros(nnzero) + call acoo%fix(info) + + call a%mv_from(acoo) + call a%cscnv(ircode,type='csr') + + else if ((psb_tolower(type) == 'complex').and.(psb_tolower(sym) == 'symmetric')) then + ! we are generally working with non-symmetric matrices, so + ! we de-symmetrize what we are about to read + call acoo%allocate(nrow,ncol,2*nnzero) + do i=1,nnzero + read(infile,fmt=*,end=902,err=905) acoo%ia(i),acoo%ja(i),are,aim + acoo%val(i) = cmplx(are,aim,kind=psb_spk_) + end do + nzr = nnzero + do i=1,nnzero + if (acoo%ia(i) /= acoo%ja(i)) then + nzr = nzr + 1 + acoo%val(nzr) = acoo%val(i) + acoo%ia(nzr) = acoo%ja(i) + acoo%ja(nzr) = acoo%ia(i) + end if + end do + call acoo%set_nzeros(nzr) + call acoo%fix(info) + + call a%mv_from(acoo) + call a%cscnv(ircode,type='csr') + + else if ((psb_tolower(type) == 'complex').and.(psb_tolower(sym) == 'hermitian')) then + ! we are generally working with non-symmetric matrices, so + ! we de-symmetrize what we are about to read + call acoo%allocate(nrow,ncol,2*nnzero) + do i=1,nnzero + read(infile,fmt=*,end=902,err=905) acoo%ia(i),acoo%ja(i),are,aim + acoo%val(i) = cmplx(are,aim,kind=psb_spk_) + end do + nzr = nnzero + do i=1,nnzero + if (acoo%ia(i) /= acoo%ja(i)) then + nzr = nzr + 1 + acoo%val(nzr) = conjg(acoo%val(i)) + acoo%ia(nzr) = acoo%ja(i) + acoo%ja(nzr) = acoo%ia(i) + end if + end do + call acoo%set_nzeros(nzr) + call acoo%fix(info) + + call a%mv_from(acoo) + call a%cscnv(ircode,type='csr') + + else + write(psb_err_unit,*) 'read_matrix: matrix type not yet supported' + info=904 + end if + if (infile /= 5) close(infile) + return + + ! open failed +901 info=901 + write(psb_err_unit,*) 'read_matrix: could not open file ',filename,' for input' + return +902 info=902 + write(psb_err_unit,*) 'READ_MATRIX: Unexpected end of file ' + return +905 info=905 + write(psb_err_unit,*) 'READ_MATRIX: Error at line',i + return +993 info=993 + write(psb_err_unit,*) 'READ_MATRIX: Memory allocation failure' + return +end subroutine lcmm_mat_read + + +subroutine lcmm_mat_write(a,mtitle,info,iunit,filename) + use psb_base_mod + implicit none + type(psb_lcspmat_type), intent(in) :: a + integer(psb_ipk_), intent(out) :: info + character(len=*), intent(in) :: mtitle + integer(psb_ipk_), optional, intent(in) :: iunit + character(len=*), optional, intent(in) :: filename + integer(psb_ipk_) :: iout + + + info = psb_success_ + + if (present(filename)) then + if (filename == '-') then + iout=6 + else + if (present(iunit)) then + iout = iunit + else + iout=99 + endif + open(iout,file=filename, err=901, action='WRITE') + endif + else + if (present(iunit)) then + iout = iunit + else + iout=6 + endif + endif + + call a%print(iout,head=mtitle) + + if (iout /= 6) close(iout) + + + return + +901 continue + info=901 + write(psb_err_unit,*) 'Error while opening ',filename + return +end subroutine lcmm_mat_write diff --git a/util/psb_d_mat_dist_impl.f90 b/util/psb_d_mat_dist_impl.f90 index 73434f9ad..9047026b2 100644 --- a/util/psb_d_mat_dist_impl.f90 +++ b/util/psb_d_mat_dist_impl.f90 @@ -338,3 +338,319 @@ subroutine psb_dmatdist(a_glob, a, ictxt, desc_a,& return end subroutine psb_dmatdist + + + +subroutine psb_ldmatdist(a_glob, a, ictxt, desc_a,& + & info, parts, v, inroot,fmt,mold) + ! + ! an utility subroutine to distribute a matrix among processors + ! according to a user defined data distribution, using + ! sparse matrix subroutines. + ! + ! type(psb_dspmat) :: a_glob + ! on entry: this contains the global sparse matrix as follows: + ! + ! type(psb_dspmat_type) :: a + ! on exit : this will contain the local sparse matrix. + ! + ! interface parts + ! ! .....user passed subroutine..... + ! subroutine parts(global_indx,n,np,pv,nv) + ! implicit none + ! integer(psb_ipk_), intent(in) :: global_indx, n, np + ! integer(psb_ipk_), intent(out) :: nv + ! integer(psb_ipk_), intent(out) :: pv(*) + ! + ! end subroutine parts + ! end interface + ! on entry: subroutine providing user defined data distribution. + ! for each global_indx the subroutine should return + ! the list pv of all processes owning the row with + ! that index; the list will contain nv entries. + ! usually nv=1; if nv >1 then we have an overlap in the data + ! distribution. + ! + ! integer(psb_ipk_) :: ictxt + ! on entry: the PSBLAS parallel environment context. + ! + ! type (desc_type) :: desc_a + ! on exit : the updated array descriptor + ! + ! integer(psb_ipk_), optional :: inroot + ! on entry: specifies processor holding a_glob. default: 0 + ! on exit : unchanged. + ! + use psb_base_mod + use psb_mat_mod + implicit none + + ! parameters + type(psb_ldspmat_type) :: a_glob + integer(psb_ipk_) :: ictxt + type(psb_dspmat_type) :: a + type(psb_desc_type) :: desc_a + integer(psb_ipk_), intent(out) :: info + integer(psb_ipk_), optional :: inroot + character(len=*), optional :: fmt + class(psb_d_base_sparse_mat), optional :: mold + procedure(psb_parts), optional :: parts + integer(psb_ipk_), optional :: v(:) + + ! local variables + logical :: use_parts, use_v + integer(psb_mpk_) :: np, iam, np_sharing, root, iproc + integer(psb_ipk_) :: err_act, il, inz + integer(psb_lpk_) :: k_count, liwork, nnzero, nrhs,& + & i, ll, nz, isize, nnr, err + integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig + integer(psb_ipk_), allocatable :: iwork(:), iwrk2(:) + integer(psb_lpk_), allocatable :: irow(:),icol(:) + real(psb_dpk_), allocatable :: val(:) + integer(psb_ipk_), parameter :: nb=30 + real(psb_dpk_) :: t0, t1, t2, t3, t4, t5 + character(len=20) :: name, ch_err + + info = psb_success_ + err = 0 + name = 'psb_d_mat_dist' + call psb_erractionsave(err_act) + + ! executable statements + if (present(inroot)) then + root = inroot + else + root = psb_root_ + end if + call psb_info(ictxt, iam, np) + if (iam == root) then + nrow = a_glob%get_nrows() + ncol = a_glob%get_ncols() + if (nrow /= ncol) then + write(psb_err_unit,*) 'a rectangular matrix ? ',nrow,ncol + info=-1 + call psb_errpush(info,name) + goto 9999 + endif + nnzero = a_glob%get_nzeros() + nrhs = 1 + endif + + use_parts = present(parts) + use_v = present(v) + if (count((/ use_parts, use_v /)) /= 1) then + info=psb_err_no_optional_arg_ + call psb_errpush(info,name,a_err=" v, parts") + goto 9999 + endif + + ! broadcast informations to other processors + call psb_bcast(ictxt,nrow, root) + call psb_bcast(ictxt,ncol, root) + call psb_bcast(ictxt,nnzero, root) + call psb_bcast(ictxt,nrhs, root) + liwork = max(np, nrow + ncol) + allocate(iwork(liwork), iwrk2(np),stat = info) + if (info /= psb_success_) then + info=psb_err_alloc_request_ + call psb_errpush(info,name,l_err=(/liwork/),a_err='integer') + goto 9999 + endif + if (iam == root) then + write (*, fmt = *) 'start matdist',root, size(iwork),& + &nrow, ncol, nnzero,nrhs + endif + if (use_parts) then + call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts) + else if (use_v) then + call psb_cdall(ictxt,desc_a,info,vg=v) + else + info = -1 + end if + if(info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='psb_cdall' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + inz = ((nnzero+np-1)/np) + call psb_spall(a,desc_a,info,nnz=inz) + if(info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='psb_spall' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + isize = 3*nb*max(((nnzero+nrow)/nrow),nb) + allocate(val(isize),irow(isize),icol(isize),stat=info) + if(info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='Allocate' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + i_count = 1 + + do while (i_count <= nrow) + + if (use_parts) then + call parts(i_count,nrow,np,iwork, np_sharing) + ! + ! np_sharing allows for overlap in the data distribution. + ! If an index is overlapped, then we have to send its row + ! to multiple processes. NOTE: we are assuming the output + ! from PARTS is coherent, otherwise a deadlock is the most + ! likely outcome. + ! + j_count = i_count + if (np_sharing == 1) then + iproc = iwork(1) + do + j_count = j_count + 1 + if (j_count-i_count >= nb) exit + if (j_count > nrow) exit + call parts(j_count,nrow,np,iwrk2, np_sharing) + if (np_sharing /= 1 ) exit + if (iwrk2(1) /= iproc ) exit + end do + end if + else + np_sharing = 1 + j_count = i_count + iproc = v(i_count) + iwork(1:np_sharing) = iproc + do + j_count = j_count + 1 + if (j_count-i_count >= nb) exit + if (j_count > nrow) exit + if (v(j_count) /= iproc ) exit + end do + end if + + ! now we should insert rows i_count..j_count-1 + nnr = j_count - i_count + + if (iam == root) then + + ll = 0 + do i= i_count, j_count-1 + call a_glob%csget(i,i,nz,& + & irow,icol,val,info,nzin=ll,append=.true.) + if (info /= psb_success_) then + if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then + write(psb_err_unit,*) 'Allocation failure? This should not happen!' + end if + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + ll = ll + nz + end do + + do k_count = 1, np_sharing + iproc = iwork(k_count) + + if (iproc == iam) then + il = ll + call psb_spins(il,irow,icol,val,a,desc_a,info) + if(info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='psb_spins' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + else + call psb_snd(ictxt,nnr,iproc) + call psb_snd(ictxt,ll,iproc) + call psb_snd(ictxt,irow(1:ll),iproc) + call psb_snd(ictxt,icol(1:ll),iproc) + call psb_snd(ictxt,val(1:ll),iproc) + call psb_rcv(ictxt,ll,iproc) + endif + end do + else if (iam /= root) then + + do k_count = 1, np_sharing + iproc = iwork(k_count) + if (iproc == iam) then + call psb_rcv(ictxt,nnr,root) + call psb_rcv(ictxt,ll,root) + if (ll > size(irow)) then + write(psb_err_unit,*) iam,'need to reallocate ',ll + deallocate(val,irow,icol) + allocate(val(ll),irow(ll),icol(ll),stat=info) + if(info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='Allocate' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + endif + call psb_rcv(ictxt,irow(1:ll),root) + call psb_rcv(ictxt,icol(1:ll),root) + call psb_rcv(ictxt,val(1:ll),root) + call psb_snd(ictxt,ll,root) + il = ll + call psb_spins(il,irow,icol,val,a,desc_a,info) + if(info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='psspins' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + endif + end do + endif + i_count = j_count + end do + + call psb_barrier(ictxt) + t0 = psb_wtime() + call psb_cdasb(desc_a,info) + t1 = psb_wtime() + if(info /= psb_success_)then + info=psb_err_from_subroutine_ + ch_err='psb_cdasb' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + call psb_barrier(ictxt) + t2 = psb_wtime() + call psb_spasb(a,desc_a,info,dupl=psb_dupl_err_,afmt=fmt,mold=mold) + t3 = psb_wtime() + if(info /= psb_success_)then + info=psb_err_from_subroutine_ + ch_err='psb_spasb' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + + if (iam == root) then + write(psb_out_unit,*) 'descriptor assembly: ',t1-t0 + write(psb_out_unit,*) 'sparse matrix assembly: ',t3-t2 + end if + + + + deallocate(val,irow,icol,iwork,iwrk2,stat=info) + if(info /= psb_success_)then + info=psb_err_from_subroutine_ + ch_err='deallocate' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + if (iam == root) write (*, fmt = *) 'end matdist' + + call psb_erractionrestore(err_act) + return + +9999 call psb_error_handler(ictxt,err_act) + + return + +end subroutine psb_ldmatdist diff --git a/util/psb_d_mat_dist_mod.f90 b/util/psb_d_mat_dist_mod.f90 index 6632600c2..beb7f113d 100644 --- a/util/psb_d_mat_dist_mod.f90 +++ b/util/psb_d_mat_dist_mod.f90 @@ -31,7 +31,8 @@ ! module psb_d_mat_dist_mod use psb_base_mod, only : psb_ipk_, psb_dpk_, psb_desc_type, psb_parts, & - & psb_dspmat_type, psb_d_base_sparse_mat, psb_d_vect_type + & psb_dspmat_type, psb_d_base_sparse_mat, psb_d_vect_type, & + & psb_ldspmat_type interface psb_matdist subroutine psb_dmatdist(a_glob, a, ictxt, desc_a,& @@ -91,6 +92,64 @@ module psb_d_mat_dist_mod procedure(psb_parts), optional :: parts integer(psb_ipk_), optional :: v(:) end subroutine psb_dmatdist + subroutine psb_ldmatdist(a_glob, a, ictxt, desc_a,& + & info, parts, v, inroot,fmt,mold) + ! + ! an utility subroutine to distribute a matrix among processors + ! according to a user defined data distribution, using + ! sparse matrix subroutines. + ! + ! type(psb_ldspmat) :: a_glob + ! on entry: this contains the global sparse matrix as follows: + ! + ! type(psb_dspmat_type) :: a + ! on exit : this will contain the local sparse matrix. + ! + ! interface parts + ! ! .....user passed subroutine..... + ! subroutine parts(global_indx,n,np,pv,nv) + ! implicit none + ! integer(psb_ipk_), intent(in) :: global_indx, n, np + ! integer(psb_ipk_), intent(out) :: nv + ! integer(psb_ipk_), intent(out) :: pv(*) + ! + ! end subroutine parts + ! end interface + ! on entry: subroutine providing user defined data distribution. + ! for each global_indx the subroutine should return + ! the list pv of all processes owning the row with + ! that index; the list will contain nv entries. + ! usually nv=1; if nv >1 then we have an overlap in the data + ! distribution. + ! + ! integer(psb_ipk_) :: ictxt + ! on entry: the PSBLAS parallel environment context. + ! + ! type (desc_type) :: desc_a + ! on exit : the updated array descriptor + ! + ! + ! integer(psb_ipk_), optional :: inroot + ! on entry: specifies processor holding a_glob. default: 0 + ! on exit : unchanged. + ! + import :: psb_ipk_, psb_dspmat_type, psb_dpk_, psb_desc_type,& + & psb_d_base_sparse_mat, psb_d_vect_type, psb_parts, & + & psb_ldspmat_type + implicit none + + ! parameters + type(psb_ldspmat_type) :: a_glob + integer(psb_ipk_) :: ictxt + type(psb_dspmat_type) :: a + type(psb_desc_type) :: desc_a + integer(psb_ipk_), intent(out) :: info + integer(psb_ipk_), optional :: inroot + character(len=*), optional :: fmt + class(psb_d_base_sparse_mat), optional :: mold + procedure(psb_parts), optional :: parts + integer(psb_ipk_), optional :: v(:) + end subroutine psb_ldmatdist end interface end module psb_d_mat_dist_mod diff --git a/util/psb_d_mmio_impl.f90 b/util/psb_d_mmio_impl.f90 index 7026f68d0..506ed1e9f 100644 --- a/util/psb_d_mmio_impl.f90 +++ b/util/psb_d_mmio_impl.f90 @@ -452,3 +452,180 @@ subroutine dmm_mat_write(a,mtitle,info,iunit,filename) write(psb_err_unit,*) 'Error while opening ',filename return end subroutine dmm_mat_write + + +subroutine ldmm_mat_read(a, info, iunit, filename) + use psb_base_mod + implicit none + type(psb_ldspmat_type), intent(out) :: a + integer(psb_ipk_), intent(out) :: info + integer(psb_ipk_), optional, intent(in) :: iunit + character(len=*), optional, intent(in) :: filename + character :: mmheader*15, fmt*15, object*10, type*10, sym*15 + character(1024) :: line + integer(psb_lpk_) :: nrow, ncol, nnzero, i, nzr + integer(psb_ipk_) :: ircode, infile + type(psb_ld_coo_sparse_mat), allocatable :: acoo + + info = psb_success_ + + if (present(filename)) then + if (filename == '-') then + infile=5 + else + if (present(iunit)) then + infile=iunit + else + infile=99 + endif + open(infile,file=filename, status='OLD', err=901, action='READ') + endif + else + if (present(iunit)) then + infile=iunit + else + infile=5 + endif + endif + + read(infile,fmt=*,end=902) mmheader, object, fmt, type, sym + + if ( (psb_tolower(object) /= 'matrix').or.(psb_tolower(fmt) /= 'coordinate')) then + write(psb_err_unit,*) 'READ_MATRIX: input file type not yet supported' + info=909 + return + end if + + do + read(infile,fmt='(a)') line + if (line(1:1) /= '%') exit + end do + read(line,fmt=*) nrow,ncol,nnzero + + allocate(acoo, stat=ircode) + if (ircode /= 0) goto 993 + if ((psb_tolower(type) == 'real').and.(psb_tolower(sym) == 'general')) then + call acoo%allocate(nrow,ncol,nnzero) + do i=1,nnzero + read(infile,fmt=*,end=902,err=905) acoo%ia(i),acoo%ja(i),acoo%val(i) + end do + call acoo%set_nzeros(nnzero) + + else if ((psb_tolower(type) == 'pattern').and.(psb_tolower(sym) == 'general')) then + call acoo%allocate(nrow,ncol,nnzero) + do i=1,nnzero + read(infile,fmt=*,end=902,err=905) acoo%ia(i),acoo%ja(i) + end do + acoo%val(:) = done + call acoo%set_nzeros(nnzero) + + else if ((psb_tolower(type) == 'real').and.(psb_tolower(sym) == 'symmetric')) then + ! we are generally working with non-symmetric matrices, so + ! we de-symmetrize what we are about to read + call acoo%allocate(nrow,ncol,2*nnzero) + do i=1,nnzero + read(infile,fmt=*,end=902,err=905) acoo%ia(i),acoo%ja(i),acoo%val(i) + end do + nzr = nnzero + do i=1,nnzero + if (acoo%ia(i) /= acoo%ja(i)) then + nzr = nzr + 1 + acoo%val(nzr) = acoo%val(i) + acoo%ia(nzr) = acoo%ja(i) + acoo%ja(nzr) = acoo%ia(i) + end if + end do + call acoo%set_nzeros(nzr) + + else if ((psb_tolower(type) == 'pattern').and.(psb_tolower(sym) == 'symmetric')) then + call acoo%allocate(nrow,ncol,2*nnzero) + do i=1,nnzero + read(infile,fmt=*,end=902,err=905) acoo%ia(i),acoo%ja(i) + end do + acoo%val(:) = done + nzr = nnzero + do i=1,nnzero + if (acoo%ia(i) /= acoo%ja(i)) then + nzr = nzr + 1 + acoo%ia(nzr) = acoo%ja(i) + acoo%ja(nzr) = acoo%ia(i) + end if + end do + call acoo%set_nzeros(nzr) + + else + write(psb_err_unit,*) 'read_matrix: matrix type not yet supported' + info=904 + end if + + if (info == 0) then + call acoo%fix(info) + call a%mv_from(acoo) + call a%cscnv(ircode,type='csr') + end if + + if (infile /= 5) close(infile) + return + + ! open failed +901 info=901 + write(psb_err_unit,*) 'read_matrix: could not open file ',filename,' for input' + return +902 info=902 + write(psb_err_unit,*) 'READ_MATRIX: Unexpected end of file ' + return +905 info=905 + write(psb_err_unit,*) 'READ_MATRIX: Error at line',i + return +993 info=993 + write(psb_err_unit,*) 'READ_MATRIX: Memory allocation failure' + return +end subroutine ldmm_mat_read + + +subroutine ldmm_mat_write(a,mtitle,info,iunit,filename) + use psb_base_mod + implicit none + type(psb_ldspmat_type), intent(in) :: a + integer(psb_ipk_), intent(out) :: info + character(len=*), intent(in) :: mtitle + integer(psb_ipk_), optional, intent(in) :: iunit + character(len=*), optional, intent(in) :: filename + integer(psb_ipk_) :: iout + + + info = psb_success_ + + if (present(filename)) then + if (filename == '-') then + iout=6 + else + if (present(iunit)) then + iout = iunit + else + iout=99 + endif + open(iout,file=filename, err=901, action='WRITE') + endif + else + if (present(iunit)) then + iout = iunit + else + iout=6 + endif + endif + + call a%print(iout,head=mtitle) + + if (iout /= 6) close(iout) + + + return + +901 continue + info=901 + write(psb_err_unit,*) 'Error while opening ',filename + return +end subroutine ldmm_mat_write + + diff --git a/util/psb_mmio_mod.F90 b/util/psb_mmio_mod.F90 index fe7f0f9ee..8a294b98d 100644 --- a/util/psb_mmio_mod.F90 +++ b/util/psb_mmio_mod.F90 @@ -34,7 +34,9 @@ module psb_mmio_mod use psb_base_mod, only : psb_ipk_, psb_spk_, psb_dpk_,& & psb_sspmat_type, psb_cspmat_type, & - & psb_dspmat_type, psb_zspmat_type + & psb_dspmat_type, psb_zspmat_type, & + & psb_lsspmat_type, psb_lcspmat_type, & + & psb_ldspmat_type, psb_lzspmat_type public mm_mat_read, mm_mat_write, mm_array_read, mm_array_write @@ -270,6 +272,38 @@ module psb_mmio_mod integer(psb_ipk_), optional, intent(in) :: iunit character(len=*), optional, intent(in) :: filename end subroutine zmm_mat_read + subroutine lsmm_mat_read(a, info, iunit, filename) + import :: psb_lsspmat_type, psb_ipk_ + implicit none + type(psb_lsspmat_type), intent(out) :: a + integer(psb_ipk_), intent(out) :: info + integer(psb_ipk_), optional, intent(in) :: iunit + character(len=*), optional, intent(in) :: filename + end subroutine lsmm_mat_read + subroutine ldmm_mat_read(a, info, iunit, filename) + import :: psb_ldspmat_type, psb_ipk_ + implicit none + type(psb_ldspmat_type), intent(out) :: a + integer(psb_ipk_), intent(out) :: info + integer(psb_ipk_), optional, intent(in) :: iunit + character(len=*), optional, intent(in) :: filename + end subroutine ldmm_mat_read + subroutine lcmm_mat_read(a, info, iunit, filename) + import :: psb_lcspmat_type, psb_ipk_ + implicit none + type(psb_lcspmat_type), intent(out) :: a + integer(psb_ipk_), intent(out) :: info + integer(psb_ipk_), optional, intent(in) :: iunit + character(len=*), optional, intent(in) :: filename + end subroutine lcmm_mat_read + subroutine lzmm_mat_read(a, info, iunit, filename) + import :: psb_lzspmat_type, psb_ipk_ + implicit none + type(psb_lzspmat_type), intent(out) :: a + integer(psb_ipk_), intent(out) :: info + integer(psb_ipk_), optional, intent(in) :: iunit + character(len=*), optional, intent(in) :: filename + end subroutine lzmm_mat_read end interface interface mm_mat_write @@ -309,6 +343,42 @@ module psb_mmio_mod integer(psb_ipk_), optional, intent(in) :: iunit character(len=*), optional, intent(in) :: filename end subroutine zmm_mat_write + subroutine lsmm_mat_write(a,mtitle,info,iunit,filename) + import :: psb_lsspmat_type, psb_ipk_ + implicit none + type(psb_lsspmat_type), intent(in) :: a + integer(psb_ipk_), intent(out) :: info + character(len=*), intent(in) :: mtitle + integer(psb_ipk_), optional, intent(in) :: iunit + character(len=*), optional, intent(in) :: filename + end subroutine lsmm_mat_write + subroutine ldmm_mat_write(a,mtitle,info,iunit,filename) + import :: psb_ldspmat_type, psb_ipk_ + implicit none + type(psb_ldspmat_type), intent(in) :: a + integer(psb_ipk_), intent(out) :: info + character(len=*), intent(in) :: mtitle + integer(psb_ipk_), optional, intent(in) :: iunit + character(len=*), optional, intent(in) :: filename + end subroutine ldmm_mat_write + subroutine lcmm_mat_write(a,mtitle,info,iunit,filename) + import :: psb_lcspmat_type, psb_ipk_ + implicit none + type(psb_lcspmat_type), intent(in) :: a + integer(psb_ipk_), intent(out) :: info + character(len=*), intent(in) :: mtitle + integer(psb_ipk_), optional, intent(in) :: iunit + character(len=*), optional, intent(in) :: filename + end subroutine lcmm_mat_write + subroutine lzmm_mat_write(a,mtitle,info,iunit,filename) + import :: psb_lzspmat_type, psb_ipk_ + implicit none + type(psb_lzspmat_type), intent(in) :: a + integer(psb_ipk_), intent(out) :: info + character(len=*), intent(in) :: mtitle + integer(psb_ipk_), optional, intent(in) :: iunit + character(len=*), optional, intent(in) :: filename + end subroutine lzmm_mat_write end interface end module psb_mmio_mod diff --git a/util/psb_s_mat_dist_impl.f90 b/util/psb_s_mat_dist_impl.f90 index c1567270d..32593e44a 100644 --- a/util/psb_s_mat_dist_impl.f90 +++ b/util/psb_s_mat_dist_impl.f90 @@ -338,3 +338,319 @@ subroutine psb_smatdist(a_glob, a, ictxt, desc_a,& return end subroutine psb_smatdist + + + +subroutine psb_lsmatdist(a_glob, a, ictxt, desc_a,& + & info, parts, v, inroot,fmt,mold) + ! + ! an utility subroutine to distribute a matrix among processors + ! according to a user defined data distribution, using + ! sparse matrix subroutines. + ! + ! type(psb_sspmat) :: a_glob + ! on entry: this contains the global sparse matrix as follows: + ! + ! type(psb_sspmat_type) :: a + ! on exit : this will contain the local sparse matrix. + ! + ! interface parts + ! ! .....user passed subroutine..... + ! subroutine parts(global_indx,n,np,pv,nv) + ! implicit none + ! integer(psb_ipk_), intent(in) :: global_indx, n, np + ! integer(psb_ipk_), intent(out) :: nv + ! integer(psb_ipk_), intent(out) :: pv(*) + ! + ! end subroutine parts + ! end interface + ! on entry: subroutine providing user defined data distribution. + ! for each global_indx the subroutine should return + ! the list pv of all processes owning the row with + ! that index; the list will contain nv entries. + ! usually nv=1; if nv >1 then we have an overlap in the data + ! distribution. + ! + ! integer(psb_ipk_) :: ictxt + ! on entry: the PSBLAS parallel environment context. + ! + ! type (desc_type) :: desc_a + ! on exit : the updated array descriptor + ! + ! integer(psb_ipk_), optional :: inroot + ! on entry: specifies processor holding a_glob. default: 0 + ! on exit : unchanged. + ! + use psb_base_mod + use psb_mat_mod + implicit none + + ! parameters + type(psb_lsspmat_type) :: a_glob + integer(psb_ipk_) :: ictxt + type(psb_sspmat_type) :: a + type(psb_desc_type) :: desc_a + integer(psb_ipk_), intent(out) :: info + integer(psb_ipk_), optional :: inroot + character(len=*), optional :: fmt + class(psb_s_base_sparse_mat), optional :: mold + procedure(psb_parts), optional :: parts + integer(psb_ipk_), optional :: v(:) + + ! local variables + logical :: use_parts, use_v + integer(psb_mpk_) :: np, iam, np_sharing, root, iproc + integer(psb_ipk_) :: err_act, il, inz + integer(psb_lpk_) :: k_count, liwork, nnzero, nrhs,& + & i, ll, nz, isize, nnr, err + integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig + integer(psb_ipk_), allocatable :: iwork(:), iwrk2(:) + integer(psb_lpk_), allocatable :: irow(:),icol(:) + real(psb_spk_), allocatable :: val(:) + integer(psb_ipk_), parameter :: nb=30 + real(psb_dpk_) :: t0, t1, t2, t3, t4, t5 + character(len=20) :: name, ch_err + + info = psb_success_ + err = 0 + name = 'psb_s_mat_dist' + call psb_erractionsave(err_act) + + ! executable statements + if (present(inroot)) then + root = inroot + else + root = psb_root_ + end if + call psb_info(ictxt, iam, np) + if (iam == root) then + nrow = a_glob%get_nrows() + ncol = a_glob%get_ncols() + if (nrow /= ncol) then + write(psb_err_unit,*) 'a rectangular matrix ? ',nrow,ncol + info=-1 + call psb_errpush(info,name) + goto 9999 + endif + nnzero = a_glob%get_nzeros() + nrhs = 1 + endif + + use_parts = present(parts) + use_v = present(v) + if (count((/ use_parts, use_v /)) /= 1) then + info=psb_err_no_optional_arg_ + call psb_errpush(info,name,a_err=" v, parts") + goto 9999 + endif + + ! broadcast informations to other processors + call psb_bcast(ictxt,nrow, root) + call psb_bcast(ictxt,ncol, root) + call psb_bcast(ictxt,nnzero, root) + call psb_bcast(ictxt,nrhs, root) + liwork = max(np, nrow + ncol) + allocate(iwork(liwork), iwrk2(np),stat = info) + if (info /= psb_success_) then + info=psb_err_alloc_request_ + call psb_errpush(info,name,l_err=(/liwork/),a_err='integer') + goto 9999 + endif + if (iam == root) then + write (*, fmt = *) 'start matdist',root, size(iwork),& + &nrow, ncol, nnzero,nrhs + endif + if (use_parts) then + call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts) + else if (use_v) then + call psb_cdall(ictxt,desc_a,info,vg=v) + else + info = -1 + end if + if(info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='psb_cdall' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + inz = ((nnzero+np-1)/np) + call psb_spall(a,desc_a,info,nnz=inz) + if(info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='psb_spall' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + isize = 3*nb*max(((nnzero+nrow)/nrow),nb) + allocate(val(isize),irow(isize),icol(isize),stat=info) + if(info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='Allocate' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + i_count = 1 + + do while (i_count <= nrow) + + if (use_parts) then + call parts(i_count,nrow,np,iwork, np_sharing) + ! + ! np_sharing allows for overlap in the data distribution. + ! If an index is overlapped, then we have to send its row + ! to multiple processes. NOTE: we are assuming the output + ! from PARTS is coherent, otherwise a deadlock is the most + ! likely outcome. + ! + j_count = i_count + if (np_sharing == 1) then + iproc = iwork(1) + do + j_count = j_count + 1 + if (j_count-i_count >= nb) exit + if (j_count > nrow) exit + call parts(j_count,nrow,np,iwrk2, np_sharing) + if (np_sharing /= 1 ) exit + if (iwrk2(1) /= iproc ) exit + end do + end if + else + np_sharing = 1 + j_count = i_count + iproc = v(i_count) + iwork(1:np_sharing) = iproc + do + j_count = j_count + 1 + if (j_count-i_count >= nb) exit + if (j_count > nrow) exit + if (v(j_count) /= iproc ) exit + end do + end if + + ! now we should insert rows i_count..j_count-1 + nnr = j_count - i_count + + if (iam == root) then + + ll = 0 + do i= i_count, j_count-1 + call a_glob%csget(i,i,nz,& + & irow,icol,val,info,nzin=ll,append=.true.) + if (info /= psb_success_) then + if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then + write(psb_err_unit,*) 'Allocation failure? This should not happen!' + end if + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + ll = ll + nz + end do + + do k_count = 1, np_sharing + iproc = iwork(k_count) + + if (iproc == iam) then + il = ll + call psb_spins(il,irow,icol,val,a,desc_a,info) + if(info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='psb_spins' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + else + call psb_snd(ictxt,nnr,iproc) + call psb_snd(ictxt,ll,iproc) + call psb_snd(ictxt,irow(1:ll),iproc) + call psb_snd(ictxt,icol(1:ll),iproc) + call psb_snd(ictxt,val(1:ll),iproc) + call psb_rcv(ictxt,ll,iproc) + endif + end do + else if (iam /= root) then + + do k_count = 1, np_sharing + iproc = iwork(k_count) + if (iproc == iam) then + call psb_rcv(ictxt,nnr,root) + call psb_rcv(ictxt,ll,root) + if (ll > size(irow)) then + write(psb_err_unit,*) iam,'need to reallocate ',ll + deallocate(val,irow,icol) + allocate(val(ll),irow(ll),icol(ll),stat=info) + if(info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='Allocate' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + endif + call psb_rcv(ictxt,irow(1:ll),root) + call psb_rcv(ictxt,icol(1:ll),root) + call psb_rcv(ictxt,val(1:ll),root) + call psb_snd(ictxt,ll,root) + il = ll + call psb_spins(il,irow,icol,val,a,desc_a,info) + if(info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='psspins' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + endif + end do + endif + i_count = j_count + end do + + call psb_barrier(ictxt) + t0 = psb_wtime() + call psb_cdasb(desc_a,info) + t1 = psb_wtime() + if(info /= psb_success_)then + info=psb_err_from_subroutine_ + ch_err='psb_cdasb' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + call psb_barrier(ictxt) + t2 = psb_wtime() + call psb_spasb(a,desc_a,info,dupl=psb_dupl_err_,afmt=fmt,mold=mold) + t3 = psb_wtime() + if(info /= psb_success_)then + info=psb_err_from_subroutine_ + ch_err='psb_spasb' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + + if (iam == root) then + write(psb_out_unit,*) 'descriptor assembly: ',t1-t0 + write(psb_out_unit,*) 'sparse matrix assembly: ',t3-t2 + end if + + + + deallocate(val,irow,icol,iwork,iwrk2,stat=info) + if(info /= psb_success_)then + info=psb_err_from_subroutine_ + ch_err='deallocate' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + if (iam == root) write (*, fmt = *) 'end matdist' + + call psb_erractionrestore(err_act) + return + +9999 call psb_error_handler(ictxt,err_act) + + return + +end subroutine psb_lsmatdist diff --git a/util/psb_s_mat_dist_mod.f90 b/util/psb_s_mat_dist_mod.f90 index e1f0675b6..3b73e5f8c 100644 --- a/util/psb_s_mat_dist_mod.f90 +++ b/util/psb_s_mat_dist_mod.f90 @@ -31,7 +31,8 @@ ! module psb_s_mat_dist_mod use psb_base_mod, only : psb_ipk_, psb_spk_, psb_desc_type, psb_parts, & - & psb_sspmat_type, psb_s_base_sparse_mat, psb_s_vect_type + & psb_sspmat_type, psb_s_base_sparse_mat, psb_s_vect_type, & + & psb_lsspmat_type interface psb_matdist subroutine psb_smatdist(a_glob, a, ictxt, desc_a,& @@ -91,6 +92,64 @@ module psb_s_mat_dist_mod procedure(psb_parts), optional :: parts integer(psb_ipk_), optional :: v(:) end subroutine psb_smatdist + subroutine psb_lsmatdist(a_glob, a, ictxt, desc_a,& + & info, parts, v, inroot,fmt,mold) + ! + ! an utility subroutine to distribute a matrix among processors + ! according to a user defined data distribution, using + ! sparse matrix subroutines. + ! + ! type(psb_lsspmat) :: a_glob + ! on entry: this contains the global sparse matrix as follows: + ! + ! type(psb_sspmat_type) :: a + ! on exit : this will contain the local sparse matrix. + ! + ! interface parts + ! ! .....user passed subroutine..... + ! subroutine parts(global_indx,n,np,pv,nv) + ! implicit none + ! integer(psb_ipk_), intent(in) :: global_indx, n, np + ! integer(psb_ipk_), intent(out) :: nv + ! integer(psb_ipk_), intent(out) :: pv(*) + ! + ! end subroutine parts + ! end interface + ! on entry: subroutine providing user defined data distribution. + ! for each global_indx the subroutine should return + ! the list pv of all processes owning the row with + ! that index; the list will contain nv entries. + ! usually nv=1; if nv >1 then we have an overlap in the data + ! distribution. + ! + ! integer(psb_ipk_) :: ictxt + ! on entry: the PSBLAS parallel environment context. + ! + ! type (desc_type) :: desc_a + ! on exit : the updated array descriptor + ! + ! + ! integer(psb_ipk_), optional :: inroot + ! on entry: specifies processor holding a_glob. default: 0 + ! on exit : unchanged. + ! + import :: psb_ipk_, psb_sspmat_type, psb_spk_, psb_desc_type,& + & psb_s_base_sparse_mat, psb_s_vect_type, psb_parts, & + & psb_lsspmat_type + implicit none + + ! parameters + type(psb_lsspmat_type) :: a_glob + integer(psb_ipk_) :: ictxt + type(psb_sspmat_type) :: a + type(psb_desc_type) :: desc_a + integer(psb_ipk_), intent(out) :: info + integer(psb_ipk_), optional :: inroot + character(len=*), optional :: fmt + class(psb_s_base_sparse_mat), optional :: mold + procedure(psb_parts), optional :: parts + integer(psb_ipk_), optional :: v(:) + end subroutine psb_lsmatdist end interface end module psb_s_mat_dist_mod diff --git a/util/psb_s_mmio_impl.f90 b/util/psb_s_mmio_impl.f90 index 7f049530b..6fffbd1f1 100644 --- a/util/psb_s_mmio_impl.f90 +++ b/util/psb_s_mmio_impl.f90 @@ -434,3 +434,156 @@ subroutine smm_mat_write(a,mtitle,info,iunit,filename) write(psb_err_unit,*) 'Error while opening ',filename return end subroutine smm_mat_write + +subroutine lsmm_mat_read(a, info, iunit, filename) + use psb_base_mod + implicit none + type(psb_lsspmat_type), intent(out) :: a + integer(psb_ipk_), intent(out) :: info + integer(psb_ipk_), optional, intent(in) :: iunit + character(len=*), optional, intent(in) :: filename + character :: mmheader*15, fmt*15, object*10, type*10, sym*15 + character(1024) :: line + integer(psb_lpk_) :: nrow, ncol, nnzero, i,nzr + integer(psb_ipk_) :: ircode,infile + type(psb_ls_coo_sparse_mat), allocatable :: acoo + + info = psb_success_ + + if (present(filename)) then + if (filename == '-') then + infile=5 + else + if (present(iunit)) then + infile=iunit + else + infile=99 + endif + open(infile,file=filename, status='OLD', err=901, action='READ') + endif + else + if (present(iunit)) then + infile=iunit + else + infile=5 + endif + endif + + read(infile,fmt=*,end=902) mmheader, object, fmt, type, sym + + if ( (psb_tolower(object) /= 'matrix').or.(psb_tolower(fmt) /= 'coordinate')) then + write(psb_err_unit,*) 'READ_MATRIX: input file type not yet supported' + info=909 + return + end if + + do + read(infile,fmt='(a)') line + if (line(1:1) /= '%') exit + end do + read(line,fmt=*) nrow,ncol,nnzero + + allocate(acoo, stat=ircode) + if (ircode /= 0) goto 993 + if ((psb_tolower(type) == 'real').and.(psb_tolower(sym) == 'general')) then + call acoo%allocate(nrow,ncol,nnzero) + do i=1,nnzero + read(infile,fmt=*,end=902,err=905) acoo%ia(i),acoo%ja(i),acoo%val(i) + end do + call acoo%set_nzeros(nnzero) + call acoo%fix(info) + + call a%mv_from(acoo) + call a%cscnv(ircode,type='csr') + + else if ((psb_tolower(type) == 'real').and.(psb_tolower(sym) == 'symmetric')) then + ! we are generally working with non-symmetric matrices, so + ! we de-symmetrize what we are about to read + call acoo%allocate(nrow,ncol,2*nnzero) + do i=1,nnzero + read(infile,fmt=*,end=902,err=905) acoo%ia(i),acoo%ja(i),acoo%val(i) + end do + nzr = nnzero + do i=1,nnzero + if (acoo%ia(i) /= acoo%ja(i)) then + nzr = nzr + 1 + acoo%val(nzr) = acoo%val(i) + acoo%ia(nzr) = acoo%ja(i) + acoo%ja(nzr) = acoo%ia(i) + end if + end do + call acoo%set_nzeros(nzr) + call acoo%fix(info) + + call a%mv_from(acoo) + call a%cscnv(ircode,type='csr') + + else + write(psb_err_unit,*) 'read_matrix: matrix type not yet supported' + info=904 + end if + + + if (infile /= 5) close(infile) + return + + ! open failed +901 info=901 + write(psb_err_unit,*) 'read_matrix: could not open file ',filename,' for input' + return +902 info=902 + write(psb_err_unit,*) 'READ_MATRIX: Unexpected end of file ' + return +905 info=905 + write(psb_err_unit,*) 'READ_MATRIX: Error at line',i + return +993 info=993 + write(psb_err_unit,*) 'READ_MATRIX: Memory allocation failure' + return +end subroutine lsmm_mat_read + + +subroutine lsmm_mat_write(a,mtitle,info,iunit,filename) + use psb_base_mod + implicit none + type(psb_lsspmat_type), intent(in) :: a + integer(psb_ipk_), intent(out) :: info + character(len=*), intent(in) :: mtitle + integer(psb_ipk_), optional, intent(in) :: iunit + character(len=*), optional, intent(in) :: filename + integer(psb_ipk_) :: iout + + + info = psb_success_ + + if (present(filename)) then + if (filename == '-') then + iout=6 + else + if (present(iunit)) then + iout = iunit + else + iout=99 + endif + open(iout,file=filename, err=901, action='WRITE') + endif + else + if (present(iunit)) then + iout = iunit + else + iout=6 + endif + endif + + call a%print(iout,head=mtitle) + + if (iout /= 6) close(iout) + + + return + +901 continue + info=901 + write(psb_err_unit,*) 'Error while opening ',filename + return +end subroutine lsmm_mat_write diff --git a/util/psb_z_mat_dist_impl.f90 b/util/psb_z_mat_dist_impl.f90 index f894e0620..b2c8be544 100644 --- a/util/psb_z_mat_dist_impl.f90 +++ b/util/psb_z_mat_dist_impl.f90 @@ -338,3 +338,319 @@ subroutine psb_zmatdist(a_glob, a, ictxt, desc_a,& return end subroutine psb_zmatdist + + + +subroutine psb_lzmatdist(a_glob, a, ictxt, desc_a,& + & info, parts, v, inroot,fmt,mold) + ! + ! an utility subroutine to distribute a matrix among processors + ! according to a user defined data distribution, using + ! sparse matrix subroutines. + ! + ! type(psb_zspmat) :: a_glob + ! on entry: this contains the global sparse matrix as follows: + ! + ! type(psb_zspmat_type) :: a + ! on exit : this will contain the local sparse matrix. + ! + ! interface parts + ! ! .....user passed subroutine..... + ! subroutine parts(global_indx,n,np,pv,nv) + ! implicit none + ! integer(psb_ipk_), intent(in) :: global_indx, n, np + ! integer(psb_ipk_), intent(out) :: nv + ! integer(psb_ipk_), intent(out) :: pv(*) + ! + ! end subroutine parts + ! end interface + ! on entry: subroutine providing user defined data distribution. + ! for each global_indx the subroutine should return + ! the list pv of all processes owning the row with + ! that index; the list will contain nv entries. + ! usually nv=1; if nv >1 then we have an overlap in the data + ! distribution. + ! + ! integer(psb_ipk_) :: ictxt + ! on entry: the PSBLAS parallel environment context. + ! + ! type (desc_type) :: desc_a + ! on exit : the updated array descriptor + ! + ! integer(psb_ipk_), optional :: inroot + ! on entry: specifies processor holding a_glob. default: 0 + ! on exit : unchanged. + ! + use psb_base_mod + use psb_mat_mod + implicit none + + ! parameters + type(psb_lzspmat_type) :: a_glob + integer(psb_ipk_) :: ictxt + type(psb_zspmat_type) :: a + type(psb_desc_type) :: desc_a + integer(psb_ipk_), intent(out) :: info + integer(psb_ipk_), optional :: inroot + character(len=*), optional :: fmt + class(psb_z_base_sparse_mat), optional :: mold + procedure(psb_parts), optional :: parts + integer(psb_ipk_), optional :: v(:) + + ! local variables + logical :: use_parts, use_v + integer(psb_mpk_) :: np, iam, np_sharing, root, iproc + integer(psb_ipk_) :: err_act, il, inz + integer(psb_lpk_) :: k_count, liwork, nnzero, nrhs,& + & i, ll, nz, isize, nnr, err + integer(psb_lpk_) :: i_count, j_count, nrow, ncol, ig + integer(psb_ipk_), allocatable :: iwork(:), iwrk2(:) + integer(psb_lpk_), allocatable :: irow(:),icol(:) + complex(psb_dpk_), allocatable :: val(:) + integer(psb_ipk_), parameter :: nb=30 + real(psb_dpk_) :: t0, t1, t2, t3, t4, t5 + character(len=20) :: name, ch_err + + info = psb_success_ + err = 0 + name = 'psb_z_mat_dist' + call psb_erractionsave(err_act) + + ! executable statements + if (present(inroot)) then + root = inroot + else + root = psb_root_ + end if + call psb_info(ictxt, iam, np) + if (iam == root) then + nrow = a_glob%get_nrows() + ncol = a_glob%get_ncols() + if (nrow /= ncol) then + write(psb_err_unit,*) 'a rectangular matrix ? ',nrow,ncol + info=-1 + call psb_errpush(info,name) + goto 9999 + endif + nnzero = a_glob%get_nzeros() + nrhs = 1 + endif + + use_parts = present(parts) + use_v = present(v) + if (count((/ use_parts, use_v /)) /= 1) then + info=psb_err_no_optional_arg_ + call psb_errpush(info,name,a_err=" v, parts") + goto 9999 + endif + + ! broadcast informations to other processors + call psb_bcast(ictxt,nrow, root) + call psb_bcast(ictxt,ncol, root) + call psb_bcast(ictxt,nnzero, root) + call psb_bcast(ictxt,nrhs, root) + liwork = max(np, nrow + ncol) + allocate(iwork(liwork), iwrk2(np),stat = info) + if (info /= psb_success_) then + info=psb_err_alloc_request_ + call psb_errpush(info,name,l_err=(/liwork/),a_err='integer') + goto 9999 + endif + if (iam == root) then + write (*, fmt = *) 'start matdist',root, size(iwork),& + &nrow, ncol, nnzero,nrhs + endif + if (use_parts) then + call psb_cdall(ictxt,desc_a,info,mg=nrow,parts=parts) + else if (use_v) then + call psb_cdall(ictxt,desc_a,info,vg=v) + else + info = -1 + end if + if(info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='psb_cdall' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + inz = ((nnzero+np-1)/np) + call psb_spall(a,desc_a,info,nnz=inz) + if(info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='psb_spall' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + isize = 3*nb*max(((nnzero+nrow)/nrow),nb) + allocate(val(isize),irow(isize),icol(isize),stat=info) + if(info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='Allocate' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + i_count = 1 + + do while (i_count <= nrow) + + if (use_parts) then + call parts(i_count,nrow,np,iwork, np_sharing) + ! + ! np_sharing allows for overlap in the data distribution. + ! If an index is overlapped, then we have to send its row + ! to multiple processes. NOTE: we are assuming the output + ! from PARTS is coherent, otherwise a deadlock is the most + ! likely outcome. + ! + j_count = i_count + if (np_sharing == 1) then + iproc = iwork(1) + do + j_count = j_count + 1 + if (j_count-i_count >= nb) exit + if (j_count > nrow) exit + call parts(j_count,nrow,np,iwrk2, np_sharing) + if (np_sharing /= 1 ) exit + if (iwrk2(1) /= iproc ) exit + end do + end if + else + np_sharing = 1 + j_count = i_count + iproc = v(i_count) + iwork(1:np_sharing) = iproc + do + j_count = j_count + 1 + if (j_count-i_count >= nb) exit + if (j_count > nrow) exit + if (v(j_count) /= iproc ) exit + end do + end if + + ! now we should insert rows i_count..j_count-1 + nnr = j_count - i_count + + if (iam == root) then + + ll = 0 + do i= i_count, j_count-1 + call a_glob%csget(i,i,nz,& + & irow,icol,val,info,nzin=ll,append=.true.) + if (info /= psb_success_) then + if (nz >min(size(irow(ll+1:)),size(icol(ll+1:)),size(val(ll+1:)))) then + write(psb_err_unit,*) 'Allocation failure? This should not happen!' + end if + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + ll = ll + nz + end do + + do k_count = 1, np_sharing + iproc = iwork(k_count) + + if (iproc == iam) then + il = ll + call psb_spins(il,irow,icol,val,a,desc_a,info) + if(info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='psb_spins' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + else + call psb_snd(ictxt,nnr,iproc) + call psb_snd(ictxt,ll,iproc) + call psb_snd(ictxt,irow(1:ll),iproc) + call psb_snd(ictxt,icol(1:ll),iproc) + call psb_snd(ictxt,val(1:ll),iproc) + call psb_rcv(ictxt,ll,iproc) + endif + end do + else if (iam /= root) then + + do k_count = 1, np_sharing + iproc = iwork(k_count) + if (iproc == iam) then + call psb_rcv(ictxt,nnr,root) + call psb_rcv(ictxt,ll,root) + if (ll > size(irow)) then + write(psb_err_unit,*) iam,'need to reallocate ',ll + deallocate(val,irow,icol) + allocate(val(ll),irow(ll),icol(ll),stat=info) + if(info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='Allocate' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + endif + call psb_rcv(ictxt,irow(1:ll),root) + call psb_rcv(ictxt,icol(1:ll),root) + call psb_rcv(ictxt,val(1:ll),root) + call psb_snd(ictxt,ll,root) + il = ll + call psb_spins(il,irow,icol,val,a,desc_a,info) + if(info /= psb_success_) then + info=psb_err_from_subroutine_ + ch_err='psspins' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + endif + end do + endif + i_count = j_count + end do + + call psb_barrier(ictxt) + t0 = psb_wtime() + call psb_cdasb(desc_a,info) + t1 = psb_wtime() + if(info /= psb_success_)then + info=psb_err_from_subroutine_ + ch_err='psb_cdasb' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + call psb_barrier(ictxt) + t2 = psb_wtime() + call psb_spasb(a,desc_a,info,dupl=psb_dupl_err_,afmt=fmt,mold=mold) + t3 = psb_wtime() + if(info /= psb_success_)then + info=psb_err_from_subroutine_ + ch_err='psb_spasb' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + + if (iam == root) then + write(psb_out_unit,*) 'descriptor assembly: ',t1-t0 + write(psb_out_unit,*) 'sparse matrix assembly: ',t3-t2 + end if + + + + deallocate(val,irow,icol,iwork,iwrk2,stat=info) + if(info /= psb_success_)then + info=psb_err_from_subroutine_ + ch_err='deallocate' + call psb_errpush(info,name,a_err=ch_err) + goto 9999 + end if + + if (iam == root) write (*, fmt = *) 'end matdist' + + call psb_erractionrestore(err_act) + return + +9999 call psb_error_handler(ictxt,err_act) + + return + +end subroutine psb_lzmatdist diff --git a/util/psb_z_mat_dist_mod.f90 b/util/psb_z_mat_dist_mod.f90 index 978e22b29..7d7691013 100644 --- a/util/psb_z_mat_dist_mod.f90 +++ b/util/psb_z_mat_dist_mod.f90 @@ -31,7 +31,8 @@ ! module psb_z_mat_dist_mod use psb_base_mod, only : psb_ipk_, psb_dpk_, psb_desc_type, psb_parts, & - & psb_zspmat_type, psb_z_base_sparse_mat, psb_z_vect_type + & psb_zspmat_type, psb_z_base_sparse_mat, psb_z_vect_type, & + & psb_lzspmat_type interface psb_matdist subroutine psb_zmatdist(a_glob, a, ictxt, desc_a,& @@ -91,6 +92,64 @@ module psb_z_mat_dist_mod procedure(psb_parts), optional :: parts integer(psb_ipk_), optional :: v(:) end subroutine psb_zmatdist + subroutine psb_lzmatdist(a_glob, a, ictxt, desc_a,& + & info, parts, v, inroot,fmt,mold) + ! + ! an utility subroutine to distribute a matrix among processors + ! according to a user defined data distribution, using + ! sparse matrix subroutines. + ! + ! type(psb_lzspmat) :: a_glob + ! on entry: this contains the global sparse matrix as follows: + ! + ! type(psb_zspmat_type) :: a + ! on exit : this will contain the local sparse matrix. + ! + ! interface parts + ! ! .....user passed subroutine..... + ! subroutine parts(global_indx,n,np,pv,nv) + ! implicit none + ! integer(psb_ipk_), intent(in) :: global_indx, n, np + ! integer(psb_ipk_), intent(out) :: nv + ! integer(psb_ipk_), intent(out) :: pv(*) + ! + ! end subroutine parts + ! end interface + ! on entry: subroutine providing user defined data distribution. + ! for each global_indx the subroutine should return + ! the list pv of all processes owning the row with + ! that index; the list will contain nv entries. + ! usually nv=1; if nv >1 then we have an overlap in the data + ! distribution. + ! + ! integer(psb_ipk_) :: ictxt + ! on entry: the PSBLAS parallel environment context. + ! + ! type (desc_type) :: desc_a + ! on exit : the updated array descriptor + ! + ! + ! integer(psb_ipk_), optional :: inroot + ! on entry: specifies processor holding a_glob. default: 0 + ! on exit : unchanged. + ! + import :: psb_ipk_, psb_zspmat_type, psb_dpk_, psb_desc_type,& + & psb_z_base_sparse_mat, psb_z_vect_type, psb_parts, & + & psb_lzspmat_type + implicit none + + ! parameters + type(psb_lzspmat_type) :: a_glob + integer(psb_ipk_) :: ictxt + type(psb_zspmat_type) :: a + type(psb_desc_type) :: desc_a + integer(psb_ipk_), intent(out) :: info + integer(psb_ipk_), optional :: inroot + character(len=*), optional :: fmt + class(psb_z_base_sparse_mat), optional :: mold + procedure(psb_parts), optional :: parts + integer(psb_ipk_), optional :: v(:) + end subroutine psb_lzmatdist end interface end module psb_z_mat_dist_mod diff --git a/util/psb_z_mmio_impl.f90 b/util/psb_z_mmio_impl.f90 index 8475d2847..5b348acca 100644 --- a/util/psb_z_mmio_impl.f90 +++ b/util/psb_z_mmio_impl.f90 @@ -462,3 +462,178 @@ subroutine zmm_mat_write(a,mtitle,info,iunit,filename) end subroutine zmm_mat_write +subroutine lzmm_mat_read(a, info, iunit, filename) + use psb_base_mod + implicit none + type(psb_lzspmat_type), intent(out) :: a + integer(psb_ipk_), intent(out) :: info + integer(psb_ipk_), optional, intent(in) :: iunit + character(len=*), optional, intent(in) :: filename + character :: mmheader*15, fmt*15, object*10, type*10, sym*15 + character(1024) :: line + integer(psb_lpk_) :: nrow, ncol, nnzero, i,nzr + integer(psb_ipk_) :: ircode,infile + type(psb_lz_coo_sparse_mat), allocatable :: acoo + real(psb_dpk_) :: are, aim + info = psb_success_ + + if (present(filename)) then + if (filename == '-') then + infile=5 + else + if (present(iunit)) then + infile=iunit + else + infile=99 + endif + open(infile,file=filename, status='OLD', err=901, action='READ') + endif + else + if (present(iunit)) then + infile=iunit + else + infile=5 + endif + endif + + read(infile,fmt=*,end=902) mmheader, object, fmt, type, sym + + if ( (psb_tolower(object) /= 'matrix').or.(psb_tolower(fmt) /= 'coordinate')) then + write(psb_err_unit,*) 'READ_MATRIX: input file type not yet supported' + info=909 + return + end if + + do + read(infile,fmt='(a)') line + if (line(1:1) /= '%') exit + end do + read(line,fmt=*) nrow,ncol,nnzero + + allocate(acoo, stat=ircode) + if (ircode /= 0) goto 993 + if ((psb_tolower(type) == 'complex').and.(psb_tolower(sym) == 'general')) then + call acoo%allocate(nrow,ncol,nnzero) + do i=1,nnzero + read(infile,fmt=*,end=902,err=905) acoo%ia(i),acoo%ja(i),are,aim + acoo%val(i) = cmplx(are,aim,kind=psb_dpk_) + end do + call acoo%set_nzeros(nnzero) + call acoo%fix(info) + + call a%mv_from(acoo) + call a%cscnv(ircode,type='csr') + + else if ((psb_tolower(type) == 'complex').and.(psb_tolower(sym) == 'symmetric')) then + ! we are generally working with non-symmetric matrices, so + ! we de-symmetrize what we are about to read + call acoo%allocate(nrow,ncol,2*nnzero) + do i=1,nnzero + read(infile,fmt=*,end=902,err=905) acoo%ia(i),acoo%ja(i),are,aim + acoo%val(i) = cmplx(are,aim,kind=psb_dpk_) + end do + nzr = nnzero + do i=1,nnzero + if (acoo%ia(i) /= acoo%ja(i)) then + nzr = nzr + 1 + acoo%val(nzr) = acoo%val(i) + acoo%ia(nzr) = acoo%ja(i) + acoo%ja(nzr) = acoo%ia(i) + end if + end do + call acoo%set_nzeros(nzr) + call acoo%fix(info) + + call a%mv_from(acoo) + call a%cscnv(ircode,type='csr') + + else if ((psb_tolower(type) == 'complex').and.(psb_tolower(sym) == 'hermitian')) then + ! we are generally working with non-symmetric matrices, so + ! we de-symmetrize what we are about to read + call acoo%allocate(nrow,ncol,2*nnzero) + do i=1,nnzero + read(infile,fmt=*,end=902) acoo%ia(i),acoo%ja(i),are,aim + acoo%val(i) = cmplx(are,aim,kind=psb_dpk_) + end do + nzr = nnzero + do i=1,nnzero + if (acoo%ia(i) /= acoo%ja(i)) then + nzr = nzr + 1 + acoo%val(nzr) = conjg(acoo%val(i)) + acoo%ia(nzr) = acoo%ja(i) + acoo%ja(nzr) = acoo%ia(i) + end if + end do + call acoo%set_nzeros(nzr) + call acoo%fix(info) + + call a%mv_from(acoo) + call a%cscnv(ircode,type='csr') + + else + write(psb_err_unit,*) 'read_matrix: matrix type not yet supported' + info=904 + end if + if (infile /= 5) close(infile) + return + + ! open failed +901 info=901 + write(psb_err_unit,*) 'read_matrix: could not open file ',filename,' for input' + return +902 info=902 + write(psb_err_unit,*) 'READ_MATRIX: Unexpected end of file ' + return +905 info=905 + write(psb_err_unit,*) 'READ_MATRIX: Error at line',i + return +993 info=993 + write(psb_err_unit,*) 'READ_MATRIX: Memory allocation failure' + return +end subroutine lzmm_mat_read + +subroutine lzmm_mat_write(a,mtitle,info,iunit,filename) + use psb_base_mod + implicit none + type(psb_lzspmat_type), intent(in) :: a + integer(psb_ipk_), intent(out) :: info + character(len=*), intent(in) :: mtitle + integer(psb_ipk_), optional, intent(in) :: iunit + character(len=*), optional, intent(in) :: filename + integer(psb_ipk_) :: iout + + + info = psb_success_ + + if (present(filename)) then + if (filename == '-') then + iout=6 + else + if (present(iunit)) then + iout = iunit + else + iout=99 + endif + open(iout,file=filename, err=901, action='WRITE') + endif + else + if (present(iunit)) then + iout = iunit + else + iout=6 + endif + endif + + call a%print(iout,head=mtitle) + + if (iout /= 6) close(iout) + + + return + +901 continue + info=901 + write(psb_err_unit,*) 'Error while opening ',filename + return +end subroutine lzmm_mat_write +