Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(influx_tools): Add export to parquet files #25297

Open
wants to merge 14 commits into
base: master-1.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions cmd/influx_tools/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
geninit "github.com/influxdata/influxdb/cmd/influx_tools/generate/init"
"github.com/influxdata/influxdb/cmd/influx_tools/help"
"github.com/influxdata/influxdb/cmd/influx_tools/importer"
"github.com/influxdata/influxdb/cmd/influx_tools/parquet"
"github.com/influxdata/influxdb/cmd/influx_tools/server"
"github.com/influxdata/influxdb/cmd/influxd/run"
"github.com/influxdata/influxdb/services/meta"
Expand Down Expand Up @@ -55,36 +56,41 @@ func (m *Main) Run(args ...string) error {
switch name {
case "", "help":
if err := help.NewCommand().Run(args...); err != nil {
return fmt.Errorf("help failed: %s", err)
return fmt.Errorf("help failed: %w", err)
}
case "compact-shard":
c := compact.NewCommand()
if err := c.Run(args); err != nil {
return fmt.Errorf("compact-shard failed: %s", err)
return fmt.Errorf("compact-shard failed: %w", err)
}
case "export":
c := export.NewCommand(&ossServer{logger: zap.NewNop()})
if err := c.Run(args); err != nil {
return fmt.Errorf("export failed: %s", err)
return fmt.Errorf("export failed: %w", err)
}
case "export-parquet":
c := parquet.NewCommand(&ossServer{logger: zap.NewNop()})
if err := c.Run(args); err != nil {
return fmt.Errorf("export failed: %w", err)
}
case "import":
c := importer.NewCommand(&ossServer{logger: zap.NewNop()})
if err := c.Run(args); err != nil {
return fmt.Errorf("import failed: %s", err)
return fmt.Errorf("import failed: %w", err)
}
case "gen-init":
c := geninit.NewCommand(&ossServer{logger: zap.NewNop()})
if err := c.Run(args); err != nil {
return fmt.Errorf("gen-init failed: %s", err)
return fmt.Errorf("gen-init failed: %w", err)
}
case "gen-exec":
deps := genexec.Dependencies{Server: &ossServer{logger: zap.NewNop()}}
c := genexec.NewCommand(deps)
if err := c.Run(args); err != nil {
return fmt.Errorf("gen-exec failed: %s", err)
return fmt.Errorf("gen-exec failed: %w", err)
}
default:
return fmt.Errorf(`unknown command "%s"`+"\n"+`Run 'influx-tools help' for usage`+"\n\n", name)
return fmt.Errorf("unknown command %q\nRun 'influx-tools help' for usage", name)
}

return nil
Expand All @@ -106,7 +112,7 @@ func (s *ossServer) Open(path string) (err error) {

// Validate the configuration.
if err = s.config.Validate(); err != nil {
return fmt.Errorf("validate config: %s", err)
return fmt.Errorf("validate config: %w", err)
}

if s.noClient {
Expand Down
166 changes: 166 additions & 0 deletions cmd/influx_tools/parquet/batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package parquet

import (
"context"
"fmt"
"sort"

"go.uber.org/zap"

"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxql"
)

type row struct {
timestamp int64
tags map[string]string
fields map[string]interface{}
}

type batcher struct {
measurement []byte
shard *tsdb.Shard

typeResolutions map[string]influxql.DataType
converter map[string]func(interface{}) (interface{}, error)
nameResolutions map[string]string

series []seriesEntry
start int64

logger *zap.SugaredLogger
}

func (b *batcher) init() error {
// Setup the type converters for the conflicting fields
b.converter = make(map[string]func(interface{}) (interface{}, error), len(b.typeResolutions))
for field, ftype := range b.typeResolutions {
switch ftype {
case influxql.Float:
b.converter[field] = toFloat
case influxql.Unsigned:
b.converter[field] = toUint
case influxql.Integer:
b.converter[field] = toInt
case influxql.Boolean:
b.converter[field] = toBool
case influxql.String:
b.converter[field] = toString
default:
return fmt.Errorf("unknown converter %v for field %q", ftype, field)
}
}

b.start = models.MinNanoTime

return nil
}

func (b *batcher) reset() {
b.start = models.MinNanoTime
}

func (b *batcher) next(ctx context.Context) ([]row, error) {
// Iterate over the series and fields and accumulate the data row-wise
iter, err := b.shard.CreateCursorIterator(ctx)
if err != nil {
return nil, fmt.Errorf("getting cursor iterator for %q failed: %w", string(b.measurement), err)
}

data := make(map[string]map[int64]row, len(b.series))
end := models.MaxNanoTime
var rowCount int
for _, s := range b.series {
data[s.key] = make(map[int64]row, tsdb.DefaultMaxPointsPerBlock)
tags := make(map[string]string, len(s.tags))
for _, t := range s.tags {
tags[string(t.Key)] = string(t.Value)
}
for field := range s.fields {
cursor, err := iter.Next(ctx,
&tsdb.CursorRequest{
Name: b.measurement,
Tags: s.tags,
Field: field,
Ascending: true,
StartTime: b.start,
EndTime: models.MaxNanoTime,
},
)
if err != nil {
return nil, fmt.Errorf("getting cursor for %s-%s failed: %w", s.key, field, err)
}
if cursor == nil {
continue
}

// Prepare mappings
fname := field
if n, found := b.nameResolutions[field]; found {
fname = n
}
converter := identity
if c, found := b.converter[field]; found {
converter = c
}
fieldEnd := models.MaxNanoTime

c, err := newValueCursor(cursor)
if err != nil {
return nil, fmt.Errorf("creating value cursor failed: %w", err)
}

for {
// Check if we do still have data
timestamp, ok := c.peek()
if !ok {
break
}

timestamp, value := c.next()
v, err := converter(value)
if err != nil {
b.logger.Errorf("converting %v of field %q failed: %v", value, field, err)
continue
}

if _, found := data[s.key][timestamp]; !found {
data[s.key][timestamp] = row{
timestamp: timestamp,
tags: tags,
fields: make(map[string]interface{}),
}
rowCount++
}

data[s.key][timestamp].fields[fname] = v
fieldEnd = timestamp
}

c.close()
end = min(end, fieldEnd)
}
}
if len(data) == 0 {
return nil, nil
}

// Extract the rows ordered by timestamp
rows := make([]row, 0, rowCount)
for _, tmap := range data {
for _, r := range tmap {
rows = append(rows, r)
}
}
sort.Slice(rows, func(i, j int) bool { return rows[i].timestamp < rows[j].timestamp })

// Only include rows that are before the end-timestamp to avoid duplicate
// or incomplete entries due to not iterating through all data
n := sort.Search(len(rows), func(i int) bool { return rows[i].timestamp > end })

// Remember the earliest datum to use this for the next batch excluding the entry itself
b.start = end + 1

return rows[:n], nil
}
109 changes: 109 additions & 0 deletions cmd/influx_tools/parquet/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package parquet

import (
"context"
"errors"
"flag"
"fmt"
"io"
"os"

"go.uber.org/zap"

"github.com/influxdata/influxdb/cmd/influx_tools/server"
internal_errors "github.com/influxdata/influxdb/pkg/errors"
)

// Command represents the program execution for "store query".
type Command struct {
// Standard input/output, overridden for testing.
Stderr io.Writer
Logger *zap.Logger

server server.Interface
}

// NewCommand returns a new instance of the export Command.
func NewCommand(server server.Interface) *Command {
return &Command{
Stderr: os.Stderr,
server: server,
}
}

// Run executes the export command using the specified args.
func (cmd *Command) Run(args []string) (err error) {
var (
configPath string
database string
rp string
measurements string
typeResolutions string
nameResolutions string
output string
dryRun bool
)

cwd, err := os.Getwd()
if err != nil {
return fmt.Errorf("getting current working directory failed: %w", err)
}

flags := flag.NewFlagSet("export", flag.ContinueOnError)
flags.StringVar(&configPath, "config", "", "Config file of the InfluxDB v1 instance")
flags.StringVar(&database, "database", "", "Database to export")
flags.StringVar(&rp, "rp", "", "Retention policy in the database to export")
flags.StringVar(&measurements, "measurements", "*", "Comma-separated list of measurements to export")
flags.StringVar(&typeResolutions, "resolve-types", "", "Comma-separated list of field type resolutions in the form <measurements>.<field>=<type>")
flags.StringVar(&nameResolutions, "resolve-names", "", "Comma-separated list of field renamings in the form <measurements>.<field>=<new name>")
flags.StringVar(&output, "output", cwd, "Output directory for exported parquet files")
flags.BoolVar(&dryRun, "dry-run", false, "Print plan and exit")

if err := flags.Parse(args); err != nil {
return fmt.Errorf("parsing flags failed: %w", err)
}

if database == "" {
return errors.New("database is required")
}

loggerCfg := zap.NewDevelopmentConfig()
loggerCfg.DisableStacktrace = true
loggerCfg.DisableCaller = true
cmd.Logger, err = loggerCfg.Build()
if err != nil {
return fmt.Errorf("creating logger failed: %w", err)
}

if err := cmd.server.Open(configPath); err != nil {
return fmt.Errorf("opening server failed: %w", err)
}
defer cmd.server.Close()

cfg := &config{
Database: database,
RP: rp,
Measurements: measurements,
TypeResolutions: typeResolutions,
NameResolutions: nameResolutions,
Output: output,
}
exp, err := newExporter(cmd.server, cfg, cmd.Logger)
if err != nil {
return err
}

ctx := context.Background()
if err := exp.open(ctx); err != nil {
return fmt.Errorf("opening exporter failed: %w", err)
}
defer internal_errors.Capture(&err, exp.close)()

exp.printPlan(cmd.Stderr)

if dryRun {
return nil
}

return exp.export(ctx)
}
Loading