diff --git a/parsec/remote_dep.h b/parsec/remote_dep.h index b42699a1d..5ec8284a6 100644 --- a/parsec/remote_dep.h +++ b/parsec/remote_dep.h @@ -53,6 +53,7 @@ typedef struct remote_dep_wire_activate_s { uint32_t taskpool_id; uint32_t task_class_id; uint32_t length; + uintptr_t callback_fn; /**< Only used for GET protocol */ parsec_assignment_t locals[MAX_LOCAL_COUNT]; } remote_dep_wire_activate_t; @@ -155,6 +156,8 @@ struct parsec_remote_deps_s { int32_t priority; uint32_t *remote_dep_fw_mask; /**< list of peers already notified about * the control sequence (only used for control messages) */ + void *memhandles; /**< used for the GET protocol to store the provided memory handles */ + uintptr_t remote_cb_data; /**< used for the GET protocol to store the provided cb data */ struct data_repo_entry_s *repo_entry; struct remote_dep_output_param_s output[1]; }; diff --git a/parsec/remote_dep_mpi.c b/parsec/remote_dep_mpi.c index 8d8759cb1..0a031ec2b 100644 --- a/parsec/remote_dep_mpi.c +++ b/parsec/remote_dep_mpi.c @@ -28,6 +28,8 @@ int parsec_comm_gets = 0; int parsec_comm_puts_max = DEP_NB_CONCURENT * MAX_PARAM_COUNT; int parsec_comm_puts = 0; +static int parsec_remote_dep_use_get = 0; + /** * Number of data movements to be extracted at each step. Bigger the number * larger the amount spent in ordering the tasks, but greater the potential @@ -144,6 +146,24 @@ remote_dep_mpi_get_end_cb(parsec_comm_engine_t *ce, int src, void *cb_data); +static int +remote_dep_mpi_local_get_end_cb(parsec_comm_engine_t *ce, + parsec_ce_mem_reg_handle_t lreg, + ptrdiff_t ldispl, + parsec_ce_mem_reg_handle_t rreg, + ptrdiff_t rdispl, + size_t size, + int src, + void *cb_data); + +static int +remote_dep_mpi_remote_get_end_cb(parsec_comm_engine_t *ce, + parsec_ce_tag_t tag, + void *msg, + size_t msg_size, + int src, + void *data); + static int remote_dep_mpi_put_end_cb(parsec_comm_engine_t *ce, parsec_ce_mem_reg_handle_t lreg, @@ -250,6 +270,9 @@ static void remote_dep_mpi_params(parsec_context_t* context) { #endif parsec_mca_param_reg_int_name("runtime", "comm_aggregate", "Aggregate multiple dependencies in the same short message (1=true,0=false).", false, false, parsec_param_enable_aggregate, &parsec_param_enable_aggregate); + parsec_mca_param_reg_int_name("runtime", "comm_get", "Use the PUT protocol (0) or the GET protocol (1) for transfering payload data.", + false, false, parsec_remote_dep_use_get, &parsec_remote_dep_use_get); + } int @@ -1354,8 +1377,22 @@ static int remote_dep_mpi_pack_dep(int peer, PARSEC_DEBUG_VERBOSE(20, parsec_debug_output, "Can't pack at %d/%d. Bail out!", *position, length); return 1; } + int num_memhandles = 0; + int memhandle_size = parsec_ce.get_mem_handle_size(); + + if (parsec_remote_dep_use_get) { + for(k = 0; deps->outgoing_mask >> k; k++) { + if( !((1U << k) & deps->outgoing_mask )) continue; + if( !(deps->output[k].rank_bits[peer_bank] & peer_mask) ) continue; + ++num_memhandles; + } + if ((num_memhandles*((int)sizeof(uintptr_t) + memhandle_size) + dsize) > (length - (*position))) { + PARSEC_DEBUG_VERBOSE(20, parsec_debug_output, "Can't pack memory handles for GET protocol at %d/%d. Bail out!", *position, length); + return 1; + } + } + /* Don't pack yet, we need to update the length field before packing */ - *position += dsize; assert((0 != msg->output_mask) && /* this should be preset */ (msg->output_mask & deps->outgoing_mask) == deps->outgoing_mask); msg->length = deps->taskpool->tdm.module->outgoing_message_piggyback_size; @@ -1400,8 +1437,58 @@ static int remote_dep_mpi_pack_dep(int peer, #endif /* And now pack the updated message (msg->length and msg->output_mask) itself. */ parsec_ce.pack(&parsec_ce, msg, dep_count, dep_dtt, packed_buffer, length, &saved_position); + + /* If we're using the GET protocol, also pack the memory registration handles */ + if (parsec_remote_dep_use_get) { + /* register memory and pack the memory handles */ + for(k = 0; deps->outgoing_mask >> k; k++) { + if( !((1U << k) & deps->outgoing_mask )) continue; + if( !(deps->output[k].rank_bits[peer_bank] & peer_mask) ) continue; + + void *dataptr = PARSEC_DATA_COPY_GET_PTR(deps->output[k].data.data); + MPI_Datatype dtt = deps->output[k].data.remote.dst_datatype; + int nbdtt = deps->output[k].data.remote.dst_count; + + parsec_ce_mem_reg_handle_t source_memory_handle; + size_t source_memory_handle_size; + + if(parsec_ce.capabilites.supports_noncontiguous_datatype) { + parsec_ce.mem_register(dataptr, PARSEC_MEM_TYPE_NONCONTIGUOUS, + nbdtt, dtt, + -1, + &source_memory_handle, &source_memory_handle_size); + } else { + /* TODO: Implement converter to pack and unpack */ + int dtt_size; + parsec_type_size(dtt, &dtt_size); + parsec_ce.mem_register(dataptr, PARSEC_MEM_TYPE_CONTIGUOUS, + -1, NULL, + dtt_size, + &source_memory_handle, &source_memory_handle_size); + } + /* pack the memory handle and a pointer to the callback data that will be returned upon completion of the GET */ + memcpy(packed_buffer+saved_position, source_memory_handle, source_memory_handle_size); + saved_position += source_memory_handle_size; + dsize += source_memory_handle_size; + + remote_dep_cb_data_t *cb_data = (remote_dep_cb_data_t *) parsec_thread_mempool_allocate + (parsec_remote_dep_cb_data_mempool->thread_mempools); + cb_data->deps = deps; + cb_data->k = k; + cb_data->memory_handle = source_memory_handle; + memcpy(packed_buffer+saved_position, &cb_data, sizeof(uintptr_t)); + saved_position += sizeof(uintptr_t); + dsize += sizeof(uintptr_t); + } + /* set the callback that is called upon completion of GET */ + msg->callback_fn = (uintptr_t)&remote_dep_mpi_remote_get_end_cb; + } + msg->length = dsize + deps->taskpool->tdm.module->outgoing_message_piggyback_size; deps->taskpool->tdm.module->outgoing_message_pack(deps->taskpool, peer, packed_buffer, &saved_position, length); + *position += dsize; + assert(*position < length); + return 0; } @@ -1789,6 +1876,37 @@ remote_dep_mpi_put_end_cb(parsec_comm_engine_t *ce, return 1; } +static int +remote_dep_mpi_remote_get_end_cb(parsec_comm_engine_t *ce, + parsec_ce_tag_t tag, + void *msg, + size_t msg_size, + int src, + void *data) +{ + /* Retreive deps from callback_data */ + remote_dep_cb_data_t *cb_data = (remote_dep_cb_data_t *)msg; + parsec_remote_deps_t* deps = cb_data->deps; + parsec_execution_stream_t* es = &parsec_comm_es; + + PARSEC_DEBUG_VERBOSE(6, parsec_debug_output, "MPI:\tTO\tna\Get END \tunknown \tk=%d\twith deps %p\tparams bla\t(tag=bla) data ptr bla", + ((remote_dep_cb_data_t *)cb_data)->k, deps); + + int rank; + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + + + TAKE_TIME(es->es_profile, MPI_Data_plds_ek, ((remote_dep_cb_data_t *)cb_data)->k); + + remote_dep_complete_and_cleanup(&deps, 1); + + ce->mem_unregister(cb_data->memory_handle); + parsec_thread_mempool_free(parsec_remote_dep_cb_data_mempool->thread_mempools, cb_data); + + parsec_comm_puts--; + return 1; +} + /** * An activation message has been received, and the remote_dep_wire_activate_t @@ -1880,6 +1998,13 @@ remote_dep_mpi_save_activate_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag, deps = remote_deps_allocate(&parsec_remote_dep_context.freelist); ce->unpack(ce, msg, length, &position, &deps->msg, dep_count, dep_dtt); + if (parsec_remote_dep_use_get) { + size_t memhandle_size = ce->get_mem_handle_size(); + deps->memhandles = malloc(memhandle_size); + ce->unpack(ce, msg, length, &position, deps->memhandles, memhandle_size, parsec_datatype_int8_t); + ce->unpack(ce, msg, length, &position, &deps->remote_cb_data, sizeof(uintptr_t), parsec_datatype_int8_t); + } + deps->from = src; deps->eager_msg = msg; @@ -1999,7 +2124,6 @@ static void remote_dep_mpi_get_start(parsec_execution_stream_t* es, { remote_dep_wire_activate_t* task = &(deps->msg); int from = deps->from, k, count, nbdtt; - remote_dep_wire_get_t msg; MPI_Datatype dtt; #if defined(PARSEC_DEBUG_NOISIER) char tmp[MAX_TASK_STRLEN], type_name[MPI_MAX_OBJECT_NAME]; @@ -2016,92 +2140,151 @@ static void remote_dep_mpi_get_start(parsec_execution_stream_t* es, (void)es; DEBUG_MARK_CTL_MSG_ACTIVATE_RECV(from, (void*)task, task); - msg.source_deps = task->deps; /* the deps copied from activate message from source */ - msg.callback_fn = (uintptr_t)remote_dep_mpi_get_end_cb; /* We let the source know to call this - * function when the PUT is over, in a true - * one sided case the (integer) value of this - * function pointer will be registered as the - * TAG to receive the same notification. */ - - for(k = 0; deps->incoming_mask >> k; k++) { - if( !((1U<incoming_mask) ) continue; - msg.output_mask = 0; /* Only get what I need */ - msg.output_mask |= (1U<incoming_mask >> k; k++) { + if( !((1U<incoming_mask) ) continue; + /* register local memory to get data into */ + parsec_ce_mem_reg_handle_t receiver_memory_handle; + size_t receiver_memory_handle_size; + + /* prepare the local receiving data */ + assert(NULL == deps->output[k].data.data); /* we do not support in-place tiles now, make sure it doesn't happen yet */ + if(NULL == deps->output[k].data.data) { + deps->output[k].data.data = remote_dep_copy_allocate(&deps->output[k].data.remote); + } + dtt = deps->output[k].data.remote.dst_datatype; + nbdtt = deps->output[k].data.remote.dst_count; + + if(parsec_ce.capabilites.supports_noncontiguous_datatype) { + parsec_ce.mem_register(PARSEC_DATA_COPY_GET_PTR(deps->output[k].data.data), PARSEC_MEM_TYPE_NONCONTIGUOUS, + nbdtt, dtt, + -1, + &receiver_memory_handle, &receiver_memory_handle_size); + } else { + /* TODO: Implement converter to pack and unpack */ + int dtt_size; + parsec_type_size(dtt, &dtt_size); + parsec_ce.mem_register(PARSEC_DATA_COPY_GET_PTR(deps->output[k].data.data), PARSEC_MEM_TYPE_CONTIGUOUS, + -1, NULL, + dtt_size, + &receiver_memory_handle, &receiver_memory_handle_size); + } - /* We pack the callback data that should be passed to us when the other side - * notifies us to invoke the callback_fn we have assigned above - */ - remote_dep_cb_data_t *callback_data = (remote_dep_cb_data_t *) parsec_thread_mempool_allocate + /* We pack the callback data that should be passed to us when the other side + * notifies us to invoke the callback_fn we have assigned above + */ + remote_dep_cb_data_t *callback_data = (remote_dep_cb_data_t *) parsec_thread_mempool_allocate (parsec_remote_dep_cb_data_mempool->thread_mempools); - callback_data->deps = deps; - callback_data->k = k; - - /* prepare the local receiving data */ - assert(NULL == deps->output[k].data.data); /* we do not support in-place tiles now, make sure it doesn't happen yet */ - if(NULL == deps->output[k].data.data) { - deps->output[k].data.data = remote_dep_copy_allocate(&deps->output[k].data.remote); + callback_data->deps = deps; + callback_data->k = k; + parsec_ce.get(&parsec_ce, receiver_memory_handle, 0, + deps->memhandles, 0, 0, from, + &remote_dep_mpi_local_get_end_cb, callback_data, + deps->msg.callback_fn, &deps->remote_cb_data, sizeof(uintptr_t)); + + parsec_comm_gets++; } - dtt = deps->output[k].data.remote.dst_datatype; - nbdtt = deps->output[k].data.remote.dst_count; - - /* We have the remote mem_handle. - * Let's allocate our mem_reg_handle - * and let the source know. - */ - parsec_ce_mem_reg_handle_t receiver_memory_handle; - size_t receiver_memory_handle_size; - if(parsec_ce.capabilites.supports_noncontiguous_datatype) { - parsec_ce.mem_register(PARSEC_DATA_COPY_GET_PTR(deps->output[k].data.data), PARSEC_MEM_TYPE_NONCONTIGUOUS, - nbdtt, dtt, - -1, - &receiver_memory_handle, &receiver_memory_handle_size); - } else { - /* TODO: Implement converter to pack and unpack */ - int dtt_size; - parsec_type_size(dtt, &dtt_size); - parsec_ce.mem_register(PARSEC_DATA_COPY_GET_PTR(deps->output[k].data.data), PARSEC_MEM_TYPE_CONTIGUOUS, - -1, NULL, - dtt_size, - &receiver_memory_handle, &receiver_memory_handle_size); + } else { - } + remote_dep_wire_get_t msg; + + msg.source_deps = task->deps; /* the deps copied from activate message from source */ + msg.callback_fn = (uintptr_t)remote_dep_mpi_get_end_cb; /* We let the source know to call this + * function when the PUT is over, in a true + * one sided case the (integer) value of this + * function pointer will be registered as the + * TAG to receive the same notification. */ + + + /* Send an AM requesting a PUT for each data */ + for(k = 0; deps->incoming_mask >> k; k++) { + if( !((1U<incoming_mask) ) continue; + msg.output_mask = 0; /* Only get what I need */ + msg.output_mask |= (1U<thread_mempools); + callback_data->deps = deps; + callback_data->k = k; + + /* prepare the local receiving data */ + assert(NULL == deps->output[k].data.data); /* we do not support in-place tiles now, make sure it doesn't happen yet */ + if(NULL == deps->output[k].data.data) { + deps->output[k].data.data = remote_dep_copy_allocate(&deps->output[k].data.remote); + } + dtt = deps->output[k].data.remote.dst_datatype; + nbdtt = deps->output[k].data.remote.dst_count; + + + /* We have the remote mem_handle. + * Let's allocate our mem_reg_handle + * and let the source know. + */ + parsec_ce_mem_reg_handle_t receiver_memory_handle; + size_t receiver_memory_handle_size; + + if(parsec_ce.capabilites.supports_noncontiguous_datatype) { + parsec_ce.mem_register(PARSEC_DATA_COPY_GET_PTR(deps->output[k].data.data), PARSEC_MEM_TYPE_NONCONTIGUOUS, + nbdtt, dtt, + -1, + &receiver_memory_handle, &receiver_memory_handle_size); + } else { + /* TODO: Implement converter to pack and unpack */ + int dtt_size; + parsec_type_size(dtt, &dtt_size); + parsec_ce.mem_register(PARSEC_DATA_COPY_GET_PTR(deps->output[k].data.data), PARSEC_MEM_TYPE_CONTIGUOUS, + -1, NULL, + dtt_size, + &receiver_memory_handle, &receiver_memory_handle_size); + } # if defined(PARSEC_DEBUG_NOISIER) - MPI_Type_get_name(dtt, type_name, &len); - int _size; - MPI_Type_size(dtt, &_size); - PARSEC_DEBUG_VERBOSE(10, parsec_debug_output, "MPI:\tTO\t%d\tGet START\t% -8s\tk=%d\twith datakey %lx at %p type %s count %d displ %ld \t(k=%d, dst_mem_handle=%p)", - from, tmp, k, task->deps, PARSEC_DATA_COPY_GET_PTR(deps->output[k].data.data), type_name, nbdtt, - deps->output[k].data.remote.dst_displ, k, receiver_memory_handle); + MPI_Type_get_name(dtt, type_name, &len); + int _size; + MPI_Type_size(dtt, &_size); + PARSEC_DEBUG_VERBOSE(10, parsec_debug_output, "MPI:\tTO\t%d\tGet START\t% -8s\tk=%d\twith datakey %lx at %p type %s count %d displ %ld \t(k=%d, dst_mem_handle=%p)", + from, tmp, k, task->deps, PARSEC_DATA_COPY_GET_PTR(deps->output[k].data.data), type_name, nbdtt, + deps->output[k].data.remote.dst_displ, k, receiver_memory_handle); # endif - callback_data->memory_handle = receiver_memory_handle; - - /* We need multiple information to be passed to the callback_fn we have assigned above. - * We pack the pointer to this callback_data and pass to the other side so we can complete - * cleanup and take necessary action when the data is available on our side */ - msg.remote_callback_data = (remote_dep_datakey_t)callback_data; - /* We pack the static message(remote_dep_wire_get_t) and our memory_handle and send this message - * to the source. Source is anticipating this exact configuration. - */ - int buf_size = sizeof(remote_dep_wire_get_t) + receiver_memory_handle_size; - void *buf = malloc(buf_size); - memcpy( buf, - &msg, - sizeof(remote_dep_wire_get_t) ); - memcpy( ((char*)buf) + sizeof(remote_dep_wire_get_t), - receiver_memory_handle, - receiver_memory_handle_size ); - - /* Send AM */ - parsec_ce.send_am(&parsec_ce, REMOTE_DEP_GET_DATA_TAG, from, buf, buf_size); - TAKE_TIME(es->es_profile, MPI_Data_ctl_ek, event_id); - - free(buf); - - parsec_comm_gets++; + callback_data->memory_handle = receiver_memory_handle; + /* We need multiple information to be passed to the callback_fn we have assigned above. + * We pack the pointer to this callback_data and pass to the other side so we can complete + * cleanup and take necessary action when the data is available on our side */ + msg.remote_callback_data = (remote_dep_datakey_t)callback_data; + + /* We pack the static message(remote_dep_wire_get_t) and our memory_handle and send this message + * to the source. Source is anticipating this exact configuration. + */ + int buf_size = sizeof(remote_dep_wire_get_t) + receiver_memory_handle_size; + void *buf = malloc(buf_size); + memcpy( buf, + &msg, + sizeof(remote_dep_wire_get_t) ); + memcpy( ((char*)buf) + sizeof(remote_dep_wire_get_t), + receiver_memory_handle, + receiver_memory_handle_size ); + + // TODO: fix the profiling! + //TAKE_TIME_WITH_INFO(MPIctl_prof, MPI_Data_ctl_sk, event_id, + // from, es->virtual_process->parsec_context->my_rank, (*task)); + /* Send AM */ + parsec_ce.send_am(&parsec_ce, REMOTE_DEP_GET_DATA_TAG, from, buf, buf_size); + TAKE_TIME(es->es_profile, MPI_Data_ctl_ek, event_id); + // TODO: fix the profiling! + //TAKE_TIME_WITH_INFO(MPIrcv_prof, MPI_Data_pldr_sk, k, from, + // es->virtual_process->parsec_context->my_rank, deps->msg); + + free(buf); + + parsec_comm_gets++; + } } } @@ -2151,6 +2334,47 @@ remote_dep_mpi_get_end_cb(parsec_comm_engine_t *ce, return 1; } +static int +remote_dep_mpi_local_get_end_cb(parsec_comm_engine_t *ce, + parsec_ce_mem_reg_handle_t lreg, + ptrdiff_t ldispl, + parsec_ce_mem_reg_handle_t rreg, + ptrdiff_t rdispl, + size_t size, + int src, + void *cb_data) +{ + (void) ce; (void) rdispl; (void) size; (void) cb_data; (void) src; (void)ldispl; (void)rreg; + parsec_execution_stream_t* es = &parsec_comm_es; + + remote_dep_cb_data_t *callback_data = (remote_dep_cb_data_t *)cb_data; + parsec_remote_deps_t *deps = (parsec_remote_deps_t *)callback_data->deps; + + if (parsec_remote_dep_use_get) { + free(deps->memhandles); + deps->memhandles = NULL; + } + +#if defined(PARSEC_DEBUG_NOISIER) + char tmp[MAX_TASK_STRLEN]; +#endif + + PARSEC_DEBUG_VERBOSE(6, parsec_debug_output, "MPI:\tFROM\t%d\tGet END \t% -8s\tk=%d\twith datakey na \tparams %lx\t(tag=%d)", + src, remote_dep_cmd_to_string(&deps->msg, tmp, MAX_TASK_STRLEN), + callback_data->k, deps->incoming_mask, src); + + + TAKE_TIME(es->es_profile, MPI_Data_pldr_ek, callback_data->k); + remote_dep_mpi_get_end(es, callback_data->k, deps); + + ce->mem_unregister(&lreg); + parsec_thread_mempool_free(parsec_remote_dep_cb_data_mempool->thread_mempools, callback_data); + + parsec_comm_gets--; + + return 1; +} + static int remote_dep_ce_init(parsec_context_t* context) {