Skip to content

Commit

Permalink
STREAM-152: fix bugs in streaming topic creation (#278)
Browse files Browse the repository at this point in the history
- send topic requests with application/json header
- enable topic tests in CI
- refactor to get tests working against dev environment
  • Loading branch information
pgier authored Jul 19, 2023
1 parent 002c4fe commit 106b591
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 86 deletions.
5 changes: 5 additions & 0 deletions internal/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"net/url"
"os"
"strings"

astrarestapi "github.com/datastax/astra-client-go/v2/astra-rest-api"
astrastreaming "github.com/datastax/astra-client-go/v2/astra-streaming"
Expand Down Expand Up @@ -180,6 +181,9 @@ func configure(providerVersion string, p *schema.Provider) func(context.Context,
providerVersion: providerVersion,
userAgent: userAgent,
}
if strings.Contains(streamingAPIServerURL, "staging") {
clients.streamingTestMode = true
}
return clients, nil
}
}
Expand Down Expand Up @@ -223,6 +227,7 @@ type astraClients struct {
stargateClientCache map[string]astrarestapi.Client
providerVersion string
userAgent string
streamingTestMode bool
}

// resourceDataOrDefaultString returns the value in the given ResourceData variable or a default value
Expand Down
4 changes: 4 additions & 0 deletions internal/provider/provider_framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type astraClients2 struct {
stargateClientCache map[string]astrarestapi.Client
providerVersion string
userAgent string
streamingTestMode bool
}

// Metadata returns the provider type name.
Expand Down Expand Up @@ -196,6 +197,9 @@ func (p *astraProvider) Configure(ctx context.Context, req provider.ConfigureReq
providerVersion: p.Version,
userAgent: userAgent,
}
if strings.Contains(streamingAPIServerURL, "staging") {
clients.streamingTestMode = true
}
resp.ResourceData = clients
resp.DataSourceData = clients
}
Expand Down
2 changes: 1 addition & 1 deletion internal/provider/resource_cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ func prepCDC(ctx context.Context, client *astra.ClientWithResponses, databaseId
cloudProvider := string(*db.Info.CloudProvider)
fmt.Printf("%s", cloudProvider)

pulsarCluster := getPulsarCluster(cloudProvider, *db.Info.Region)
pulsarCluster := getPulsarCluster(cloudProvider, *db.Info.Region, false)
pulsarToken, err := getPulsarToken(ctx, pulsarCluster, token, org, err, streamingClient, tenantName)
return pulsarCluster, pulsarToken, err
}
Expand Down
44 changes: 22 additions & 22 deletions internal/provider/resource_streaming_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ func resourceStreamingSinkDelete(ctx context.Context, resourceData *schema.Resou
return diag.Errorf("\"deletion_protection\" must be explicitly set to \"false\" in order to destroy astra_streaming_sink")
}

streamingClient := meta.(astraClients).astraStreamingClient.(*astrastreaming.ClientWithResponses)
client := meta.(astraClients).astraClient.(*astra.ClientWithResponses)
streamingClientv3 := meta.(astraClients).astraStreamingClientv3
astraClients := meta.(astraClients)
streamingClient := astraClients.astraStreamingClient.(*astrastreaming.ClientWithResponses)
astraClient := astraClients.astraClient.(*astra.ClientWithResponses)

tenantName := resourceData.Get("tenant_name").(string)
sinkName := resourceData.Get("sink_name").(string)
Expand All @@ -134,9 +134,9 @@ func resourceStreamingSinkDelete(ctx context.Context, resourceData *schema.Resou
region := strings.ReplaceAll(rawRegion, "-", "")
cloudProvider := resourceData.Get("cloud_provider").(string)

pulsarCluster := getPulsarCluster(cloudProvider, region)
pulsarCluster := getPulsarCluster(cloudProvider, region, astraClients.streamingTestMode)

orgBody, _ := client.GetCurrentOrganization(ctx)
orgBody, _ := astraClient.GetCurrentOrganization(ctx)

var org OrgId
bodyBuffer, err := ioutil.ReadAll(orgBody.Body)
Expand All @@ -149,7 +149,7 @@ func resourceStreamingSinkDelete(ctx context.Context, resourceData *schema.Resou
fmt.Println("Can't deserialize", orgBody)
}

token := meta.(astraClients).token
token := astraClients.token
pulsarToken, err := getPulsarToken(ctx, pulsarCluster, token, org, err, streamingClient, tenantName)
if err != nil {
return diag.FromErr(err)
Expand All @@ -160,7 +160,7 @@ func resourceStreamingSinkDelete(ctx context.Context, resourceData *schema.Resou
Authorization: fmt.Sprintf("Bearer %s", pulsarToken),
}

deleteSinkResponse, err := streamingClientv3.DeleteSinkWithResponse(ctx, tenantName, namespace, sinkName, &deleteSinkParams)
deleteSinkResponse, err := astraClients.astraStreamingClientv3.DeleteSinkWithResponse(ctx, tenantName, namespace, sinkName, &deleteSinkParams)
if err != nil {
diag.FromErr(err)
}
Expand Down Expand Up @@ -210,9 +210,9 @@ type SinkResponse struct {
}

func resourceStreamingSinkRead(ctx context.Context, resourceData *schema.ResourceData, meta interface{}) diag.Diagnostics {
streamingClient := meta.(astraClients).astraStreamingClient.(*astrastreaming.ClientWithResponses)
client := meta.(astraClients).astraClient.(*astra.ClientWithResponses)
streamingClientv3 := meta.(astraClients).astraStreamingClientv3
astraClients := meta.(astraClients)
streamingClient := astraClients.astraStreamingClient.(*astrastreaming.ClientWithResponses)
astraClient := astraClients.astraClient.(*astra.ClientWithResponses)

tenantName := resourceData.Get("tenant_name").(string)
sinkName := resourceData.Get("sink_name").(string)
Expand All @@ -223,9 +223,9 @@ func resourceStreamingSinkRead(ctx context.Context, resourceData *schema.Resourc
region := strings.ReplaceAll(rawRegion, "-", "")
cloudProvider := resourceData.Get("cloud_provider").(string)

pulsarCluster := getPulsarCluster(cloudProvider, region)
pulsarCluster := getPulsarCluster(cloudProvider, region, astraClients.streamingTestMode)

orgBody, _ := client.GetCurrentOrganization(ctx)
orgBody, _ := astraClient.GetCurrentOrganization(ctx)

var org OrgId
bodyBuffer, err := ioutil.ReadAll(orgBody.Body)
Expand All @@ -235,7 +235,7 @@ func resourceStreamingSinkRead(ctx context.Context, resourceData *schema.Resourc
fmt.Println("Can't deserislize", orgBody)
}

token := meta.(astraClients).token
token := astraClients.token
pulsarToken, err := getPulsarToken(ctx, pulsarCluster, token, org, err, streamingClient, tenantName)
if err != nil {
diag.FromErr(err)
Expand All @@ -246,7 +246,7 @@ func resourceStreamingSinkRead(ctx context.Context, resourceData *schema.Resourc
Authorization: fmt.Sprintf("Bearer %s", pulsarToken),
}

getSinkResponse, err := streamingClientv3.GetSinksWithResponse(ctx, tenantName, namespace, sinkName, &getSinksParams)
getSinkResponse, err := astraClients.astraStreamingClientv3.GetSinksWithResponse(ctx, tenantName, namespace, sinkName, &getSinksParams)
if err != nil {
diag.FromErr(err)
}
Expand All @@ -263,9 +263,9 @@ func resourceStreamingSinkRead(ctx context.Context, resourceData *schema.Resourc
}

func resourceStreamingSinkCreate(ctx context.Context, resourceData *schema.ResourceData, meta interface{}) diag.Diagnostics {
streamingClient := meta.(astraClients).astraStreamingClient.(*astrastreaming.ClientWithResponses)
client := meta.(astraClients).astraClient.(*astra.ClientWithResponses)
streamingClientv3 := meta.(astraClients).astraStreamingClientv3
astraClients := meta.(astraClients)
streamingClient := astraClients.astraStreamingClient.(*astrastreaming.ClientWithResponses)
astraClient := astraClients.astraClient.(*astra.ClientWithResponses)

rawRegion := resourceData.Get("region").(string)
region := strings.ReplaceAll(rawRegion, "-", "")
Expand All @@ -281,7 +281,7 @@ func resourceStreamingSinkCreate(ctx context.Context, resourceData *schema.Resou
topic := resourceData.Get("topic").(string)
autoAck := resourceData.Get("auto_ack").(bool)

orgBody, _ := client.GetCurrentOrganization(ctx)
orgBody, _ := astraClient.GetCurrentOrganization(ctx)

var org OrgId
bodyBuffer, err := ioutil.ReadAll(orgBody.Body)
Expand Down Expand Up @@ -311,9 +311,9 @@ func resourceStreamingSinkCreate(ctx context.Context, resourceData *schema.Resou
}
}

pulsarCluster := getPulsarCluster(cloudProvider, region)
pulsarCluster := getPulsarCluster(cloudProvider, region, astraClients.streamingTestMode)

token := meta.(astraClients).token
token := astraClients.token
pulsarToken, err := getPulsarToken(ctx, pulsarCluster, token, org, err, streamingClient, tenantName)
if err != nil {
diag.FromErr(err)
Expand All @@ -331,7 +331,7 @@ func resourceStreamingSinkCreate(ctx context.Context, resourceData *schema.Resou
Authorization: pulsarToken,
}

builtinSinksResponse, err := streamingClientv3.GetBuiltInSinks(ctx, &getBuiltinSinkParams)
builtinSinksResponse, err := astraClients.astraStreamingClientv3.GetBuiltInSinks(ctx, &getBuiltinSinkParams)
if err != nil {
diag.FromErr(err)
}
Expand Down Expand Up @@ -405,7 +405,7 @@ func resourceStreamingSinkCreate(ctx context.Context, resourceData *schema.Resou
TopicsPattern: nil,
}

sinkCreationResponse, err := streamingClientv3.CreateSinkJSON(ctx, tenantName, namespace, sinkName, &createSinkParams, createSinkBody)
sinkCreationResponse, err := astraClients.astraStreamingClientv3.CreateSinkJSON(ctx, tenantName, namespace, sinkName, &createSinkParams, createSinkBody)
if err != nil {
diag.FromErr(err)
}
Expand Down
112 changes: 53 additions & 59 deletions internal/provider/resource_streaming_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,8 @@ func resourceStreamingTopicDelete(ctx context.Context, resourceData *schema.Reso
if protectedFromDelete(resourceData) {
return diag.Errorf("\"deletion_protection\" must be explicitly set to \"false\" in order to destroy astra_streaming_topic")
}
streamingClient := meta.(astraClients).astraStreamingClient.(*astrastreaming.ClientWithResponses)
client := meta.(astraClients).astraClient.(*astra.ClientWithResponses)
streamingClientv3 := meta.(astraClients).astraStreamingClientv3
astraClients := meta.(astraClients)
astraClient := astraClients.astraClient.(*astra.ClientWithResponses)

namespace := resourceData.Get("namespace").(string)
topic := resourceData.Get("topic").(string)
Expand All @@ -95,47 +94,46 @@ func resourceStreamingTopicDelete(ctx context.Context, resourceData *schema.Reso
cloudProvider := resourceData.Get("cloud_provider").(string)
rawRegion := resourceData.Get("region").(string)

token := meta.(astraClients).token
token := astraClients.token

region := strings.ReplaceAll(rawRegion, "-", "")
pulsarCluster := getPulsarCluster(cloudProvider, region)
orgBody, _ := client.GetCurrentOrganization(ctx)

pulsarCluster := getPulsarCluster(cloudProvider, region, astraClients.streamingTestMode)
orgResp, err := astraClient.GetCurrentOrganization(ctx)
if err != nil {
return diag.Errorf("Failed to read orgnization ID: %v", err)
}
var org OrgId
bodyBuffer, err := ioutil.ReadAll(orgBody.Body)

err = json.Unmarshal(bodyBuffer, &org)
err = json.NewDecoder(orgResp.Body).Decode(&org)
if err != nil {
fmt.Println("Can't deserislize", orgBody)
return diag.Errorf("Failed to read orgnization ID: %v", err)
}

pulsarToken, err := getPulsarToken(ctx, pulsarCluster, token, org, err, streamingClient, tenant)
pulsarToken, err := getLatestPulsarToken(ctx, astraClients.astraStreamingClientv3, token, org.ID, pulsarCluster, tenant)
if err != nil {
return diag.FromErr(err)
}

deleteTopicParams := astrastreaming.DeleteTopicParams{
XDataStaxPulsarCluster: pulsarCluster,
Authorization: fmt.Sprintf("Bearer %s", pulsarToken),
}

deleteTopicResponse, err := streamingClientv3.DeleteTopic(ctx, tenant, namespace, topic, &deleteTopicParams)
deleteTopicResponse, err := astraClients.astraStreamingClientv3.DeleteTopic(ctx, tenant, namespace, topic, &deleteTopicParams)
if err != nil {
diag.FromErr(err)
}

if !strings.HasPrefix(deleteTopicResponse.Status, "2") {
bodyBuffer, err = ioutil.ReadAll(deleteTopicResponse.Body)
return diag.FromErr(err)
} else if deleteTopicResponse.StatusCode > 299 {
bodyBuffer, _ := ioutil.ReadAll(deleteTopicResponse.Body)
return diag.Errorf("Error deleting topic %s", bodyBuffer)
}
bodyBuffer, err = ioutil.ReadAll(deleteTopicResponse.Body)

resourceData.SetId("")

return nil
}

func resourceStreamingTopicRead(ctx context.Context, resourceData *schema.ResourceData, meta interface{}) diag.Diagnostics {
streamingClient := meta.(astraClients).astraStreamingClient.(*astrastreaming.ClientWithResponses)
client := meta.(astraClients).astraClient.(*astra.ClientWithResponses)
streamingClientv3 := meta.(astraClients).astraStreamingClientv3
astraClients := meta.(astraClients)
astraClient := astraClients.astraClient.(*astra.ClientWithResponses)

namespace := resourceData.Get("namespace").(string)
topic := resourceData.Get("topic").(string)
Expand All @@ -144,41 +142,37 @@ func resourceStreamingTopicRead(ctx context.Context, resourceData *schema.Resour
cloudProvider := resourceData.Get("cloud_provider").(string)
rawRegion := resourceData.Get("region").(string)

token := meta.(astraClients).token
token := astraClients.token

region := strings.ReplaceAll(rawRegion, "-", "")
pulsarCluster := getPulsarCluster(cloudProvider, region)
orgBody, _ := client.GetCurrentOrganization(ctx)

pulsarCluster := getPulsarCluster(cloudProvider, region, astraClients.streamingTestMode)
orgBody, err := astraClient.GetCurrentOrganization(ctx)
if err != nil {
return diag.Errorf("Failed to get organization ID: %v", err)
}
var org OrgId
bodyBuffer, err := ioutil.ReadAll(orgBody.Body)
err = json.NewDecoder(orgBody.Body).Decode(&org)
if err != nil {
return diag.FromErr(err)
return diag.Errorf("Failed to read organization ID: %v", err)
}

err = json.Unmarshal(bodyBuffer, &org)
pulsarToken, err := getLatestPulsarToken(ctx, astraClients.astraStreamingClientv3, token, org.ID, pulsarCluster, tenant)
if err != nil {
fmt.Println("Can't deserislize", orgBody)
return diag.Errorf("Failed to get pulsar token: %v", err)
}

pulsarToken, err := getPulsarToken(ctx, pulsarCluster, token, org, err, streamingClient, tenant)

getTopicsParams := astrastreaming.GetTopicsParams{
XDataStaxPulsarCluster: &pulsarCluster,
Authorization: fmt.Sprintf("Bearer %s", pulsarToken),
}

createTopicResponse, err := streamingClientv3.GetTopics(ctx, tenant, namespace, &getTopicsParams)
readTopicResponse, err := astraClients.astraStreamingClientv3.GetTopics(ctx, tenant, namespace, &getTopicsParams)
if err != nil {
return diag.FromErr(err)
}

if !strings.HasPrefix(createTopicResponse.Status, "2") {
bodyBuffer, err = ioutil.ReadAll(createTopicResponse.Body)
return diag.Errorf("Failed to get topic list: %v", err)
} else if readTopicResponse.StatusCode > 299 {
bodyBuffer, _ := ioutil.ReadAll(readTopicResponse.Body)
return diag.Errorf("Error reading topic %s", bodyBuffer)
}
bodyBuffer, err = ioutil.ReadAll(createTopicResponse.Body)

//TODO: validate that our topic is there

setStreamingTopicData(resourceData, tenant, topic)
Expand All @@ -187,9 +181,8 @@ func resourceStreamingTopicRead(ctx context.Context, resourceData *schema.Resour
}

func resourceStreamingTopicCreate(ctx context.Context, resourceData *schema.ResourceData, meta interface{}) diag.Diagnostics {
streamingClient := meta.(astraClients).astraStreamingClient.(*astrastreaming.ClientWithResponses)
client := meta.(astraClients).astraClient.(*astra.ClientWithResponses)
streamingClientv3 := meta.(astraClients).astraStreamingClientv3
astraClients := meta.(astraClients)
astraClient := astraClients.astraClient.(*astra.ClientWithResponses)

namespace := resourceData.Get("namespace").(string)
topic := resourceData.Get("topic").(string)
Expand All @@ -198,38 +191,39 @@ func resourceStreamingTopicCreate(ctx context.Context, resourceData *schema.Reso
cloudProvider := resourceData.Get("cloud_provider").(string)
rawRegion := resourceData.Get("region").(string)

token := meta.(astraClients).token
token := astraClients.token

region := strings.ReplaceAll(rawRegion, "-", "")
pulsarCluster := getPulsarCluster(cloudProvider, region)
orgBody, _ := client.GetCurrentOrganization(ctx)

var org OrgId
bodyBuffer, err := ioutil.ReadAll(orgBody.Body)
pulsarCluster := getPulsarCluster(cloudProvider, region, astraClients.streamingTestMode)

err = json.Unmarshal(bodyBuffer, &org)
orgResp, err := astraClient.GetCurrentOrganization(ctx)
if err != nil {
fmt.Println("Can't deserislize", orgBody)
return diag.Errorf("Failed to get current organization: %v", err)
}
org := OrgId{}
err = json.NewDecoder(orgResp.Body).Decode(&org)
if err != nil {
return diag.Errorf("Failed to read organization: %v", err)
}

pulsarToken, err := getPulsarToken(ctx, pulsarCluster, token, org, err, streamingClient, tenant)
pulsarToken, err := getLatestPulsarToken(ctx, astraClients.astraStreamingClientv3, token, org.ID, pulsarCluster, tenant)
if err != nil {
return diag.FromErr(err)
}

createTopicParams := astrastreaming.CreateTopicParams{
XDataStaxCurrentOrg: &org.ID,
XDataStaxPulsarCluster: pulsarCluster,
Authorization: fmt.Sprintf("Bearer %s", pulsarToken),
}

createTopicResponse, err := streamingClientv3.CreateTopic(ctx, tenant, namespace, topic, &createTopicParams)
createTopicResponse, err := astraClients.astraStreamingClientv3.CreateTopic(ctx, tenant, namespace, topic, &createTopicParams, setContentTypeHeader("application/json"))
if err != nil {
diag.FromErr(err)
}

if !strings.HasPrefix(createTopicResponse.Status, "2") {
bodyBuffer, err = ioutil.ReadAll(createTopicResponse.Body)
return diag.Errorf("Error creating topic: %v", err)
} else if createTopicResponse.StatusCode > 299 {
bodyBuffer, _ := ioutil.ReadAll(createTopicResponse.Body)
return diag.Errorf("Error creating topic %s", bodyBuffer)
}
bodyBuffer, err = ioutil.ReadAll(createTopicResponse.Body)

setStreamingTopicData(resourceData, tenant, topic)

Expand Down
Loading

0 comments on commit 106b591

Please sign in to comment.