diff --git a/analyse/service/shard_processor.go b/analyse/service/shard_processor.go index d49074c..2cf51b2 100644 --- a/analyse/service/shard_processor.go +++ b/analyse/service/shard_processor.go @@ -47,6 +47,7 @@ func (p *ShardProcessor) aggregateAll(ctx context.Context, consumerArn string, p resultsChan := make(chan *ProcessOutput) pendingEnumerations := len(parentShardIDs) aggregatedShards := make([]*ProcessOutput, 0) + aggregatedChildren := make(map[string]bool) for _, shardID := range parentShardIDs { go reader(ctx, resultsChan, shardID, consumerArn) @@ -60,8 +61,11 @@ func (p *ShardProcessor) aggregateAll(ctx context.Context, consumerArn string, p aggregatedShards = append(aggregatedShards, r) if children { for _, cs := range r.childShards { - pendingEnumerations++ - go reader(ctx, resultsChan, *cs.ShardId, consumerArn) + if _, ok := aggregatedChildren[*cs.ShardId]; !ok { + aggregatedChildren[*cs.ShardId] = true + pendingEnumerations++ + go reader(ctx, resultsChan, *cs.ShardId, consumerArn) + } } } } else { diff --git a/analyse/service/shard_processor_test.go b/analyse/service/shard_processor_test.go index 8c35329..f99ec91 100644 --- a/analyse/service/shard_processor_test.go +++ b/analyse/service/shard_processor_test.go @@ -27,8 +27,9 @@ func Test_aggregateAll(t *testing.T) { cases := []testCase{ {Name: "No shards with children and enumeration is required", ShardIDs: []string{"a", "b"}, EnumerateChildren: true, Expect: []string{"a", "b"}}, {Name: "No shards with children and enumeration is not required", ShardIDs: []string{"a", "b"}, EnumerateChildren: false, Expect: []string{"a", "b"}}, - {Name: "Shards with children and enumeration is required", ShardIDs: []string{"a", "b"}, EnumerateChildren: true, ChildShards: map[string][]types.ChildShard{"a": {{ShardId: aws.String("c")}, {ShardId: aws.String("d")}}}, Expect: []string{"a", "b", "c", "d"}}, - {Name: "Shards with children and enumeration is not required", ShardIDs: []string{"a", "b"}, EnumerateChildren: false, ChildShards: map[string][]types.ChildShard{"a": {{ShardId: aws.String("c")}, {ShardId: aws.String("d")}}}, Expect: []string{"a", "b"}}, + {Name: "Shards with children and enumeration is required", ShardIDs: []string{"a", "b"}, EnumerateChildren: true, ChildShards: map[string][]types.ChildShard{"a": {{ShardId: aws.String("c"), ParentShards: []string{"a"}}, {ShardId: aws.String("d"), ParentShards: []string{"a"}}}}, Expect: []string{"a", "b", "c", "d"}}, + {Name: "Shards with children and enumeration is not required", ShardIDs: []string{"a", "b"}, EnumerateChildren: false, ChildShards: map[string][]types.ChildShard{"a": {{ShardId: aws.String("c"), ParentShards: []string{"a"}}, {ShardId: aws.String("d"), ParentShards: []string{"a"}}}}, Expect: []string{"a", "b"}}, + {Name: "Shards with a merged child and enumeration is required", ShardIDs: []string{"a", "b"}, EnumerateChildren: true, ChildShards: map[string][]types.ChildShard{"a": {{ShardId: aws.String("c"), ParentShards: []string{"a", "b"}}}, "b": {{ShardId: aws.String("c"), ParentShards: []string{"a", "b"}}}}, Expect: []string{"a", "b", "c"}}, } for i, c := range cases {