Fixed various small mistakes in srcht.c. Added a tree argument to the

search tree, so as to enable multiple trees at the same time.
psblas3-type-indexed
Salvatore Filippone 19 years ago
parent b35b455e72
commit ae8964a56a

@ -85,7 +85,7 @@
TOPFILE = userguide.tex
SECFILE = title.tex intro.tex commrout.tex datastruct.tex psbrout.tex toolsrout.tex\
methods.tex precs.tex error.tex
methods.tex precs.tex penv.tex error.tex
FIGDIR = figures
XPDFFLAGS =

@ -1,3 +1,5 @@
\section{Error handling}
The PSBLAS library error handling policy has been completely rewritten

@ -77,7 +77,7 @@ to improve efficiency or to implement communication patterns for which
the BLACS package doesn't provide any method.
In any case we provide wrappers around the BLACS routines so that the
user does not need to delve into their details (see Sec.~\ref{sec:toolsrout}).
user does not need to delve into their details (see Sec.~\ref{sec:parenv}).
%% We assume that the user program has initialized a BLACS process grid
%% with one column and as many rows as there are processes; the PSBLAS
%% initialization routines will take the communication context for this

@ -1,149 +1,6 @@
\section{Data management, environment handling and auxiliary
communication routines}
\section{Data management routines}
\label{sec:toolsrout}
\subroutine{psb\_init}{Initializes PSBLAS parallel environment}
\syntax{call psb\_init}{ictxt, np}
This subroutine initializes the PSBLAS parallel environment, defining
a virtual parallel machine.
\begin{description}
\item[\bf On Entry ]
\item[np] Number of processes in the PSBLAS virtual parallel machine.\\
Scope:{\bf global}.\\
Type:{\bf optional}.\\
Specified as: an integer value. \
Default: use all available processes provided by the underlying
parallel environment.
\end{description}
\begin{description}
\item[\bf On Return]
\item[icontxt] the communication context identifying the virtual
parallel machine.\\
Scope:{\bf global}.\\
Type:{\bf required}.\\
Specified as: an integer variable.
\end{description}
\section*{Notes}
\begin{enumerate}
\item A call to this routine must precede any other PSBLAS call.
\item It is an error to specify a value for $np$ greater than the
number of processes available in the underlying parallel execution
environment.
\end{enumerate}
\subroutine{psb\_info}{Return information about PSBLAS parallel environment}
\syntax{call psb\_info}{ictxt, iam, np}
This subroutine returns informantion about the PSBLAS parallel environment, defining
a virtual parallel machine.
\begin{description}
\item[\bf On Entry ]
\item[icontxt] the communication context identifying the virtual
parallel machine.\\
Scope:{\bf global}.\\
Type:{\bf required}.\\
Specified as: an integer variable.
\end{description}
\begin{description}
\item[\bf On Return]
\item[iam] Identifier of current process in the PSBLAS virtual parallel machine.\\
Scope:{\bf local}.\\
Type:{\bf required}.\\
Specified as: an integer value. $-1 \le iam \le np-1$\
\item[np] Number of processes in the PSBLAS virtual parallel machine.\\
Scope:{\bf global}.\\
Type:{\bf required}.\\
Specified as: an integer variable. \
\end{description}
\section*{Notes}
\begin{enumerate}
\item For processes in the virtual parallel machine the identifier
will satisfy $0 \le iam \le np-1$;
\item If the user has requested on \verb|psb_init| a number of
processes less than the total available in the parallel execution
environment, the remaining processes will have on return $iam=-1$;
any such process may only place a call to \verb|psb_exit|, and is
required to do so.
\end{enumerate}
\subroutine{psb\_exit}{Exit from PSBLAS parallel environment}
\syntax{call psb\_exit}{ictxt}
This subroutine exits from the PSBLAS parallel virtual machine.
\begin{description}
\item[\bf On Entry ]
\item[icontxt] the communication context identifying the virtual
parallel machine.\\
Scope:{\bf global}.\\
Type:{\bf required}.\\
Specified as: an integer variable.
\end{description}
\section*{Notes}
\begin{enumerate}
\item This routine may be called even if a previous call to
\verb|psb_info| has returned with $iam=-1$; indeed, it it is the ONLY
routine that may be called in this situation, and it is required to
do so.
\end{enumerate}
\subroutine{psb\_wtime}{Wall clock timing}
\syntax{time = psb\_wtime}{}
This function returns a wall clock timer. The resolution of the timer
is dependent on the underlying parallel environment implementation.
\begin{description}
\item[\bf On Exit ]
\item[Function value] the elapsed time in seconds.\\
Returned as: a \verb|real(kind(1.d0))| integer variable.
\end{description}
\subroutine{psb\_barrier}{Sinchronization point parallel environment}
\syntax{call psb\_barrier}{ictxt}
This subroutine acts as a synchronization point for the PSBLAS
parallel virtual machine. As such, it must be called by all
participating processes.
\begin{description}
\item[\bf On Entry ]
\item[icontxt] the communication context identifying the virtual
parallel machine.\\
Scope:{\bf global}.\\
Type:{\bf required}.\\
Specified as: an integer variable.
\end{description}
\subroutine{psb\_abort}{Abort a computation}
\syntax{call psb\_abort}{ictxt}
This subroutine aborts computation on the parallel virtual machine.
\begin{description}
\item[\bf On Entry ]
\item[icontxt] the communication context identifying the virtual
parallel machine.\\
Scope:{\bf global}.\\
Type:{\bf required}.\\
Specified as: an integer variable.
\end{description}
%
%% psb_cdall %%
@ -1004,230 +861,6 @@ Specified as: an integer variable.
\end{description}
\subroutine{psb\_bcast}{Broadcast data}
\syntax{call psb\_bcast}{ictxt, dat, root}
This subroutine implements a broadcast operation based on the
underlying communication library.
\begin{description}
\item[\bf On Entry ]
\item[icontxt] the communication context identifying the virtual
parallel machine.\\
Scope:{\bf global}.\\
Type:{\bf required}.\\
Specified as: an integer variable.
\item[dat] On the root process, the data to be broadcast.\\
Scope:{\bf global}.\\
Type:{\bf required}.\\
Specified as: an integer, real or complex variable, which may be a
scalar, or a rank 1 or 2 array. \
Type, rank and size must agree on all processes.
\item[root] Root process holding data to be broadcast.\\
Scope:{\bf global}.\\
Type:{\bf optional}.\\
Specified as: an integer value $0<= root <= np-1$, default 0 \
\end{description}
\begin{description}
\item[\bf On Return]
\item[dat] On processes other than root, the data to be broadcast.\\
Scope:{\bf global}.\\
Type:{\bf required}.\\
Specified as: an integer, real or complex variable, which may be a
scalar, or a rank 1 or 2 array. \
Type, rank and size must agree on all processes.
\end{description}
\subroutine{psb\_sum}{Global sum}
\syntax{call psb\_sum}{ictxt, dat, root}
This subroutine implements a sum reduction operation based on the
underlying communication library.
\begin{description}
\item[\bf On Entry ]
\item[icontxt] the communication context identifying the virtual
parallel machine.\\
Scope:{\bf global}.\\
Type:{\bf required}.\\
Specified as: an integer variable.
\item[dat] The local contribution to the global sum.\\
Scope:{\bf global}.\\
Type:{\bf required}.\\
Specified as: an integer, real or complex variable, which may be a
scalar, or a rank 1 or 2 array. \
Type, rank and size must agree on all processes.
\item[root] Process to hold the final sum, or $-1$ to make it available
on all processes.\\
Scope:{\bf global}.\\
Type:{\bf optional}.\\
Specified as: an integer value $-1<= root <= np-1$, default -1. \
\end{description}
\begin{description}
\item[\bf On Return]
\item[dat] On destination process(es), the result of the sum operation.\\
Scope:{\bf global}.\\
Type:{\bf required}.\\
Specified as: an integer, real or complex variable, which may be a
scalar, or a rank 1 or 2 array. \\
Type, rank and size must agree on all processes.
\end{description}
\subroutine{psb\_amx}{Global maximum absolute value}
\syntax{call psb\_amx}{ictxt, dat, root}
This subroutine implements a maximum absolute value reduction
operation based on the underlying communication library.
\begin{description}
\item[\bf On Entry ]
\item[icontxt] the communication context identifying the virtual
parallel machine.\\
Scope:{\bf global}.\\
Type:{\bf required}.\\
Specified as: an integer variable.
\item[dat] The local contribution to the global maximum.\\
Scope:{\bf local}.\\
Type:{\bf required}.\\
Specified as: an integer, real or complex variable, which may be a
scalar, or a rank 1 or 2 array. \
Type, rank and size must agree on all processes.
\item[root] Process to hold the final sum, or $-1$ to make it available
on all processes.\\
Scope:{\bf global}.\\
Type:{\bf optional}.\\
Specified as: an integer value $-1<= root <= np-1$, default -1. \\
\end{description}
\begin{description}
\item[\bf On Return]
\item[dat] On destination process(es), the result of the maximum operation.\\
Scope:{\bf global}.\\
Type:{\bf required}.\\
Specified as: an integer, real or complex variable, which may be a
scalar, or a rank 1 or 2 array. \
Type, rank and size must agree on all processes.
\end{description}
\subroutine{psb\_amn}{Global minimum absolute value}
\syntax{call psb\_amn}{ictxt, dat, root}
This subroutine implements a minimum absolute value reduction
operation based on the underlying communication library.
\begin{description}
\item[\bf On Entry ]
\item[icontxt] the communication context identifying the virtual
parallel machine.\\
Scope:{\bf global}.\\
Type:{\bf required}.\\
Specified as: an integer variable.
\item[dat] The local contribution to the global minimum.\\
Scope:{\bf local}.\\
Type:{\bf required}.\\
Specified as: an integer, real or complex variable, which may be a
scalar, or a rank 1 or 2 array. \
Type, rank and size must agree on all processes.
\item[root] Process to hold the final sum, or $-1$ to make it available
on all processes.\\
Scope:{\bf global}.\\
Type:{\bf optional}.\\
Specified as: an integer value $-1<= root <= np-1$, default -1. \\
\end{description}
\begin{description}
\item[\bf On Return]
\item[dat] On destination process(es), the result of the minimum operation.\\
Scope:{\bf global}.\\
Type:{\bf required}.\\
Specified as: an integer, real or complex variable, which may be a
scalar, or a rank 1 or 2 array. \\
Type, rank and size must agree on all processes.
\end{description}
\subroutine{psb\_snd}{Send data}
\syntax{call psb\_snd}{ictxt, dat, dst, m}
This subroutine sends a packet of data to a destination.
\begin{description}
\item[\bf On Entry ]
\item[icontxt] the communication context identifying the virtual
parallel machine.\\
Scope:{\bf global}.\\
Type:{\bf required}.\\
Specified as: an integer variable.
\item[dat] The data to be sent.\\
Scope:{\bf local}.\\
Type:{\bf required}.\\
Specified as: an integer, real or complex variable, which may be a
scalar, or a rank 1 or 2 array. \
Type and rank must agree on sender and receiver process; if $m$ is
not specified, size must agree as well.
\item[dst] Destination process.\\
Scope:{\bf global}.\\
Type:{\bf required}.\\
Specified as: an integer value $0<= dst <= np-1$. \\
\item[m] Number of rows.\\
Scope:{\bf global}.\\
Type:{\bf Optional}.\\
Specified as: an integer value $0<= m <= size(dat,1)$. \\
When $dat$ is a rank 2 array, specifies the number of rows to be sent
independently of the leading dimension $size(dat,1)$; must have the
same value on sending and receiving processes.
\end{description}
\begin{description}
\item[\bf On Return]
\end{description}
\subroutine{psb\_rcv}{Receive data}
\syntax{call psb\_rcv}{ictxt, dat, src, m}
This subroutine receives a packet of data to a destination.
\begin{description}
\item[\bf On Entry ]
\item[icontxt] the communication context identifying the virtual
parallel machine.\\
Scope:{\bf global}.\\
Type:{\bf required}.\\
Specified as: an integer variable.
\item[src] Source process.\\
Scope:{\bf global}.\\
Type:{\bf required}.\\
Specified as: an integer value $0<= src <= np-1$. \\
\item[m] Number of rows.\\
Scope:{\bf global}.\\
Type:{\bf Optional}.\\
Specified as: an integer value $0<= m <= size(dat,1)$. \\
When $dat$ is a rank 2 array, specifies the number of rows to be sent
independently of the leading dimension $size(dat,1)$; must have the
same value on sending and receiving processes.
\end{description}
\begin{description}
\item[\bf On Return]
\item[dat] The data to be received.\\
Scope:{\bf local}.\\
Type:{\bf required}.\\
Specified as: an integer, real or complex variable, which may be a
scalar, or a rank 1 or 2 array. \
Type and rank must agree on sender and receiver process; if $m$ is
not specified, size must agree as well.
\end{description}

@ -90,6 +90,7 @@
\include{toolsrout}
\include{methods}
\include{precs}
\include{penv}
\include{error}

File diff suppressed because one or more lines are too long

@ -99,8 +99,8 @@
/* */
/* int AVLTreeInsert(AVLTreePtr Tree, void *key, */
/* int (*comp)(void*,void*), */
/* void (*update)(void*,void*)) */
/* */
/* void (*update)(void*,void*,void*), */
/* void *data) */
/* Purpose: Insert an item into an existing (possibly */
/* empty) tree. */
/* */
@ -308,9 +308,14 @@ AVLNodePtr AVLTreeSearch(AVLTreePtr Tree, void *key,
int icmp;
if (Tree==NULL) return(NULL);
current = Tree->root;
#ifdef PROFILE
Tree->nsteps=0;
#endif
while (current != NULL) {
icmp = (*comp)(key,current->key);
#ifdef PROFILE
Tree->nsteps +=1;
#endif
if (icmp<0) {
current = current->llink;
} else if (icmp==0){
@ -517,7 +522,7 @@ AVLNodePtr GetAVLNode(AVLTreePtr Tree)
}
int AVLTreeInsert(AVLTreePtr Tree, void *key,int (*comp)(void *, void *),
void (*update)(void *, void *))
void (*update)(void *, void *, void *), void *data)
{
AVLNodePtr root, t, s, p, q, r;
int search, bal, icmp;
@ -558,7 +563,7 @@ int AVLTreeInsert(AVLTreePtr Tree, void *key,int (*comp)(void *, void *),
}
}
} else if (icmp == 0) {
(*update)(key,p->key);
(*update)(key,p->key,data);
return(1);
} else {
if ((q=p->rlink)==NULL) {

@ -46,6 +46,9 @@ typedef struct avltree {
AVLTVectPtr first, current;
AVLNodePtr root;
int nnodes;
#ifdef PROFILE
int nsteps;
#endif
} AVLTree;
@ -55,7 +58,7 @@ int AVLTreeInit(AVLTreePtr);
int AVLTreeReInit(AVLTreePtr);
AVLTreePtr GetAVLTree();
int AVLTreeInsert(AVLTreePtr, void *, int (*)(void *, void *),
void (*)(void *, void *));
void (*)(void *, void *, void *), void *);
AVLNodePtr AVLTreeUserInsert(AVLTreePtr, void *, int (*)(void *, void *));
void AVLTreeInorderTraverse(AVLTreePtr, void (*)(void *, void *), void *);
void AVLTreePreorderTraverse(AVLTreePtr, void (*)(void *, void *), void *);

@ -39,6 +39,7 @@ subroutine psi_crea_ovr_elem(desc_overlap,ovr_elem)
! ...local scalars...
integer :: i,pnt_new_elem,ret,j, info
integer :: dim_ovr_elem
integer :: pairtree(2)
! ...external function...
integer :: psi_exist_ovr_elem,dim
@ -49,7 +50,7 @@ subroutine psi_crea_ovr_elem(desc_overlap,ovr_elem)
dim_ovr_elem=size(ovr_elem)
i=1
pnt_new_elem=1
if (usetree) call initpairsearchtree(info)
if (usetree) call initpairsearchtree(pairtree,info)
do while (desc_overlap(i).ne.-1)
! ...loop over all procs of desc_overlap list....
@ -57,7 +58,7 @@ subroutine psi_crea_ovr_elem(desc_overlap,ovr_elem)
do j=1,desc_overlap(i)
! ....loop over all overlap indices referred to act proc.....
if (usetree) then
call searchinskeyval(desc_overlap(i+j),pnt_new_elem,&
call searchinskeyval(pairtree,desc_overlap(i+j),pnt_new_elem,&
& ret,info)
if (ret == pnt_new_elem) ret=-1
else
@ -89,6 +90,6 @@ subroutine psi_crea_ovr_elem(desc_overlap,ovr_elem)
! ...add -1 at the end of output list......
ovr_elem(pnt_new_elem)=-1
if (usetree) call freepairsearchtree()
if (usetree) call freepairsearchtree(pairtree)
end subroutine psi_crea_ovr_elem

@ -82,21 +82,27 @@
#include "avltree.h"
#define POOLSIZE 4096
#define CACHESIZE 16
#ifdef Add_
#define InitPairSearchTree initpairsearchtree_
#define FreePairSearchTree freepairsearchtree_
#define SearchInsKeyVal searchinskeyval_
#define SearchKeyVal searchkeyval_
#define NPairs npairs_
#endif
#ifdef AddDouble_
#define InitPairSearchTree initpairsearchtree_
#define FreePairSearchTree freepairsearchtree_
#define SearchInsKeyVal searchinskeyval_
#define SearchKeyVal searchkeyval_
#define NPairs npairs_
#endif
#ifdef NoChange
#define InitPairSearchTree initpairsearchtree
#define FreePairSearchTree freepairsearchtree
#define SearchInsKeyVal searchinskeyval
#define SearchKeyVal searchkeyval
#define NPairs npairs
#endif
@ -115,9 +121,27 @@ typedef struct pairvect {
} PairVect;
static int retval;
static PairVectPtr PairPoolRoot=NULL,PairPoolCrt=NULL;
static AVLTreePtr tree=NULL;
typedef struct pairtree *PairTreePtr;
typedef struct pairtree {
int retval;
int cache[2][CACHESIZE], cpnt;
PairVectPtr PairPoolRoot,PairPoolCrt;
AVLTreePtr tree;
} PairTree;
#ifdef LargeFptr
typedef long long fptr; /* 32-bit by default */
#else
typedef int fptr; /* 32-bit by default */
#endif
/* fptr *f_factors, */
/* *f_factors = (fptr) LUfactors; */
/* static int retval; */
/* static PairVectPtr PairPoolRoot=NULL,PairPoolCrt=NULL; */
/* static AVLTreePtr tree=NULL; */
int CompareKeys(void *key1, void *key2)
{
@ -130,45 +154,69 @@ int CompareKeys(void *key1, void *key2)
}
}
void InitPairSearchTree(int *iret)
void InitPairSearchTree(fptr *ftree, int *iret)
{
int i;
*iret = 0;
PairTreePtr PTree;
if ((tree = GetAVLTree())==NULL) {
if ((PTree = malloc(sizeof(PairTree)))==NULL) {
*iret=-1; return;
}
PTree->retval=0;
for (i=0; i<CACHESIZE; i++) {
PTree->cache[0][i]=PTree->cache[1][i] = -1;
}
PTree->cpnt=0;
if ((PTree->tree = GetAVLTree())==NULL) {
*iret=-1; return;
}
if ((PairPoolRoot=(PairVectPtr)malloc(sizeof(PairVect)))==NULL) {
*iret=-3;
if ((PTree->PairPoolRoot=(PairVectPtr)malloc(sizeof(PairVect)))==NULL) {
*iret=-3; return;
} else {
PairPoolRoot->avail=0;
PairPoolRoot->previous=PairPoolRoot->next=NULL;
PairPoolCrt=PairPoolRoot;
PTree->PairPoolRoot->avail=0;
PTree->PairPoolRoot->previous=PTree->PairPoolRoot->next=NULL;
PTree->PairPoolCrt=PTree->PairPoolRoot;
}
*ftree = (fptr) PTree;
return;
}
void KeyUpdate( void *key1, void *key2)
int NPairs(fptr *ftree)
{
retval=((KeyPairPtr) key2)->val;
PairTreePtr PTree;
PTree = (PairTreePtr) *ftree;
return(HowManyItems(PTree->tree));
}
void KeyUpdate( void *key1, void *key2, void *data)
{
*((int *) data)=((KeyPairPtr) key2)->val;
}
void FreePairSearchTree()
void FreePairSearchTree(fptr *ftree)
{
PairTreePtr PTree;
PairVectPtr current,next;
AVLTreeFree(tree,NULL);
PTree = (PairTreePtr) *ftree;
AVLTreeFree(PTree->tree,NULL);
current=PairPoolRoot;
current=PTree->PairPoolRoot;
while (current != NULL) {
next=current->next;
free(current);
current=next;
}
free(tree);
tree = NULL;
free(PTree->tree);
free(PTree);
*ftree = (fptr) NULL;
return;
}
@ -228,21 +276,81 @@ KeyPairPtr GetKeyPair(PairVectPtr *current)
/* -3 Memory allocation failure */
/* */
void SearchInsKeyVal(int *key, int *val, int *res, int *iret)
void SearchInsKeyVal(fptr *ftree, int *key, int *val, int *res, int *iret)
{
KeyPairPtr node; int info;
PairTreePtr PTree;
KeyPairPtr node;
int info,i;
node = GetKeyPair(&PairPoolCrt);
PTree = (PairTreePtr) *ftree;
node = GetKeyPair(&(PTree->PairPoolCrt));
node->key = *key;
node->val = *val;
if ((i=PTree->cpnt)<CACHESIZE) {
PTree->cache[0][i] = *key;
PTree->cache[1][i] = *val;
PTree->cpnt=i+1;
}
info = AVLTreeInsert(tree,node,CompareKeys,KeyUpdate);
info = AVLTreeInsert(PTree->tree,node,CompareKeys,KeyUpdate,&(PTree->retval));
*iret = info;
if (info==0) {
*res = node->val;
AdvanceKeyPair(PairPoolCrt);
AdvanceKeyPair(PTree->PairPoolCrt);
} else if (info == 1) {
*res = retval;
*res = PTree->retval;
}
return;
}
void SearchKeyVal(fptr *ftree, int *key, int *res, int *iret)
{
PairTreePtr PTree;
KeyPair node;
AVLNodePtr noderes;
KeyPairPtr result;
int i,sv[2];
int info;
*iret = 0;
PTree = (PairTreePtr) *ftree;
#if 0
for (i=0; i<CACHESIZE; i++) {
if (PTree->cache[0][i] == *key) {
*res=PTree->cache[1][i];
sv[0]=PTree->cache[0][i];
sv[1]=PTree->cache[1][i];
PTree->cache[0][i]=PTree->cache[0][0];
PTree->cache[1][i]=PTree->cache[1][0];
PTree->cache[0][0]=sv[0];
PTree->cache[1][0]=sv[1];
return;
}
}
#endif
node.key=*key;
if ((noderes = AVLTreeSearch(PTree->tree,&node,CompareKeys))==NULL) {
*res = -1;
*iret = -1;
} else {
result = (KeyPairPtr) noderes->key;
*res = result->val;
#if 0
for (i=CACHESIZE-1; i>0; i--) {
PTree->cache[0][i]=PTree->cache[0][i-1];
PTree->cache[0][i]=PTree->cache[1][i-1];
}
PTree->cache[0][0]=*key;
PTree->cache[1][0]=result->val;
#endif
}
#ifdef PROFILE
*iret = 1;
*iret = PTree->tree->nsteps;
#endif
return;
}

@ -2,13 +2,16 @@ include ../../Make.inc
MODULES = psb_realloc_mod.o psb_string_mod.o psb_spmat_type.o \
psb_desc_type.o psb_spsb_mod.o \
psb_penv_mod.o psb_serial_mod.o psb_tools_mod.o \
psb_serial_mod.o psb_tools_mod.o \
psb_prec_type.o psb_error_mod.o psb_prec_mod.o \
psb_methd_mod.o psb_const_mod.o \
psb_comm_mod.o psb_psblas_mod.o psi_mod.o \
psb_sparse_mod.o psb_check_mod.o psb_all_mod.o $(EXTRA_BLACS_ENV_OBJS)
psb_check_mod.o $(EXTRA_BLACS_ENV_OBJS)
OBJS = error.o
MPFOBJS = psb_penv_mod.o
OBJS = error.o psb_sparse_mod.o psb_all_mod.o
INCDIRS = -I ../../lib
LIBDIR = ../../lib
@ -19,12 +22,15 @@ psb_error_mod.o: psb_const_mod.o
psb_const_mod.f90: psb_const.fh
psb_penv_mod.o : psb_const_mod.o psb_error_mod.o
lib: $(MODULES) $(OBJS)
$(AR) $(LIBDIR)/$(LIBNAME) $(MODULES) $(OBJS)
lib: $(MODULES) mpfobjs $(OBJS)
$(AR) $(LIBDIR)/$(LIBNAME) $(MODULES) $(OBJS) $(MPFOBJS)
$(RANLIB) $(LIBDIR)/$(LIBNAME)
cp *$(.mod) ./psb_const.fh ./parts.fh ../../lib
mpfobjs:
(make $(MPFOBJS) F90="$(MPF90)" FC="$(MPF90)" FCOPT="$(F90COPT)")
clean:
/bin/rm -f $(MODULES) $(OBJS) *$(.mod)

@ -58,20 +58,23 @@ module psb_penv_mod
interface psb_bcast
module procedure psb_ibcasts, psb_ibcastv, psb_ibcastm,&
& psb_dbcasts, psb_dbcastv, psb_dbcastm,&
& psb_zbcasts, psb_zbcastv, psb_zbcastm
& psb_zbcasts, psb_zbcastv, psb_zbcastm,&
& psb_hbcasts, psb_lbcasts
end interface
interface psb_snd
module procedure psb_isnds, psb_isndv, psb_isndm,&
& psb_dsnds, psb_dsndv, psb_dsndm,&
& psb_zsnds, psb_zsndv, psb_zsndm
& psb_zsnds, psb_zsndv, psb_zsndm,&
& psb_hsnds, psb_lsnds
end interface
interface psb_rcv
module procedure psb_ircvs, psb_ircvv, psb_ircvm,&
& psb_drcvs, psb_drcvv, psb_drcvm,&
& psb_zrcvs, psb_zrcvv, psb_zrcvm
& psb_zrcvs, psb_zrcvv, psb_zrcvm,&
& psb_hrcvs, psb_lrcvs
end interface
@ -184,7 +187,6 @@ contains
if ((myprow >=0).and.(mypcol>=0)) then
call blacs_gridexit(ictxt)
end if
call blacs_exit(0)
end subroutine psb_exit
@ -428,6 +430,53 @@ contains
end subroutine psb_zbcastm
subroutine psb_hbcasts(ictxt,dat,root,length)
use mpi
integer, intent(in) :: ictxt
character(len=*), intent(inout) :: dat
integer, intent(in), optional :: root,length
integer :: iam, np, root_,icomm,length_,info
if (present(root)) then
root_ = root
else
root_ = 0
endif
if (present(length)) then
length_ = length
else
length_ = len(dat)
endif
call psb_info(ictxt,iam,np)
call psb_get_mpicomm(ictxt,icomm)
call mpi_bcast(dat,length_,MPI_CHARACTER,root_,icomm,info)
end subroutine psb_hbcasts
subroutine psb_lbcasts(ictxt,dat,root)
use mpi
integer, intent(in) :: ictxt
logical, intent(inout) :: dat
integer, intent(in), optional :: root
integer :: iam, np, root_,icomm,info
if (present(root)) then
root_ = root
else
root_ = 0
endif
call psb_info(ictxt,iam,np)
call psb_get_mpicomm(ictxt,icomm)
call mpi_bcast(dat,1,MPI_LOGICAL,root_,icomm,info)
end subroutine psb_lbcasts
subroutine psb_iamxs(ictxt,dat,root,ia)
integer, intent(in) :: ictxt
integer, intent(inout) :: dat
@ -978,6 +1027,77 @@ contains
subroutine psb_hsnds(ictxt,dat,dst,length)
integer, intent(in) :: ictxt
character(len=*), intent(in) :: dat
integer, intent(in) :: dst
integer, intent(in), optional :: length
integer, allocatable :: buffer(:)
integer :: length_, i
if (present(length)) then
length_ = length
else
length_ = len(dat)
endif
allocate(buffer(length_))
do i=1,length_
buffer(i) = iachar(dat(i:i))
end do
call gesd2d(ictxt,buffer,dst,0)
end subroutine psb_hsnds
subroutine psb_hrcvs(ictxt,dat,src,length)
integer, intent(in) :: ictxt
character(len=*), intent(out) :: dat
integer, intent(in) :: src
integer, intent(in), optional :: length
integer, allocatable :: buffer(:)
integer :: length_, i
if (present(length)) then
length_ = length
else
length_ = len(dat)
endif
allocate(buffer(length_))
call gerv2d(ictxt,buffer,src,0)
do i=1,length_
dat(i:i) = achar(buffer(i))
end do
end subroutine psb_hrcvs
subroutine psb_lsnds(ictxt,dat,dst,length)
integer, intent(in) :: ictxt
logical, intent(in) :: dat
integer, intent(in) :: dst
integer :: i
if (dat) then
i = 1
else
i = 0
endif
call gesd2d(ictxt,i,dst,0)
end subroutine psb_lsnds
subroutine psb_lrcvs(ictxt,dat,src,length)
integer, intent(in) :: ictxt
logical, intent(out) :: dat
integer, intent(in) :: src
integer :: i
call gerv2d(ictxt,i,src,0)
dat = (i == 1)
end subroutine psb_lrcvs
subroutine psb_isnds(ictxt,dat,dst)
integer, intent(in) :: ictxt
@ -1159,14 +1279,6 @@ contains
!
!
!
!
!
!
!
subroutine igebs2ds(ictxt,scope,dat,top)
integer, intent(in) :: ictxt,dat

@ -81,7 +81,7 @@ Subroutine psb_dcdovrbld(n_ovr,desc_p,desc_a,a,&
Integer,Pointer :: halo(:),length_dl(:),works(:),workr(:),t_halo_in(:),&
& t_halo_out(:),work(:),dep_list(:),temp(:)
Integer,Pointer :: brvindx(:),rvsz(:), bsdindx(:),sdsz(:)
integer :: pairtree(2)
Logical,Parameter :: debug=.false.
real(kind(1.d0)) :: t1,t2,t3,t4,t5,t6,t7, tl, tch
@ -152,7 +152,7 @@ Subroutine psb_dcdovrbld(n_ovr,desc_p,desc_a,a,&
counter_o = 1
! See comment in main loop below.
call InitPairSearchTree(info)
call InitPairSearchTree(pairtree,info)
if (info /= 0) then
info=4010
ch_err='InitPairSearhTree'
@ -292,7 +292,7 @@ Subroutine psb_dcdovrbld(n_ovr,desc_p,desc_a,a,&
counter_h=counter_h+3
call SearchInsKeyVal(gidx,counter_e,glx,info)
call SearchInsKeyVal(pairtree,gidx,counter_e,glx,info)
!!$ if (debug) write(0,*) 'From searchInsKey ',gidx,glx,counter_e,info
if (info>=0) then
If (glx < counter_e) Then
@ -350,7 +350,7 @@ Subroutine psb_dcdovrbld(n_ovr,desc_p,desc_a,a,&
tmp_ovr_idx(counter_o+3)=-1
counter_o=counter_o+3
call SearchInsKeyVal(gidx,counter_e,glx,info)
call SearchInsKeyVal(pairtree,gidx,counter_e,glx,info)
!!$ if (debug) write(0,*) 'From searchInsKey ',gidx,glx,counter_e,info
if (info>=0) then
If (glx < counter_e) Then
@ -611,7 +611,7 @@ Subroutine psb_dcdovrbld(n_ovr,desc_p,desc_a,a,&
tch = tch +(t3-t2)
End Do
t1 = mpi_wtime()
call FreePairSearchTree()
call FreePairSearchTree(pairtree)
desc_p%matrix_data(psb_m_)=desc_a%matrix_data(psb_m_)
desc_p%matrix_data(psb_n_)=desc_a%matrix_data(psb_n_)

@ -75,13 +75,13 @@ Subroutine psb_zcdovrbld(n_ovr,desc_p,desc_a,a,&
Integer :: counter,counter_h, counter_o, counter_e,j,idx,gidx,proc,n_elem_recv,&
& n_elem_send,tot_recv,tot_elem,n_col,m,ictxt,np,me,dl_lda,lwork,&
& counter_t,n_elem,i_ovr,jj,n,i,proc_id,isz, mglob, glx,n_row, &
& counter_t,n_elem,i_ovr,jj,i,proc_id,isz, mglob, glx,n_row, &
& idxr, idxs, lx, iszr, err_act, icomm
Integer,Pointer :: halo(:),length_dl(:),works(:),workr(:),t_halo_in(:),&
& t_halo_out(:),work(:),dep_list(:),temp(:)
Integer,Pointer :: brvindx(:),rvsz(:), bsdindx(:),sdsz(:)
integer :: pairtree(2)
Logical,Parameter :: debug=.false.
real(kind(1.d0)) :: t1,t2,t3,t4,t5,t6,t7, tl, tch
@ -152,7 +152,7 @@ Subroutine psb_zcdovrbld(n_ovr,desc_p,desc_a,a,&
counter_o = 1
! See comment in main loop below.
call InitPairSearchTree(info)
call InitPairSearchTree(pairtree,info)
if (info /= 0) then
info=4010
ch_err='InitPairSearhTree'
@ -292,7 +292,7 @@ Subroutine psb_zcdovrbld(n_ovr,desc_p,desc_a,a,&
counter_h=counter_h+3
call SearchInsKeyVal(gidx,counter_e,glx,info)
call SearchInsKeyVal(pairtree,gidx,counter_e,glx,info)
!!$ if (debug) write(0,*) 'From searchInsKey ',gidx,glx,counter_e,info
if (info>=0) then
If (glx < counter_e) Then
@ -350,7 +350,7 @@ Subroutine psb_zcdovrbld(n_ovr,desc_p,desc_a,a,&
tmp_ovr_idx(counter_o+3)=-1
counter_o=counter_o+3
call SearchInsKeyVal(gidx,counter_e,glx,info)
call SearchInsKeyVal(pairtree,gidx,counter_e,glx,info)
!!$ if (debug) write(0,*) 'From searchInsKey ',gidx,glx,counter_e,info
if (info>=0) then
If (glx < counter_e) Then
@ -594,7 +594,7 @@ Subroutine psb_zcdovrbld(n_ovr,desc_p,desc_a,a,&
call psi_crea_index(desc_p,t_halo_in,t_halo_out,.false.,info)
if (debug) then
write(0,*) me,'Done Crea_index'
write(0,*) me,'Done Crea_Index'
call psb_barrier(ictxt)
end if
if (debug) write(0,*) me,'Checktmp_o_i 2',tmp_ovr_idx(1:10)
@ -611,13 +611,15 @@ Subroutine psb_zcdovrbld(n_ovr,desc_p,desc_a,a,&
tch = tch +(t3-t2)
End Do
t1 = mpi_wtime()
call FreePairSearchTree()
call FreePairSearchTree(pairtree)
desc_p%matrix_data(psb_m_)=desc_a%matrix_data(psb_m_)
desc_p%matrix_data(psb_n_)=desc_a%matrix_data(psb_n_)
tmp_halo(counter_h)=-1
tmp_ovr_idx(counter_o)=-1
!
! At this point we have gathered all the indices in the halo at
! N levels of overlap. Just call convert_comm. This is
@ -676,6 +678,7 @@ Subroutine psb_zcdovrbld(n_ovr,desc_p,desc_a,a,&
end if
if (debug) write(0,*) me,'Done ConvertComm'
Deallocate(works,workr,t_halo_in,t_halo_out,work,&
& length_dl,dep_list,tmp_ovr_idx,tmp_halo,&
& brvindx,rvsz,sdsz,bsdindx,temp,stat=info)

Loading…
Cancel
Save