Skip to content

Commit

Permalink
remove streams delete and extend unit tests (#2737)
Browse files Browse the repository at this point in the history
  • Loading branch information
FxKu authored Dec 16, 2024
1 parent 4929dd2 commit 301462c
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 63 deletions.
10 changes: 1 addition & 9 deletions pkg/cluster/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,15 +453,6 @@ func (c *Cluster) syncStream(appId string) error {
if stream.Spec.ApplicationId != appId {
continue
}
if streamExists {
c.logger.Warningf("more than one event stream with applicationId %s found, delete it", appId)
if err = c.KubeClient.FabricEventStreams(stream.ObjectMeta.Namespace).Delete(context.TODO(), stream.ObjectMeta.Name, metav1.DeleteOptions{}); err != nil {
c.logger.Errorf("could not delete event stream %q with applicationId %s: %v", stream.ObjectMeta.Name, appId, err)
} else {
c.logger.Infof("redundant event stream %q with applicationId %s has been successfully deleted", stream.ObjectMeta.Name, appId)
}
continue
}
streamExists = true
desiredStreams := c.generateFabricEventStream(appId)
if !reflect.DeepEqual(stream.ObjectMeta.OwnerReferences, desiredStreams.ObjectMeta.OwnerReferences) {
Expand All @@ -484,6 +475,7 @@ func (c *Cluster) syncStream(appId string) error {
c.Streams[appId] = updatedStream
c.logger.Infof("event streams %q with applicationId %s have been successfully updated", updatedStream.Name, appId)
}
break
}

if !streamExists {
Expand Down
158 changes: 104 additions & 54 deletions pkg/cluster/streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ var (
Namespace: namespace,
Labels: map[string]string{
"application": "spilo",
"cluster-name": fmt.Sprintf("%s-2", clusterName),
"cluster-name": clusterName,
"team": "acid",
},
OwnerReferences: []metav1.OwnerReference{
Expand Down Expand Up @@ -494,14 +494,13 @@ func TestSyncStreams(t *testing.T) {
OpConfig: config.Config{
PodManagementPolicy: "ordered_ready",
Resources: config.Resources{
ClusterLabels: map[string]string{"application": "spilo"},
ClusterNameLabel: "cluster-name",
DefaultCPURequest: "300m",
DefaultCPULimit: "300m",
DefaultMemoryRequest: "300Mi",
DefaultMemoryLimit: "300Mi",
EnableOwnerReferences: util.True(),
PodRoleLabel: "spilo-role",
ClusterLabels: map[string]string{"application": "spilo"},
ClusterNameLabel: "cluster-name",
DefaultCPURequest: "300m",
DefaultCPULimit: "300m",
DefaultMemoryRequest: "300Mi",
DefaultMemoryLimit: "300Mi",
PodRoleLabel: "spilo-role",
},
},
}, client, pg, logger, eventRecorder)
Expand All @@ -514,33 +513,17 @@ func TestSyncStreams(t *testing.T) {
err = cluster.syncStream(appId)
assert.NoError(t, err)

// create a second stream with same spec but with different name
createdStream, err := cluster.KubeClient.FabricEventStreams(namespace).Create(
context.TODO(), fes, metav1.CreateOptions{})
// sync the stream again
err = cluster.syncStream(appId)
assert.NoError(t, err)
assert.Equal(t, createdStream.Spec.ApplicationId, appId)

// check that two streams exist
// check that only one stream remains after sync
listOptions := metav1.ListOptions{
LabelSelector: cluster.labelsSet(true).String(),
}
streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
assert.NoError(t, err)
assert.Equalf(t, 2, len(streams.Items), "unexpected number of streams found: got %d, but expected only 2", len(streams.Items))

// sync the stream which should remove the redundant stream
err = cluster.syncStream(appId)
assert.NoError(t, err)

// check that only one stream remains after sync
streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
assert.NoError(t, err)
assert.Equalf(t, 1, len(streams.Items), "unexpected number of streams found: got %d, but expected only 1", len(streams.Items))

// check owner references
if !reflect.DeepEqual(streams.Items[0].OwnerReferences, cluster.ownerReferences()) {
t.Errorf("unexpected owner references, expected %#v, got %#v", cluster.ownerReferences(), streams.Items[0].OwnerReferences)
}
}

func TestSameStreams(t *testing.T) {
Expand Down Expand Up @@ -663,13 +646,14 @@ func TestUpdateStreams(t *testing.T) {
OpConfig: config.Config{
PodManagementPolicy: "ordered_ready",
Resources: config.Resources{
ClusterLabels: map[string]string{"application": "spilo"},
ClusterNameLabel: "cluster-name",
DefaultCPURequest: "300m",
DefaultCPULimit: "300m",
DefaultMemoryRequest: "300Mi",
DefaultMemoryLimit: "300Mi",
PodRoleLabel: "spilo-role",
ClusterLabels: map[string]string{"application": "spilo"},
ClusterNameLabel: "cluster-name",
DefaultCPURequest: "300m",
DefaultCPULimit: "300m",
DefaultMemoryRequest: "300Mi",
DefaultMemoryLimit: "300Mi",
EnableOwnerReferences: util.True(),
PodRoleLabel: "spilo-role",
},
},
}, client, pg, logger, eventRecorder)
Expand All @@ -678,10 +662,31 @@ func TestUpdateStreams(t *testing.T) {
context.TODO(), &pg, metav1.CreateOptions{})
assert.NoError(t, err)

// create the stream
// create stream with different owner reference
fes.ObjectMeta.Name = fmt.Sprintf("%s-12345", pg.Name)
fes.ObjectMeta.Labels["cluster-name"] = pg.Name
createdStream, err := cluster.KubeClient.FabricEventStreams(namespace).Create(
context.TODO(), fes, metav1.CreateOptions{})
assert.NoError(t, err)
assert.Equal(t, createdStream.Spec.ApplicationId, appId)

// sync the stream which should update the owner reference
err = cluster.syncStream(appId)
assert.NoError(t, err)

// check that only one stream exists after sync
listOptions := metav1.ListOptions{
LabelSelector: cluster.labelsSet(true).String(),
}
streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
assert.NoError(t, err)
assert.Equalf(t, 1, len(streams.Items), "unexpected number of streams found: got %d, but expected only 1", len(streams.Items))

// compare owner references
if !reflect.DeepEqual(streams.Items[0].OwnerReferences, cluster.ownerReferences()) {
t.Errorf("unexpected owner references, expected %#v, got %#v", cluster.ownerReferences(), streams.Items[0].OwnerReferences)
}

// change specs of streams and patch CRD
for i, stream := range pg.Spec.Streams {
if stream.ApplicationId == appId {
Expand All @@ -694,10 +699,7 @@ func TestUpdateStreams(t *testing.T) {
}

// compare stream returned from API with expected stream
listOptions := metav1.ListOptions{
LabelSelector: cluster.labelsSet(true).String(),
}
streams := patchPostgresqlStreams(t, cluster, &pg.Spec, listOptions)
streams = patchPostgresqlStreams(t, cluster, &pg.Spec, listOptions)
result := cluster.generateFabricEventStream(appId)
if match, _ := cluster.compareStreams(&streams.Items[0], result); !match {
t.Errorf("Malformed FabricEventStream after updating manifest, expected %#v, got %#v", streams.Items[0], result)
Expand All @@ -716,9 +718,51 @@ func TestUpdateStreams(t *testing.T) {
if match, _ := cluster.compareStreams(&streams.Items[0], result); !match {
t.Errorf("Malformed FabricEventStream after disabling event recovery, expected %#v, got %#v", streams.Items[0], result)
}
}

mockClient := k8sutil.NewMockKubernetesClient()
cluster.KubeClient.CustomResourceDefinitionsGetter = mockClient.CustomResourceDefinitionsGetter
func patchPostgresqlStreams(t *testing.T, cluster *Cluster, pgSpec *acidv1.PostgresSpec, listOptions metav1.ListOptions) (streams *zalandov1.FabricEventStreamList) {
patchData, err := specPatch(pgSpec)
assert.NoError(t, err)

pgPatched, err := cluster.KubeClient.Postgresqls(namespace).Patch(
context.TODO(), cluster.Name, types.MergePatchType, patchData, metav1.PatchOptions{}, "spec")
assert.NoError(t, err)

cluster.Postgresql.Spec = pgPatched.Spec
err = cluster.syncStream(appId)
assert.NoError(t, err)

streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
assert.NoError(t, err)

return streams
}

func TestDeleteStreams(t *testing.T) {
pg.Name = fmt.Sprintf("%s-4", pg.Name)
var cluster = New(
Config{
OpConfig: config.Config{
PodManagementPolicy: "ordered_ready",
Resources: config.Resources{
ClusterLabels: map[string]string{"application": "spilo"},
ClusterNameLabel: "cluster-name",
DefaultCPURequest: "300m",
DefaultCPULimit: "300m",
DefaultMemoryRequest: "300Mi",
DefaultMemoryLimit: "300Mi",
PodRoleLabel: "spilo-role",
},
},
}, client, pg, logger, eventRecorder)

_, err := cluster.KubeClient.Postgresqls(namespace).Create(
context.TODO(), &pg, metav1.CreateOptions{})
assert.NoError(t, err)

// create the stream
err = cluster.syncStream(appId)
assert.NoError(t, err)

// remove streams from manifest
pg.Spec.Streams = nil
Expand All @@ -729,26 +773,32 @@ func TestUpdateStreams(t *testing.T) {
appIds := getDistinctApplicationIds(pgUpdated.Spec.Streams)
cluster.cleanupRemovedStreams(appIds)

streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
if len(streams.Items) > 0 || err != nil {
t.Errorf("stream resource has not been removed or unexpected error %v", err)
// check that streams have been deleted
listOptions := metav1.ListOptions{
LabelSelector: cluster.labelsSet(true).String(),
}
}

func patchPostgresqlStreams(t *testing.T, cluster *Cluster, pgSpec *acidv1.PostgresSpec, listOptions metav1.ListOptions) (streams *zalandov1.FabricEventStreamList) {
patchData, err := specPatch(pgSpec)
streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
assert.NoError(t, err)
assert.Equalf(t, 0, len(streams.Items), "unexpected number of streams found: got %d, but expected none", len(streams.Items))

pgPatched, err := cluster.KubeClient.Postgresqls(namespace).Patch(
context.TODO(), cluster.Name, types.MergePatchType, patchData, metav1.PatchOptions{}, "spec")
// create stream to test deleteStreams code
fes.ObjectMeta.Name = fmt.Sprintf("%s-12345", pg.Name)
fes.ObjectMeta.Labels["cluster-name"] = pg.Name
_, err = cluster.KubeClient.FabricEventStreams(namespace).Create(
context.TODO(), fes, metav1.CreateOptions{})
assert.NoError(t, err)

cluster.Postgresql.Spec = pgPatched.Spec
// sync it once to cluster struct
err = cluster.syncStream(appId)
assert.NoError(t, err)

// we need a mock client because deleteStreams checks for CRD existance
mockClient := k8sutil.NewMockKubernetesClient()
cluster.KubeClient.CustomResourceDefinitionsGetter = mockClient.CustomResourceDefinitionsGetter
cluster.deleteStreams()

// check that streams have been deleted
streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
assert.NoError(t, err)

return streams
assert.Equalf(t, 0, len(streams.Items), "unexpected number of streams found: got %d, but expected none", len(streams.Items))
}

0 comments on commit 301462c

Please sign in to comment.