Skip to content

Commit

Permalink
Addressing review comments
Browse files Browse the repository at this point in the history
# Conflicts:
#	cinch
  • Loading branch information
hkaiser committed Oct 17, 2020
1 parent 0260db2 commit e2ca73c
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 121 deletions.
2 changes: 1 addition & 1 deletion cinch
2 changes: 2 additions & 0 deletions flecsi/data/hpx/dense.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ struct storage_class_u<storage_label_type_t::dense> {

auto & context = execution::context_t::instance();

using client_type = typename DATA_CLIENT_TYPE::type_identifier_t;

// get field_info for this data handle
auto & field_info = context.get_field_info_from_name(
typeid(typename DATA_CLIENT_TYPE::type_identifier_t).hash_code(),
Expand Down
2 changes: 1 addition & 1 deletion flecsi/execution/execution.h
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ clog_register_tag(execution);
#define flecsi_execute_mpi_task(task, nspace, ...) \
/* MACRO IMPLEMENTATION */ \
\
flecsi_execute_task(task, nspace, index, ##__VA_ARGS__)
flecsi_execute_task(task, nspace, index, ##__VA_ARGS__).get()

//----------------------------------------------------------------------------//
// Reduction Interface
Expand Down
22 changes: 2 additions & 20 deletions flecsi/execution/hpx/context_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ hpx_context_policy_t::hpx_main(void (*driver)(int, char *[]),
int argc,
char * argv[]) {

// initialize executors (possible only after runtime is active)
exec_ = std::make_unique<flecsi::execution::pool_executor>("default");
mpi_exec_ = std::make_unique<flecsi::execution::pool_executor>("mpi");

// execute user code (driver)
(*driver)(argc, argv);

Expand All @@ -84,29 +80,15 @@ hpx_context_policy_t::start_hpx(void (*driver)(int, char *[]),
// disable HPX' short options
"hpx.commandline.aliasing!=0"};

auto init_rp = [](hpx::resource::partitioner & rp) {
// Create a thread pool encapsulating the default scheduler
rp.create_thread_pool("default", hpx::resource::local_priority_fifo);

// Create a thread pool for executing MPI tasks
rp.create_thread_pool("mpi", hpx::resource::static_);

// Add first core to mpi pool
rp.add_resource(rp.numa_domains()[0].cores()[0].pus()[0], "mpi");
};

// Now, initialize and run the HPX runtime, will return when done.
#if HPX_VERSION_FULL < 0x010500
hpx::resource::partitioner rp{
return hpx::init(
hpx::util::bind_front(&hpx_context_policy_t::hpx_main, this, driver), argc,
argv, cfg};
init_rp(rp);
return hpx::init();
argv, cfg);
#else
// Newer versions of HPX do not allow to explicitly initialize the
// resource partitioner anymore
hpx::init_params params;
params.rp_callback = init_rp;
params.cfg = std::move(cfg);

return hpx::init(
Expand Down
63 changes: 4 additions & 59 deletions flecsi/execution/hpx/context_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,19 @@
/@@ /@@/@@//// //@@ @@ /@@/@@
/@@ @@@//@@@@@@ //@@@@@@ @@@@@@@@ /@@
// /// ////// ////// //////// //
*/
Copyright (c) 2016, Los Alamos National Security, LLC
All rights reserved.
*/
#pragma once

#include <hpx/include/async.hpp>
#include <hpx/include/lcos.hpp>
#include <hpx/include/parallel_execution.hpp>
#include <hpx/runtime_fwd.hpp>

#include <cstddef>
#include <functional>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <tuple>
Expand Down Expand Up @@ -50,49 +51,6 @@

#include <flecsi/utils/const_string.h>

#if HPX_VERSION_FULL < 0x010500
namespace flecsi {
namespace execution {

using pool_executor = hpx::threads::executors::pool_executor;
} // namespace execution
} // namespace flecsi
#else
namespace flecsi {
namespace execution {

// newer versions of HPX do not support pool_executor anymore
class pool_executor : public hpx::parallel::execution::thread_pool_executor
{
public:
explicit pool_executor(std::string const & pool_name = "default")
: hpx::parallel::execution::thread_pool_executor(
&hpx::threads::get_thread_manager().get_pool(pool_name),
hpx::threads::thread_priority_default,
hpx::threads::thread_stacksize_default) {}
};
} // namespace execution
} // namespace flecsi

namespace hpx {
namespace parallel {
namespace execution {
template<>
struct is_one_way_executor<flecsi::execution::pool_executor> : std::true_type {
};

template<>
struct is_two_way_executor<flecsi::execution::pool_executor> : std::true_type {
};

template<>
struct is_bulk_two_way_executor<flecsi::execution::pool_executor>
: std::true_type {};
} // namespace execution
} // namespace parallel
} // namespace hpx
#endif

namespace flecsi {
namespace execution {

Expand Down Expand Up @@ -855,14 +813,6 @@ struct hpx_context_policy_t {
start_hpx(void (*driver)(int, char *[]), int argc, char * argv[]);

public:
flecsi::execution::pool_executor & get_default_executor() {
return *exec_;
}

flecsi::execution::pool_executor & get_mpi_executor() {
return *mpi_exec_;
}

int rank;

// private:
Expand All @@ -886,11 +836,6 @@ struct hpx_context_policy_t {

// Map to store task registration callback methods.
std::map<size_t, task_info_t> task_registry_;

private:
std::unique_ptr<flecsi::execution::pool_executor> exec_;
std::unique_ptr<flecsi::execution::pool_executor> mpi_exec_;

}; // struct hpx_context_policy_t

} // namespace execution
Expand Down
38 changes: 8 additions & 30 deletions flecsi/execution/hpx/execution_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

#include <hpx/include/async.hpp>
#include <hpx/include/lcos.hpp>
#include <hpx/include/parallel_execution.hpp>
#include <hpx/include/parallel_executors.hpp>
#include <hpx/include/thread_executors.hpp>

#include <cinchlog.h>
#include <functional>
Expand Down Expand Up @@ -52,11 +49,10 @@ struct executor_u {
/*!
FIXME documentation
*/
template<typename Exec, typename T, typename A>
static decltype(auto) execute(Exec && exec, T function, A && targs) {
auto user_fun = reinterpret_cast<RETURN (*)(std::decay_t<A>)>(function);
return hpx::async(
std::forward<Exec>(exec), user_fun, std::forward<A>(targs));
template<typename T, typename A>
static decltype(auto) execute(T function, A && targs) {
auto user_fun = reinterpret_cast<RETURN (*)(ARG_TUPLE)>(function);
return hpx::async(user_fun, utils::forward_tuple(std::forward<A>(targs)));
} // execute_task
}; // struct executor_u

Expand Down Expand Up @@ -164,27 +160,8 @@ struct FLECSI_EXPORT hpx_execution_policy_t {

annotation::begin<annotation::execute_task_user>(tname);

// hpx_future_u<RETURN> future;
// auto processor_type = context_.processor_type<TASK>();
// if(processor_type == processor_type_t::mpi) {
//
// {
// clog_tag_guard(execution);
// clog(info) << "Executing MPI task: " << TASK << std::endl;
// }
//
// future = executor_u<RETURN, ARG_TUPLE>::execute(
// context_t::instance().get_mpi_executor(), std::move(function),
// task_args);
// }
// else {
// future = executor_u<RETURN, ARG_TUPLE>::execute(
// context_t::instance().get_default_executor(),
// std::move(function), task_args);
// }
hpx_future_u<RETURN> future = executor_u<RETURN, ARG_TUPLE>::execute(
context_t::instance().get_default_executor(), std::move(function),
task_args);
hpx_future_u<RETURN> future =
executor_u<RETURN, ARG_TUPLE>::execute(function, task_args);

annotation::end<annotation::execute_task_user>();

Expand All @@ -204,7 +181,8 @@ struct FLECSI_EXPORT hpx_execution_policy_t {
if constexpr(REDUCTION != ZERO) {

return future
.then([&](hpx_future_u<RETURN> && future) {
.then([](hpx_future_u<RETURN> && future) {
context_t & context_ = context_t::instance();
MPI_Datatype datatype =
flecsi::utils::mpi_typetraits_u<RETURN>::type();

Expand Down
29 changes: 19 additions & 10 deletions flecsi/topology/hpx/set_storage_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,31 @@ struct hpx_set_topology_storage_policy_u {
}

void init_entities(size_t index_space,
size_t active_migrate_index_space,
set_entity_t * entities,
size_t size,
size_t num_entities,
set_entity_t * active_entities,
size_t num_active_entities,
set_entity_t * migrate_entities,
size_t num_migrate_entities,
size_t size,
bool read) {

auto itr = index_space_map.find(index_space);
clog_assert(itr != index_space_map.end(), "invalid index space");
auto & is = index_spaces[itr->second];
auto s = is.storage();

s->set_buffer(entities, num_entities, read);
*s = {{entities, num_entities}, read ? num_entities : 0};

itr = index_space_map.find(active_migrate_index_space);
clog_assert(
itr != index_space_map.end(), "invalid active migrate index space");
auto & amis = index_spaces[itr->second];
auto s2 = amis.storage();

// how to handle migration buffer?
s2->set_buffer(active_entities, num_entities, read);

if(!read) {
return;
Expand Down Expand Up @@ -101,15 +116,9 @@ struct hpx_set_topology_storage_policy_u {
constexpr std::size_t index_space =
find_set_index_space_u<num_index_spaces, entity_types_t, T>::find();

auto & is = index_spaces[index_space].template cast<T *>();
std::size_t entity = is.size();

auto placement_ptr = static_cast<T *>(is.storage()->buffer()) + entity;
auto & is = index_spaces[index_space];
auto placement_ptr = static_cast<T *>(is.data.buffer()) + is.ids.size();
auto ent = new(placement_ptr) T(std::forward<ARG_TYPES>(args)...);
auto storage = is.storage();
storage->pushed();
is.pushed();

return ent;
}
};
Expand Down

0 comments on commit e2ca73c

Please sign in to comment.