diff --git a/internal/provider/provider.go b/internal/provider/provider.go index d628153..9714c22 100644 --- a/internal/provider/provider.go +++ b/internal/provider/provider.go @@ -7,6 +7,7 @@ import ( "net/http" "net/url" "os" + "strings" astrarestapi "github.com/datastax/astra-client-go/v2/astra-rest-api" astrastreaming "github.com/datastax/astra-client-go/v2/astra-streaming" @@ -180,6 +181,9 @@ func configure(providerVersion string, p *schema.Provider) func(context.Context, providerVersion: providerVersion, userAgent: userAgent, } + if strings.Contains(streamingAPIServerURL, "staging") { + clients.streamingTestMode = true + } return clients, nil } } @@ -223,6 +227,7 @@ type astraClients struct { stargateClientCache map[string]astrarestapi.Client providerVersion string userAgent string + streamingTestMode bool } // resourceDataOrDefaultString returns the value in the given ResourceData variable or a default value diff --git a/internal/provider/provider_framework.go b/internal/provider/provider_framework.go index 0d9575e..979def1 100644 --- a/internal/provider/provider_framework.go +++ b/internal/provider/provider_framework.go @@ -59,6 +59,7 @@ type astraClients2 struct { stargateClientCache map[string]astrarestapi.Client providerVersion string userAgent string + streamingTestMode bool } // Metadata returns the provider type name. @@ -196,6 +197,9 @@ func (p *astraProvider) Configure(ctx context.Context, req provider.ConfigureReq providerVersion: p.Version, userAgent: userAgent, } + if strings.Contains(streamingAPIServerURL, "staging") { + clients.streamingTestMode = true + } resp.ResourceData = clients resp.DataSourceData = clients } diff --git a/internal/provider/resource_cdc.go b/internal/provider/resource_cdc.go index 1868c03..dd35eb7 100644 --- a/internal/provider/resource_cdc.go +++ b/internal/provider/resource_cdc.go @@ -416,7 +416,7 @@ func prepCDC(ctx context.Context, client *astra.ClientWithResponses, databaseId cloudProvider := string(*db.Info.CloudProvider) fmt.Printf("%s", cloudProvider) - pulsarCluster := getPulsarCluster(cloudProvider, *db.Info.Region) + pulsarCluster := getPulsarCluster(cloudProvider, *db.Info.Region, false) pulsarToken, err := getPulsarToken(ctx, pulsarCluster, token, org, err, streamingClient, tenantName) return pulsarCluster, pulsarToken, err } diff --git a/internal/provider/resource_streaming_sink.go b/internal/provider/resource_streaming_sink.go index e3e3619..5a3c417 100644 --- a/internal/provider/resource_streaming_sink.go +++ b/internal/provider/resource_streaming_sink.go @@ -122,9 +122,9 @@ func resourceStreamingSinkDelete(ctx context.Context, resourceData *schema.Resou return diag.Errorf("\"deletion_protection\" must be explicitly set to \"false\" in order to destroy astra_streaming_sink") } - streamingClient := meta.(astraClients).astraStreamingClient.(*astrastreaming.ClientWithResponses) - client := meta.(astraClients).astraClient.(*astra.ClientWithResponses) - streamingClientv3 := meta.(astraClients).astraStreamingClientv3 + astraClients := meta.(astraClients) + streamingClient := astraClients.astraStreamingClient.(*astrastreaming.ClientWithResponses) + astraClient := astraClients.astraClient.(*astra.ClientWithResponses) tenantName := resourceData.Get("tenant_name").(string) sinkName := resourceData.Get("sink_name").(string) @@ -134,9 +134,9 @@ func resourceStreamingSinkDelete(ctx context.Context, resourceData *schema.Resou region := strings.ReplaceAll(rawRegion, "-", "") cloudProvider := resourceData.Get("cloud_provider").(string) - pulsarCluster := getPulsarCluster(cloudProvider, region) + pulsarCluster := getPulsarCluster(cloudProvider, region, astraClients.streamingTestMode) - orgBody, _ := client.GetCurrentOrganization(ctx) + orgBody, _ := astraClient.GetCurrentOrganization(ctx) var org OrgId bodyBuffer, err := ioutil.ReadAll(orgBody.Body) @@ -149,7 +149,7 @@ func resourceStreamingSinkDelete(ctx context.Context, resourceData *schema.Resou fmt.Println("Can't deserialize", orgBody) } - token := meta.(astraClients).token + token := astraClients.token pulsarToken, err := getPulsarToken(ctx, pulsarCluster, token, org, err, streamingClient, tenantName) if err != nil { return diag.FromErr(err) @@ -160,7 +160,7 @@ func resourceStreamingSinkDelete(ctx context.Context, resourceData *schema.Resou Authorization: fmt.Sprintf("Bearer %s", pulsarToken), } - deleteSinkResponse, err := streamingClientv3.DeleteSinkWithResponse(ctx, tenantName, namespace, sinkName, &deleteSinkParams) + deleteSinkResponse, err := astraClients.astraStreamingClientv3.DeleteSinkWithResponse(ctx, tenantName, namespace, sinkName, &deleteSinkParams) if err != nil { diag.FromErr(err) } @@ -210,9 +210,9 @@ type SinkResponse struct { } func resourceStreamingSinkRead(ctx context.Context, resourceData *schema.ResourceData, meta interface{}) diag.Diagnostics { - streamingClient := meta.(astraClients).astraStreamingClient.(*astrastreaming.ClientWithResponses) - client := meta.(astraClients).astraClient.(*astra.ClientWithResponses) - streamingClientv3 := meta.(astraClients).astraStreamingClientv3 + astraClients := meta.(astraClients) + streamingClient := astraClients.astraStreamingClient.(*astrastreaming.ClientWithResponses) + astraClient := astraClients.astraClient.(*astra.ClientWithResponses) tenantName := resourceData.Get("tenant_name").(string) sinkName := resourceData.Get("sink_name").(string) @@ -223,9 +223,9 @@ func resourceStreamingSinkRead(ctx context.Context, resourceData *schema.Resourc region := strings.ReplaceAll(rawRegion, "-", "") cloudProvider := resourceData.Get("cloud_provider").(string) - pulsarCluster := getPulsarCluster(cloudProvider, region) + pulsarCluster := getPulsarCluster(cloudProvider, region, astraClients.streamingTestMode) - orgBody, _ := client.GetCurrentOrganization(ctx) + orgBody, _ := astraClient.GetCurrentOrganization(ctx) var org OrgId bodyBuffer, err := ioutil.ReadAll(orgBody.Body) @@ -235,7 +235,7 @@ func resourceStreamingSinkRead(ctx context.Context, resourceData *schema.Resourc fmt.Println("Can't deserislize", orgBody) } - token := meta.(astraClients).token + token := astraClients.token pulsarToken, err := getPulsarToken(ctx, pulsarCluster, token, org, err, streamingClient, tenantName) if err != nil { diag.FromErr(err) @@ -246,7 +246,7 @@ func resourceStreamingSinkRead(ctx context.Context, resourceData *schema.Resourc Authorization: fmt.Sprintf("Bearer %s", pulsarToken), } - getSinkResponse, err := streamingClientv3.GetSinksWithResponse(ctx, tenantName, namespace, sinkName, &getSinksParams) + getSinkResponse, err := astraClients.astraStreamingClientv3.GetSinksWithResponse(ctx, tenantName, namespace, sinkName, &getSinksParams) if err != nil { diag.FromErr(err) } @@ -263,9 +263,9 @@ func resourceStreamingSinkRead(ctx context.Context, resourceData *schema.Resourc } func resourceStreamingSinkCreate(ctx context.Context, resourceData *schema.ResourceData, meta interface{}) diag.Diagnostics { - streamingClient := meta.(astraClients).astraStreamingClient.(*astrastreaming.ClientWithResponses) - client := meta.(astraClients).astraClient.(*astra.ClientWithResponses) - streamingClientv3 := meta.(astraClients).astraStreamingClientv3 + astraClients := meta.(astraClients) + streamingClient := astraClients.astraStreamingClient.(*astrastreaming.ClientWithResponses) + astraClient := astraClients.astraClient.(*astra.ClientWithResponses) rawRegion := resourceData.Get("region").(string) region := strings.ReplaceAll(rawRegion, "-", "") @@ -281,7 +281,7 @@ func resourceStreamingSinkCreate(ctx context.Context, resourceData *schema.Resou topic := resourceData.Get("topic").(string) autoAck := resourceData.Get("auto_ack").(bool) - orgBody, _ := client.GetCurrentOrganization(ctx) + orgBody, _ := astraClient.GetCurrentOrganization(ctx) var org OrgId bodyBuffer, err := ioutil.ReadAll(orgBody.Body) @@ -311,9 +311,9 @@ func resourceStreamingSinkCreate(ctx context.Context, resourceData *schema.Resou } } - pulsarCluster := getPulsarCluster(cloudProvider, region) + pulsarCluster := getPulsarCluster(cloudProvider, region, astraClients.streamingTestMode) - token := meta.(astraClients).token + token := astraClients.token pulsarToken, err := getPulsarToken(ctx, pulsarCluster, token, org, err, streamingClient, tenantName) if err != nil { diag.FromErr(err) @@ -331,7 +331,7 @@ func resourceStreamingSinkCreate(ctx context.Context, resourceData *schema.Resou Authorization: pulsarToken, } - builtinSinksResponse, err := streamingClientv3.GetBuiltInSinks(ctx, &getBuiltinSinkParams) + builtinSinksResponse, err := astraClients.astraStreamingClientv3.GetBuiltInSinks(ctx, &getBuiltinSinkParams) if err != nil { diag.FromErr(err) } @@ -405,7 +405,7 @@ func resourceStreamingSinkCreate(ctx context.Context, resourceData *schema.Resou TopicsPattern: nil, } - sinkCreationResponse, err := streamingClientv3.CreateSinkJSON(ctx, tenantName, namespace, sinkName, &createSinkParams, createSinkBody) + sinkCreationResponse, err := astraClients.astraStreamingClientv3.CreateSinkJSON(ctx, tenantName, namespace, sinkName, &createSinkParams, createSinkBody) if err != nil { diag.FromErr(err) } diff --git a/internal/provider/resource_streaming_topic.go b/internal/provider/resource_streaming_topic.go index 9cbe213..3e5cec9 100644 --- a/internal/provider/resource_streaming_topic.go +++ b/internal/provider/resource_streaming_topic.go @@ -84,9 +84,8 @@ func resourceStreamingTopicDelete(ctx context.Context, resourceData *schema.Reso if protectedFromDelete(resourceData) { return diag.Errorf("\"deletion_protection\" must be explicitly set to \"false\" in order to destroy astra_streaming_topic") } - streamingClient := meta.(astraClients).astraStreamingClient.(*astrastreaming.ClientWithResponses) - client := meta.(astraClients).astraClient.(*astra.ClientWithResponses) - streamingClientv3 := meta.(astraClients).astraStreamingClientv3 + astraClients := meta.(astraClients) + astraClient := astraClients.astraClient.(*astra.ClientWithResponses) namespace := resourceData.Get("namespace").(string) topic := resourceData.Get("topic").(string) @@ -95,37 +94,37 @@ func resourceStreamingTopicDelete(ctx context.Context, resourceData *schema.Reso cloudProvider := resourceData.Get("cloud_provider").(string) rawRegion := resourceData.Get("region").(string) - token := meta.(astraClients).token + token := astraClients.token region := strings.ReplaceAll(rawRegion, "-", "") - pulsarCluster := getPulsarCluster(cloudProvider, region) - orgBody, _ := client.GetCurrentOrganization(ctx) - + pulsarCluster := getPulsarCluster(cloudProvider, region, astraClients.streamingTestMode) + orgResp, err := astraClient.GetCurrentOrganization(ctx) + if err != nil { + return diag.Errorf("Failed to read orgnization ID: %v", err) + } var org OrgId - bodyBuffer, err := ioutil.ReadAll(orgBody.Body) - - err = json.Unmarshal(bodyBuffer, &org) + err = json.NewDecoder(orgResp.Body).Decode(&org) if err != nil { - fmt.Println("Can't deserislize", orgBody) + return diag.Errorf("Failed to read orgnization ID: %v", err) } - pulsarToken, err := getPulsarToken(ctx, pulsarCluster, token, org, err, streamingClient, tenant) + pulsarToken, err := getLatestPulsarToken(ctx, astraClients.astraStreamingClientv3, token, org.ID, pulsarCluster, tenant) + if err != nil { + return diag.FromErr(err) + } deleteTopicParams := astrastreaming.DeleteTopicParams{ XDataStaxPulsarCluster: pulsarCluster, Authorization: fmt.Sprintf("Bearer %s", pulsarToken), } - deleteTopicResponse, err := streamingClientv3.DeleteTopic(ctx, tenant, namespace, topic, &deleteTopicParams) + deleteTopicResponse, err := astraClients.astraStreamingClientv3.DeleteTopic(ctx, tenant, namespace, topic, &deleteTopicParams) if err != nil { - diag.FromErr(err) - } - - if !strings.HasPrefix(deleteTopicResponse.Status, "2") { - bodyBuffer, err = ioutil.ReadAll(deleteTopicResponse.Body) + return diag.FromErr(err) + } else if deleteTopicResponse.StatusCode > 299 { + bodyBuffer, _ := ioutil.ReadAll(deleteTopicResponse.Body) return diag.Errorf("Error deleting topic %s", bodyBuffer) } - bodyBuffer, err = ioutil.ReadAll(deleteTopicResponse.Body) resourceData.SetId("") @@ -133,9 +132,8 @@ func resourceStreamingTopicDelete(ctx context.Context, resourceData *schema.Reso } func resourceStreamingTopicRead(ctx context.Context, resourceData *schema.ResourceData, meta interface{}) diag.Diagnostics { - streamingClient := meta.(astraClients).astraStreamingClient.(*astrastreaming.ClientWithResponses) - client := meta.(astraClients).astraClient.(*astra.ClientWithResponses) - streamingClientv3 := meta.(astraClients).astraStreamingClientv3 + astraClients := meta.(astraClients) + astraClient := astraClients.astraClient.(*astra.ClientWithResponses) namespace := resourceData.Get("namespace").(string) topic := resourceData.Get("topic").(string) @@ -144,41 +142,37 @@ func resourceStreamingTopicRead(ctx context.Context, resourceData *schema.Resour cloudProvider := resourceData.Get("cloud_provider").(string) rawRegion := resourceData.Get("region").(string) - token := meta.(astraClients).token + token := astraClients.token region := strings.ReplaceAll(rawRegion, "-", "") - pulsarCluster := getPulsarCluster(cloudProvider, region) - orgBody, _ := client.GetCurrentOrganization(ctx) - + pulsarCluster := getPulsarCluster(cloudProvider, region, astraClients.streamingTestMode) + orgBody, err := astraClient.GetCurrentOrganization(ctx) + if err != nil { + return diag.Errorf("Failed to get organization ID: %v", err) + } var org OrgId - bodyBuffer, err := ioutil.ReadAll(orgBody.Body) + err = json.NewDecoder(orgBody.Body).Decode(&org) if err != nil { - return diag.FromErr(err) + return diag.Errorf("Failed to read organization ID: %v", err) } - err = json.Unmarshal(bodyBuffer, &org) + pulsarToken, err := getLatestPulsarToken(ctx, astraClients.astraStreamingClientv3, token, org.ID, pulsarCluster, tenant) if err != nil { - fmt.Println("Can't deserislize", orgBody) + return diag.Errorf("Failed to get pulsar token: %v", err) } - pulsarToken, err := getPulsarToken(ctx, pulsarCluster, token, org, err, streamingClient, tenant) - getTopicsParams := astrastreaming.GetTopicsParams{ XDataStaxPulsarCluster: &pulsarCluster, Authorization: fmt.Sprintf("Bearer %s", pulsarToken), } - createTopicResponse, err := streamingClientv3.GetTopics(ctx, tenant, namespace, &getTopicsParams) + readTopicResponse, err := astraClients.astraStreamingClientv3.GetTopics(ctx, tenant, namespace, &getTopicsParams) if err != nil { - return diag.FromErr(err) - } - - if !strings.HasPrefix(createTopicResponse.Status, "2") { - bodyBuffer, err = ioutil.ReadAll(createTopicResponse.Body) + return diag.Errorf("Failed to get topic list: %v", err) + } else if readTopicResponse.StatusCode > 299 { + bodyBuffer, _ := ioutil.ReadAll(readTopicResponse.Body) return diag.Errorf("Error reading topic %s", bodyBuffer) } - bodyBuffer, err = ioutil.ReadAll(createTopicResponse.Body) - //TODO: validate that our topic is there setStreamingTopicData(resourceData, tenant, topic) @@ -187,9 +181,8 @@ func resourceStreamingTopicRead(ctx context.Context, resourceData *schema.Resour } func resourceStreamingTopicCreate(ctx context.Context, resourceData *schema.ResourceData, meta interface{}) diag.Diagnostics { - streamingClient := meta.(astraClients).astraStreamingClient.(*astrastreaming.ClientWithResponses) - client := meta.(astraClients).astraClient.(*astra.ClientWithResponses) - streamingClientv3 := meta.(astraClients).astraStreamingClientv3 + astraClients := meta.(astraClients) + astraClient := astraClients.astraClient.(*astra.ClientWithResponses) namespace := resourceData.Get("namespace").(string) topic := resourceData.Get("topic").(string) @@ -198,21 +191,25 @@ func resourceStreamingTopicCreate(ctx context.Context, resourceData *schema.Reso cloudProvider := resourceData.Get("cloud_provider").(string) rawRegion := resourceData.Get("region").(string) - token := meta.(astraClients).token + token := astraClients.token region := strings.ReplaceAll(rawRegion, "-", "") - pulsarCluster := getPulsarCluster(cloudProvider, region) - orgBody, _ := client.GetCurrentOrganization(ctx) - - var org OrgId - bodyBuffer, err := ioutil.ReadAll(orgBody.Body) + pulsarCluster := getPulsarCluster(cloudProvider, region, astraClients.streamingTestMode) - err = json.Unmarshal(bodyBuffer, &org) + orgResp, err := astraClient.GetCurrentOrganization(ctx) if err != nil { - fmt.Println("Can't deserislize", orgBody) + return diag.Errorf("Failed to get current organization: %v", err) + } + org := OrgId{} + err = json.NewDecoder(orgResp.Body).Decode(&org) + if err != nil { + return diag.Errorf("Failed to read organization: %v", err) } - pulsarToken, err := getPulsarToken(ctx, pulsarCluster, token, org, err, streamingClient, tenant) + pulsarToken, err := getLatestPulsarToken(ctx, astraClients.astraStreamingClientv3, token, org.ID, pulsarCluster, tenant) + if err != nil { + return diag.FromErr(err) + } createTopicParams := astrastreaming.CreateTopicParams{ XDataStaxCurrentOrg: &org.ID, @@ -220,16 +217,13 @@ func resourceStreamingTopicCreate(ctx context.Context, resourceData *schema.Reso Authorization: fmt.Sprintf("Bearer %s", pulsarToken), } - createTopicResponse, err := streamingClientv3.CreateTopic(ctx, tenant, namespace, topic, &createTopicParams) + createTopicResponse, err := astraClients.astraStreamingClientv3.CreateTopic(ctx, tenant, namespace, topic, &createTopicParams, setContentTypeHeader("application/json")) if err != nil { - diag.FromErr(err) - } - - if !strings.HasPrefix(createTopicResponse.Status, "2") { - bodyBuffer, err = ioutil.ReadAll(createTopicResponse.Body) + return diag.Errorf("Error creating topic: %v", err) + } else if createTopicResponse.StatusCode > 299 { + bodyBuffer, _ := ioutil.ReadAll(createTopicResponse.Body) return diag.Errorf("Error creating topic %s", bodyBuffer) } - bodyBuffer, err = ioutil.ReadAll(createTopicResponse.Body) setStreamingTopicData(resourceData, tenant, topic) diff --git a/internal/provider/resource_streaming_topic_test.go b/internal/provider/resource_streaming_topic_test.go index 16312cd..af125aa 100644 --- a/internal/provider/resource_streaming_topic_test.go +++ b/internal/provider/resource_streaming_topic_test.go @@ -8,8 +8,6 @@ import ( ) func TestStreamingTopic(t *testing.T) { - // Disable this test by default until test works with non-prod clusters - checkRequiredTestVars(t, "ASTRA_TEST_STREAMING_TOPIC_TEST_ENABLED") t.Parallel() tenantName := "terraform-test-" + randomString(5) @@ -43,6 +41,7 @@ resource "astra_streaming_topic" "streaming_topic-1" { region = "useast-4" cloud_provider = "gcp" namespace = "default" + deletion_protection = false } `, tenantName) diff --git a/internal/provider/util_streaming.go b/internal/provider/util_streaming.go index a15b129..cbd2c9f 100644 --- a/internal/provider/util_streaming.go +++ b/internal/provider/util_streaming.go @@ -39,6 +39,13 @@ func setPulsarClusterHeaders(pulsarToken, clusterName, organizationID string) fu } } +func setContentTypeHeader(contentType string) func(ctx context.Context, req *http.Request) error { + return func(ctx context.Context, req *http.Request) error { + req.Header.Set("Content-Type", contentType) + return nil + } +} + type StreamingToken struct { Iat int `json:"iat"` Iss string `json:"iss"` @@ -150,8 +157,12 @@ func getCurrentOrgID(ctx context.Context, astraClient *astra.ClientWithResponses } // getPulsarCluster TODO: this is unreliable because not all clusters might follow this format -func getPulsarCluster(cloudProvider string, rawRegion string) string { +func getPulsarCluster(cloudProvider string, rawRegion string, testMode bool) string { // In most astra APIs there are dashes in region names depending on the cloud provider, this seems not to be the case for streaming region := strings.ReplaceAll(rawRegion, "-", "") - return strings.ToLower(fmt.Sprintf("pulsar-%s-%s", cloudProvider, region)) + clusterName := strings.ToLower(fmt.Sprintf("pulsar-%s-%s", cloudProvider, region)) + if testMode { + clusterName += "-staging" + } + return clusterName }