Skip to content

Commit

Permalink
feat: add --format to describe
Browse files Browse the repository at this point in the history
I'm not happy with this and consider it partial but I wasn't able to finish it just yet. yaml and json output are working correctly and summary and config in text are on the new system but I wasn't able to rework the text portion of partition to use the same system.

Printer is a workaround to formatter's storing of a struct's type before outputting something.
  • Loading branch information
gene-redpanda committed Sep 21, 2024
1 parent 6aa2177 commit 396d29e
Show file tree
Hide file tree
Showing 2 changed files with 476 additions and 44 deletions.
301 changes: 257 additions & 44 deletions src/go/rpk/pkg/cli/topic/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"context"
"errors"
"fmt"
"os"
"sort"

"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
Expand Down Expand Up @@ -59,6 +60,7 @@ For example,

Args: cobra.MinimumNArgs(1),
Run: func(_ *cobra.Command, topicArg []string) {
f := p.Formatter
p, err := p.LoadVirtualProfile(fs)
out.MaybeDie(err, "rpk unable to load config: %v", err)

Expand All @@ -75,8 +77,6 @@ For example,
out.MaybeDie(err, "unable to filter topics by regex: %v", err)
}

// By default, if neither are specified, we opt in to
// the config section only.
if !summary && !configs && !partitions {
summary, configs = true, true
}
Expand Down Expand Up @@ -104,8 +104,19 @@ For example,
secConfigs = "configs"
secPart = "partitions"
)
pr := &printer{}
pr.Setup(&f, summary, configs, partitions)

for _, topic := range resp.Topics {
dall := describeAll{
Summary: buildDescribeSummary(topic),
Configs: buildDescribeConfig(prepDescribeConfig(topic, cl)),
Partitions: buildDescribePartitions(topic.Partitions, listStartEndOffsets(cl, *topic.Topic, len(topic.Partitions), stable)),
}
if pr.PrintJsonYaml(dall) {
continue
}

for i, topic := range resp.Topics {
sections := out.NewMaybeHeaderSections(
out.ConditionalSectionHeaders(map[string]bool{
secSummary: summary,
Expand All @@ -117,73 +128,44 @@ For example,
sections.Add(secSummary, func() {
tw := out.NewTabWriter()
defer tw.Flush()
tw.PrintColumn("NAME", *topic.Topic)
tw.PrintColumn("NAME", dall.Summary.Name)
if topic.IsInternal {
tw.PrintColumn("INTERNAL", topic.IsInternal)
tw.PrintColumn("INTERNAL", dall.Summary.Internal)
}
tw.PrintColumn("PARTITIONS", len(topic.Partitions))
if len(topic.Partitions) > 0 {
p0 := &topic.Partitions[0]
tw.PrintColumn("REPLICAS", len(p0.Replicas))
tw.PrintColumn("PARTITIONS", dall.Summary.Partitions)
if dall.Summary.Partitions > 0 {
tw.PrintColumn("REPLICAS", dall.Summary.Replicas)
}
if err := kerr.ErrorForCode(topic.ErrorCode); err != nil {
tw.PrintColumn("ERROR", err)
if dall.Summary.isErr {
tw.PrintColumn("ERROR", dall.Summary.Error)
}
})

sections.Add(secConfigs, func() {
req := kmsg.NewPtrDescribeConfigsRequest()
reqResource := kmsg.NewDescribeConfigsRequestResource()
reqResource.ResourceType = kmsg.ConfigResourceTypeTopic
reqResource.ResourceName = *topic.Topic
req.Resources = append(req.Resources, reqResource)

resp, err := req.RequestWith(context.Background(), cl)
out.MaybeDie(err, "unable to request configs: %v", err)
if len(resp.Resources) != 1 {
out.Die("config response returned %d resources when we asked for 1", len(resp.Resources))
}
err = kerr.ErrorForCode(resp.Resources[0].ErrorCode)
out.MaybeDie(err, "config response contained error: %v", err)

tw := out.NewTable("KEY", "VALUE", "SOURCE")
defer tw.Flush()
types.Sort(resp)
for _, config := range resp.Resources[0].Configs {
var val string
if config.IsSensitive {
val = "(sensitive)"
} else if config.Value != nil {
val = *config.Value
}
tw.Print(config.Name, val, config.Source)
for _, c := range dall.Configs {
tw.Print(c.Key, c.Value, c.Source)
}
})

sections.Add(secPart, func() {
offsets := listStartEndOffsets(cl, *topic.Topic, len(topic.Partitions), stable)

tw := out.NewTable(describePartitionsHeaders(
topic.Partitions,
offsets,
listStartEndOffsets(cl, *topic.Topic, len(topic.Partitions), stable),
)...)
defer tw.Flush()
for _, row := range describePartitionsRows(
topic.Partitions,
offsets,
listStartEndOffsets(cl, *topic.Topic, len(topic.Partitions), stable),
) {
tw.Print(row...)
}
})

i++
if i < len(resp.Topics) {
fmt.Println()
}
}

Check failure on line 164 in src/go/rpk/pkg/cli/topic/describe.go

View workflow job for this annotation

GitHub Actions / Lint go files

unnecessary trailing newline (whitespace)
},
}

p.InstallFormatFlag(cmd)
cmd.Flags().IntVar(new(int), "page", -1, "deprecated")
cmd.Flags().IntVar(new(int), "page-size", 20, "deprecated")
cmd.Flags().BoolVar(new(bool), "watermarks", true, "deprecated")
Expand All @@ -204,6 +186,160 @@ For example,
return cmd
}

type describeAll struct {
Summary describeSummary `json:"summary" yaml:"summary"`
Configs []describeConfig `json:"configs" yaml:"configs"`
Partitions []describePartition `json:"partitions" yaml:"partitions"`
}

type describeSummaryConfig struct {
Summary describeSummary `json:"summary" yaml:"summary"`
Configs []describeConfig `json:"configs" yaml:"configs"`
}

type describeSummaryPartitions struct {
Summary describeSummary `json:"summary" yaml:"summary"`
Partitions []describePartition `json:"partitions" yaml:"partitions"`
}

type describeConfigPartitions struct {
Configs []describeConfig `json:"configs" yaml:"configs"`
Partitions []describePartition `json:"partitions" yaml:"partitions"`
}

type describeSummary struct {
Name string `json:"name" yaml:"name"`
Internal bool `json:"internal" yaml:"internal"`
Partitions int `json:"partitions" yaml:"partitions"`
Replicas int `json:"replicas" yaml:"replicas"`
Error string `json:"error" yaml:"error"`
isErr bool
}

func buildDescribeSummary(topic kmsg.MetadataResponseTopic) (resp describeSummary) {
resp = describeSummary{
Name: *topic.Topic,
Internal: topic.IsInternal,
Partitions: len(topic.Partitions),
}
if len(topic.Partitions) > 0 {
resp.Replicas = len(topic.Partitions[0].Replicas)
}
if err := kerr.ErrorForCode(topic.ErrorCode); err != nil {
resp.isErr = true
resp.Error = err.Error()
}
return
}

type describeConfig struct {
Key string `json:"key" yaml:"key"`
Value any `json:"value" yaml:"value"`
Source string `json:"source" yaml:"source"`
}

func prepDescribeConfig(topic kmsg.MetadataResponseTopic, cl *kgo.Client) []kmsg.DescribeConfigsResponseResourceConfig {
req := kmsg.NewPtrDescribeConfigsRequest()
reqResource := kmsg.NewDescribeConfigsRequestResource()
reqResource.ResourceType = kmsg.ConfigResourceTypeTopic
reqResource.ResourceName = *topic.Topic
req.Resources = append(req.Resources, reqResource)

resp, err := req.RequestWith(context.Background(), cl)
out.MaybeDie(err, "unable to request configs: %v", err)
if len(resp.Resources) != 1 {
out.Die("config response returned %d resources when we asked for 1", len(resp.Resources))
}
err = kerr.ErrorForCode(resp.Resources[0].ErrorCode)
out.MaybeDie(err, "config response contained error: %v", err)
types.Sort(resp)
return resp.Resources[0].Configs
}

func buildDescribeConfig(configs []kmsg.DescribeConfigsResponseResourceConfig) (output []describeConfig) {
output = make([]describeConfig, 0, len(configs))
for _, cfg := range configs {
d := describeConfig{
Key: cfg.Name,
Source: cfg.Source.String(),
}
if cfg.IsSensitive {
d.Value = "(sensitive)"
} else if cfg.Value != nil {
d.Value = *cfg.Value
}
output = append(output, d)
}
return
}

type describePartition struct {
Partition int32 `json:"partition" yaml:"partition"`
Leader int32 `json:"leader" yaml:"leader"`
Epoch int32 `json:"epoch" yaml:"epoch"`
Replicas []int32 `json:"replicas" yaml:"replicas"`
isOffline bool
OfflineReplicas []int32 `json:"offline_replicas,omitempty" yaml:"offline_replicas,omitempty"`
isLoadErr bool
LoadError string `json:"load_error,omitempty" yaml:"load_error,omitempty"`
LogStartOffset any `json:"log_start_offset" yaml:"log_start_offset"`
isStable bool
LastStableOffset any `json:"last_stable_offset,omitempty" yaml:"last_stable_offset,omitempty"`
HighWatermark any `json:"high_watermark" yaml:"high_watermark"`
}

func buildDescribePartitions(partitions []kmsg.MetadataResponseTopicPartition, offsets []startStableEndOffset) (resp []describePartition) {
sort.Slice(partitions, func(i, j int) bool {
return partitions[i].Partition < partitions[j].Partition
})
offline, useErr, stable := getDescribeUsed(partitions, offsets)
for _, p := range partitions {
row := describePartition{
Partition: p.Partition,
Leader: p.Leader,
Epoch: p.LeaderEpoch,
Replicas: int32s(p.Replicas).sort(),
}
if offline {
row.isOffline = true
row.OfflineReplicas = int32s(p.OfflineReplicas).sort()
}
if useErr {
if err := kerr.ErrorForCode(p.ErrorCode); err != nil {
row.isLoadErr = true
row.LoadError = err.Error()
}
}
o := offsets[p.Partition]
if o.startErr == nil {
row.LogStartOffset = o.start
} else if errors.Is(o.startErr, errUnlisted) {
row.LogStartOffset = "-"
} else {
row.LogStartOffset = o.startErr.(*kerr.Error).Message //nolint:errorlint // This error must be kerr.Error, and we want the message
}
if stable {
if o.stableErr == nil {
row.isStable = true
row.LastStableOffset = o.stable
} else if errors.Is(o.stableErr, errUnlisted) {
row.LastStableOffset = "-"
} else {
row.LastStableOffset = o.stableErr.(*kerr.Error).Message //nolint:errorlint // This error must be kerr.Error, and we want the message
}
}
if o.endErr == nil {
row.HighWatermark = o.end
} else if errors.Is(o.endErr, errUnlisted) {
row.HighWatermark = "-"
} else {
row.HighWatermark = o.endErr.(*kerr.Error).Message //nolint:errorlint // This error must be kerr.Error, and we want the message
}
resp = append(resp, row)
}
return resp
}

// We optionally include the following columns:
// - offline-replicas, if any are offline
// - load-error, if metadata indicates load errors any partitions
Expand Down Expand Up @@ -422,3 +558,80 @@ func (is int32s) sort() []int32 {
}
return is
}

type printer struct {
formatter *config.OutFormatter
flags int
}

func (p *printer) Setup(f *config.OutFormatter, summary, configs, partitions bool) {
p.flags = 0
if summary {
p.flags |= 1
}
if configs {
p.flags |= 2
}
if partitions {
p.flags |= 4
}
var help func() (string, bool)
switch p.flags {
case 7: // 111 - All flags set
help = func() (string, bool) { return f.Help([]describeAll{}) }
case 3: // 011 - Summary and configs
help = func() (string, bool) { return f.Help([]describeSummaryConfig{}) }
case 5: // 101 - Summary and partitions
help = func() (string, bool) { return f.Help([]describeSummaryPartitions{}) }
case 6: // 110 - Configs and partitions
help = func() (string, bool) { return f.Help([]describeConfigPartitions{}) }
case 1: // 001 - Only summary
help = func() (string, bool) { return f.Help([]describeSummary{}) }
case 2: // 010 - Only configs
help = func() (string, bool) { return f.Help([]describeConfig{}) }
case 4: // 100 - Only partitions
help = func() (string, bool) { return f.Help([]describePartition{}) }
}

if help != nil {
if h, ok := help(); ok {
out.Exit(h)
}
}
p.formatter = f
}

func (p *printer) PrintJsonYaml(d describeAll) bool {

Check failure on line 604 in src/go/rpk/pkg/cli/topic/describe.go

View workflow job for this annotation

GitHub Actions / Lint go files

var-naming: method PrintJsonYaml should be PrintJSONYaml (revive)
var toprint any
switch p.flags {
case 7: // 111 - All flags set
toprint = d
case 3: // 011 - Summary and configs
toprint = describeSummaryConfig{
Summary: d.Summary,
Configs: d.Configs,
}
case 5: // 101 - Summary and partitions
toprint = describeSummaryPartitions{
Summary: d.Summary,
Partitions: d.Partitions,
}
case 6: // 110 - Configs and partitions
toprint = describeConfigPartitions{
Configs: d.Configs,
Partitions: d.Partitions,
}
case 1: // 001 - Only summary
toprint = d.Summary
case 2: // 010 - Only configs
toprint = d.Configs
case 4: // 100 - Only partitions
toprint = d.Partitions
}
if isText, _, t, err := p.formatter.Format(toprint); !isText {
out.MaybeDie(err, "unable to print in the requested format %q: %v", p.formatter.Kind, err)
fmt.Fprintf(os.Stdout, "%s\n", t)
return true
}
return false
}
Loading

0 comments on commit 396d29e

Please sign in to comment.