Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GH-15810] Allow the user to adjust parquet import timezone #16304

Open
wants to merge 5 commits into
base: rel-3.46.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public static void gbm_example_flow() {
null,
(byte)'\\',
false,
false,
null).execute().body();
System.out.println("parseSetupBody: " + parseSetupBody);

Expand Down Expand Up @@ -140,6 +141,7 @@ public static void gbm_example_flow() {
null,
null,
parseSetupBody.escapechar,
false,
null).execute().body();
System.out.println("parseBody: " + parseBody);

Expand Down
2 changes: 1 addition & 1 deletion h2o-core/src/main/java/water/api/ParseHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public ParseV3 parse(int version, ParseV3 parse) {
new ParseWriter.ParseErr[0], parse.chunk_size,
parse.decrypt_tool != null ? parse.decrypt_tool.key() : null, parse.skipped_columns,
parse.custom_non_data_line_markers != null ? parse.custom_non_data_line_markers.getBytes(): null,
parse.escapechar, parse.force_col_types);
parse.escapechar, parse.force_col_types, parse.tz_adjustment);

if (parse.source_frames == null)
throw new H2OIllegalArgumentException("Data for Frame '" + parse.destination_frame.name + "' is not available. Please check that the path is valid (for all H2O nodes).'");
Expand Down
3 changes: 3 additions & 0 deletions h2o-core/src/main/java/water/api/schemas3/ParseSetupV3.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ public class ParseSetupV3 extends RequestSchemaV3<ParseSetup, ParseSetupV3> {
" will happen without setting this parameter. Defaults to false.", direction=API.Direction.INPUT)
public boolean force_col_types;

@API(help="Adjust the imported time from GMT timezone to cluster timezone.", direction=API.Direction.INPUT)
public boolean tz_adjustment;

@Override
public ParseSetup fillImpl(ParseSetup impl) {
ParseSetup parseSetup = fillImpl(impl, new String[] {"parse_type"});
Expand Down
3 changes: 3 additions & 0 deletions h2o-core/src/main/java/water/api/schemas3/ParseV3.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,7 @@ public class ParseV3 extends RequestSchemaV3<Iced, ParseV3> {

@API(help="One ASCII character used to escape other characters.", direction=API.Direction.INOUT)
public byte escapechar = ParseSetup.DEFAULT_ESCAPE_CHAR;

@API(help="Adjust the imported time from GMT timezone to cluster timezone.", direction=API.Direction.INPUT)
public boolean tz_adjustment;
}
2 changes: 1 addition & 1 deletion h2o-core/src/main/java/water/parser/ARFFParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ static ParseSetup guessSetup(ByteVec bv, byte[] bits, byte sep, boolean singleQu
naStrings = addDefaultNAs(naStrings, ncols);

// Return the final setup
return new ParseSetup(ARFF_INFO, sep, singleQuotes, ParseSetup.NO_HEADER, ncols, labels, ctypes, domains, naStrings, data, nonDataLineMarkers, escapechar);
return new ParseSetup(ARFF_INFO, sep, singleQuotes, ParseSetup.NO_HEADER, ncols, labels, ctypes, domains, naStrings, data, nonDataLineMarkers, escapechar, false);
}

private static String[][] addDefaultNAs(String[][] naStrings, int nCols) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public abstract class BinaryParserProvider extends ParserProvider {
@Deprecated
public final ParseSetup guessSetup(ByteVec v, byte[] bits, byte sep, int ncols, boolean singleQuotes, int checkHeader, String[] columnNames, byte[] columnTypes, String[][] domains, String[][] naStrings) {
ParseSetup ps = new ParseSetup(null, sep, singleQuotes, checkHeader,
ncols, columnNames, columnTypes, domains, naStrings, null);
ncols, columnNames, columnTypes, domains, naStrings, null, false);
return guessSetup(v, bits, ps);
}

Expand Down
4 changes: 2 additions & 2 deletions h2o-core/src/main/java/water/parser/CsvParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ else if (ParseUUID.isUUID(str))
}
//FIXME should set warning message and let fall through
return new ParseSetup(CSV_INFO, GUESS_SEP, singleQuotes, checkHeader, 1, null, ctypes, domains, naStrings, data, new ParseWriter.ParseErr[0],FileVec.DFLT_CHUNK_SIZE,
nonDataLineMarkers, escapechar);
nonDataLineMarkers, escapechar, false);
}
}
data[0] = determineTokens(lines[0], sep, singleQuotes, escapechar);
Expand Down Expand Up @@ -791,7 +791,7 @@ else if (ParseUUID.isUUID(str))

// Assemble the setup understood so far
ParseSetup resSetup = new ParseSetup(CSV_INFO, sep, singleQuotes, checkHeader, ncols, labels, null, null /*domains*/, naStrings, data,
nonDataLineMarkers, escapechar);
nonDataLineMarkers, escapechar, false);

// now guess the types
if (columnTypes == null || ncols != columnTypes.length) {
Expand Down
52 changes: 33 additions & 19 deletions h2o-core/src/main/java/water/parser/ParseSetup.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class ParseSetup extends Iced {
int[] _parse_columns_indices; // store column indices to be parsed into the final file
byte[] _nonDataLineMarkers;
boolean _force_col_types = false; // at end of parsing, change column type to users specified ones
boolean _tz_adjustment = false;
String[] _orig_column_types; // copy over the original column type setup before translating to byte[]

String[] _synthetic_column_names; // Columns with constant values to be added to parsed Frame
Expand Down Expand Up @@ -73,35 +74,35 @@ public ParseSetup(ParseSetup ps) {
ps._separator, ps._single_quotes, ps._check_header, ps._number_columns,
ps._column_names, ps._column_types, ps._domains, ps._na_strings, ps._data,
new ParseWriter.ParseErr[0], ps._chunk_size, ps._decrypt_tool, ps._skipped_columns,
ps._nonDataLineMarkers, ps._escapechar);
ps._nonDataLineMarkers, ps._escapechar, ps._tz_adjustment);
}

public static ParseSetup makeSVMLightSetup(){
return new ParseSetup(SVMLight_INFO, ParseSetup.GUESS_SEP,
false,ParseSetup.NO_HEADER,1,null,new byte[]{Vec.T_NUM},null,null,null, new ParseWriter.ParseErr[0],
null);
null, false);
}

// This method was called during guess setup, lot of things are null, like ctypes.
// when it is called again, it either contains the guess column types or it will have user defined column types
public ParseSetup(ParserInfo parse_type, byte sep, boolean singleQuotes, int checkHeader, int ncols, String[] columnNames,
byte[] ctypes, String[][] domains, String[][] naStrings, String[][] data, ParseWriter.ParseErr[] errs,
int chunkSize, byte[] nonDataLineMarkers, byte escapeChar) {
int chunkSize, byte[] nonDataLineMarkers, byte escapeChar, boolean tzAdjustment) {
this(parse_type, sep, singleQuotes, checkHeader, ncols, columnNames, ctypes, domains, naStrings, data, errs,
chunkSize, null, null, nonDataLineMarkers, escapeChar);
chunkSize, null, null, nonDataLineMarkers, escapeChar, tzAdjustment);
}

public ParseSetup(ParserInfo parse_type, byte sep, boolean singleQuotes, int checkHeader, int ncols, String[] columnNames,
byte[] ctypes, String[][] domains, String[][] naStrings, String[][] data, ParseWriter.ParseErr[] errs,
int chunkSize, Key<DecryptionTool> decrypt_tool, int[] skipped_columns, byte[] nonDataLineMarkers, byte escapeChar) {
int chunkSize, Key<DecryptionTool> decrypt_tool, int[] skipped_columns, byte[] nonDataLineMarkers, byte escapeChar, boolean tzAdjustment) {
this(parse_type, sep, singleQuotes, checkHeader, ncols, columnNames, ctypes, domains, naStrings, data, errs,
chunkSize, decrypt_tool, skipped_columns, nonDataLineMarkers, escapeChar, false);
chunkSize, decrypt_tool, skipped_columns, nonDataLineMarkers, escapeChar, false, tzAdjustment);
}

public ParseSetup(ParserInfo parse_type, byte sep, boolean singleQuotes, int checkHeader, int ncols, String[] columnNames,
byte[] ctypes, String[][] domains, String[][] naStrings, String[][] data, ParseWriter.ParseErr[] errs,
int chunkSize, Key<DecryptionTool> decrypt_tool, int[] skipped_columns, byte[] nonDataLineMarkers,
byte escapeChar, boolean force_col_types) {
byte escapeChar, boolean force_col_types, boolean tz_adjustment) {
_parse_type = parse_type;
_separator = sep;
_nonDataLineMarkers = nonDataLineMarkers;
Expand All @@ -119,6 +120,7 @@ public ParseSetup(ParserInfo parse_type, byte sep, boolean singleQuotes, int che
_skipped_columns = skipped_columns;
_escapechar = escapeChar;
_force_col_types = force_col_types;
_tz_adjustment = tz_adjustment;
setParseColumnIndices(ncols, _skipped_columns);
}

Expand Down Expand Up @@ -172,7 +174,7 @@ ps.column_names, strToColumnTypes(ps.column_types),
ps.chunk_size,
ps.decrypt_tool != null ? ps.decrypt_tool.key() : null, ps.skipped_columns,
ps.custom_non_data_line_markers != null ? ps.custom_non_data_line_markers.getBytes() : null,
ps.escapechar, ps.force_col_types);
ps.escapechar, ps.force_col_types, ps.tz_adjustment);
this._force_col_types = ps.force_col_types;
this._orig_column_types = this._force_col_types ? (ps.column_types == null ? null : ps.column_types.clone()) : null;
}
Expand All @@ -185,9 +187,9 @@ ps.column_names, strToColumnTypes(ps.column_types),
*/
public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int checkHeader,
int ncols, String[] columnNames, byte[] ctypes,
String[][] domains, String[][] naStrings, String[][] data, byte[] nonDataLineMarkers, byte escapeChar) {
String[][] domains, String[][] naStrings, String[][] data, byte[] nonDataLineMarkers, byte escapeChar, boolean tzAdjustment) {
this(parseType, sep, singleQuotes, checkHeader, ncols, columnNames, ctypes,
domains, naStrings, data, new ParseWriter.ParseErr[0], FileVec.DFLT_CHUNK_SIZE, nonDataLineMarkers, escapeChar);
domains, naStrings, data, new ParseWriter.ParseErr[0], FileVec.DFLT_CHUNK_SIZE, nonDataLineMarkers, escapeChar, tzAdjustment);
}

/**
Expand All @@ -198,30 +200,30 @@ public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int chec
*/
public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int checkHeader,
int ncols, String[] columnNames, byte[] ctypes,
String[][] domains, String[][] naStrings, String[][] data, byte escapeChar) {
String[][] domains, String[][] naStrings, String[][] data, byte escapeChar, boolean tzAdjustment) {
this(parseType, sep, singleQuotes, checkHeader, ncols, columnNames, ctypes,
domains, naStrings, data, new ParseWriter.ParseErr[0], FileVec.DFLT_CHUNK_SIZE, null, escapeChar);
domains, naStrings, data, new ParseWriter.ParseErr[0], FileVec.DFLT_CHUNK_SIZE, null, escapeChar, tzAdjustment);
}

public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int checkHeader,
int ncols, String[] columnNames, byte[] ctypes,
String[][] domains, String[][] naStrings, String[][] data) {
String[][] domains, String[][] naStrings, String[][] data, boolean tzAdjustment) {
this(parseType, sep, singleQuotes, checkHeader, ncols, columnNames, ctypes,
domains, naStrings, data, new ParseWriter.ParseErr[0], FileVec.DFLT_CHUNK_SIZE, null, ParseSetup.DEFAULT_ESCAPE_CHAR);
domains, naStrings, data, new ParseWriter.ParseErr[0], FileVec.DFLT_CHUNK_SIZE, null, ParseSetup.DEFAULT_ESCAPE_CHAR,tzAdjustment);
}

public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int checkHeader,
int ncols, String[] columnNames, byte[] ctypes,
String[][] domains, String[][] naStrings, String[][] data, ParseWriter.ParseErr[] errs, byte[] nonDataLineMarkers) {
String[][] domains, String[][] naStrings, String[][] data, ParseWriter.ParseErr[] errs, byte[] nonDataLineMarkers, boolean tzAdjustment) {
this(parseType, sep, singleQuotes, checkHeader, ncols, columnNames, ctypes,
domains, naStrings, data, errs, FileVec.DFLT_CHUNK_SIZE, nonDataLineMarkers, ParseSetup.DEFAULT_ESCAPE_CHAR);
domains, naStrings, data, errs, FileVec.DFLT_CHUNK_SIZE, nonDataLineMarkers, ParseSetup.DEFAULT_ESCAPE_CHAR, tzAdjustment);
}

public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int checkHeader,
int ncols, String[] columnNames, byte[] ctypes,
String[][] domains, String[][] naStrings, String[][] data, ParseWriter.ParseErr[] errs) {
this(parseType, sep, singleQuotes, checkHeader, ncols, columnNames, ctypes,
domains, naStrings, data, errs, FileVec.DFLT_CHUNK_SIZE, null, ParseSetup.DEFAULT_ESCAPE_CHAR);
domains, naStrings, data, errs, FileVec.DFLT_CHUNK_SIZE, null, ParseSetup.DEFAULT_ESCAPE_CHAR, false);
}

/**
Expand All @@ -230,7 +232,7 @@ public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int chec
* Typically used by file type parsers for returning final invalid results
*/
public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int checkHeader, int ncols, String[][] data, ParseWriter.ParseErr[] errs) {
this(parseType, sep, singleQuotes, checkHeader, ncols, null, null, null, null, data, errs, FileVec.DFLT_CHUNK_SIZE, null, ParseSetup.DEFAULT_ESCAPE_CHAR);
this(parseType, sep, singleQuotes, checkHeader, ncols, null, null, null, null, data, errs, FileVec.DFLT_CHUNK_SIZE, null, ParseSetup.DEFAULT_ESCAPE_CHAR, false);
}

/**
Expand Down Expand Up @@ -258,6 +260,10 @@ public String[] getOrigColumnTypes() {
public boolean getForceColTypes() {
return _force_col_types;
}

public boolean getTzAdjustment() {
return _tz_adjustment;
}

public byte[] getColumnTypes() { return _column_types; }

Expand Down Expand Up @@ -558,6 +564,7 @@ public GuessSetupTsk(ParseSetup userSetup) {
}
if (_gblSetup==null)
throw new RuntimeException("This H2O node couldn't find the file(s) to parse. Please check files and/or working directories.");
_gblSetup.setTzAdjustment(_userSetup.getTzAdjustment());
_gblSetup.setFileName(FileUtils.keyToFileName(key));
}

Expand Down Expand Up @@ -587,6 +594,7 @@ public void reduce(GuessSetupTsk other) {
else
_gblSetup._na_strings = _userSetup._na_strings;
}
_gblSetup._tz_adjustment = _gblSetup._tz_adjustment || _userSetup._tz_adjustment;
// if(_gblSetup._errs != null)
for(ParseWriter.ParseErr err:_gblSetup._errs)
Log.warn("ParseSetup: " + err.toString());
Expand All @@ -600,6 +608,7 @@ private ParseSetup mergeSetups(ParseSetup setupA, ParseSetup setupB, String file
}
ParseSetup mergedSetup = setupA;

mergedSetup._tz_adjustment = setupA._tz_adjustment || setupB._tz_adjustment;
mergedSetup._check_header = unifyCheckHeader(setupA._check_header, setupB._check_header);

mergedSetup._separator = unifyColumnSeparators(setupA._separator, setupB._separator);
Expand Down Expand Up @@ -707,7 +716,7 @@ public static ParseSetup guessSetup(ByteVec bv, byte [] bits, ParseSetup userSet
*/
private ParseSetup toInitialSetup() {
return new ParseSetup(_parse_type, _separator, _single_quotes, _check_header, GUESS_COL_CNT, _column_names,
_column_types, null, null, null, _nonDataLineMarkers, _escapechar);
_column_types, null, null, null, _nonDataLineMarkers, _escapechar, false);
}

/**
Expand Down Expand Up @@ -878,6 +887,11 @@ public ParseSetup setForceColTypes(boolean force_col_types) {
return this;
}

public ParseSetup setTzAdjustment(boolean tz_adjustment) {
this._tz_adjustment = tz_adjustment;
return this;
}

public ParseSetup setDomains(String[][] domains) {
this._domains = domains;
return this;
Expand Down
2 changes: 1 addition & 1 deletion h2o-core/src/main/java/water/parser/ParserProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public final ParseSetup guessSetup(ByteVec v, byte[] bits, ParseSetup userSetup)
*/
protected ParseSetup guessSetup_impl(ByteVec v, byte[] bits, ParseSetup userSetup) {
ParseSetup ps = guessInitSetup(v, bits, userSetup);
return guessFinalSetup(v, bits, ps);
return guessFinalSetup(v, bits, ps).setTzAdjustment(userSetup._tz_adjustment);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion h2o-core/src/main/java/water/parser/SVMLightParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public static ParseSetup guessSetup(byte [] bits) {
if(lastNewline > 0) bits = Arrays.copyOf(bits,lastNewline+1);
SVMLightParser p = new SVMLightParser(new ParseSetup(SVMLight_INFO,
ParseSetup.GUESS_SEP, false,ParseSetup.GUESS_HEADER,ParseSetup.GUESS_COL_CNT,
null,null,null,null,null), null);
null,null,null,null,null, false), null);
SVMLightInspectParseWriter dout = new SVMLightInspectParseWriter();
p.parseChunk(0,new ByteAryData(bits,0), dout);
if (dout._ncols > 0 && dout._nlines > 0 && dout._nlines > dout._invalidLines)
Expand Down
4 changes: 2 additions & 2 deletions h2o-core/src/main/java/water/parser/XlsParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,15 @@ private void readAtLeast(int lim) throws IOException{
/** Try to parse the bytes as XLS format */
public static ParseSetup guessSetup( byte[] bytes ) {
XlsParser p = new XlsParser(new ParseSetup(XLS_INFO, ParseSetup.GUESS_SEP, false,
ParseSetup.GUESS_HEADER, ParseSetup.GUESS_COL_CNT, null, null, null, null, null), null);
ParseSetup.GUESS_HEADER, ParseSetup.GUESS_COL_CNT, null, null, null, null, null, false), null);
p._buf = bytes; // No need to copy already-unpacked data; just use it directly
p._lim = bytes.length;
PreviewParseWriter dout = new PreviewParseWriter();
try{ p.streamParse(new ByteArrayInputStream(bytes), dout); } catch(IOException e) { throw new RuntimeException(e); }
if (dout._ncols > 0 && dout._nlines > 0 && dout._nlines > dout._invalidLines)
return new ParseSetup(XLS_INFO, ParseSetup.GUESS_SEP, false,
dout.colNames()==null?ParseSetup.NO_HEADER:ParseSetup.HAS_HEADER,dout._ncols,
dout.colNames(), dout.guessTypes(),null,null,dout._data);
dout.colNames(), dout.guessTypes(),null,null,dout._data, false);
else throw new ParseDataset.H2OParseException("Could not parse file as an XLS file.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ public AvroParseSetup(int ncols,
String[][] data,
byte[] header,
long blockSize) {
super(AvroParserProvider.AVRO_INFO, (byte) '|', true, HAS_HEADER , ncols, columnNames, ctypes, domains, naStrings, data);
super(AvroParserProvider.AVRO_INFO, (byte) '|', true, HAS_HEADER , ncols, columnNames, ctypes, domains, naStrings, data, false);
this.header = header;
this.blockSize = blockSize;
this.setChunkSize((int) blockSize);
Expand Down
Loading
Loading