bsp.c

Go to the documentation of this file.
00001 /*
00002     BSPonMPI. This is an implementation of the BSPlib standard on top of MPI
00003     Copyright (C) 2006  Wijnand J. Suijlen
00004                                                                                 
00005     This library is free software; you can redistribute it and/or
00006     modify it under the terms of the GNU Lesser General Public
00007     License as published by the Free Software Foundation; either
00008     version 2.1 of the License, or (at your option) any later version.
00009                                                                                 
00010     This library is distributed in the hope that it will be useful,
00011     but WITHOUT ANY WARRANTY; without even the implied warranty of
00012     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00013     Lesser General Public License for more details.
00014                                                                                 
00015     You should have received a copy of the GNU Lesser General Public
00016     License along with this library; if not, write to the Free Software
00017     Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00018                                                                                 
00019     You may contact me via electronic mail:
00020       wjsuijle@users.sourceforge.net
00021     or snail mail:
00022       W.J. Suijlen
00023       Kraaiheidelaan 10
00024       2803 VP Gouda
00025       The Netherlands
00026 */
00027 
00339 #include <stdarg.h>
00340 #include <stdio.h>
00341 #include <stdlib.h>
00342 #include <mpi.h>
00343 #include "bsp.h"
00344 #include "bsp_memreg.h"
00345 #include "bsp_mesgqueue.h"
00346 #include "bsp_delivtable.h"
00347 #include "bsp_reqtable.h"
00348 #include "bsp_private.h"
00349 #include "bsp_alloc.h"
00350 #include "bsp_abort.h"
00351 #include <config.h>
00352 
00353 #define DELIVTAB_SIZE 1
00354 #define REQTAB_SIZE   1
00355 #define MEMREG_SIZE   1 
00356 
00364 
00396 void
00397 bsp_init (void (*spmd_part) (void), int argc, char *argv[])
00398 {
00399   /* initialize MPI */
00400   MPI_Init(&argc, &argv);
00401   MPI_Comm_size( MPI_COMM_WORLD, &bsp.nprocs);
00402   MPI_Comm_rank( MPI_COMM_WORLD, &bsp.rank);
00403 
00404   if (bsp.rank == 0) 
00405     {
00406      /* do nothing */
00407     }
00408   else
00409     {
00410      /* else just run the spmd part */
00411       spmd_part();
00412       exit(0);
00413     }  
00414 }
00415 
00426 void
00427 bsp_begin (int maxprocs)
00428 {
00429   int flag, i, *ranks;
00430   MPI_Group group, newgroup;
00431   /* initialize if necessary */
00432   if (MPI_Initialized(&flag), !flag)
00433     {
00434       int argc = 0;
00435       char **argv = NULL;
00436       fprintf(stderr, "Warning! bsp_init() is not called. Initialization of MPI may fail\n");
00437       MPI_Init (&argc, &argv);
00438       MPI_Comm_size (MPI_COMM_WORLD, &bsp.nprocs);
00439       MPI_Comm_rank (MPI_COMM_WORLD, &bsp.rank);
00440     }
00441   /* broadcast maxprocs to all other processors */  
00442   MPI_Bcast(&maxprocs, 1, MPI_INT, 0, MPI_COMM_WORLD);
00443   
00444   /* allocate at most maxproc processors:
00445      Form a new group of processors. Some processors will not be member of
00446      this group and will therefore be deallocated */
00447   if (maxprocs > 0) 
00448     {
00449       bsp.nprocs = MIN(maxprocs, bsp.nprocs);
00450     } /* else request maximum number of processors*/
00451     
00452   MPI_Comm_group( MPI_COMM_WORLD, &group);
00453   ranks = bsp_malloc( bsp.nprocs, sizeof(int));
00454   for (i = 0; i < bsp.nprocs; i++)
00455     ranks[i] = i;
00456 
00457   MPI_Group_incl(group, bsp.nprocs, ranks, &newgroup);
00458   MPI_Comm_create(MPI_COMM_WORLD, newgroup, &bsp.communicator);
00459 
00460   bsp_free(ranks);
00461 
00462   if (bsp.rank >= bsp.nprocs)
00463     { /* terminate all unnecessary processes */
00464       MPI_Finalize();
00465       exit(0);
00466     }  
00467 
00468   /* initialize data structures */
00469   memoryRegister_initialize(&bsp.memory_register, bsp.nprocs, MEMREG_SIZE,
00470                             bsp.rank);
00471   messageQueue_initialize (&bsp.message_queue);
00472   deliveryTable_initialize(&bsp.delivery_table, bsp.nprocs, DELIVTAB_SIZE);
00473   requestTable_initialize(&bsp.request_table, bsp.nprocs, REQTAB_SIZE);
00474   deliveryTable_initialize(&bsp.delivery_received_table, bsp.nprocs, 
00475                            DELIVTAB_SIZE);
00476   requestTable_initialize(&bsp.request_received_table, bsp.nprocs,
00477                            REQTAB_SIZE);
00478 
00479   /* save starting time */
00480   bsp.begintime = MPI_Wtime ();
00481 }
00482 
00483 
00488 void
00489 bsp_end ()
00490 {
00491   /* clean up datastructures */
00492   memoryRegister_destruct (&bsp.memory_register);
00493   deliveryTable_destruct(&bsp.delivery_table);
00494   requestTable_destruct(&bsp.request_table);
00495   deliveryTable_destruct(&bsp.delivery_received_table);
00496   requestTable_destruct(&bsp.request_received_table);
00497 
00498   /* and finalize */
00499   MPI_Finalize ();
00500 }
00506 
00510 void
00511 bsp_abort (const char *format, ...)
00512 {
00513   va_list ap;
00514   va_start (ap, format);
00515   vfprintf (stderr, format, ap);
00516   va_end (ap);
00517 
00518   bsp_intern_abort (ERR_BSP_ABORT, "bsp_abort()", __FILE__, __LINE__);
00519 }
00524 
00532 int
00533 bsp_nprocs ()
00534 {
00535   int flag;
00536   MPI_Initialized(&flag);
00537   return flag?bsp.nprocs:-1;
00538 }
00539 
00543 int
00544 bsp_pid ()
00545 {
00546   return bsp.rank;
00547 }
00548 
00552 double
00553 bsp_time ()
00554 {
00555   return MPI_Wtime () - bsp.begintime;
00556 }
00561 
00563 void
00564 bsp_sync ()
00565 {
00566   unsigned int send_index[ 3 * bsp.nprocs] ;
00567   unsigned int recv_index[ 3 * bsp.nprocs] ;
00568   unsigned int maxreqrows = 0, maxdelrows = 0, p;
00569   unsigned int any_gets = 0; /* any_gets is a boolean value, whether there are
00570                                any gets to performed. If there are no gets,
00571                                then one MPI_Alltoall doesn't have to be
00572                                executed */
00573   /* reset message buffer */
00574   messageQueue_sync(&bsp.message_queue);
00575   requestTable_reset(&bsp.request_received_table);
00576   deliveryTable_reset(&bsp.delivery_received_table);
00577  
00578   /* communicate information */
00579   for (p = 0; p < bsp.nprocs; p++)
00580     any_gets |= bsp.request_table.used_slot_count[p];
00581 
00582   for (p = 0; p < bsp.nprocs; p++)
00583     {
00584       send_index[3*p    ] = bsp.request_table.used_slot_count[p];
00585       send_index[3*p + 1] = bsp.delivery_table.used_slot_count[p];
00586       send_index[3*p + 2] = any_gets;
00587     }  
00588 
00589   MPI_Alltoall( send_index, 3 , MPI_UNSIGNED, 
00590                 recv_index, 3 , MPI_UNSIGNED, bsp.communicator);
00591 
00592   /* expand buffers if necessary */
00593   maxreqrows = max(recv_index, 3*bsp.nprocs, 3);
00594   for (p = 0; p < bsp.nprocs; p++)
00595     maxdelrows = MAX( recv_index[1 + 3*p] + 
00596                      bsp.request_table.info.req.data_sizes[p], maxdelrows);
00597 
00598   if ( bsp.request_received_table.rows < maxreqrows )
00599     {
00600       maxreqrows = MAX(bsp.request_received_table.rows, maxreqrows);
00601       requestTable_expand(&bsp.request_received_table, maxreqrows);
00602     }  
00603   
00604   if (bsp.delivery_received_table.rows < maxdelrows )
00605     {
00606       maxdelrows = MAX(bsp.delivery_received_table.rows, maxdelrows);
00607       deliveryTable_expand(&bsp.delivery_received_table, maxdelrows );
00608     }  
00609 
00610   /* copy necessary indices to received_tables */
00611   for (p = 0; p < bsp.nprocs; p++) 
00612     {
00613       bsp.request_received_table.used_slot_count[p] = recv_index[3*p];
00614       bsp.delivery_received_table.used_slot_count[p] =
00615         recv_index[1 + 3*p] + bsp.request_table.info.req.data_sizes[p] ;
00616     }   
00617   
00618   /* Now we may conclude something about the communcation pattern */
00619   any_gets = 0;
00620   for (p = 0; p < bsp.nprocs; p++)   
00621     any_gets |= recv_index[3*p + 2];
00622 
00623   /* communicate & execute */
00624   if (any_gets) 
00625     {
00626       expandableTable_comm(&bsp.request_table, &bsp.request_received_table,
00627                     bsp.communicator);
00628       requestTable_execute(&bsp.request_received_table, &bsp.delivery_table);
00629     }
00630 
00631   expandableTable_comm(&bsp.delivery_table, &bsp.delivery_received_table,
00632                      bsp.communicator);
00633   deliveryTable_execute(&bsp.delivery_received_table, 
00634                         &bsp.memory_register, &bsp.message_queue, bsp.rank);
00635   /* clear the buffers */                       
00636   requestTable_reset(&bsp.request_table);
00637   deliveryTable_reset(&bsp.delivery_table);
00638  
00639   /* pack the memoryRegister */
00640   memoryRegister_pack(&bsp.memory_register);
00641 }
00653 void
00654 bsp_push_reg (const void *ident, int size)
00655 {
00656   int i;
00657   DelivElement element;
00658   element.size = 0;
00659   element.info.push.address = ident;
00660   for (i=0 ; i < bsp.nprocs; i++)
00661     deliveryTable_push(&bsp.delivery_table, i, element, pushreg);
00662 }
00663 
00668 void
00669 bsp_pop_reg (const void *ident)
00670 {
00671   DelivElement element;
00672   element.size = 0;
00673   element.info.pop.address = ident;
00674   deliveryTable_push(&bsp.delivery_table, bsp.rank, element, popreg);
00675 }  
00676 
00688 void
00689 bsp_put (int pid, const void *src, void *dst, int offset, int nbytes)
00690 {
00691   /* place put command in buffer */
00692   char * restrict pointer;
00693   DelivElement element;
00694   element.size = nbytes;
00695   element.info.put.dst = 
00696     memoryRegister_memoized_find(&bsp.memory_register, pid, dst) + offset;
00697   pointer = deliveryTable_push(&bsp.delivery_table, pid, element, put);
00698   memcpy(pointer, src, nbytes);
00699 }
00700 
00701 
00715 void
00716 bsp_get (int pid, const void *src, int offset, void *dst, int nbytes)
00717 {
00718   ReqElement elem;
00719   elem.size = nbytes;
00720   elem.src = 
00721      memoryRegister_memoized_find(&bsp.memory_register, pid, src);
00722   elem.dst = dst;
00723   elem.offset = offset;
00724   
00725   /* place get command in buffer */
00726   requestTable_push(&bsp.request_table, pid, elem);
00727 }
00732 
00741 void
00742 bsp_send (int pid, const void *tag, const void *payload, int payload_nbytes)
00743 {
00744   DelivElement element;
00745   char * restrict pointer;
00746   element.size = payload_nbytes + bsp.message_queue.send_tag_size;
00747   element.info.send.payload_size = payload_nbytes;
00748   pointer = deliveryTable_push(&bsp.delivery_table, pid, element, send);
00749   memcpy( pointer, tag, bsp.message_queue.send_tag_size);
00750   memcpy( pointer + bsp.message_queue.send_tag_size, payload, payload_nbytes);
00751 }
00752 
00759 void
00760 bsp_qsize (int * restrict nmessages, int * restrict accum_nbytes)
00761 {
00762   *nmessages = bsp.message_queue.n_mesg;
00763   *accum_nbytes = bsp.message_queue.accum_size;
00764 }
00765 
00773 void
00774 bsp_get_tag (int * restrict status , void * restrict tag)
00775 {
00776   if (bsp.message_queue.n_mesg == 0)
00777     *status = -1;
00778   else
00779     {
00780       ALIGNED_TYPE * restrict current_tag = 
00781         bsp.message_queue.head + 
00782          no_slots( sizeof(DelivElement), sizeof(ALIGNED_TYPE));
00783       DelivElement * restrict message = (DelivElement *) bsp.message_queue.head;        
00784       *status = message->size;
00785       memcpy(tag, current_tag, bsp.message_queue.recv_tag_size ); 
00786     }
00787 }
00788 
00794 void
00795 bsp_move (void *payload, int reception_nbytes)
00796 {
00797   DelivElement * restrict message = (DelivElement *) bsp.message_queue.head;    
00798   int copy_bytes = MIN(reception_nbytes, message->size);
00799   char * restrict current_payload =
00800     (char *) bsp.message_queue.head + 
00801       sizeof(ALIGNED_TYPE) * 
00802       no_slots( sizeof(DelivElement), sizeof(ALIGNED_TYPE)) +
00803     bsp.message_queue.recv_tag_size;
00804   memcpy(payload, current_payload, copy_bytes);
00805   
00806   bsp.message_queue.head += message->next;
00807   bsp.message_queue.n_mesg --;
00808   bsp.message_queue.accum_size -= message->size;
00809 }
00810 
00815 void
00816 bsp_set_tagsize (int *tag_nbytes)
00817 {
00818   DelivElement element;
00819   element.info.settag.tag_size = *tag_nbytes;
00820   element.size = 0;
00821  
00822   deliveryTable_push(&bsp.delivery_table, bsp.rank, element, settag);
00823   *tag_nbytes = bsp.message_queue.send_tag_size;
00824 }
00825 
00831 
00844 void
00845 bsp_hpput (int pid, const void *src, void *dst, int offset, int nbytes)
00846 {
00847   bsp_put (pid, src, dst, offset, nbytes);
00848 }
00861 void
00862 bsp_hpget (int pid, const void *src, int offset, void *dst, int nbytes)
00863 {
00864   bsp_get (pid, src, offset, dst, nbytes);
00865 }
00866 
00874 int
00875 bsp_hpmove (void **tag_ptr, void **payload_ptr)
00876 {
00877   if (bsp.message_queue.n_mesg == 0)
00878     return -1;
00879   else
00880     {
00881        ALIGNED_TYPE * restrict current_tag =
00882          (ALIGNED_TYPE *) bsp.message_queue.head + 
00883            no_slots(sizeof(DelivElement), sizeof(ALIGNED_TYPE));
00884        char * restrict current_payload =
00885          (char *) current_tag + bsp.message_queue.recv_tag_size;
00886        DelivElement * restrict message =
00887          (DelivElement *) bsp.message_queue.head;
00888        int size = message->info.send.payload_size;       
00889        *tag_ptr     = current_tag;
00890        *payload_ptr = current_payload;
00891        bsp.message_queue.head += message->next;
00892        bsp.message_queue.n_mesg--;
00893        bsp.message_queue.accum_size -= size;
00894        return size;
00895     }
00896 }
00897 

Generated on Sat Apr 8 20:12:29 2006 for BSPonMPI by  doxygen 1.4.6