/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */ /* * (C) 2007 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */ /* Test case from John Bent (ROMIO req #835) * Aggregation code was not handling certain access patterns when collective * buffering forced */ #define _XOPEN_SOURCE 500 /* strdup not in string.h otherwsie */ #include #include #include #include #include #define NUM_OBJS 4 #define OBJ_SIZE 1048576 extern char *optarg; extern int optind, opterr, optopt; char *prog = NULL; int debug = 0; static void Usage( int line ) { int rank; MPI_Comm_rank(MPI_COMM_WORLD, &rank); if ( rank == 0 ) { fprintf( stderr, "Usage (line %d): %s [-d] [-h] -f filename\n" "\t-d for debugging\n" "\t-h to turn on the hints to force collective aggregation\n", line, prog ); } exit( 0 ); } static void fatal_error( int mpi_ret, MPI_Status *mpi_stat, const char *msg ) { fprintf( stderr, "Fatal error %s: %d\n", msg, mpi_ret ); MPI_Abort( MPI_COMM_WORLD, -1 ); } static void print_hints( int rank, MPI_File *mfh ) { MPI_Info info; int nkeys; int i, dummy_int; char key[1024]; char value[1024]; MPI_Barrier( MPI_COMM_WORLD ); if ( rank == 0 ) { MPI_File_get_info( *mfh, &info ); MPI_Info_get_nkeys( info, &nkeys ); printf( "HINTS:\n" ); for( i = 0; i < nkeys; i++ ) { MPI_Info_get_nthkey( info, i, key ); printf( "%35s -> ", key ); MPI_Info_get( info, key, 1024, value, &dummy_int ); printf( "%s\n", value ); } MPI_Info_free(&info); } MPI_Barrier( MPI_COMM_WORLD ); } static void fill_buffer( char *buffer, int bufsize, int rank, MPI_Offset offset ) { memset( (void*)buffer, 0, bufsize ); snprintf( buffer, bufsize, "Hello from %d at %lld\n", rank, offset ); } static MPI_Offset get_offset( int rank, int num_objs, int obj_size, int which_obj ) { MPI_Offset offset; offset = (MPI_Offset)rank * num_objs * obj_size + which_obj * obj_size; return offset; } static void write_file( char *target, int rank, MPI_Info *info ) { MPI_File wfh; MPI_Status mpi_stat; int mpi_ret; int i; char *buffer; buffer = malloc(OBJ_SIZE); if ( debug ) printf( "%d writing file %s\n", rank, target ); if( (mpi_ret = MPI_File_open(MPI_COMM_WORLD, target, MPI_MODE_WRONLY | MPI_MODE_CREATE, *info, &wfh ) ) != MPI_SUCCESS ) { fatal_error( mpi_ret, NULL, "open for write" ); } for( i = 0; i < NUM_OBJS; i++ ) { MPI_Offset offset = get_offset( rank, NUM_OBJS, OBJ_SIZE, i ); fill_buffer( buffer, OBJ_SIZE, rank, offset ); if ( debug ) printf( "%s", buffer ); if ( (mpi_ret = MPI_File_write_at_all( wfh, offset, buffer, OBJ_SIZE, MPI_CHAR, &mpi_stat ) ) != MPI_SUCCESS ) { fatal_error( mpi_ret, &mpi_stat, "write" ); } } if ( debug ) print_hints( rank, &wfh ); if( (mpi_ret = MPI_File_close( &wfh ) ) != MPI_SUCCESS ) { fatal_error( mpi_ret, NULL, "close for write" ); } if ( debug ) printf( "%d wrote file %s\n", rank, target ); free(buffer); } static int reduce_corruptions( int corrupt_blocks ) { int mpi_ret; int sum; if ( ( mpi_ret = MPI_Reduce( &corrupt_blocks, &sum, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD ) ) != MPI_SUCCESS ) { fatal_error( mpi_ret, NULL, "MPI_Reduce" ); } return sum; } static void read_file( char *target, int rank, MPI_Info *info, int *corrupt_blocks ) { MPI_File rfh; MPI_Status mpi_stat; int mpi_ret; int i; char *buffer; char *verify_buf = NULL; buffer = malloc(OBJ_SIZE); verify_buf = (char *)malloc(OBJ_SIZE); if ( debug ) printf( "%d reading file %s\n", rank, target ); if( (mpi_ret = MPI_File_open(MPI_COMM_WORLD, target, MPI_MODE_RDONLY, *info, &rfh ) ) != MPI_SUCCESS ) { fatal_error( mpi_ret, NULL, "open for read" ); } for( i = 0; i < NUM_OBJS; i++ ) { MPI_Offset offset = get_offset( rank, NUM_OBJS, OBJ_SIZE, i ); fill_buffer( verify_buf, OBJ_SIZE, rank, offset ); if ( debug ) printf( "Expecting %s", buffer ); if ( (mpi_ret = MPI_File_read_at_all( rfh, offset, buffer, OBJ_SIZE, MPI_CHAR, &mpi_stat ) ) != MPI_SUCCESS ) { fatal_error( mpi_ret, &mpi_stat, "read" ); } if ( memcmp( verify_buf, buffer, OBJ_SIZE ) != 0 ) { (*corrupt_blocks)++; printf( "Corruption at %lld\n", offset ); if ( debug ) { printf( "\tExpecting %s\n" "\tRecieved %s\n", verify_buf, buffer ); } } } if( (mpi_ret = MPI_File_close( &rfh ) ) != MPI_SUCCESS ) { fatal_error( mpi_ret, NULL, "close for read" ); } free (buffer); free(verify_buf); } static void set_hints( MPI_Info *info ) { MPI_Info_set( *info, "romio_cb_write", "enable" ); MPI_Info_set( *info, "romio_no_indep_rw", "1" ); MPI_Info_set( *info, "cb_nodes", "1" ); MPI_Info_set( *info, "cb_buffer_size", "4194304" ); } /* void set_hints( MPI_Info *info, char *hints ) { char *delimiter = " "; char *hints_cp = strdup( hints ); char *key = strtok( hints_cp, delimiter ); char *val; while( key ) { val = strtok( NULL, delimiter ); if ( debug ) printf( "HINT: %s = %s\n", key, val ); if ( ! val ) { Usage( __LINE__ ); } MPI_Info_set( *info, key, val ); key = strtok( NULL, delimiter ); } free( hints_cp ); } */ int main( int argc, char *argv[] ) { int nproc = 1, rank = 0; char *target = NULL; int c; MPI_Info info; int mpi_ret; int corrupt_blocks = 0; MPI_Init( &argc, &argv ); MPI_Comm_size(MPI_COMM_WORLD, &nproc); MPI_Comm_rank(MPI_COMM_WORLD, &rank); if( (mpi_ret = MPI_Info_create(&info)) != MPI_SUCCESS) { if(rank == 0) fatal_error( mpi_ret, NULL, "MPI_info_create.\n"); } prog = strdup( argv[0] ); while( ( c = getopt( argc, argv, "df:h" ) ) != EOF ) { switch( c ) { case 'd': debug = 1; break; case 'f': target = strdup( optarg ); break; case 'h': set_hints( &info ); break; default: Usage( __LINE__ ); } } if ( ! target ) { Usage( __LINE__ ); } write_file( target, rank, &info ); read_file( target, rank, &info, &corrupt_blocks ); corrupt_blocks = reduce_corruptions( corrupt_blocks ); if ( rank == 0 ) { if (corrupt_blocks == 0) { fprintf(stdout, " No Errors\n"); } else { fprintf(stdout, "%d/%d blocks corrupt\n", corrupt_blocks, nproc * NUM_OBJS ); } } MPI_Info_free(&info); MPI_Finalize(); free(prog); exit( 0 ); }