Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
rui-mo committed Dec 25, 2024
1 parent 8ebd3a8 commit f0ac10b
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 17 deletions.
8 changes: 1 addition & 7 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -954,13 +954,7 @@ void updateParquetWriterOptions(

if (!parquetWriterOptions->parquetWriteTimestampTimeZone) {
parquetWriterOptions->parquetWriteTimestampTimeZone =
getTimestampTimeZone(
*sessionProperties, core::QueryConfig::kSessionTimezone)
.has_value()
? getTimestampTimeZone(
*sessionProperties, core::QueryConfig::kSessionTimezone)
: getTimestampTimeZone(
*hiveConfig->config(), core::QueryConfig::kSessionTimezone);
parquetWriterOptions->sessionTimezoneName;
}

writerOptions = std::move(parquetWriterOptions);
Expand Down
11 changes: 4 additions & 7 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -780,19 +780,16 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
insertTableHandle_->serdeParameters().end());
}

options->sessionTimezoneName = connectorQueryCtx_->sessionTimezone();
options->adjustTimestampToTimezone =
connectorQueryCtx_->adjustTimestampToTimezone();

updateWriterOptionsFromHiveConfig(
insertTableHandle_->storageFormat(),
hiveConfig_,
connectorSessionProperties,
options);

const auto& sessionTimeZoneName = connectorQueryCtx_->sessionTimezone();
if (!sessionTimeZoneName.empty()) {
options->sessionTimezone = tz::locateZone(sessionTimeZoneName);
}
options->adjustTimestampToTimezone =
connectorQueryCtx_->adjustTimestampToTimezone();

// Prevents the memory allocation during the writer creation.
WRITER_NON_RECLAIMABLE_SECTION_GUARD(writerInfo_.size() - 1);
auto writer = writerFactory_->createWriter(
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/common/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ struct WriterOptions {
std::function<std::unique_ptr<dwio::common::FlushPolicy>()>
flushPolicyFactory;

const tz::TimeZone* sessionTimezone{nullptr};
std::string sessionTimezoneName;
bool adjustTimestampToTimezone{false};

virtual ~WriterOptions() = default;
Expand Down
14 changes: 12 additions & 2 deletions velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ TEST_F(ParquetWriterTest, parquetWriteTimestampTimeZoneWithDefault) {
};

#ifdef VELOX_ENABLE_PARQUET
DEBUG_ONLY_TEST_F(ParquetWriterTest, unitFromHiveConfig) {
DEBUG_ONLY_TEST_F(ParquetWriterTest, timestampUnitAndTimeZone) {
SCOPED_TESTVALUE_SET(
"facebook::velox::parquet::Writer::write",
std::function<void(const ::arrow::Schema*)>(
Expand All @@ -215,6 +215,14 @@ DEBUG_ONLY_TEST_F(ParquetWriterTest, unitFromHiveConfig) {
ASSERT_EQ(tsType->unit(), ::arrow::TimeUnit::MICRO);
})));

SCOPED_TESTVALUE_SET(
"facebook::velox::parquet::Writer::Writer",
std::function<void(const ArrowOptions* options)>(
([&](const ArrowOptions* options) {
ASSERT_TRUE(options->timestampTimeZone.has_value());
ASSERT_EQ(options->timestampTimeZone.value(), "America/New_York");
})));

const auto data = makeRowVector({makeFlatVector<Timestamp>(
10'000, [](auto row) { return Timestamp(row, row); })});
const auto outputDirectory = TempDirectoryPath::create();
Expand All @@ -230,7 +238,9 @@ DEBUG_ONLY_TEST_F(ParquetWriterTest, unitFromHiveConfig) {
{},
writerOptions)
.planNode();
AssertQueryBuilder(plan).copyResults(pool_.get());
AssertQueryBuilder(plan)
.config("session_timezone", "America/New_York")
.copyResults(pool_.get());
}
#endif

Expand Down
2 changes: 2 additions & 0 deletions velox/dwio/parquet/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ Writer::Writer(
static_cast<TimestampUnit>(options.parquetWriteTimestampUnit.value_or(
TimestampPrecision::kNanoseconds));
options_.timestampTimeZone = options.parquetWriteTimestampTimeZone;
common::testutil::TestValue::adjust(
"facebook::velox::parquet::Writer::Writer", &options_);
arrowContext_->properties =
getArrowParquetWriterOptions(options, flushPolicy_);
setMemoryReclaimers();
Expand Down

0 comments on commit f0ac10b

Please sign in to comment.