rsl_bcast.c

References to this file elsewhere.
1 /* #define LEARN_BCAST */
2 /***********************************************************************
3      
4                               COPYRIGHT
5      
6      The following is a notice of limited availability of the code and 
7      Government license and disclaimer which must be included in the 
8      prologue of the code and in all source listings of the code.
9      
10      Copyright notice
11        © 1977  University of Chicago
12      
13      Permission is hereby granted to use, reproduce, prepare 
14      derivative works, and to redistribute to others at no charge.  If 
15      you distribute a copy or copies of the Software, or you modify a 
16      copy or copies of the Software or any portion of it, thus forming 
17      a work based on the Software and make and/or distribute copies of 
18      such work, you must meet the following conditions:
19      
20           a) If you make a copy of the Software (modified or verbatim) 
21              it must include the copyright notice and Government       
22              license and disclaimer.
23      
24           b) You must cause the modified Software to carry prominent   
25              notices stating that you changed specified portions of    
26              the Software.
27      
28      This software was authored by:
29      
30      Argonne National Laboratory
31      J. Michalakes: (630) 252-6646; email: michalak@mcs.anl.gov
32      Mathematics and Computer Science Division
33      Argonne National Laboratory, Argonne, IL  60439
34      
35      ARGONNE NATIONAL LABORATORY (ANL), WITH FACILITIES IN THE STATES 
36      OF ILLINOIS AND IDAHO, IS OWNED BY THE UNITED STATES GOVERNMENT, 
37      AND OPERATED BY THE UNIVERSITY OF CHICAGO UNDER PROVISION OF A 
38      CONTRACT WITH THE DEPARTMENT OF ENERGY.
39      
40                       GOVERNMENT LICENSE AND DISCLAIMER
41      
42      This computer code material was prepared, in part, as an account 
43      of work sponsored by an agency of the United States Government.
44      The Government is granted for itself and others acting on its 
45      behalf a paid-up, nonexclusive, irrevocable worldwide license in 
46      this data to reproduce, prepare derivative works, distribute 
47      copies to the public, perform publicly and display publicly, and 
48      to permit others to do so.  NEITHER THE UNITED STATES GOVERNMENT 
49      NOR ANY AGENCY THEREOF, NOR THE UNIVERSITY OF CHICAGO, NOR ANY OF 
50      THEIR EMPLOYEES, MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR 
51      ASSUMES ANY LEGAL LIABILITY OR RESPONSIBILITY FOR THE ACCURACY, 
52      COMPLETENESS, OR USEFULNESS OF ANY INFORMATION, APPARATUS, 
53      PRODUCT, OR PROCESS DISCLOSED, OR REPRESENTS THAT ITS USE WOULD 
54      NOT INFRINGE PRIVATELY OWNED RIGHTS.
55 
56 ***************************************************************************/
57 
58 #define MOD_9707
59 
60 #ifndef MS_SUA
61 # include <stdio.h>
62 #endif
63 #include <stdlib.h>
64 #ifndef STUBMPI
65 #  include "mpi.h"
66 #endif
67 #include "rsl_lite.h"
68 
69 typedef struct bcast_point_desc {
70   int ig ;
71   int jg ;
72 } bcast_point_desc_t ;
73 
74 
75 static destroy_par_info ( p )
76   char * p ;
77 {
78   if ( p != NULL ) RSL_FREE( p ) ;
79 }
80 
81 static rsl_list_t *Xlist, *Xp, *Xprev ;
82 static rsl_list_t *stage ;
83 static int stage_len = 0 ;              /* 96/3/15 */
84 
85 static int  Sendbufsize ;
86 static int  Sendbufcurs ;
87 static char *Sendbuf ;
88 static int  Sdisplacements[RSL_MAXPROC] ;
89 static int  Ssizes[RSL_MAXPROC] ;
90 
91 static int  Recsizeindex ;
92 
93 static int  Rbufsize ;
94 static int  Rbufcurs ;
95 static int  Rpointcurs ;
96 static char *Recvbuf ;
97 static int  Rdisplacements[RSL_MAXPROC+1] ;
98 static int  Rsizes[RSL_MAXPROC] ;
99 static int  Rreclen ;
100 
101 static int s_d ;
102 static int s_nst ;
103 static int s_msize ;
104 static int s_idim ;
105 static int s_jdim ;
106 static int s_idim_nst ;
107 static int s_jdim_nst ;
108 static int s_irax_n ;
109 static int s_irax_m ;
110 static int s_ntasks_x ;
111 static int s_ntasks_y ;
112 static rsl_list_t **Plist ;
113 static int Psize[RSL_MAXPROC] ;
114 static char *s_parent_msgs ;
115 static int s_parent_msgs_curs ;
116 static int s_remaining ;  /* number of bytes left in a parent message before
117                            the next point descriptor */
118 
119 /* add a field to a message outgoing for the specified child domain cell */
120 /* relies on rsl_ready_bcast having been called already */
121 /* sends are specified in terms of coarse domain */
122 
123 static int s_i, s_j, s_ig, s_jg, s_cm, s_cn,
124            s_nig, s_njg ;
125 
126 static int Pcurs ;
127 static rsl_list_t *Pptr ; 
128 
129 #ifdef LEARN_BCAST
130 static int s_putmsg = 0 ;
131 #endif
132 
133 /* parent->nest */
134 RSL_LITE_TO_CHILD_INFO ( Fcomm, msize_p,
135                          cips_p, cipe_p, cjps_p, cjpe_p, /* patch dims of SOURCE DOMAIN */
136                          iids_p, iide_p, ijds_p, ijde_p, /* domain dims of INTERMEDIATE DOMAIN */
137                          nids_p, nide_p, njds_p, njde_p, /* domain dims of CHILD DOMAIN */
138                          pgr_p,  shw_p ,                 /* nest ratio and stencil half width */
139                          ntasks_x_p , ntasks_y_p ,       /* proc counts in x and y */
140                          icoord_p, jcoord_p,
141                          idim_cd_p, jdim_cd_p,
142                          ig_p, jg_p,
143                          retval_p )
144   
145   int_p
146      Fcomm                            /* Fortran version of MPI communicator */
147     ,cips_p, cipe_p, cjps_p, cjpe_p   /* (i) c.d. patch dims */
148     ,iids_p, iide_p, ijds_p, ijde_p   /* (i) n.n. global dims */
149     ,nids_p, nide_p, njds_p, njde_p   /* (i) n.n. global dims */
150     ,pgr_p                            /* nesting ratio */
151     ,ntasks_x_p , ntasks_y_p          /* proc counts in x and y */
152     ,icoord_p       /* i coordinate of nest in cd */
153     ,jcoord_p       /* j coordinate of nest in cd */
154     ,shw_p          /* stencil half width */
155     ,idim_cd_p      /* i width of nest in cd */
156     ,jdim_cd_p      /* j width of nest in cd */
157     ,msize_p        /* (I) Message size in bytes. */
158     ,ig_p           /* (O) Global N index of parent domain point. */
159     ,jg_p           /* (O) Global N index of parent domain point. */
160     ,retval_p ;     /* (O) =1 if a valid point returned; =0 (zero) otherwise. */
161 {
162   int P, Px, Py ;
163 
164   rsl_list_t *q ;
165   int *r ;
166   int i, j, ni, nj ;
167   int coords[2] ;
168 #ifndef STUBMPI
169   MPI_Comm *comm, dummy_comm ;
170 
171   comm = &dummy_comm ;
172   *comm = MPI_Comm_f2c( *Fcomm ) ;
173 #endif
174 
175   if ( Plist == NULL ) {
176     s_ntasks_x = *ntasks_x_p ;
177     s_ntasks_y = *ntasks_y_p ;
178     /* construct Plist */
179     Sendbufsize = 0 ;
180     Plist = RSL_MALLOC( rsl_list_t * , s_ntasks_x * s_ntasks_y ) ;  /* big enough for nest points */
181     for ( j = 0 ; j < s_ntasks_x * s_ntasks_y ; j++ ) {
182        Plist[j] = NULL ;
183        Sdisplacements[j] = 0 ;
184        Ssizes[j] = 0 ;
185     }
186     for ( j = *cjps_p ; j <= *cjpe_p ; j++ )
187     {
188       for ( i = *cips_p ; i <= *cipe_p ; i++ )
189       {
190 	if ( ( *jcoord_p <= j && j <= *jcoord_p+*jdim_cd_p-1 ) && ( *icoord_p <= i && i <= *icoord_p+*idim_cd_p-1 ) ) {
191            ni = ( i - (*icoord_p + *shw_p) ) * *pgr_p + 1 + 1 ; /* add 1 to give center point */
192            nj = ( j - (*jcoord_p + *shw_p) ) * *pgr_p + 1 + 1 ;
193 
194 #ifndef STUBMPI
195 	   TASK_FOR_POINT ( &ni, &nj, nids_p, nide_p, njds_p, njde_p, &s_ntasks_x, &s_ntasks_y, &Px, &Py ) ;
196            coords[1] = Px ; coords[0] = Py ;
197            MPI_Cart_rank( *comm, coords, &P ) ;
198 #else
199            P = 0 ;
200 #endif
201 	   q = RSL_MALLOC( rsl_list_t , 1 ) ;
202 	   q->info1 = i ;
203 	   q->info2 = j ;
204 	   q->next = Plist[P] ;
205 	   Plist[P] = q ;
206 	   Sendbufsize += *msize_p + 3 * sizeof( int ) ;  /* point data plus 3 ints for i, j, and size */
207         }
208       }
209     }
210     Sendbuf = RSL_MALLOC( char , Sendbufsize ) ;
211     Sendbufcurs = 0 ;
212     Recsizeindex = -1 ;
213     Pcurs = -1 ;
214     Pptr = NULL ;
215   }
216 
217   if ( Pptr != NULL ) {
218     Pptr = Pptr->next ;
219   } 
220 
221   if ( Recsizeindex >= 0 ) {
222           r = (int *) &(Sendbuf[Recsizeindex]) ;
223           *r = Sendbufcurs - Recsizeindex + 2 * sizeof(int) ;
224           Ssizes[Pcurs] += *r ;
225   }
226 
227   while ( Pptr == NULL ) {
228       Pcurs++ ;
229       while ( Pcurs < s_ntasks_x * s_ntasks_y && Plist[Pcurs] == NULL  ) Pcurs++ ;
230       if ( Pcurs < s_ntasks_x * s_ntasks_y ) {
231         Sdisplacements[Pcurs] = Sendbufcurs ;
232         Ssizes[Pcurs] = 0 ;
233         Pptr = Plist[Pcurs] ;
234       } else {
235         *retval_p = 0 ;
236         return ;  /* done */
237       }
238   }
239 
240   *ig_p = Pptr->info1 ;
241   *jg_p = Pptr->info2 ;
242 
243   r = (int *) &(Sendbuf[Sendbufcurs]) ;
244   *r++ = Pptr->info1 ; Sendbufcurs += sizeof(int) ;  /* ig to buffer */
245   *r++ = Pptr->info2 ; Sendbufcurs += sizeof(int) ;  /* jg to buffer */
246   Recsizeindex = Sendbufcurs ;
247   *r++ =           0 ; Sendbufcurs += sizeof(int) ;  /* store start for size */
248   *retval_p = 1 ;
249 
250   return ;
251 }
252 
253 /********************************************/
254 
255 /* nest->parent */
256 RSL_LITE_TO_PARENT_INFO ( Fcomm, msize_p,
257                           nips_p, nipe_p, njps_p, njpe_p, /* patch dims of SOURCE DOMAIN (CHILD) */
258                           cids_p, cide_p, cjds_p, cjde_p, /* domain dims of TARGET DOMAIN (PARENT) */
259                           ntasks_x_p , ntasks_y_p ,       /* proc counts in x and y */
260                           icoord_p, jcoord_p,
261                           idim_cd_p, jdim_cd_p,
262                           ig_p, jg_p,
263                           retval_p )
264   int_p
265      Fcomm                            /* Fortran version of MPI communicator */
266     ,nips_p, nipe_p, njps_p, njpe_p   /* (i) n.d. patch dims */
267     ,cids_p, cide_p, cjds_p, cjde_p   /* (i) n.n. global dims */
268     ,ntasks_x_p , ntasks_y_p          /* proc counts in x and y */
269     ,icoord_p       /* i coordinate of nest in cd */
270     ,jcoord_p       /* j coordinate of nest in cd */
271     ,idim_cd_p      /* i width of nest in cd */
272     ,jdim_cd_p      /* j width of nest in cd */
273     ,msize_p        /* (I) Message size in bytes. */
274     ,ig_p           /* (O) Global N index of parent domain point. */
275     ,jg_p           /* (O) Global N index of parent domain point. */
276     ,retval_p ;     /* (O) =1 if a valid point returned; =0 (zero) otherwise. */
277 {
278   int P, Px, Py ;
279   rsl_list_t *q ;
280   int *r ;
281   int i, j ;
282   int coords[2] ;
283 #ifndef STUBMPI
284   MPI_Comm *comm, dummy_comm ;
285 
286   comm = &dummy_comm ;
287   *comm = MPI_Comm_f2c( *Fcomm ) ;
288 #endif
289 
290   if ( Plist == NULL ) {
291     s_ntasks_x = *ntasks_x_p ;
292     s_ntasks_y = *ntasks_y_p ;
293     /* construct Plist */
294     Sendbufsize = 0 ;
295     Plist = RSL_MALLOC( rsl_list_t * , s_ntasks_x * s_ntasks_y ) ;
296     for ( j = 0 ; j < s_ntasks_x * s_ntasks_y ; j++ ) {
297        Plist[j] = NULL ;
298        Sdisplacements[j] = 0 ;
299        Ssizes[j] = 0 ;
300     }
301     for ( j = *njps_p ; j <= *njpe_p ; j++ )
302     {
303       for ( i = *nips_p ; i <= *nipe_p ; i++ )
304       {
305 	if ( ( *jcoord_p <= j && j <= *jcoord_p+*jdim_cd_p-1 ) && ( *icoord_p <= i && i <= *icoord_p+*idim_cd_p-1 ) ) {
306 #ifndef STUBMPI
307 	  TASK_FOR_POINT ( &i, &j, cids_p, cide_p, cjds_p, cjde_p, &s_ntasks_x, &s_ntasks_y, &Px, &Py ) ;
308           coords[1] = Px ; coords[0] = Py ;
309           MPI_Cart_rank( *comm, coords, &P ) ;
310 #else
311           P = 0 ;
312 #endif
313 	  q = RSL_MALLOC( rsl_list_t , 1 ) ;
314 	  q->info1 = i ;
315 	  q->info2 = j ;
316 	  q->next = Plist[P] ;
317 	  Plist[P] = q ;
318 	  Sendbufsize += *msize_p + 3 * sizeof( int ) ;  /* point data plus 3 ints for i, j, and size */
319         }
320       }
321     }
322     Sendbuf = RSL_MALLOC( char , Sendbufsize ) ;
323     Sendbufcurs = 0 ;
324     Recsizeindex = -1 ;
325     Pcurs = -1 ;
326     Pptr = NULL ;
327   }
328   if ( Pptr != NULL ) {
329     Pptr = Pptr->next ;
330   } 
331 
332   if ( Recsizeindex >= 0 ) {
333           r = (int *) &(Sendbuf[Recsizeindex]) ;
334           *r = Sendbufcurs - Recsizeindex + 2 * sizeof(int) ;
335           Ssizes[Pcurs] += *r ;
336   }
337 
338   while ( Pptr == NULL ) {
339       Pcurs++ ;
340       while ( Pcurs < s_ntasks_x * s_ntasks_y && Plist[Pcurs] == NULL ) Pcurs++ ;
341       if ( Pcurs < s_ntasks_x * s_ntasks_y ) {
342         Sdisplacements[Pcurs] = Sendbufcurs ;
343         Ssizes[Pcurs] = 0 ;
344         Pptr = Plist[Pcurs] ;
345       } else {
346         *retval_p = 0 ;
347         return ;  /* done */
348       }
349   }
350 
351   *ig_p = Pptr->info1 ;
352   *jg_p = Pptr->info2 ;
353 
354   r = (int *) &(Sendbuf[Sendbufcurs]) ;
355   *r++ = Pptr->info1 ; Sendbufcurs += sizeof(int) ;  /* ig to buffer */
356   *r++ = Pptr->info2 ; Sendbufcurs += sizeof(int) ;  /* jg to buffer */
357   Recsizeindex = Sendbufcurs ;
358   *r++ =           0 ; Sendbufcurs += sizeof(int) ;  /* store start for size */
359   *retval_p = 1 ;
360 
361   return ;
362 }
363 
364 
365 /********************************************/
366 
367 /*@
368   RSL_TO_CHILD_MSG -- Pack force data into a message for a nest point.
369 
370 @*/
371 
372 /* parent->nest */
373 RSL_LITE_TO_CHILD_MSG ( nbuf_p, buf )
374   int_p
375     nbuf_p ;     /* (I) Number of bytes to be packed. */
376   char *
377     buf ;        /* (I) Buffer containing the data to be packed. */
378 {
379    rsl_lite_to_peerpoint_msg ( nbuf_p, buf ) ;
380 }
381 
382 /* nest->parent */
383 RSL_LITE_TO_PARENT_MSG ( nbuf_p, buf )
384   int_p
385     nbuf_p ;     /* (I) Number of bytes to be packed. */
386   char *
387     buf ;        /* (I) Buffer containing the data to be packed. */
388 {
389    rsl_lite_to_peerpoint_msg ( nbuf_p, buf ) ;
390 }
391 
392 /* common code */
393 rsl_lite_to_peerpoint_msg ( nbuf_p, buf )
394   int_p
395     nbuf_p ;     /* (I) Number of bytes to be packed. */
396   char *
397     buf ;        /* (I) Buffer containing the data to be packed. */
398 {
399   int nbuf ;
400   int *p, *q ;
401   char *c, *d ;
402   int i ;
403   char mess[4096] ;
404 
405   RSL_TEST_ERR(buf==NULL,"2nd argument is NULL.  Field allocated?") ;
406 
407   nbuf = *nbuf_p ;
408 
409   if ( Sendbufcurs + nbuf >= Sendbufsize ) {
410     sprintf(mess,"RSL_LITE_TO_CHILD_MSG: Sendbufcurs + nbuf (%d) would exceed Sendbufsize (%d)\n",
411            Sendbufcurs + nbuf , Sendbufsize ) ;
412     RSL_TEST_ERR(1,mess) ;
413   }
414 
415   if ( nbuf % sizeof(int) == 0 ) {
416     for ( p = (int *)buf, q = (int *) &(Sendbuf[Sendbufcurs]), i = 0 ; i < nbuf ; i += sizeof(int) )
417     {
418       *q++ = *p++ ;
419     }
420   }
421   else
422   {
423     for ( c = buf, d = &(Sendbuf[Sendbufcurs]), i = 0 ; i < nbuf ; i++ )
424     {
425       *d++ = *c++ ;
426     }
427   }
428 
429   Sendbufcurs += nbuf ;
430 
431 }
432 
433 /********************************************/
434 
435 /* parent->nest */
436 RSL_LITE_BCAST_MSGS ( mytask_p, ntasks_p, Fcomm )
437   int_p mytask_p, ntasks_p, Fcomm ;
438 {
439 #ifndef STUBMPI
440   MPI_Comm comm ;
441 
442   comm = MPI_Comm_f2c( *Fcomm ) ;
443 #else
444   int comm ;
445 #endif
446   rsl_lite_allgather_msgs ( mytask_p, ntasks_p, comm ) ;
447 }
448 
449 /* nest->parent */
450 RSL_LITE_MERGE_MSGS ( mytask_p, ntasks_p, Fcomm )
451   int_p mytask_p, ntasks_p, Fcomm ;
452 {
453 #ifndef STUBMPI
454   MPI_Comm comm ;
455 
456   comm = MPI_Comm_f2c( *Fcomm ) ;
457 #else
458   int comm ;
459 #endif
460   rsl_lite_allgather_msgs ( mytask_p, ntasks_p, comm ) ;
461 }
462 
463 /* common code */
464 rsl_lite_allgather_msgs ( mytask_p, ntasks_p, comm )
465   int_p mytask_p, ntasks_p ;
466   MPI_Comm comm ;
467 {
468   int P ;
469   char *work ;
470   int * r ;
471   bcast_point_desc_t pdesc ;
472   int curs ;
473   int msglen, mdest, mtag ;
474   int ntasks, mytask ;
475   int ii, i, j ;
476   int ig, jg ;
477   int *Psize_all ;
478   int *sp, *bp ;
479   int rc ;
480 
481 #ifndef STUBMPI
482   ntasks = *ntasks_p ;
483   mytask = *mytask_p ;
484 #else
485   ntasks = 1 ;
486   mytask = 0 ;
487 #endif
488 
489   RSL_TEST_ERR( Plist == NULL,
490     "RSL_BCAST_MSGS: rsl_to_child_info not called first" ) ;
491 
492   RSL_TEST_ERR( ntasks == RSL_MAXPROC ,
493     "RSL_BCAST_MSGS: raise the compile time value of MAXPROC" ) ;
494   
495   Psize_all = RSL_MALLOC( int, ntasks * ntasks ) ;
496 
497 #ifndef STUBMPI
498   MPI_Allgather( Ssizes, ntasks, MPI_INT , Psize_all, ntasks, MPI_INT, comm ) ;
499 #else
500   Psize_all[0] = Ssizes[0] ;
501 #endif
502 
503   for ( j = 0 ; j < ntasks ; j++ ) 
504     Rsizes[j] = 0 ;
505 
506   for ( j = 0 ; j < ntasks ; j++ ) 
507   {
508     Rsizes[j] += Psize_all[ INDEX_2( j , mytask , ntasks ) ] ;
509   }
510 
511   for ( Rbufsize = 0, P = 0, Rdisplacements[0] = 0 ; P < ntasks ; P++ )
512   {
513     Rdisplacements[P+1] = Rsizes[P] + Rdisplacements[P] ;
514     Rbufsize += Rsizes[P] ;
515   }
516 
517   /* this will be freed later */
518 
519   Recvbuf = RSL_MALLOC( char , Rbufsize + 3 * sizeof(int) ) ; /* for sentinal record */
520   Rbufcurs = 0 ;
521   Rreclen = 0 ;
522 
523 #ifndef STUBMPI
524   rc = MPI_Alltoallv ( Sendbuf, Ssizes, Sdisplacements, MPI_BYTE , 
525                        Recvbuf, Rsizes, Rdisplacements, MPI_BYTE ,  comm ) ;
526 #else
527   work = Sendbuf ;
528   Sendbuf = Recvbuf ;
529   Recvbuf = work ;
530 #endif
531 
532 /* add sentinel to the end of Recvbuf */
533 
534   r = (int *)&(Recvbuf[Rbufsize + 2 * sizeof(int)]) ;
535   *r = RSL_INVALID ;
536 
537   RSL_FREE( Sendbuf ) ;
538   RSL_FREE( Psize_all ) ;
539 
540   for ( j = 0 ; j < *ntasks_p ; j++ )  {
541     destroy_list ( &(Plist[j]), NULL ) ;
542   }
543   RSL_FREE( Plist ) ;
544   Plist = NULL ;
545 
546 }
547 
548 /********************************************/
549 
550 /* parent->nest */
551 RSL_LITE_FROM_PARENT_INFO ( ig_p, jg_p, retval_p )
552   int_p
553     ig_p        /* (O) Global index in M dimension of nest. */
554    ,jg_p        /* (O) Global index in N dimension of nest. */
555    ,retval_p ;  /* (O) Return value; =1 valid point, =0 done. */
556 {
557   rsl_lite_from_peerpoint_info ( ig_p, jg_p, retval_p ) ;
558 }
559 
560 /* nest->parent */
561 RSL_LITE_FROM_CHILD_INFO ( ig_p, jg_p, retval_p )
562   int_p
563     ig_p        /* (O) Global index in M dimension of nest. */
564    ,jg_p        /* (O) Global index in N dimension of nest. */
565    ,retval_p ;  /* (O) Return value; =1 valid point, =0 done. */
566 {
567   rsl_lite_from_peerpoint_info ( ig_p, jg_p, retval_p ) ;
568 }
569 
570 /* common code */
571 rsl_lite_from_peerpoint_info ( ig_p, jg_p, retval_p )
572   int_p
573     ig_p        /* (O) Global index in M dimension of nest. */
574    ,jg_p        /* (O) Global index in N dimension of nest. */
575    ,retval_p ;  /* (O) Return value; =1 valid point, =0 done. */
576 {
577   int ii ;
578 
579   Rbufcurs = Rbufcurs + Rreclen ;
580   Rpointcurs = 0 ;
581   *ig_p    = *(int *)&( Recvbuf[Rbufcurs + Rpointcurs ] ) ; Rpointcurs += sizeof(int) ;
582   *jg_p    = *(int *)&( Recvbuf[Rbufcurs + Rpointcurs ] ) ; Rpointcurs += sizeof(int) ;
583 /* read sentinel */
584   Rreclen  = *(int *)&( Recvbuf[Rbufcurs + Rpointcurs ] ) ; Rpointcurs += sizeof(int) ;
585   *retval_p = 1 ;
586   if ( Rreclen == RSL_INVALID ) {
587     *retval_p = 0 ;
588     RSL_FREE( Recvbuf ) ;
589   }
590      
591   return ;
592 }
593 
594 /********************************************/
595 
596 /* parent->nest */
597 RSL_LITE_FROM_PARENT_MSG ( len_p, buf )
598   int_p
599     len_p ;          /* (I) Number of bytes to unpack. */
600   int *
601     buf ;            /* (O) Destination buffer. */
602 {
603   rsl_lite_from_peerpoint_msg ( len_p, buf ) ;
604 }
605 
606 /* nest->parent */
607 RSL_LITE_FROM_CHILD_MSG ( len_p, buf )
608   int_p
609     len_p ;          /* (I) Number of bytes to unpack. */
610   int *
611     buf ;            /* (O) Destination buffer. */
612 {
613   rsl_lite_from_peerpoint_msg ( len_p, buf ) ;
614 }
615 
616 /* common code */
617 rsl_lite_from_peerpoint_msg ( len_p, buf )
618   int_p
619     len_p ;          /* (I) Number of bytes to unpack. */
620   int *
621     buf ;            /* (O) Destination buffer. */
622 {
623   int *p, *q ;
624   char *c, *d ;
625   int i ;
626 
627   if ( *len_p % sizeof(int) == 0 ) {
628     for ( p = (int *)&(Recvbuf[Rbufcurs+Rpointcurs]), q = buf , i = 0 ; i < *len_p ; i += sizeof(int) ) 
629     {
630       *q++ = *p++ ;
631     }
632   } else {
633     for ( c = &(Recvbuf[Rbufcurs+Rpointcurs]), d = (char *) buf , i = 0 ; i < *len_p ; i++ )
634     {
635       *d++ = *c++ ;
636     }
637   }
638 
639   Rpointcurs += *len_p ;
640 }
641 
642 /********************************************/
643 
644 destroy_list( list, dfcn )
645   rsl_list_t ** list ;          /* pointer to pointer to list */
646   int (*dfcn)() ;               /* pointer to function for destroying
647                                    the data field of the list */
648 {
649   rsl_list_t *p, *trash ;
650   if ( list == NULL ) return(0) ;
651   if ( *list == NULL ) return(0) ;
652   for ( p = *list ; p != NULL ; )
653   {
654     if ( dfcn != NULL ) (*dfcn)( p->data ) ;
655     trash = p ;
656     p = p->next ;
657     RSL_FREE( trash ) ;
658   }
659   *list = NULL ;
660   return(0) ;
661 }
662 
663 /********************************************/