00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00356 #include <stdarg.h>
00357 #include <stdio.h>
00358 #include <stdlib.h>
00359 #include <mpi.h>
00360 #include "bsp.h"
00361 #include "bsp_memreg.h"
00362 #include "bsp_mesgqueue.h"
00363 #include "bsp_delivtable.h"
00364 #include "bsp_reqtable.h"
00365 #include "bsp_private.h"
00366 #include "bsp_alloc.h"
00367 #include "bsp_abort.h"
00368 #include <config.h>
00369
00370 #define DELIVTAB_SIZE 1000
00371 #define REQTAB_SIZE 1000
00372 #define MEMREG_SIZE 100
00373 #define MESGQ_SIZE 1000
00374
00382
00414 void
00415 bsp_init (void (*spmd_part) (void), int argc, char *argv[])
00416 {
00417
00418 MPI_Init(&argc, &argv);
00419 MPI_Comm_size( MPI_COMM_WORLD, &bsp.nprocs);
00420 MPI_Comm_rank( MPI_COMM_WORLD, &bsp.rank);
00421
00422 if (bsp.rank == 0)
00423 {
00424
00425 }
00426 else
00427 {
00428
00429 spmd_part();
00430 exit(0);
00431 }
00432 }
00433
00444 void
00445 bsp_begin (int maxprocs)
00446 {
00447 int flag, i, *ranks;
00448 MPI_Group group, newgroup;
00449
00450 if (MPI_Initialized(&flag), !flag)
00451 {
00452 int argc = 0;
00453 char **argv = NULL;
00454 fprintf(stderr, "Warning! bsp_init() is not called. Initialization of MPI may fail\n");
00455 MPI_Init (&argc, &argv);
00456 MPI_Comm_size (MPI_COMM_WORLD, &bsp.nprocs);
00457 MPI_Comm_rank (MPI_COMM_WORLD, &bsp.rank);
00458 }
00459
00460 MPI_Bcast(&maxprocs, 1, MPI_INT, 0, MPI_COMM_WORLD);
00461
00462
00463
00464
00465 if (maxprocs > 0)
00466 {
00467 bsp.nprocs = MIN(maxprocs, bsp.nprocs);
00468 }
00469
00470 MPI_Comm_group( MPI_COMM_WORLD, &group);
00471 ranks = bsp_malloc( bsp.nprocs, sizeof(int));
00472 for (i = 0; i < bsp.nprocs; i++)
00473 ranks[i] = i;
00474
00475 MPI_Group_incl(group, bsp.nprocs, ranks, &newgroup);
00476 MPI_Comm_create(MPI_COMM_WORLD, newgroup, &bsp.communicator);
00477
00478 bsp_free(ranks);
00479
00480 if (bsp.rank >= bsp.nprocs)
00481 {
00482 MPI_Finalize();
00483 exit(0);
00484 }
00485
00486
00487 memoryRegister_initialize(&bsp.memory_register, bsp.nprocs, MEMREG_SIZE,
00488 bsp.rank);
00489 messageQueue_initialize (&bsp.message_queue, MESGQ_SIZE);
00490 deliveryTable_initialize(&bsp.delivery_table, bsp.nprocs, DELIVTAB_SIZE);
00491 requestTable_initialize(&bsp.request_table, bsp.nprocs, REQTAB_SIZE);
00492 deliveryTable_initialize(&bsp.delivery_received_table, bsp.nprocs,
00493 DELIVTAB_SIZE);
00494 requestTable_initialize(&bsp.request_received_table, bsp.nprocs,
00495 REQTAB_SIZE);
00496
00497
00498 bsp.begintime = MPI_Wtime ();
00499 }
00500
00501
00506 void
00507 bsp_end ()
00508 {
00509
00510 memoryRegister_destruct (&bsp.memory_register);
00511 messageQueue_destruct (&bsp.message_queue);
00512 deliveryTable_destruct(&bsp.delivery_table);
00513 requestTable_destruct(&bsp.request_table);
00514 deliveryTable_destruct(&bsp.delivery_received_table);
00515 requestTable_destruct(&bsp.request_received_table);
00516
00517
00518 MPI_Finalize ();
00519 }
00525
00529 void
00530 bsp_abort (const char *format, ...)
00531 {
00532 va_list ap;
00533 va_start (ap, format);
00534 vfprintf (stderr, format, ap);
00535 va_end (ap);
00536
00537 bsp_intern_abort (ERR_BSP_ABORT, "bsp_abort()", __FILE__, __LINE__);
00538 }
00543
00551 int
00552 bsp_nprocs ()
00553 {
00554 int flag;
00555 MPI_Initialized(&flag);
00556 return flag?bsp.nprocs:-1;
00557 }
00558
00562 int
00563 bsp_pid ()
00564 {
00565 return bsp.rank;
00566 }
00567
00571 double
00572 bsp_time ()
00573 {
00574 return MPI_Wtime () - bsp.begintime;
00575 }
00581 void
00582 bsp_sync ()
00583 {
00584
00585 messageQueue_reset(&bsp.message_queue);
00586
00587
00588 expandableTable_comm(&bsp.request_table, &bsp.request_received_table,
00589 bsp.communicator);
00590 requestTable_execute(&bsp.request_received_table, &bsp.delivery_table);
00591 expandableTable_comm(&bsp.delivery_table, &bsp.delivery_received_table,
00592 bsp.communicator);
00593 deliveryTable_execute(&bsp.delivery_received_table,
00594 &bsp.memory_register, &bsp.message_queue);
00595
00596 expandableTable_reset(&bsp.request_table);
00597 expandableTable_reset(&bsp.request_received_table);
00598 deliveryTable_reset(&bsp.delivery_table);
00599 deliveryTable_reset(&bsp.delivery_received_table);
00600
00601
00602 memoryRegister_pack(&bsp.memory_register);
00603 }
00615 void
00616 bsp_push_reg (const void *ident, int size)
00617 {
00618 int i;
00619 VarSizeElement elem;
00620 elem.size = 0;
00621 elem.type = pushreg;
00622 elem.info.push.address = ident;
00623 elem.data = NULL;
00624 for (i=0 ; i < bsp.nprocs; i++)
00625 deliveryTable_push(&bsp.delivery_table, i, elem);
00626 }
00627
00632 void
00633 bsp_pop_reg (const void *ident)
00634 {
00635 VarSizeElement elem;
00636 elem.size = 0;
00637 elem.type = popreg;
00638 elem.info.pop.address = ident;
00639 elem.data = NULL;
00640 deliveryTable_push(&bsp.delivery_table, bsp.rank, elem);
00641 }
00642
00654 void
00655 bsp_put (int pid, const void *src, void *dst, int offset, int nbytes)
00656 {
00657
00658 deliveryTable_pushPut(&bsp.delivery_table, pid,
00659 memoryRegister_memoized_find(&bsp.memory_register, pid, dst) + offset,
00660 src, nbytes);
00661 }
00662
00663
00677 void
00678 bsp_get (int pid, const void *src, int offset, void *dst, int nbytes)
00679 {
00680 ReqElement elem;
00681 elem.size = nbytes;
00682 elem.src =
00683 memoryRegister_memoized_find(&bsp.memory_register, pid, src);
00684 elem.dst = dst;
00685 elem.offset = offset;
00686
00687
00688 requestTable_push(&bsp.request_table, pid, elem);
00689 }
00694
00703 void
00704 bsp_send (int pid, const void *tag, const void *payload, int payload_nbytes)
00705 {
00706 deliveryTable_pushSend(&bsp.delivery_table, pid, tag,
00707 messageQueue_getTagSize(&bsp.message_queue), payload, payload_nbytes);
00708 }
00709
00716 void
00717 bsp_qsize (int * restrict nmessages, int * restrict accum_nbytes)
00718 {
00719 *nmessages = messageQueue_getNumber (&bsp.message_queue);
00720 *accum_nbytes = messageQueue_getAccumSize (&bsp.message_queue);
00721 }
00722
00730 void
00731 bsp_get_tag (int * restrict status , void * restrict tag)
00732 {
00733 int nmesg = messageQueue_getNumber (&bsp.message_queue);
00734 if (nmesg == 0)
00735 *status = -1;
00736 else
00737 *status = messageQueue_getTag (&bsp.message_queue, tag);
00738 }
00739
00745 void
00746 bsp_move (void *payload, int reception_nbytes)
00747 {
00748 messageQueue_dequeue (&bsp.message_queue, payload, reception_nbytes);
00749 }
00750
00755 void
00756 bsp_set_tagsize (int *tag_nbytes)
00757 {
00758 VarSizeElement elem;
00759 elem.info.settag.tag_size = *tag_nbytes;
00760 elem.size = 0;
00761 elem.type = settag;
00762 elem.data = NULL;
00763
00764 deliveryTable_push(&bsp.delivery_table, bsp.rank, elem);
00765 *tag_nbytes = messageQueue_getTagSize (&bsp.message_queue);
00766 }
00767
00773
00786 void
00787 bsp_hpput (int pid, const void *src, void *dst, int offset, int nbytes)
00788 {
00789 bsp_put (pid, src, dst, offset, nbytes);
00790 }
00803 void
00804 bsp_hpget (int pid, const void *src, int offset, void *dst, int nbytes)
00805 {
00806 bsp_get (pid, src, offset, dst, nbytes);
00807 }
00808
00816 int
00817 bsp_hpmove (const void **tag_ptr, const void **payload_ptr)
00818 {
00819 if (messageQueue_getNumber (&bsp.message_queue) == 0)
00820 return -1;
00821 else
00822 return messageQueue_hpdequeue(&bsp.message_queue, tag_ptr, payload_ptr);
00823 }
00824