/* ---------------------------------------------------------------- */ /* (C)Copyright IBM Corp. 2007, 2008, 2019 */ /* ---------------------------------------------------------------- */ /** * \file ad_gpfs_aggrs.c * \brief The externally used function from this file is is declared in ad_gpfs_aggrs.h */ /* -*- Mode: C; c-basic-offset:4 ; -*- */ /* * Copyright (C) 1997-2001 University of Chicago. * See COPYRIGHT notice in top-level directory. */ #include "adio.h" #include "adio_cb_config_list.h" #include "ad_gpfs.h" #include "ad_gpfs_aggrs.h" #ifdef AGGREGATION_PROFILE #include "mpe.h" #endif #ifdef USE_DBG_LOGGING #define AGG_DEBUG 1 #endif #ifndef TRACE_ERR # define TRACE_ERR(format...) #endif /* Comments copied from common: * This file contains four functions: * * ADIOI_Calc_aggregator() * ADIOI_Calc_file_domains() * ADIOI_Calc_my_req() * ADIOI_Calc_others_req() * * The last three of these were originally in ad_read_coll.c, but they are * also shared with ad_write_coll.c. I felt that they were better kept with * the rest of the shared aggregation code. */ /* Discussion of values available from above: * * ADIO_Offset st_offsets[0..nprocs-1] * ADIO_Offset end_offsets[0..nprocs-1] * These contain a list of start and end offsets for each process in * the communicator. For example, an access at loc 10, size 10 would * have a start offset of 10 and end offset of 19. * int nprocs * number of processors in the collective I/O communicator * ADIO_Offset min_st_offset * ADIO_Offset fd_start[0..nprocs_for_coll-1] * starting location of "file domain"; region that a given process will * perform aggregation for (i.e. actually do I/O) * ADIO_Offset fd_end[0..nprocs_for_coll-1] * start + size - 1 roughly, but it can be less, or 0, in the case of * uneven distributions */ /* Description from common/ad_aggregate.c. (Does it completely apply to bg?) * ADIOI_Calc_aggregator() * * The intention here is to implement a function which provides basically * the same functionality as in Rajeev's original version of * ADIOI_Calc_my_req(). He used a ceiling division approach to assign the * file domains, and we use the same approach here when calculating the * location of an offset/len in a specific file domain. Further we assume * this same distribution when calculating the rank_index, which is later * used to map to a specific process rank in charge of the file domain. * * A better (i.e. more general) approach would be to use the list of file * domains only. This would be slower in the case where the * original ceiling division was used, but it would allow for arbitrary * distributions of regions to aggregators. We'd need to know the * nprocs_for_coll in that case though, which we don't have now. * * Note a significant difference between this function and Rajeev's old code: * this code doesn't necessarily return a rank in the range * 0..nprocs_for_coll; instead you get something in 0..nprocs. This is a * result of the rank mapping; any set of ranks in the communicator could be * used now. * * Returns an integer representing a rank in the collective I/O communicator. * * The "len" parameter is also modified to indicate the amount of data * actually available in this file domain. */ /* * This is more general aggregator search function which does not base on the assumption * that each aggregator hosts the file domain with the same size */ int ADIOI_GPFS_Calc_aggregator(ADIO_File fd, ADIO_Offset off, ADIO_Offset min_off, ADIO_Offset *len, ADIO_Offset fd_size, ADIO_Offset *fd_start, ADIO_Offset *fd_end) { int rank_index, rank; ADIO_Offset avail_bytes; TRACE_ERR("Entering ADIOI_GPFS_Calc_aggregator\n"); ADIOI_Assert ( (off <= fd_end[fd->hints->cb_nodes-1] && off >= min_off && fd_start[0] >= min_off ) ); /* binary search --> rank_index is returned */ int ub = fd->hints->cb_nodes; int lb = 0; /* get an index into our array of aggregators */ /* Common code for striping - bg doesn't use it but it's here to make diff'ing easier. rank_index = (int) ((off - min_off + fd_size)/ fd_size - 1); if (fd->hints->striping_unit > 0) { * wkliao: implementation for file domain alignment fd_start[] and fd_end[] have been aligned with file lock boundaries when returned from ADIOI_Calc_file_domains() so cannot just use simple arithmatic as above * rank_index = 0; while (off > fd_end[rank_index]) rank_index++; } bg does it's own striping below */ rank_index = fd->hints->cb_nodes / 2; while ( off < fd_start[rank_index] || off > fd_end[rank_index] ) { if ( off > fd_end [rank_index] ) { lb = rank_index; rank_index = (rank_index + ub) / 2; } else if ( off < fd_start[rank_index] ) { ub = rank_index; rank_index = (rank_index + lb) / 2; } } /* we index into fd_end with rank_index, and fd_end was allocated to be no * bigger than fd->hins->cb_nodes. If we ever violate that, we're * overrunning arrays. Obviously, we should never ever hit this abort */ if (rank_index >= fd->hints->cb_nodes || rank_index < 0) { FPRINTF(stderr, "Error in ADIOI_Calc_aggregator(): rank_index(%d) >= fd->hints->cb_nodes (%d) fd_size=%lld off=%lld\n", rank_index,fd->hints->cb_nodes,fd_size,off); MPI_Abort(MPI_COMM_WORLD, 1); } /* DBG_FPRINTF ("ADIOI_GPFS_Calc_aggregator: rank_index = %d\n", rank_index ); */ /* * remember here that even in Rajeev's original code it was the case that * different aggregators could end up with different amounts of data to * aggregate. here we use fd_end[] to make sure that we know how much * data this aggregator is working with. * * the +1 is to take into account the end vs. length issue. */ avail_bytes = fd_end[rank_index] + 1 - off; if (avail_bytes < *len && avail_bytes > 0) { /* this file domain only has part of the requested contig. region */ *len = avail_bytes; } /* map our index to a rank */ /* NOTE: FOR NOW WE DON'T HAVE A MAPPING...JUST DO 0..NPROCS_FOR_COLL */ rank = fd->hints->ranklist[rank_index]; TRACE_ERR("Leaving ADIOI_GPFS_Calc_aggregator\n"); return rank; } /* * Compute a dynamic access range based file domain partition among I/O aggregators, * which align to the GPFS block size * Divide the I/O workload among "nprocs_for_coll" processes. This is * done by (logically) dividing the file into file domains (FDs); each * process may directly access only its own file domain. * Additional effort is to make sure that each I/O aggregator get * a file domain that aligns to the GPFS block size. So, there will * not be any false sharing of GPFS file blocks among multiple I/O nodes. * * The common version of this now accepts a min_fd_size and striping_unit. * It doesn't seem necessary here (using GPFS block sizes) but keep it in mind * (e.g. we could pass striping unit instead of using fs_ptr->blksize). */ void ADIOI_GPFS_Calc_file_domains(ADIO_File fd, ADIO_Offset *st_offsets, ADIO_Offset *end_offsets, int nprocs, int nprocs_for_coll, ADIO_Offset *min_st_offset_ptr, ADIO_Offset **fd_start_ptr, ADIO_Offset **fd_end_ptr, ADIO_Offset *fd_size_ptr, void *fs_ptr) { ADIO_Offset min_st_offset, max_end_offset, *fd_start, *fd_end, *fd_size; int i, aggr; TRACE_ERR("Entering ADIOI_GPFS_Calc_file_domains\n"); blksize_t blksize; #ifdef AGGREGATION_PROFILE MPE_Log_event (5004, 0, NULL); #endif # if AGG_DEBUG static char myname[] = "ADIOI_GPFS_Calc_file_domains"; DBG_FPRINTF(stderr, "%s(%d): %d aggregator(s)\n", myname,__LINE__,nprocs_for_coll); # endif if (fd->blksize <= 0) /* default to 1M if blksize unset */ fd->blksize = 1048576; blksize = fd->blksize; # if AGG_DEBUG DBG_FPRINTF(stderr,"%s(%d): Blocksize=%ld\n",myname,__LINE__,blksize); # endif /* find min of start offsets and max of end offsets of all processes */ min_st_offset = st_offsets [0]; max_end_offset = end_offsets[0]; for (i=1; ihints->fs_hints.bg.numbridges*sizeof(int)); /* tmpbridgelistnum: copy of the bridgelistnum whose entries can be * decremented to keep track of bridge assignments during the actual * large block assignments to the agg rank list*/ int *tmpbridgelistnum = (int *) ADIOI_Malloc(fd->hints->fs_hints.bg.numbridges*sizeof(int)); int j; for (j=0;jhints->fs_hints.bg.numbridges;j++) { int k, bridgerankoffset = 0; for (k=0;khints->fs_hints.bg.bridgelistnum[k]; } bridgelistoffset[j] = bridgerankoffset; } for (j=0;jhints->fs_hints.bg.numbridges;j++) tmpbridgelistnum[j] = fd->hints->fs_hints.bg.bridgelistnum[j]; int bridgeiter = 0; /* distribute the large blocks across the aggs going breadth-first * across the bridgelist - this distributes the fd sizes across the * ions, so later in the file domain assignment when it iterates thru * the ranklist the offsets will be contiguous within the bridge and * ion as well */ for (j=0;j 0) { foundbridge = 1; /* printf("bridgeiter is %d tmpbridgelistnum[bridgeiter] is %d bridgelistoffset[bridgeiter] is %d\n",bridgeiter,tmpbridgelistnum[bridgeiter],bridgelistoffset[bridgeiter]); printf("naggs is %d bridgeiter is %d bridgelistoffset[bridgeiter] is %d tmpbridgelistnum[bridgeiter] is %d\n",naggs, bridgeiter,bridgelistoffset[bridgeiter],tmpbridgelistnum[bridgeiter]); printf("naggs is %d bridgeiter is %d setting fd_size[%d]\n",naggs, bridgeiter,bridgelistoffset[bridgeiter]+(fd->hints->bridgelistnum[bridgeiter]-tmpbridgelistnum[bridgeiter])); */ int currentbridgelistnum = (fd->hints->fs_hints.bg.bridgelistnum[bridgeiter]- tmpbridgelistnum[bridgeiter]); int currentfdsizeindex = bridgelistoffset[bridgeiter] + currentbridgelistnum; fd_size[currentfdsizeindex] = (nb_cn_small+1) * blksize; tmpbridgelistnum[bridgeiter]--; } if (bridgeiter == (fd->hints->fs_hints.bg.numbridges-1)) { /* guard against infinite loop - should only ever make 1 pass * thru bridgelist */ ADIOI_Assert(numbridgelistpasses == 0); numbridgelistpasses++; bridgeiter = 0; } else bridgeiter++; } } ADIOI_Free(tmpbridgelistnum); ADIOI_Free(bridgelistoffset); } else { /* BG/L- and BG/P-style distribution of file domains: simple allocation of * file domins to each aggregator */ for (i=0; icomm,&myrank); if (myrank == 0) { fprintf(stderr,"naggs_small is %d nb_cn_small is %d\n",naggs_small,nb_cn_small); for (i=0; ihints->ranklist[i]); } } #endif #else // not BGQ platform for (i=0; i 0) { off += fd_len; /* point to first remaining byte */ fd_len = rem_len; /* save remaining size, pass to calc */ proc = ADIOI_GPFS_Calc_aggregator(fd, off, min_st_offset, &fd_len, fd_size, fd_start, fd_end); count_my_req_per_proc[proc]++; rem_len -= fd_len; /* reduce remaining length by amount from fd */ } } /* now allocate space for my_req, offset, and len */ *my_req_ptr = (ADIOI_Access *) ADIOI_Malloc(nprocs*sizeof(ADIOI_Access)); my_req = *my_req_ptr; count_my_req_procs = 0; for (i=0; i < nprocs; i++) { if (count_my_req_per_proc[i]) { my_req[i].offsets = (ADIO_Offset *) ADIOI_Malloc(count_my_req_per_proc[i] * sizeof(ADIO_Offset)); my_req[i].lens = ADIOI_Malloc(count_my_req_per_proc[i] * sizeof(ADIO_Offset)); count_my_req_procs++; } my_req[i].count = 0; /* will be incremented where needed later */ } /* now fill in my_req */ curr_idx = 0; for (i=0; i 0) { off += fd_len; fd_len = rem_len; proc = ADIOI_GPFS_Calc_aggregator(fd, off, min_st_offset, &fd_len, fd_size, fd_start, fd_end); if (buf_idx[proc] == -1) { ADIOI_Assert(curr_idx == (int) curr_idx); buf_idx[proc] = (int) curr_idx; } l = my_req[proc].count; curr_idx += fd_len; rem_len -= fd_len; my_req[proc].offsets[l] = off; my_req[proc].lens[l] = fd_len; my_req[proc].count++; } } #ifdef AGG_DEBUG for (i=0; i 0) { DBG_FPRINTF(stderr, "data needed from %d (count = %d):\n", i, my_req[i].count); for (l=0; l < my_req[i].count; l++) { DBG_FPRINTF(stderr, " off[%d] = %lld, len[%d] = %lld\n", l, my_req[i].offsets[l], l, my_req[i].lens[l]); } } DBG_FPRINTF(stderr, "buf_idx[%d] = 0x%x\n", i, buf_idx[i]); } #endif *count_my_req_procs_ptr = count_my_req_procs; *buf_idx_ptr = buf_idx; #ifdef AGGREGATION_PROFILE MPE_Log_event (5025, 0, NULL); #endif TRACE_ERR("Leaving ADIOI_GPFS_Calc_my_req\n"); } /* * ADIOI_Calc_others_req (copied to bg and switched to all to all for performance) * * param[in] count_my_req_procs Number of processes whose file domain my * request touches. * param[in] count_my_req_per_proc count_my_req_per_proc[i] gives the no. of * contig. requests of this process in * process i's file domain. * param[in] my_req A structure defining my request * param[in] nprocs Number of nodes in the block * param[in] myrank Rank of this node * param[out] count_others_req_proc_ptr Number of processes whose requests lie in * my process's file domain (including my * process itself) * param[out] others_req_ptr Array of other process' requests that lie * in my process's file domain */ void ADIOI_GPFS_Calc_others_req(ADIO_File fd, int count_my_req_procs, int *count_my_req_per_proc, ADIOI_Access *my_req, int nprocs, int myrank, int *count_others_req_procs_ptr, ADIOI_Access **others_req_ptr) { TRACE_ERR("Entering ADIOI_GPFS_Calc_others_req\n"); /* determine what requests of other processes lie in this process's file domain */ /* count_others_req_procs = number of processes whose requests lie in this process's file domain (including this process itself) count_others_req_per_proc[i] indicates how many separate contiguous requests of proc. i lie in this process's file domain. */ int *count_others_req_per_proc, count_others_req_procs; int i; ADIOI_Access *others_req; /* Parameters for MPI_Alltoallv */ int *scounts, *sdispls, *rcounts, *rdispls; /* first find out how much to send/recv and from/to whom */ #ifdef AGGREGATION_PROFILE MPE_Log_event (5026, 0, NULL); #endif /* Send 1 int to each process. count_my_req_per_proc[i] is the number of * requests that my process will do to the file domain owned by process[i]. * Receive 1 int from each process. count_others_req_per_proc[i] is the number of * requests that process[i] will do to the file domain owned by my process. */ count_others_req_per_proc = (int *) ADIOI_Malloc(nprocs*sizeof(int)); /* cora2a1=timebase(); */ /*for(i=0;icomm); /* total_cora2a+=timebase()-cora2a1; */ /* Allocate storage for an array of other nodes' accesses of our * node's file domain. Also allocate storage for the alltoallv * parameters. */ *others_req_ptr = (ADIOI_Access *) ADIOI_Malloc(nprocs*sizeof(ADIOI_Access)); others_req = *others_req_ptr; scounts = ADIOI_Malloc(nprocs*sizeof(int)); sdispls = ADIOI_Malloc(nprocs*sizeof(int)); rcounts = ADIOI_Malloc(nprocs*sizeof(int)); rdispls = ADIOI_Malloc(nprocs*sizeof(int)); /* If process[i] has any requests in my file domain, * initialize an ADIOI_Access structure that will describe each request * from process[i]. The offsets, lengths, and buffer pointers still need * to be obtained to complete the setting of this structure. */ count_others_req_procs = 0; for (i=0; icomm); for (i=0; icomm); for (i=0; i