Skip to content

Commit

Permalink
Improve writing large arrays (#18)
Browse files Browse the repository at this point in the history
Merged FesapiHdfProxy::createSubArrayNd() in FesapiHdfProxy::writeSubArrayNd()
  • Loading branch information
Hoosayin authored Jul 31, 2023
1 parent 58161f8 commit 4c5ec7d
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 84 deletions.
135 changes: 56 additions & 79 deletions src/etp/fesapi/FesapiHdfProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,68 +115,6 @@ std::vector<uint32_t> FesapiHdfProxy::getElementCountPerDimension(const std::str
return result;
}

template<typename T>
void FesapiHdfProxy::writeSubArrayNd(
const std::string& uri,
const std::string& pathInResource,
std::vector<int64_t>& totalCounts,
std::vector<int64_t> starts,
std::vector<int64_t> counts,
const void* values)
{
// Calculate array size
size_t totalCount{ 1 };

for (const auto& count : counts) {
totalCount *= count;
}

// [Base Condition] Array size is OK to be transmitted.
if ((totalCount * sizeof(T)) <= maxArraySize_) {

// PUT DATA SUBARRAYS
Energistics::Etp::v12::Protocol::DataArray::PutDataSubarrays pdsa{};
pdsa.dataSubarrays["0"].uid.uri = uri;
pdsa.dataSubarrays["0"].uid.pathInResource = pathInResource;
pdsa.dataSubarrays["0"].starts = starts;
pdsa.dataSubarrays["0"].counts = counts;

// Cast values in T values.
const T* typeValues{ static_cast<const T*>(values) };

// Create 1D Array for Sub Values.
T* subValues = new T[totalCount];
size_t valueIndex{ 0 };

// Recursively populate subValues starting from first dimension.
populateSubValuesNd<T>(
0,
totalCounts, starts, counts,
valueIndex, typeValues, subValues);

// Create AVRO Array
Energistics::Etp::v12::Datatypes::AnyArray data;
createAnyArray<T>(data, totalCount, subValues); // Type-specific code is written in explicit specializations for createAnyArray().
pdsa.dataSubarrays["0"].data = data;

std::cout << "Writing subarray..." << std::endl;

// Send putDataSubarrays Message
session_->sendAndBlock(pdsa, 0, 0x02);

// Delete Array
delete[] subValues;
}
// [Divide and Conquer Approach] If sub array is still large, partition it into more sub arrays.
else {
// Recursively divide all dimensions starting from first dimension.
createSubArrayNd<T>(
0,
uri, pathInResource, totalCounts,
starts, counts, values);
}
}

template<typename T>
void FesapiHdfProxy::populateSubValuesNd(
size_t dimensionIndex,
Expand Down Expand Up @@ -245,7 +183,7 @@ int64_t FesapiHdfProxy::getCountsProduct(
}

template<typename T>
void FesapiHdfProxy::createSubArrayNd(
void FesapiHdfProxy::writeSubArrayNd(
size_t dimensionIndex,
const std::string& uri,
const std::string& pathInResource,
Expand All @@ -254,15 +192,56 @@ void FesapiHdfProxy::createSubArrayNd(
std::vector<int64_t> counts,
const void* values)
{
// [Base Condition] If dimensionIndex exceeds the last dimension.
if (dimensionIndex >= starts.size()) {
// Recursively Write Subarray.
// Calculate array size
size_t totalCount{ 1 };

for (const auto& count : counts) {
totalCount *= count;
}

// [Base Condition] If subarray can be transmitted.
if ((totalCount * sizeof(T)) <= maxArraySize_) {
// PUT DATA SUBARRAYS
Energistics::Etp::v12::Protocol::DataArray::PutDataSubarrays pdsa{};
pdsa.dataSubarrays["0"].uid.uri = uri;
pdsa.dataSubarrays["0"].uid.pathInResource = pathInResource;
pdsa.dataSubarrays["0"].starts = starts;
pdsa.dataSubarrays["0"].counts = counts;

// Cast values in T values.
const T* typeValues{ static_cast<const T*>(values) };

// Create 1D Array for Sub Values.
T* subValues = new T[totalCount];
size_t valueIndex{ 0 };

// Recursively populate subValues starting from first dimension.
populateSubValuesNd<T>(
0,
totalCounts, starts, counts,
valueIndex, typeValues, subValues);

// Create AVRO Array
Energistics::Etp::v12::Datatypes::AnyArray data;
createAnyArray<T>(data, totalCount, subValues); // Type-specific code is written in explicit specializations for createAnyArray().
pdsa.dataSubarrays["0"].data = data;

std::cout << "Writing subarray..." << std::endl;

// Send putDataSubarrays Message
session_->sendAndBlock(pdsa, 0, 0x02);

// Delete Array
delete[] subValues;
}
// Again divide all dimensions starting from first dimension.
else if (dimensionIndex >= starts.size()) {
writeSubArrayNd<T>(
0,
uri, pathInResource, totalCounts,
starts,
counts,
values);
starts, counts, values);
}
// Divide the values of current dimension in halves.
else {
int64_t numberOfValues = counts[dimensionIndex];

Expand All @@ -273,7 +252,7 @@ void FesapiHdfProxy::createSubArrayNd(
newCounts[dimensionIndex] = firstHalfValues;

// Recursively divide next dimension.
createSubArrayNd<T>(
writeSubArrayNd<T>(
dimensionIndex + 1,
uri, pathInResource, totalCounts,
starts,
Expand All @@ -285,7 +264,7 @@ void FesapiHdfProxy::createSubArrayNd(
newCounts[dimensionIndex] = secondHalfValues;

// Recursively divide next dimension.
createSubArrayNd<T>(
writeSubArrayNd<T>(
dimensionIndex + 1,
uri, pathInResource, totalCounts,
newStarts,
Expand Down Expand Up @@ -543,43 +522,41 @@ void FesapiHdfProxy::writeArrayNd(const std::string & groupName,

// Recursively Write Subarrays
if (datatype == COMMON_NS::AbstractObject::numericalDatatypeEnum::DOUBLE) {
writeSubArrayNd<double>(uri, pathInResource, counts,
writeSubArrayNd<double>(0, uri, pathInResource, counts,
starts,
counts,
values);
}
else if (datatype == COMMON_NS::AbstractObject::numericalDatatypeEnum::FLOAT) {
writeSubArrayNd<float>(uri, pathInResource, counts,
writeSubArrayNd<float>(0, uri, pathInResource, counts,
starts,
counts,
values);
}
else if (datatype == COMMON_NS::AbstractObject::numericalDatatypeEnum::INT64 ||
datatype == COMMON_NS::AbstractObject::numericalDatatypeEnum::UINT64) {
writeSubArrayNd<int64_t>(uri, pathInResource, counts,
writeSubArrayNd<int64_t>(0, uri, pathInResource, counts,
starts,
counts,
values);
}
else if (datatype == COMMON_NS::AbstractObject::numericalDatatypeEnum::INT32 ||
datatype == COMMON_NS::AbstractObject::numericalDatatypeEnum::UINT32) {
writeSubArrayNd<int32_t>(uri, pathInResource, counts,
writeSubArrayNd<int32_t>(0, uri, pathInResource, counts,
starts,
counts,
values);
}
else if (datatype == COMMON_NS::AbstractObject::numericalDatatypeEnum::INT16 ||
datatype == COMMON_NS::AbstractObject::numericalDatatypeEnum::UINT16) {
writeSubArrayNd<short>(
uri, pathInResource, counts,
writeSubArrayNd<short>(0, uri, pathInResource, counts,
starts,
counts,
values);
}
else if (datatype == COMMON_NS::AbstractObject::numericalDatatypeEnum::INT8 ||
datatype == COMMON_NS::AbstractObject::numericalDatatypeEnum::UINT8) {
writeSubArrayNd<char>(
uri, pathInResource, counts,
writeSubArrayNd<char>(0, uri, pathInResource, counts,
starts,
counts,
values);
Expand Down
10 changes: 5 additions & 5 deletions src/etp/fesapi/FesapiHdfProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,14 @@ namespace ETP_NS
* @param counts The number of values in each dimension of the subarray to be written.
* @param values 1d array of specific datatype ordered firstly by fastest direction.
*/
template<typename T>
/*template<typename T>
void writeSubArrayNd(
const std::string& uri,
const std::string& pathInResource,
std::vector<int64_t>& totalCounts,
std::vector<int64_t> starts,
std::vector<int64_t> counts,
const void* values);
const void* values);*/

/**
* Recursively populate subValues array from original values array.
Expand Down Expand Up @@ -201,7 +201,7 @@ namespace ETP_NS
std::vector<int64_t>& totalCounts);

/**
* Recursively divide each dimension into half and create a new nD subarray.
* Recursively write sub arrays (potentially with 2 dimensions) of a specific datatype into the HDF file by means of a single dataset.
* @param dimensionIndex The index of dimension in nD array.
* @param uri The uri of the original array.
* @param pathInResource The path of the original array.
Expand All @@ -211,7 +211,7 @@ namespace ETP_NS
* @param values 1d array of specific datatype ordered firstly by fastest direction.
*/
template<typename T>
void createSubArrayNd(
void writeSubArrayNd(
size_t dimensionIndex,
const std::string& uri,
const std::string& pathInResource,
Expand Down Expand Up @@ -575,7 +575,7 @@ namespace ETP_NS
AbstractSession* session_;
unsigned int compressionLevel;
std::string xmlNs_;
int maxArraySize_{ 4000000 }; // Bytes
int maxArraySize_{ 12000000 }; // Bytes

Energistics::Etp::v12::Datatypes::DataArrayTypes::DataArrayIdentifier buildDataArrayIdentifier(const std::string & datasetName) const;
Energistics::Etp::v12::Protocol::DataArray::GetDataArrays buildGetDataArraysMessage(const std::string & datasetName) const;
Expand Down

0 comments on commit 4c5ec7d

Please sign in to comment.