00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00339 #include <stdarg.h>
00340 #include <stdio.h>
00341 #include <stdlib.h>
00342 #include <mpi.h>
00343 #include "bsp.h"
00344 #include "bsp_memreg.h"
00345 #include "bsp_mesgqueue.h"
00346 #include "bsp_delivtable.h"
00347 #include "bsp_reqtable.h"
00348 #include "bsp_private.h"
00349 #include "bsp_alloc.h"
00350 #include "bsp_abort.h"
00351 #include <config.h>
00352
00353 #define DELIVTAB_SIZE 1
00354 #define REQTAB_SIZE 1
00355 #define MEMREG_SIZE 1
00356
00364
00396 void
00397 bsp_init (void (*spmd_part) (void), int argc, char *argv[])
00398 {
00399
00400 MPI_Init(&argc, &argv);
00401 MPI_Comm_size( MPI_COMM_WORLD, &bsp.nprocs);
00402 MPI_Comm_rank( MPI_COMM_WORLD, &bsp.rank);
00403
00404 if (bsp.rank == 0)
00405 {
00406
00407 }
00408 else
00409 {
00410
00411 spmd_part();
00412 exit(0);
00413 }
00414 }
00415
00426 void
00427 bsp_begin (int maxprocs)
00428 {
00429 int flag, i, *ranks;
00430 MPI_Group group, newgroup;
00431
00432 if (MPI_Initialized(&flag), !flag)
00433 {
00434 int argc = 0;
00435 char **argv = NULL;
00436 fprintf(stderr, "Warning! bsp_init() is not called. Initialization of MPI may fail\n");
00437 MPI_Init (&argc, &argv);
00438 MPI_Comm_size (MPI_COMM_WORLD, &bsp.nprocs);
00439 MPI_Comm_rank (MPI_COMM_WORLD, &bsp.rank);
00440 }
00441
00442 MPI_Bcast(&maxprocs, 1, MPI_INT, 0, MPI_COMM_WORLD);
00443
00444
00445
00446
00447 if (maxprocs > 0)
00448 {
00449 bsp.nprocs = MIN(maxprocs, bsp.nprocs);
00450 }
00451
00452 MPI_Comm_group( MPI_COMM_WORLD, &group);
00453 ranks = bsp_malloc( bsp.nprocs, sizeof(int));
00454 for (i = 0; i < bsp.nprocs; i++)
00455 ranks[i] = i;
00456
00457 MPI_Group_incl(group, bsp.nprocs, ranks, &newgroup);
00458 MPI_Comm_create(MPI_COMM_WORLD, newgroup, &bsp.communicator);
00459
00460 bsp_free(ranks);
00461
00462 if (bsp.rank >= bsp.nprocs)
00463 {
00464 MPI_Finalize();
00465 exit(0);
00466 }
00467
00468
00469 memoryRegister_initialize(&bsp.memory_register, bsp.nprocs, MEMREG_SIZE,
00470 bsp.rank);
00471 messageQueue_initialize (&bsp.message_queue);
00472 deliveryTable_initialize(&bsp.delivery_table, bsp.nprocs, DELIVTAB_SIZE);
00473 requestTable_initialize(&bsp.request_table, bsp.nprocs, REQTAB_SIZE);
00474 deliveryTable_initialize(&bsp.delivery_received_table, bsp.nprocs,
00475 DELIVTAB_SIZE);
00476 requestTable_initialize(&bsp.request_received_table, bsp.nprocs,
00477 REQTAB_SIZE);
00478
00479
00480 bsp.begintime = MPI_Wtime ();
00481 }
00482
00483
00488 void
00489 bsp_end ()
00490 {
00491
00492 memoryRegister_destruct (&bsp.memory_register);
00493 deliveryTable_destruct(&bsp.delivery_table);
00494 requestTable_destruct(&bsp.request_table);
00495 deliveryTable_destruct(&bsp.delivery_received_table);
00496 requestTable_destruct(&bsp.request_received_table);
00497
00498
00499 MPI_Finalize ();
00500 }
00506
00510 void
00511 bsp_abort (const char *format, ...)
00512 {
00513 va_list ap;
00514 va_start (ap, format);
00515 vfprintf (stderr, format, ap);
00516 va_end (ap);
00517
00518 bsp_intern_abort (ERR_BSP_ABORT, "bsp_abort()", __FILE__, __LINE__);
00519 }
00524
00532 int
00533 bsp_nprocs ()
00534 {
00535 int flag;
00536 MPI_Initialized(&flag);
00537 return flag?bsp.nprocs:-1;
00538 }
00539
00543 int
00544 bsp_pid ()
00545 {
00546 return bsp.rank;
00547 }
00548
00552 double
00553 bsp_time ()
00554 {
00555 return MPI_Wtime () - bsp.begintime;
00556 }
00561
00563 void
00564 bsp_sync ()
00565 {
00566 unsigned int send_index[ 3 * bsp.nprocs] ;
00567 unsigned int recv_index[ 3 * bsp.nprocs] ;
00568 unsigned int maxreqrows = 0, maxdelrows = 0, p;
00569 unsigned int any_gets = 0;
00570
00571
00572
00573
00574 messageQueue_sync(&bsp.message_queue);
00575 requestTable_reset(&bsp.request_received_table);
00576 deliveryTable_reset(&bsp.delivery_received_table);
00577
00578
00579 for (p = 0; p < bsp.nprocs; p++)
00580 any_gets |= bsp.request_table.used_slot_count[p];
00581
00582 for (p = 0; p < bsp.nprocs; p++)
00583 {
00584 send_index[3*p ] = bsp.request_table.used_slot_count[p];
00585 send_index[3*p + 1] = bsp.delivery_table.used_slot_count[p];
00586 send_index[3*p + 2] = any_gets;
00587 }
00588
00589 MPI_Alltoall( send_index, 3 , MPI_UNSIGNED,
00590 recv_index, 3 , MPI_UNSIGNED, bsp.communicator);
00591
00592
00593 maxreqrows = max(recv_index, 3*bsp.nprocs, 3);
00594 for (p = 0; p < bsp.nprocs; p++)
00595 maxdelrows = MAX( recv_index[1 + 3*p] +
00596 bsp.request_table.info.req.data_sizes[p], maxdelrows);
00597
00598 if ( bsp.request_received_table.rows < maxreqrows )
00599 {
00600 maxreqrows = MAX(bsp.request_received_table.rows, maxreqrows);
00601 requestTable_expand(&bsp.request_received_table, maxreqrows);
00602 }
00603
00604 if (bsp.delivery_received_table.rows < maxdelrows )
00605 {
00606 maxdelrows = MAX(bsp.delivery_received_table.rows, maxdelrows);
00607 deliveryTable_expand(&bsp.delivery_received_table, maxdelrows );
00608 }
00609
00610
00611 for (p = 0; p < bsp.nprocs; p++)
00612 {
00613 bsp.request_received_table.used_slot_count[p] = recv_index[3*p];
00614 bsp.delivery_received_table.used_slot_count[p] =
00615 recv_index[1 + 3*p] + bsp.request_table.info.req.data_sizes[p] ;
00616 }
00617
00618
00619 any_gets = 0;
00620 for (p = 0; p < bsp.nprocs; p++)
00621 any_gets |= recv_index[3*p + 2];
00622
00623
00624 if (any_gets)
00625 {
00626 expandableTable_comm(&bsp.request_table, &bsp.request_received_table,
00627 bsp.communicator);
00628 requestTable_execute(&bsp.request_received_table, &bsp.delivery_table);
00629 }
00630
00631 expandableTable_comm(&bsp.delivery_table, &bsp.delivery_received_table,
00632 bsp.communicator);
00633 deliveryTable_execute(&bsp.delivery_received_table,
00634 &bsp.memory_register, &bsp.message_queue, bsp.rank);
00635
00636 requestTable_reset(&bsp.request_table);
00637 deliveryTable_reset(&bsp.delivery_table);
00638
00639
00640 memoryRegister_pack(&bsp.memory_register);
00641 }
00653 void
00654 bsp_push_reg (const void *ident, int size)
00655 {
00656 int i;
00657 DelivElement element;
00658 element.size = 0;
00659 element.info.push.address = ident;
00660 for (i=0 ; i < bsp.nprocs; i++)
00661 deliveryTable_push(&bsp.delivery_table, i, element, pushreg);
00662 }
00663
00668 void
00669 bsp_pop_reg (const void *ident)
00670 {
00671 DelivElement element;
00672 element.size = 0;
00673 element.info.pop.address = ident;
00674 deliveryTable_push(&bsp.delivery_table, bsp.rank, element, popreg);
00675 }
00676
00688 void
00689 bsp_put (int pid, const void *src, void *dst, int offset, int nbytes)
00690 {
00691
00692 char * restrict pointer;
00693 DelivElement element;
00694 element.size = nbytes;
00695 element.info.put.dst =
00696 memoryRegister_memoized_find(&bsp.memory_register, pid, dst) + offset;
00697 pointer = deliveryTable_push(&bsp.delivery_table, pid, element, put);
00698 memcpy(pointer, src, nbytes);
00699 }
00700
00701
00715 void
00716 bsp_get (int pid, const void *src, int offset, void *dst, int nbytes)
00717 {
00718 ReqElement elem;
00719 elem.size = nbytes;
00720 elem.src =
00721 memoryRegister_memoized_find(&bsp.memory_register, pid, src);
00722 elem.dst = dst;
00723 elem.offset = offset;
00724
00725
00726 requestTable_push(&bsp.request_table, pid, elem);
00727 }
00732
00741 void
00742 bsp_send (int pid, const void *tag, const void *payload, int payload_nbytes)
00743 {
00744 DelivElement element;
00745 char * restrict pointer;
00746 element.size = payload_nbytes + bsp.message_queue.send_tag_size;
00747 element.info.send.payload_size = payload_nbytes;
00748 pointer = deliveryTable_push(&bsp.delivery_table, pid, element, send);
00749 memcpy( pointer, tag, bsp.message_queue.send_tag_size);
00750 memcpy( pointer + bsp.message_queue.send_tag_size, payload, payload_nbytes);
00751 }
00752
00759 void
00760 bsp_qsize (int * restrict nmessages, int * restrict accum_nbytes)
00761 {
00762 *nmessages = bsp.message_queue.n_mesg;
00763 *accum_nbytes = bsp.message_queue.accum_size;
00764 }
00765
00773 void
00774 bsp_get_tag (int * restrict status , void * restrict tag)
00775 {
00776 if (bsp.message_queue.n_mesg == 0)
00777 *status = -1;
00778 else
00779 {
00780 ALIGNED_TYPE * restrict current_tag =
00781 bsp.message_queue.head +
00782 no_slots( sizeof(DelivElement), sizeof(ALIGNED_TYPE));
00783 DelivElement * restrict message = (DelivElement *) bsp.message_queue.head;
00784 *status = message->size;
00785 memcpy(tag, current_tag, bsp.message_queue.recv_tag_size );
00786 }
00787 }
00788
00794 void
00795 bsp_move (void *payload, int reception_nbytes)
00796 {
00797 DelivElement * restrict message = (DelivElement *) bsp.message_queue.head;
00798 int copy_bytes = MIN(reception_nbytes, message->size);
00799 char * restrict current_payload =
00800 (char *) bsp.message_queue.head +
00801 sizeof(ALIGNED_TYPE) *
00802 no_slots( sizeof(DelivElement), sizeof(ALIGNED_TYPE)) +
00803 bsp.message_queue.recv_tag_size;
00804 memcpy(payload, current_payload, copy_bytes);
00805
00806 bsp.message_queue.head += message->next;
00807 bsp.message_queue.n_mesg --;
00808 bsp.message_queue.accum_size -= message->size;
00809 }
00810
00815 void
00816 bsp_set_tagsize (int *tag_nbytes)
00817 {
00818 DelivElement element;
00819 element.info.settag.tag_size = *tag_nbytes;
00820 element.size = 0;
00821
00822 deliveryTable_push(&bsp.delivery_table, bsp.rank, element, settag);
00823 *tag_nbytes = bsp.message_queue.send_tag_size;
00824 }
00825
00831
00844 void
00845 bsp_hpput (int pid, const void *src, void *dst, int offset, int nbytes)
00846 {
00847 bsp_put (pid, src, dst, offset, nbytes);
00848 }
00861 void
00862 bsp_hpget (int pid, const void *src, int offset, void *dst, int nbytes)
00863 {
00864 bsp_get (pid, src, offset, dst, nbytes);
00865 }
00866
00874 int
00875 bsp_hpmove (void **tag_ptr, void **payload_ptr)
00876 {
00877 if (bsp.message_queue.n_mesg == 0)
00878 return -1;
00879 else
00880 {
00881 ALIGNED_TYPE * restrict current_tag =
00882 (ALIGNED_TYPE *) bsp.message_queue.head +
00883 no_slots(sizeof(DelivElement), sizeof(ALIGNED_TYPE));
00884 char * restrict current_payload =
00885 (char *) current_tag + bsp.message_queue.recv_tag_size;
00886 DelivElement * restrict message =
00887 (DelivElement *) bsp.message_queue.head;
00888 int size = message->info.send.payload_size;
00889 *tag_ptr = current_tag;
00890 *payload_ptr = current_payload;
00891 bsp.message_queue.head += message->next;
00892 bsp.message_queue.n_mesg--;
00893 bsp.message_queue.accum_size -= size;
00894 return size;
00895 }
00896 }
00897