From b88a3862212e9372a691d7464bf3f82a66114b8a Mon Sep 17 00:00:00 2001 From: Tim Schaub Date: Mon, 25 Sep 2023 14:58:10 -0600 Subject: [PATCH] Optionally use stdin/stdout for validate, describe, and convert commands --- cmd/gpq/command/command.go | 16 ++++ cmd/gpq/command/command_test.go | 65 ++++++++++++++ cmd/gpq/{ => command}/convert.go | 73 ++++++++++++---- cmd/gpq/command/convert_test.go | 122 +++++++++++++++++++++++++++ cmd/gpq/{ => command}/describe.go | 56 +++++++----- cmd/gpq/command/describe_test.go | 101 ++++++++++++++++++++++ cmd/gpq/{ => command}/validate.go | 29 ++++++- cmd/gpq/{ => command}/version.go | 2 +- cmd/gpq/main.go | 10 +-- internal/geo/geo.go | 75 +++++++++------- internal/geoparquet/geoparquet.go | 4 +- internal/test/test.go | 30 ++----- internal/validator/validator.go | 12 +-- internal/validator/validator_test.go | 8 +- 14 files changed, 486 insertions(+), 117 deletions(-) create mode 100644 cmd/gpq/command/command.go create mode 100644 cmd/gpq/command/command_test.go rename cmd/gpq/{ => command}/convert.go (66%) create mode 100644 cmd/gpq/command/convert_test.go rename cmd/gpq/{ => command}/describe.go (84%) create mode 100644 cmd/gpq/command/describe_test.go rename cmd/gpq/{ => command}/validate.go (80%) rename cmd/gpq/{ => command}/version.go (98%) diff --git a/cmd/gpq/command/command.go b/cmd/gpq/command/command.go new file mode 100644 index 0000000..1a2558f --- /dev/null +++ b/cmd/gpq/command/command.go @@ -0,0 +1,16 @@ +package command + +import "io" + +var CLI struct { + Convert ConvertCmd `cmd:"" help:"Convert data from one format to another."` + Validate ValidateCmd `cmd:"" help:"Validate a GeoParquet file."` + Describe DescribeCmd `cmd:"" help:"Describe a GeoParquet file."` + Version VersionCmd `cmd:"" help:"Print the version of this program."` +} + +type ReaderAtSeeker interface { + io.Reader + io.ReaderAt + io.Seeker +} diff --git a/cmd/gpq/command/command_test.go b/cmd/gpq/command/command_test.go new file mode 100644 index 0000000..716c0aa --- /dev/null +++ b/cmd/gpq/command/command_test.go @@ -0,0 +1,65 @@ +package command_test + +import ( + "io" + "os" + "testing" + + "github.com/stretchr/testify/suite" +) + +type Suite struct { + suite.Suite + originalStdin *os.File + mockStdin *os.File + originalStdout *os.File + mockStdout *os.File +} + +func (s *Suite) SetupTest() { + stdin, err := os.CreateTemp("", "stdin") + s.Require().NoError(err) + s.originalStdin = os.Stdin + s.mockStdin = stdin + os.Stdin = stdin + + stdout, err := os.CreateTemp("", "stdout") + s.Require().NoError(err) + s.originalStdout = os.Stdout + s.mockStdout = stdout + os.Stdout = stdout +} + +func (s *Suite) writeStdin(data []byte) { + _, writeErr := s.mockStdin.Write(data) + s.Require().NoError(writeErr) + _, seekErr := s.mockStdin.Seek(0, 0) + s.Require().NoError(seekErr) +} + +func (s *Suite) readStdout() []byte { + if _, seekErr := s.mockStdout.Seek(0, 0); seekErr != nil { + // assume the file is closed + stdout, err := os.Open(s.mockStdout.Name()) + s.Require().NoError(err) + s.mockStdout = stdout + } + data, err := io.ReadAll(s.mockStdout) + s.Require().NoError(err) + return data +} + +func (s *Suite) TearDownTest() { + os.Stdout = s.originalStdout + os.Stdin = s.originalStdin + + _ = s.mockStdin.Close() + s.NoError(os.Remove(s.mockStdin.Name())) + + _ = s.mockStdout.Close() + s.NoError(os.Remove(s.mockStdout.Name())) +} + +func TestSuite(t *testing.T) { + suite.Run(t, &Suite{}) +} diff --git a/cmd/gpq/convert.go b/cmd/gpq/command/convert.go similarity index 66% rename from cmd/gpq/convert.go rename to cmd/gpq/command/convert.go index a65e31f..5557282 100644 --- a/cmd/gpq/convert.go +++ b/cmd/gpq/command/convert.go @@ -12,11 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -package main +package command import ( + "bytes" "errors" "fmt" + "io" "os" "strings" @@ -25,9 +27,9 @@ import ( ) type ConvertCmd struct { - Input string `arg:"" name:"input" help:"Input file." type:"existingfile"` + Input string `arg:"" optional:"" name:"input" help:"Input file. If not provided, input is read from stdin." type:"path"` From string `help:"Input file format. Possible values: ${enum}." enum:"auto, geojson, geoparquet, parquet" default:"auto"` - Output string `arg:"" name:"output" help:"Output file." type:"path"` + Output string `arg:"" optional:"" name:"output" help:"Output file. If not provided, output is written to stdout." type:"path"` To string `help:"Output file format. Possible values: ${enum}." enum:"auto, geojson, geoparquet" default:"auto"` Min int `help:"Minimum number of features to consider when building a schema." default:"10"` Max int `help:"Maximum number of features to consider when building a schema." default:"100"` @@ -53,6 +55,9 @@ var validTypes = map[FormatType]bool{ } func parseFormatType(format string) FormatType { + if format == "" { + return AutoType + } ft := FormatType(strings.ToLower(format)) if !validTypes[ft] { return UnknownType @@ -73,34 +78,72 @@ func getFormatType(filename string) FormatType { return UnknownType } +func hasStdin() bool { + stats, err := os.Stdin.Stat() + if err != nil { + return false + } + return stats.Size() > 0 +} + func (c *ConvertCmd) Run() error { + inputSource := c.Input + outputSource := c.Output + + if outputSource == "" && hasStdin() { + outputSource = inputSource + inputSource = "" + } + outputFormat := parseFormatType(c.To) if outputFormat == AutoType { - outputFormat = getFormatType(c.Output) + if outputSource == "" { + return fmt.Errorf("when writing to stdout, the --to option must be provided to determine the output format") + } + outputFormat = getFormatType(outputSource) } if outputFormat == UnknownType { - return fmt.Errorf("could not determine output format for %s", c.Output) + return fmt.Errorf("could not determine output format for %s", outputSource) } inputFormat := parseFormatType(c.From) if inputFormat == AutoType { - inputFormat = getFormatType(c.Input) + if inputSource == "" { + return fmt.Errorf("when reading from stdin, the --from option must be provided to determine the input format") + } + inputFormat = getFormatType(inputSource) } if inputFormat == UnknownType { - return fmt.Errorf("could not determine input format for %s", c.Input) + return fmt.Errorf("could not determine input format for %s", inputSource) } - input, readErr := os.Open(c.Input) - if readErr != nil { - return fmt.Errorf("failed to read from %q: %w", c.Input, readErr) + var input ReaderAtSeeker + if inputSource == "" { + data, err := io.ReadAll(os.Stdin) + if err != nil { + return fmt.Errorf("trouble reading from stdin: %w", err) + } + input = bytes.NewReader(data) + } else { + i, readErr := os.Open(inputSource) + if readErr != nil { + return fmt.Errorf("failed to read from %q: %w", inputSource, readErr) + } + defer i.Close() + input = i } - defer input.Close() - output, createErr := os.Create(c.Output) - if createErr != nil { - return fmt.Errorf("failed to open %q for writing: %w", c.Output, createErr) + var output *os.File + if outputSource == "" { + output = os.Stdout + } else { + o, createErr := os.Create(outputSource) + if createErr != nil { + return fmt.Errorf("failed to open %q for writing: %w", outputSource, createErr) + } + defer o.Close() + output = o } - defer output.Close() if inputFormat == GeoJSONType { if outputFormat != ParquetType && outputFormat != GeoParquetType { diff --git a/cmd/gpq/command/convert_test.go b/cmd/gpq/command/convert_test.go new file mode 100644 index 0000000..48b5d24 --- /dev/null +++ b/cmd/gpq/command/convert_test.go @@ -0,0 +1,122 @@ +package command_test + +import ( + "bytes" + "encoding/json" + + "github.com/apache/arrow/go/v14/parquet/file" + "github.com/planetlabs/gpq/cmd/gpq/command" + "github.com/planetlabs/gpq/internal/geo" + "github.com/planetlabs/gpq/internal/test" +) + +func (s *Suite) TestConvertGeoParquetToGeoJSONStdout() { + cmd := &command.ConvertCmd{ + From: "auto", + Input: "../../../internal/testdata/cases/example-v1.0.0.parquet", + To: "geojson", + } + + s.Require().NoError(cmd.Run()) + data := s.readStdout() + + collection := &geo.FeatureCollection{} + s.Require().NoError(json.Unmarshal(data, collection)) + s.Len(collection.Features, 5) +} + +func (s *Suite) TestConvertGeoJSONToGeoParquetStdout() { + cmd := &command.ConvertCmd{ + From: "auto", + Input: "../../../internal/geojson/testdata/example.geojson", + To: "parquet", + } + + s.Require().NoError(cmd.Run()) + data := s.readStdout() + + fileReader, err := file.NewParquetReader(bytes.NewReader(data)) + s.Require().NoError(err) + defer fileReader.Close() + + s.Equal(int64(5), fileReader.NumRows()) +} + +func (s *Suite) TestConvertGeoParquetToUnknownStdout() { + cmd := &command.ConvertCmd{ + From: "auto", + Input: "../../../internal/testdata/cases/example-v1.0.0.parquet", + } + + s.ErrorContains(cmd.Run(), "when writing to stdout, the --to option must be provided") +} + +func (s *Suite) TestConvertGeoJSONStdinToGeoParquetStdout() { + s.writeStdin([]byte(`{ + "type": "FeatureCollection", + "features": [ + { + "type": "Feature", + "properties": { + "name": "Null Island" + }, + "geometry": { + "type": "Point", + "coordinates": [0, 0] + } + } + ] + }`)) + + cmd := &command.ConvertCmd{ + From: "geojson", + To: "geoparquet", + } + + s.Require().NoError(cmd.Run()) + data := s.readStdout() + + fileReader, err := file.NewParquetReader(bytes.NewReader(data)) + s.Require().NoError(err) + defer fileReader.Close() + + s.Equal(int64(1), fileReader.NumRows()) +} + +func (s *Suite) TestConvertGeoParquetStdinToGeoJSONStdout() { + s.writeStdin(test.GeoParquetFromJSON(s.T(), `{ + "type": "FeatureCollection", + "features": [ + { + "type": "Feature", + "properties": { + "name": "Null Island" + }, + "geometry": { + "type": "Point", + "coordinates": [0, 0] + } + } + ] + }`)) + + cmd := &command.ConvertCmd{ + From: "geoparquet", + To: "geojson", + } + + s.Require().NoError(cmd.Run()) + data := s.readStdout() + + collection := &geo.FeatureCollection{} + s.Require().NoError(json.Unmarshal(data, collection)) + s.Len(collection.Features, 1) +} + +func (s *Suite) TestConvertUnknownStdinToGeoParquetStdout() { + cmd := &command.ConvertCmd{ + To: "geoparquet", + } + + s.ErrorContains(cmd.Run(), "when reading from stdin, the --from option must be provided") +} diff --git a/cmd/gpq/describe.go b/cmd/gpq/command/describe.go similarity index 84% rename from cmd/gpq/describe.go rename to cmd/gpq/command/describe.go index 603784f..3033535 100644 --- a/cmd/gpq/describe.go +++ b/cmd/gpq/command/describe.go @@ -12,12 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -package main +package command import ( + "bytes" "encoding/json" "errors" "fmt" + "io" "os" "strconv" "strings" @@ -32,7 +34,7 @@ import ( ) type DescribeCmd struct { - Input string `arg:"" name:"input" help:"Path to a GeoParquet file." type:"existingfile"` + Input string `arg:"" optional:"" name:"input" help:"Path to a GeoParquet file. If not provided, input is read from stdin." type:"existingfile"` Format string `help:"Report format. Possible values: ${enum}." enum:"text, json" default:"text"` Unpretty bool `help:"No newlines or indentation in the JSON output."` } @@ -50,11 +52,21 @@ const ( ) func (c *DescribeCmd) Run() error { - input, readErr := os.Open(c.Input) - if readErr != nil { - return fmt.Errorf("failed to read from %q: %w", c.Input, readErr) + var input ReaderAtSeeker + if c.Input == "" { + data, err := io.ReadAll(os.Stdin) + if err != nil { + return fmt.Errorf("trouble reading from stdin: %w", err) + } + input = bytes.NewReader(data) + } else { + i, readErr := os.Open(c.Input) + if readErr != nil { + return fmt.Errorf("failed to read from %q: %w", c.Input, readErr) + } + defer i.Close() + input = i } - defer input.Close() fileReader, fileErr := file.NewParquetReader(input) if fileErr != nil { @@ -69,7 +81,7 @@ func (c *DescribeCmd) Run() error { } } - info := &Info{ + info := &DescribeInfo{ Schema: buildSchema(fileReader, "", fileMetadata.Schema.Root()), Metadata: metadata, NumRows: fileMetadata.NumRows, @@ -81,7 +93,7 @@ func (c *DescribeCmd) Run() error { return c.formatText(info) } -func (c *DescribeCmd) formatText(info *Info) error { +func (c *DescribeCmd) formatText(info *DescribeInfo) error { metadata := info.Metadata header := table.Row{ColName, ColType, ColAnnotation, ColRepetition, ColCompression} @@ -180,7 +192,7 @@ func makeFooter(key string, value any, header table.Row) table.Row { return row } -func (c *DescribeCmd) formatJSON(info *Info) error { +func (c *DescribeCmd) formatJSON(info *DescribeInfo) error { encoder := json.NewEncoder(os.Stdout) if !c.Unpretty { encoder.SetIndent("", " ") @@ -193,20 +205,20 @@ func (c *DescribeCmd) formatJSON(info *Info) error { return nil } -type Info struct { - Schema *Schema `json:"schema"` +type DescribeInfo struct { + Schema *DescribeSchema `json:"schema"` Metadata *geoparquet.Metadata `json:"metadata"` NumRows int64 `json:"rows"` } -type Schema struct { - Name string `json:"name,omitempty"` - Optional bool `json:"optional,omitempty"` - Repeated bool `json:"repeated,omitempty"` - Type string `json:"type,omitempty"` - Annotation string `json:"annotation,omitempty"` - Compression string `json:"compression,omitempty"` - Fields []*Schema `json:"fields,omitempty"` +type DescribeSchema struct { + Name string `json:"name,omitempty"` + Optional bool `json:"optional,omitempty"` + Repeated bool `json:"repeated,omitempty"` + Type string `json:"type,omitempty"` + Annotation string `json:"annotation,omitempty"` + Compression string `json:"compression,omitempty"` + Fields []*DescribeSchema `json:"fields,omitempty"` } func getCompression(fileReader *file.Reader, node schema.Node) string { @@ -228,7 +240,7 @@ func getCompression(fileReader *file.Reader, node schema.Node) string { return strings.ToLower(col.Compression().String()) } -func buildSchema(fileReader *file.Reader, name string, node schema.Node) *Schema { +func buildSchema(fileReader *file.Reader, name string, node schema.Node) *DescribeSchema { annotation := "" logicalType := node.LogicalType() if !logicalType.IsNone() { @@ -246,7 +258,7 @@ func buildSchema(fileReader *file.Reader, name string, node schema.Node) *Schema repeated = true } - field := &Schema{ + field := &DescribeSchema{ Name: name, Optional: optional, Repeated: repeated, @@ -280,7 +292,7 @@ func buildSchema(fileReader *file.Reader, name string, node schema.Node) *Schema if group, ok := node.(*schema.GroupNode); ok { count := group.NumFields() - field.Fields = make([]*Schema, count) + field.Fields = make([]*DescribeSchema, count) for i := 0; i < count; i += 1 { groupField := group.Field(i) field.Fields[i] = buildSchema(fileReader, groupField.Name(), groupField) diff --git a/cmd/gpq/command/describe_test.go b/cmd/gpq/command/describe_test.go new file mode 100644 index 0000000..091cccf --- /dev/null +++ b/cmd/gpq/command/describe_test.go @@ -0,0 +1,101 @@ +package command_test + +import ( + "encoding/json" + + "github.com/planetlabs/gpq/cmd/gpq/command" + "github.com/planetlabs/gpq/internal/test" +) + +func (s *Suite) TestDescribe() { + cmd := &command.DescribeCmd{ + Input: "../../../internal/testdata/cases/example-v1.0.0.parquet", + Format: "json", + } + + s.Require().NoError(cmd.Run()) + + output := s.readStdout() + info := &command.DescribeInfo{} + err := json.Unmarshal(output, info) + s.Require().NoError(err) + + s.Equal(int64(5), info.NumRows) + s.Require().Len(info.Schema.Fields, 6) + + s.Equal("geometry", info.Schema.Fields[0].Name) + s.Equal("binary", info.Schema.Fields[0].Type) + s.Equal("gzip", info.Schema.Fields[0].Compression) + s.True(info.Schema.Fields[0].Optional) + + s.Equal("pop_est", info.Schema.Fields[1].Name) + s.Equal("double", info.Schema.Fields[1].Type) + s.Equal("gzip", info.Schema.Fields[1].Compression) + s.True(info.Schema.Fields[1].Optional) + + s.Equal("continent", info.Schema.Fields[2].Name) + s.Equal("binary", info.Schema.Fields[2].Type) + s.Equal("string", info.Schema.Fields[2].Annotation) + s.Equal("gzip", info.Schema.Fields[2].Compression) + s.True(info.Schema.Fields[2].Optional) + + s.Equal("gdp_md_est", info.Schema.Fields[3].Name) + s.Equal("double", info.Schema.Fields[3].Type) + s.Equal("gzip", info.Schema.Fields[3].Compression) + s.True(info.Schema.Fields[3].Optional) + + s.Equal("iso_a3", info.Schema.Fields[4].Name) + s.Equal("binary", info.Schema.Fields[4].Type) + s.Equal("string", info.Schema.Fields[4].Annotation) + s.Equal("gzip", info.Schema.Fields[4].Compression) + s.True(info.Schema.Fields[4].Optional) + + s.Equal("name", info.Schema.Fields[5].Name) + s.Equal("binary", info.Schema.Fields[5].Type) + s.Equal("string", info.Schema.Fields[5].Annotation) + s.Equal("gzip", info.Schema.Fields[5].Compression) + s.True(info.Schema.Fields[5].Optional) +} + +func (s *Suite) TestDescribeFromStdin() { + s.writeStdin(test.GeoParquetFromJSON(s.T(), `{ + "type": "FeatureCollection", + "features": [ + { + "type": "Feature", + "properties": { + "name": "Null Island" + }, + "geometry": { + "type": "Point", + "coordinates": [0, 0] + } + } + ] + }`)) + + cmd := &command.DescribeCmd{ + Format: "json", + } + + s.Require().NoError(cmd.Run()) + + output := s.readStdout() + info := &command.DescribeInfo{} + err := json.Unmarshal(output, info) + s.Require().NoError(err) + + s.Equal(int64(1), info.NumRows) + s.Require().Len(info.Schema.Fields, 2) + + s.Equal("geometry", info.Schema.Fields[0].Name) + s.Equal("binary", info.Schema.Fields[0].Type) + s.Equal("zstd", info.Schema.Fields[0].Compression) + s.True(info.Schema.Fields[0].Optional) + + s.Equal("name", info.Schema.Fields[1].Name) + s.Equal("binary", info.Schema.Fields[1].Type) + s.Equal("string", info.Schema.Fields[1].Annotation) + s.Equal("zstd", info.Schema.Fields[1].Compression) + s.True(info.Schema.Fields[1].Optional) +} diff --git a/cmd/gpq/validate.go b/cmd/gpq/command/validate.go similarity index 80% rename from cmd/gpq/validate.go rename to cmd/gpq/command/validate.go index acfc0ec..1ab4ed2 100644 --- a/cmd/gpq/validate.go +++ b/cmd/gpq/command/validate.go @@ -12,12 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -package main +package command import ( + "bytes" "context" "encoding/json" "fmt" + "io" "os" "strings" @@ -27,15 +29,36 @@ import ( ) type ValidateCmd struct { - Input string `arg:"" name:"input" help:"Input file." type:"existingfile"` + Input string `arg:"" optional:"" name:"input" help:"Path to a GeoParquet file. If not provided, input is read from stdin." type:"existingfile"` MetadataOnly bool `help:"Only run rules that apply to file metadata and schema (no data will be scanned)."` Unpretty bool `help:"No colors in text output, no newlines and indentation in JSON output."` Format string `help:"Report format. Possible values: ${enum}." enum:"text, json" default:"text"` } func (c *ValidateCmd) Run(ctx *kong.Context) error { + inputName := c.Input + var input ReaderAtSeeker + if c.Input == "" { + if !hasStdin() { + return fmt.Errorf("input argument must be provided if there is no stdin data") + } + data, err := io.ReadAll(os.Stdin) + if err != nil { + return fmt.Errorf("trouble reading from stdin: %w", err) + } + input = bytes.NewReader(data) + inputName = "" + } else { + i, readErr := os.Open(c.Input) + if readErr != nil { + return fmt.Errorf("failed to read from %q: %w", c.Input, readErr) + } + defer i.Close() + input = i + } + v := validator.New(c.MetadataOnly) - report, err := v.Validate(context.Background(), c.Input) + report, err := v.Validate(context.Background(), input, inputName) if err != nil { return err } diff --git a/cmd/gpq/version.go b/cmd/gpq/command/version.go similarity index 98% rename from cmd/gpq/version.go rename to cmd/gpq/command/version.go index 1f6d846..a0fb6e0 100644 --- a/cmd/gpq/version.go +++ b/cmd/gpq/command/version.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package main +package command import "fmt" diff --git a/cmd/gpq/main.go b/cmd/gpq/main.go index 382b869..9476038 100644 --- a/cmd/gpq/main.go +++ b/cmd/gpq/main.go @@ -16,17 +16,11 @@ package main import ( "github.com/alecthomas/kong" + "github.com/planetlabs/gpq/cmd/gpq/command" ) -var CLI struct { - Convert ConvertCmd `cmd:"" help:"Convert data from one format to another."` - Validate ValidateCmd `cmd:"" help:"Validate a GeoParquet file."` - Describe DescribeCmd `cmd:"" help:"Describe a GeoParquet file."` - Version VersionCmd `cmd:"" help:"Print the version of this program."` -} - func main() { - ctx := kong.Parse(&CLI) + ctx := kong.Parse(&command.CLI) err := ctx.Run(ctx) ctx.FatalIfErrorf(err) } diff --git a/internal/geo/geo.go b/internal/geo/geo.go index 97e831c..ed899b8 100644 --- a/internal/geo/geo.go +++ b/internal/geo/geo.go @@ -12,6 +12,23 @@ import ( orbjson "github.com/paulmach/orb/geojson" ) +type FeatureCollection struct { + Type string `json:"type"` + Features []*Feature `json:"features"` +} + +var ( + _ json.Marshaler = (*FeatureCollection)(nil) +) + +func (c *FeatureCollection) MarshalJSON() ([]byte, error) { + m := map[string]any{ + "type": "FeatureCollection", + "features": c.Features, + } + return json.Marshal(m) +} + type Feature struct { Id any `json:"id,omitempty"` Type string `json:"type"` @@ -120,7 +137,7 @@ func DecodeGeometry(value any, encoding string) (*orbjson.Geometry, error) { return nil, fmt.Errorf("unsupported encoding: %s", encoding) } -type CollectionInfo struct { +type GeometryStats struct { mutex *sync.RWMutex minX float64 maxX float64 @@ -129,12 +146,12 @@ type CollectionInfo struct { types map[string]bool } -func NewCollectionInfo(concurrent bool) *CollectionInfo { +func NewGeometryStats(concurrent bool) *GeometryStats { var mutex *sync.RWMutex if concurrent { mutex = &sync.RWMutex{} } - return &CollectionInfo{ + return &GeometryStats{ mutex: mutex, types: map[string]bool{}, minX: math.MaxFloat64, @@ -144,35 +161,35 @@ func NewCollectionInfo(concurrent bool) *CollectionInfo { } } -func (i *CollectionInfo) writeLock() { +func (i *GeometryStats) writeLock() { if i.mutex == nil { return } i.mutex.Lock() } -func (i *CollectionInfo) writeUnlock() { +func (i *GeometryStats) writeUnlock() { if i.mutex == nil { return } i.mutex.Unlock() } -func (i *CollectionInfo) readLock() { +func (i *GeometryStats) readLock() { if i.mutex == nil { return } i.mutex.RLock() } -func (i *CollectionInfo) readUnlock() { +func (i *GeometryStats) readUnlock() { if i.mutex == nil { return } i.mutex.RUnlock() } -func (i *CollectionInfo) AddBounds(bounds *orb.Bound) { +func (i *GeometryStats) AddBounds(bounds *orb.Bound) { i.writeLock() minPoint := bounds.Min minX := minPoint[0] @@ -187,7 +204,7 @@ func (i *CollectionInfo) AddBounds(bounds *orb.Bound) { i.writeUnlock() } -func (i *CollectionInfo) Bounds() *orb.Bound { +func (i *GeometryStats) Bounds() *orb.Bound { i.readLock() bounds := &orb.Bound{ Min: orb.Point{i.minX, i.minY}, @@ -197,13 +214,13 @@ func (i *CollectionInfo) Bounds() *orb.Bound { return bounds } -func (i *CollectionInfo) AddType(typ string) { +func (i *GeometryStats) AddType(typ string) { i.writeLock() i.types[typ] = true i.writeUnlock() } -func (i *CollectionInfo) AddTypes(types []string) { +func (i *GeometryStats) AddTypes(types []string) { i.writeLock() for _, typ := range types { i.types[typ] = true @@ -211,7 +228,7 @@ func (i *CollectionInfo) AddTypes(types []string) { i.writeUnlock() } -func (i *CollectionInfo) Types() []string { +func (i *GeometryStats) Types() []string { i.readLock() types := []string{} for typ, ok := range i.types { @@ -223,92 +240,92 @@ func (i *CollectionInfo) Types() []string { return types } -type DatasetInfo struct { +type DatasetStats struct { mutex *sync.RWMutex - collections map[string]*CollectionInfo + collections map[string]*GeometryStats } -func NewDatasetInfo(concurrent bool) *DatasetInfo { +func NewDatasetStats(concurrent bool) *DatasetStats { var mutex *sync.RWMutex if concurrent { mutex = &sync.RWMutex{} } - return &DatasetInfo{ + return &DatasetStats{ mutex: mutex, - collections: map[string]*CollectionInfo{}, + collections: map[string]*GeometryStats{}, } } -func (i *DatasetInfo) writeLock() { +func (i *DatasetStats) writeLock() { if i.mutex == nil { return } i.mutex.Lock() } -func (i *DatasetInfo) writeUnlock() { +func (i *DatasetStats) writeUnlock() { if i.mutex == nil { return } i.mutex.Unlock() } -func (i *DatasetInfo) readLock() { +func (i *DatasetStats) readLock() { if i.mutex == nil { return } i.mutex.RLock() } -func (i *DatasetInfo) readUnlock() { +func (i *DatasetStats) readUnlock() { if i.mutex == nil { return } i.mutex.RUnlock() } -func (i *DatasetInfo) NumCollections() int { +func (i *DatasetStats) NumCollections() int { i.readLock() num := len(i.collections) i.readUnlock() return num } -func (i *DatasetInfo) AddCollection(name string) { +func (i *DatasetStats) AddCollection(name string) { i.writeLock() - i.collections[name] = NewCollectionInfo(i.mutex != nil) + i.collections[name] = NewGeometryStats(i.mutex != nil) i.writeUnlock() } -func (i *DatasetInfo) HasCollection(name string) bool { +func (i *DatasetStats) HasCollection(name string) bool { i.readLock() _, has := i.collections[name] i.readUnlock() return has } -func (i *DatasetInfo) AddBounds(name string, bounds *orb.Bound) { +func (i *DatasetStats) AddBounds(name string, bounds *orb.Bound) { i.readLock() collection := i.collections[name] i.readUnlock() collection.AddBounds(bounds) } -func (i *DatasetInfo) Bounds(name string) *orb.Bound { +func (i *DatasetStats) Bounds(name string) *orb.Bound { i.readLock() collection := i.collections[name] i.readUnlock() return collection.Bounds() } -func (i *DatasetInfo) AddTypes(name string, types []string) { +func (i *DatasetStats) AddTypes(name string, types []string) { i.readLock() collection := i.collections[name] i.readUnlock() collection.AddTypes(types) } -func (i *DatasetInfo) Types(name string) []string { +func (i *DatasetStats) Types(name string) []string { i.readLock() collection := i.collections[name] i.readUnlock() diff --git a/internal/geoparquet/geoparquet.go b/internal/geoparquet/geoparquet.go index 80f37de..6bc9605 100644 --- a/internal/geoparquet/geoparquet.go +++ b/internal/geoparquet/geoparquet.go @@ -57,7 +57,7 @@ func FromParquet(input parquet.ReaderAtSeeker, output io.Writer, convertOptions compression = &c } - datasetInfo := geo.NewDatasetInfo(true) + datasetInfo := geo.NewDatasetStats(true) transformSchema := func(fileReader *file.Reader) (*schema.Schema, error) { inputSchema := fileReader.MetaData().Schema metadata := getMetadata(fileReader, convertOptions) @@ -109,7 +109,7 @@ func FromParquet(input parquet.ReaderAtSeeker, output io.Writer, convertOptions builder := array.NewBinaryBuilder(memory.DefaultAllocator, arrow.BinaryTypes.Binary) defer builder.Release() - collectionInfo := geo.NewCollectionInfo(false) + collectionInfo := geo.NewGeometryStats(false) for i, arr := range chunks { stringArray, ok := arr.(*array.String) if !ok { diff --git a/internal/test/test.go b/internal/test/test.go index 0921162..467187a 100644 --- a/internal/test/test.go +++ b/internal/test/test.go @@ -15,7 +15,7 @@ import ( "github.com/apache/arrow/go/v14/parquet/file" "github.com/apache/arrow/go/v14/parquet/pqarrow" "github.com/apache/arrow/go/v14/parquet/schema" - "github.com/planetlabs/gpq/internal/geoparquet" + "github.com/planetlabs/gpq/internal/geojson" "github.com/planetlabs/gpq/internal/pqutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -89,31 +89,11 @@ func ParquetToJSON(t *testing.T, input parquet.ReaderAtSeeker) string { return string(data) } -func GeoParquetFromStructs[T any](t *testing.T, rows []T, metadata *geoparquet.Metadata) parquet.ReaderAtSeeker { - parquetSchema, err := schema.NewSchemaFromStruct(rows[0]) - require.NoError(t, err) - - arrowSchema, err := pqarrow.FromParquet(parquetSchema, nil, nil) - require.NoError(t, err) - +func GeoParquetFromJSON(t *testing.T, data string) []byte { + input := strings.NewReader(data) output := &bytes.Buffer{} - recordWriter, err := geoparquet.NewRecordWriter(&geoparquet.WriterConfig{ - Writer: output, - Metadata: metadata, - ArrowSchema: arrowSchema, - }) - require.NoError(t, err) - - data, err := json.Marshal(rows) - require.NoError(t, err) - - rec, _, err := array.RecordFromJSON(memory.DefaultAllocator, arrowSchema, strings.NewReader(string(data))) - require.NoError(t, err) - - require.NoError(t, recordWriter.Write(rec)) - require.NoError(t, recordWriter.Close()) - - return bytes.NewReader(output.Bytes()) + require.NoError(t, geojson.ToParquet(input, output, nil)) + return output.Bytes() } func ParquetFromStructs[T any](t *testing.T, rows []T) parquet.ReaderAtSeeker { diff --git a/internal/validator/validator.go b/internal/validator/validator.go index 8787e55..d42ce3f 100644 --- a/internal/validator/validator.go +++ b/internal/validator/validator.go @@ -20,9 +20,9 @@ import ( "errors" "fmt" "io" - "os" "github.com/apache/arrow/go/v14/arrow/array" + "github.com/apache/arrow/go/v14/parquet" "github.com/apache/arrow/go/v14/parquet/file" "github.com/paulmach/orb" "github.com/planetlabs/gpq/internal/geo" @@ -93,16 +93,10 @@ type Check struct { } // Validate opens and validates a GeoParquet file. -func (v *Validator) Validate(ctx context.Context, resource string) (*Report, error) { - input, inputErr := os.Open(resource) - if inputErr != nil { - return nil, fmt.Errorf("failed to read from %q: %w", resource, inputErr) - } - defer input.Close() - +func (v *Validator) Validate(ctx context.Context, input parquet.ReaderAtSeeker, name string) (*Report, error) { reader, readerErr := file.NewParquetReader(input) if readerErr != nil { - return nil, fmt.Errorf("failed to create parquet reader from %q: %w", resource, readerErr) + return nil, fmt.Errorf("failed to create parquet reader from %q: %w", name, readerErr) } defer reader.Close() diff --git a/internal/validator/validator_test.go b/internal/validator/validator_test.go index 7781116..bc12586 100644 --- a/internal/validator/validator_test.go +++ b/internal/validator/validator_test.go @@ -142,13 +142,15 @@ func (s *Suite) TestValidCases() { for _, c := range cases { s.Run(c, func() { - resourcePath := path.Join("../testdata/cases", c) + filePath := path.Join("../testdata/cases", c) + data, err := os.ReadFile(filePath) + s.Require().NoError(err) - allReport, allErr := validatorAll.Validate(ctx, resourcePath) + allReport, allErr := validatorAll.Validate(ctx, bytes.NewReader(data), filePath) s.Require().NoError(allErr) s.assertExpectedReport("all-pass", allReport) - metaReport, metaErr := validatorMeta.Validate(ctx, resourcePath) + metaReport, metaErr := validatorMeta.Validate(ctx, bytes.NewReader(data), filePath) s.Require().NoError(metaErr) s.assertExpectedReport("all-pass-meta", metaReport) })