Skip to content

Commit

Permalink
Add kafka cluster create test (#988)
Browse files Browse the repository at this point in the history
* test(docs/install): added install e2e test

* test(docs/install): added localversion 4 Koperator

* feat(e2e): added uninstall koperator and dep

* fix: make ordered root container

* fix: typos

* refact 1

* refact 2

* add(beta): topic produce-consume test

* refactor 3

* refactor 4

* refactor 5

* add(beta) example for external produce consume

* Fix getK8sResources

* fix: checking all resource type

* refactor 6

* add: more log messages

* add(beta) zookeepercluster remove

* fix: remove warnings from output

* add: const.go

* fix: zookeeper import

* refactor 7

* refactor 8

* fix: remove David test

* fix: typo

* Initial simplekafkaclsuter test

* Updated zookeper address to zookeeper-server

* Updated tests to use Marton's wait function

* Various small fixes, added uninstall steps

* Various small fixes

* Random updates regarding reviews

* Update after rebase

* add: const.go

* added ssl kafkacluster

* Removed debugging stuff accidentally left there

* refactor 7

* refactor 8

* Initial simplekafkaclsuter test

* Updated tests to use Marton's wait function

* added ssl kafkacluster

* Updated to latest of the uninstall branch

* Fixes after messing up the rebase a bit

* fix: remove zookeeperclusterready fn

* remove: unnecessary tests

* add: externalListener go-template

* Updates requested by reviews

* Using time.Duration everywhere

* remove: ordered keyword from unnecessary places

* fix: uninstallHelmChartIfExists

* fix: typos

* refactor: based on Kuvesz review

* Fixed rest of review comments

* remove: external consumer-producer test (another PR)

* Remove unnecessary timeout check

* fix: requireUninstallingKoperator description

* Update zookeeper_cluster_test.go

* Update zookeeper_cluster_test.go

* Update tests/e2e/koperator_test.go

Co-authored-by: Darren Lau <[email protected]>

* Update kafka_cluster_test.go

* Update zookeeper_cluster_test.go

* Updated implementation to reflect changes on master

* fixed rebase messup

* Review updates, removed zookeeper renaming

* Updated configmap_test zookeeper name to original

* updates requested in reviews

* added debug error log

---------

Co-authored-by: Patrik Egyed <[email protected]>
Co-authored-by: marbarta <[email protected]>
Co-authored-by: Darren Lau <[email protected]>
Co-authored-by: Kuvesz <[email protected]>
  • Loading branch information
5 people authored Jun 28, 2023
1 parent 2d4c6d4 commit 111348f
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 9 deletions.
13 changes: 10 additions & 3 deletions tests/e2e/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,23 @@ const (
zookeeperClusterName = "zookeeper-server"
managedByHelmLabelTemplate = "app.kubernetes.io/managed-by=Helm,app.kubernetes.io/instance=%s"

cruiseControlPodReadinessTimeout = 50 * time.Second
kafkaClusterResourceReadinessTimeout = 60 * time.Second
defaultDeletionTimeout = 20 * time.Second
defaultPodReadinessWaitTime = 10 * time.Second
defaultTopicCreationWaitTime = 10 * time.Second
kafkaClusterResourceCleanupTimeout = 30 * time.Second
kafkaClusterCreateTimeout = 500 * time.Second
kafkaClusterResourceCleanupTimeout = 120 * time.Second
zookeeperClusterCreateTimeout = 4 * time.Minute
zookeeperClusterResourceCleanupTimeout = 60 * time.Second
externalConsumerTimeout = 5 * time.Second
externalProducerTimeout = 5 * time.Second

kcatPodTemplate = "templates/kcat.yaml.tmpl"
kafkaTopicTemplate = "templates/topic.yaml.tmpl"
zookeeperClusterReplicaCount = 1

kcatPodTemplate = "templates/kcat.yaml.tmpl"
kafkaTopicTemplate = "templates/topic.yaml.tmpl"
zookeeperClusterTemplate = "templates/zookeeper_cluster.yaml.tmpl"

kubectlNotFoundErrorMsg = "NotFound"
)
Expand Down
1 change: 1 addition & 0 deletions tests/e2e/helm.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ func listHelmReleases(kubectlOptions k8s.KubectlOptions) ([]*HelmRelease, error)
"list",
"--output", "json",
)

if err != nil {
return nil, errors.WrapIf(err, "listing Helm releases failed")
}
Expand Down
88 changes: 88 additions & 0 deletions tests/e2e/install_cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright © 2023 Cisco Systems, Inc. and/or its affiliates
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package e2e

import (
"context"
"fmt"
"time"

"github.com/banzaicloud/koperator/api/v1beta1"
"github.com/gruntwork-io/terratest/modules/k8s"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

// requireCreatingKafkaCluster creates a KafkaCluster and
// checks the success of that operation.
func requireCreatingKafkaCluster(kubectlOptions k8s.KubectlOptions, manifestPath string) {
It("Deploying a KafkaCluster", func() {

By("Checking existing KafkaClusters")
found := isExistingK8SResource(kubectlOptions, kafkaKind, kafkaClusterName)
if found {
By(fmt.Sprintf("KafkaCluster %s already exists\n", kafkaClusterName))
} else {
By("Deploying a KafkaCluster")
applyK8sResourceManifest(kubectlOptions, manifestPath)
}

By("Verifying the KafkaCluster state")
err := waitK8sResourceCondition(kubectlOptions, kafkaKind, fmt.Sprintf("jsonpath={.status.state}=%s", string(v1beta1.KafkaClusterRunning)), kafkaClusterCreateTimeout, "", kafkaClusterName)
Expect(err).NotTo(HaveOccurred())

By("Verifying the CruiseControl pod")
Eventually(context.Background(), func() error {
return waitK8sResourceCondition(kubectlOptions, "pod", "condition=Ready", cruiseControlPodReadinessTimeout, v1beta1.KafkaCRLabelKey+"="+kafkaClusterName+",app=cruisecontrol", "")
}, kafkaClusterResourceReadinessTimeout, 3*time.Second).ShouldNot(HaveOccurred())

By("Verifying all Kafka pods")
err = waitK8sResourceCondition(kubectlOptions, "pod", "condition=Ready", defaultPodReadinessWaitTime, v1beta1.KafkaCRLabelKey+"="+kafkaClusterName, "")
Expect(err).NotTo(HaveOccurred())
})
}

// requireCreatingZookeeperCluster creates a ZookeeperCluster and
// checks the success of that operation.
func requireCreatingZookeeperCluster(kubectlOptions k8s.KubectlOptions) {
It("Deploying a ZookeeperCluster", func() {

By("Checking existing ZookeeperClusters")
found := isExistingK8SResource(kubectlOptions, zookeeperKind, zookeeperClusterName)
if found {
By(fmt.Sprintf("ZookeeperCluster %s already exists\n", zookeeperClusterName))
} else {
By("Deploying the sample ZookeeperCluster")
err := applyK8sResourceFromTemplate(kubectlOptions,
zookeeperClusterTemplate,
map[string]interface{}{
"Name": zookeeperClusterName,
"Namespace": kubectlOptions.Namespace,
"Replicas": zookeeperClusterReplicaCount,
},
)
Expect(err).NotTo(HaveOccurred())
}

By("Verifying the ZookeeperCluster resource")
err := waitK8sResourceCondition(kubectlOptions, zookeeperKind, "jsonpath={.status.readyReplicas}=1", zookeeperClusterCreateTimeout, "", zookeeperClusterName)
Expect(err).NotTo(HaveOccurred())

By("Verifying the ZookeeperCluster's pods")
err = waitK8sResourceCondition(kubectlOptions, "pod", "condition=Ready", defaultPodReadinessWaitTime, "app="+zookeeperClusterName, "")
Expect(err).NotTo(HaveOccurred())
})
}
16 changes: 16 additions & 0 deletions tests/e2e/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,22 @@ func applyK8sResourceManifest(kubectlOptions k8s.KubectlOptions, manifestPath st
k8s.KubectlApply(GinkgoT(), &kubectlOptions, manifestPath)
}

// isExistingK8SResource queries a Resource by it's kind, namespace and name and
// returns true if it's found, false otherwise
func isExistingK8SResource(
kubectlOptions k8s.KubectlOptions,
resourceKind string,
resourceName string,
) bool {
By(fmt.Sprintf("Checking the existence of resource %s in namespace %s (kind: %s)", resourceName, kubectlOptions.Namespace, resourceKind))
err := k8s.RunKubectlE(GinkgoT(), &kubectlOptions, "get", resourceKind, resourceName)
if err != nil {
By(fmt.Sprintf("Received error when getting resource: %s", err))
return false
}
return true
}

// createOrReplaceK8sResourcesFromManifest creates non-existent Kubernetes
// resources or replaces existing ones from the specified manifest to the
// provided kubectl context and namespace.
Expand Down
14 changes: 8 additions & 6 deletions tests/e2e/koperator_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,12 @@ var _ = BeforeSuite(func() {
})

var _ = When("Testing e2e test altogether", Ordered, func() {
// testInstall()
testProduceConsumeExternal("kafka-controller")
// testProduceConsumeInternal()
// testUninstallZookeeperCluster()
// testUninstallKafkaCluster()
// testUninstall()
//testInstall()
testInstallZookeeperCluster()
testInstallKafkaCluster("../../config/samples/simplekafkacluster.yaml")
testUninstallKafkaCluster()
testInstallKafkaCluster("../../config/samples/simplekafkacluster_ssl.yaml")
testUninstallKafkaCluster()
testUninstallZookeeperCluster()
//testUninstall()
})
9 changes: 9 additions & 0 deletions tests/e2e/templates/zookeeper_cluster.yaml.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
apiVersion: zookeeper.pravega.io/v1beta1
kind: ZookeeperCluster
metadata:
name: {{ .Name }}
namespace: {{ or .Namespace "zookeeper" }}
spec:
replicas: {{ or .Replicas 1 }}
persistence:
reclaimPolicy: Delete
51 changes: 51 additions & 0 deletions tests/e2e/test_install_cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright © 2023 Cisco Systems, Inc. and/or its affiliates
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package e2e

import (
"github.com/gruntwork-io/terratest/modules/k8s"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func testInstallZookeeperCluster() bool {
return When("Installing Zookeeper cluster", func() {
var kubectlOptions k8s.KubectlOptions
var err error

It("Acquiring K8s config and context", func() {
kubectlOptions, err = kubectlOptionsForCurrentContext()
Expect(err).NotTo(HaveOccurred())
})

kubectlOptions.Namespace = zookeeperOperatorHelmDescriptor.Namespace
requireCreatingZookeeperCluster(kubectlOptions)
})
}

func testInstallKafkaCluster(kafkaClusterManifestPath string) bool {
return When("Installing Kafka cluster", func() {
var kubectlOptions k8s.KubectlOptions
var err error

It("Acquiring K8s config and context", func() {
kubectlOptions, err = kubectlOptionsForCurrentContext()
Expect(err).NotTo(HaveOccurred())
})

kubectlOptions.Namespace = koperatorLocalHelmDescriptor.Namespace
requireCreatingKafkaCluster(kubectlOptions, kafkaClusterManifestPath)
})
}

0 comments on commit 111348f

Please sign in to comment.