From 2d1d97364c9fef86d06cc8380603dae365c702da Mon Sep 17 00:00:00 2001 From: Isteb4k Date: Tue, 10 Dec 2024 15:29:24 +0100 Subject: [PATCH] feat(vd): set tolerations for provisioners Signed-off-by: Isteb4k --- .../019-manage-provisioner-tolerations.patch | 220 ++++++++++++++++++ images/cdi-artifact/patches/README.md | 4 + .../pkg/common/annotations/annotations.go | 19 +- .../pkg/common/provisioner/node_placement.go | 87 +++++++ .../common/provisioner/node_placement_test.go | 133 +++++++++++ .../pkg/controller/conditions/getter.go | 29 +++ .../cvi/internal/source/interfaces.go | 4 +- .../controller/cvi/internal/source/mock.go | 28 ++- .../pkg/controller/importer/importer_pod.go | 70 ++---- .../controller/importer/importer_pod_test.go | 7 +- .../pkg/controller/importer/node_placement.go | 47 ---- .../pkg/controller/kvbuilder/dv.go | 30 +++ .../pkg/controller/service/condition.go | 10 - .../pkg/controller/service/disk_service.go | 37 ++- .../pkg/controller/service/errors.go | 11 +- .../controller/service/importer_service.go | 33 ++- .../pkg/controller/service/stat_service.go | 5 +- .../controller/service/uploader_service.go | 22 +- .../pkg/controller/uploader/uploader_pod.go | 29 ++- .../controller/uploader/uploader_pod_test.go | 4 +- .../controller/vd/internal/source/blank.go | 23 +- .../pkg/controller/vd/internal/source/http.go | 48 ++-- .../vd/internal/source/object_ref.go | 2 +- .../vd/internal/source/object_ref_cvi.go | 15 +- .../vd/internal/source/object_ref_vi_dvcr.go | 15 +- .../vd/internal/source/object_ref_vi_pvc.go | 24 +- .../controller/vd/internal/source/registry.go | 44 ++-- .../controller/vd/internal/source/sources.go | 163 ++++++++++++- .../controller/vd/internal/source/upload.go | 39 ++-- .../vd/internal/watcher/pod_watcher.go | 82 +++++++ .../pkg/controller/vd/vd_controller.go | 6 +- .../pkg/controller/vd/vd_reconciler.go | 23 +- .../vi/internal/source/interfaces.go | 4 +- .../pkg/controller/vi/internal/source/mock.go | 44 ++-- 34 files changed, 1117 insertions(+), 244 deletions(-) create mode 100644 images/cdi-artifact/patches/019-manage-provisioner-tolerations.patch create mode 100644 images/virtualization-artifact/pkg/common/provisioner/node_placement.go create mode 100644 images/virtualization-artifact/pkg/common/provisioner/node_placement_test.go create mode 100644 images/virtualization-artifact/pkg/controller/conditions/getter.go delete mode 100644 images/virtualization-artifact/pkg/controller/importer/node_placement.go create mode 100644 images/virtualization-artifact/pkg/controller/vd/internal/watcher/pod_watcher.go diff --git a/images/cdi-artifact/patches/019-manage-provisioner-tolerations.patch b/images/cdi-artifact/patches/019-manage-provisioner-tolerations.patch new file mode 100644 index 000000000..4d4c98d18 --- /dev/null +++ b/images/cdi-artifact/patches/019-manage-provisioner-tolerations.patch @@ -0,0 +1,220 @@ +diff --git a/pkg/controller/clone-controller.go b/pkg/controller/clone-controller.go +index 59ee5fd3f..046d1f916 100644 +--- a/pkg/controller/clone-controller.go ++++ b/pkg/controller/clone-controller.go +@@ -500,6 +500,11 @@ func (r *CloneReconciler) CreateCloneSourcePod(image, pullPolicy string, pvc *co + return nil, err + } + ++ workloadNodePlacement, err = cc.AdjustWorkloadNodePlacement(context.TODO(), r.client, workloadNodePlacement, pvc) ++ if err != nil { ++ return nil, fmt.Errorf("failed to adjust workload node placement: %w", err) ++ } ++ + sourcePvc, err := r.getCloneRequestSourcePVC(pvc) + if err != nil { + return nil, err +diff --git a/pkg/controller/clone/prep-claim.go b/pkg/controller/clone/prep-claim.go +index 9317b7429..68a249b77 100644 +--- a/pkg/controller/clone/prep-claim.go ++++ b/pkg/controller/clone/prep-claim.go +@@ -139,6 +139,11 @@ func (p *PrepClaimPhase) createPod(ctx context.Context, name string, pvc *corev1 + return err + } + ++ workloadNodePlacement, err = cc.AdjustWorkloadNodePlacement(context.TODO(), p.Client, workloadNodePlacement, pvc) ++ if err != nil { ++ return fmt.Errorf("failed to adjust workload node placement: %w", err) ++ } ++ + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, +diff --git a/pkg/controller/common/util.go b/pkg/controller/common/util.go +index 48c73628d..f2a751805 100644 +--- a/pkg/controller/common/util.go ++++ b/pkg/controller/common/util.go +@@ -21,6 +21,7 @@ import ( + "crypto/rand" + "crypto/rsa" + "crypto/tls" ++ "encoding/json" + "fmt" + "io" + "math" +@@ -95,6 +96,11 @@ const ( + // AnnExternalPopulation annotation marks a PVC as "externally populated", allowing the import-controller to skip it + AnnExternalPopulation = AnnAPIGroup + "/externalPopulation" + ++ // AnnProvisionerTolerations annotation specifies tolerations to use for provisioners. ++ AnnProvisionerTolerations = "virt.deckhouse.io/provisioner-tolerations" ++ // AnnProvisionerName provides a name of data volume provisioner. ++ AnnProvisionerName = "virt.deckhouse.io/provisioner-name" ++ + // AnnDeleteAfterCompletion is PVC annotation for deleting DV after completion + AnnDeleteAfterCompletion = AnnAPIGroup + "/storage.deleteAfterCompletion" + // AnnPodRetainAfterCompletion is PVC annotation for retaining transfer pods after completion +@@ -780,6 +786,50 @@ func GetWorkloadNodePlacement(ctx context.Context, c client.Client) (*sdkapi.Nod + return &cr.Spec.Workloads, nil + } + ++// AdjustWorkloadNodePlacement adds tolerations specified in prime pvc annotation. ++func AdjustWorkloadNodePlacement(ctx context.Context, c client.Client, nodePlacement *sdkapi.NodePlacement, primePVC *corev1.PersistentVolumeClaim) (*sdkapi.NodePlacement, error) { ++ targetPVCKey := types.NamespacedName{ ++ Namespace: primePVC.Namespace, ++ } ++ ++ for _, ref := range primePVC.OwnerReferences { ++ if ref.Kind == "PersistentVolumeClaim" { ++ targetPVCKey.Name = ref.Name ++ } ++ } ++ ++ var targetPVC corev1.PersistentVolumeClaim ++ err := c.Get(ctx, targetPVCKey, &targetPVC) ++ if err != nil { ++ return nil, fmt.Errorf("failed to get target pvc %s: %w", targetPVCKey, err) ++ } ++ ++ provisionerTolerations, err := ExtractProvisionerTolerations(&targetPVC) ++ if err != nil { ++ return nil, fmt.Errorf("failed to extract provisioner tolerations: %w", err) ++ } ++ ++ nodePlacement.Tolerations = append(nodePlacement.Tolerations, provisionerTolerations...) ++ ++ return nodePlacement, nil ++} ++ ++func ExtractProvisionerTolerations(obj client.Object) ([]corev1.Toleration, error) { ++ rawTolerations := obj.GetAnnotations()[AnnProvisionerTolerations] ++ ++ if rawTolerations == "" { ++ return nil, nil ++ } ++ ++ var tolerations []corev1.Toleration ++ err := json.Unmarshal([]byte(rawTolerations), &tolerations) ++ if err != nil { ++ return nil, fmt.Errorf("failed to unmarshal provisioner tolerations %s: %w", rawTolerations, err) ++ } ++ ++ return tolerations, nil ++} ++ + // GetActiveCDI returns the active CDI CR + func GetActiveCDI(ctx context.Context, c client.Client) (*cdiv1.CDI, error) { + crList := &cdiv1.CDIList{} +diff --git a/pkg/controller/datavolume/controller-base.go b/pkg/controller/datavolume/controller-base.go +index b8c9f893e..99f8501be 100644 +--- a/pkg/controller/datavolume/controller-base.go ++++ b/pkg/controller/datavolume/controller-base.go +@@ -1145,6 +1145,11 @@ func (r *ReconcilerBase) newPersistentVolumeClaim(dataVolume *cdiv1.DataVolume, + annotations[k] = v + } + annotations[cc.AnnPodRestarts] = "0" ++ ++ if dataVolume.Annotations[cc.AnnProvisionerTolerations] != "" { ++ annotations[cc.AnnProvisionerTolerations] = dataVolume.Annotations[cc.AnnProvisionerTolerations] ++ } ++ + annotations[cc.AnnContentType] = string(cc.GetContentType(dataVolume.Spec.ContentType)) + if dataVolume.Spec.PriorityClassName != "" { + annotations[cc.AnnPriorityClassName] = dataVolume.Spec.PriorityClassName +diff --git a/pkg/controller/datavolume/pvc-clone-controller.go b/pkg/controller/datavolume/pvc-clone-controller.go +index e9d18ef30..f879408fe 100644 +--- a/pkg/controller/datavolume/pvc-clone-controller.go ++++ b/pkg/controller/datavolume/pvc-clone-controller.go +@@ -42,6 +42,7 @@ import ( + "sigs.k8s.io/controller-runtime/pkg/source" + + cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1" ++ + "kubevirt.io/containerized-data-importer/pkg/common" + cc "kubevirt.io/containerized-data-importer/pkg/controller/common" + featuregates "kubevirt.io/containerized-data-importer/pkg/feature-gates" +@@ -683,6 +684,12 @@ func (r *PvcCloneReconciler) makeSizeDetectionPodSpec( + if err != nil { + return nil + } ++ ++ workloadNodePlacement.Tolerations, err = cc.ExtractProvisionerTolerations(dv) ++ if err != nil { ++ return nil ++ } ++ + // Generate individual specs + objectMeta := makeSizeDetectionObjectMeta(sourcePvc) + volume := makeSizeDetectionVolumeSpec(sourcePvc.Name) +diff --git a/pkg/controller/import-controller.go b/pkg/controller/import-controller.go +index 49f1ff898..ba9fcb531 100644 +--- a/pkg/controller/import-controller.go ++++ b/pkg/controller/import-controller.go +@@ -859,6 +859,11 @@ func createImporterPod(ctx context.Context, log logr.Logger, client client.Clien + return nil, err + } + ++ args.workloadNodePlacement, err = cc.AdjustWorkloadNodePlacement(context.TODO(), client, args.workloadNodePlacement, args.pvc) ++ if err != nil { ++ return nil, fmt.Errorf("failed to adjust workload node placement: %w", err) ++ } ++ + if isRegistryNodeImport(args) { + args.importImage, err = getRegistryImportImage(args.pvc) + if err != nil { +diff --git a/pkg/controller/populators/populator-base.go b/pkg/controller/populators/populator-base.go +index 6c6fd8f8a..a69ce4f2a 100644 +--- a/pkg/controller/populators/populator-base.go ++++ b/pkg/controller/populators/populator-base.go +@@ -223,7 +223,7 @@ type updatePVCAnnotationsFunc func(pvc, pvcPrime *corev1.PersistentVolumeClaim) + + var desiredAnnotations = []string{cc.AnnPodPhase, cc.AnnPodReady, cc.AnnPodRestarts, + cc.AnnPreallocationRequested, cc.AnnPreallocationApplied, cc.AnnCurrentCheckpoint, cc.AnnMultiStageImportDone, +- cc.AnnRunningCondition, cc.AnnRunningConditionMessage, cc.AnnRunningConditionReason} ++ cc.AnnRunningCondition, cc.AnnRunningConditionMessage, cc.AnnRunningConditionReason, cc.AnnProvisionerName} + + func (r *ReconcilerBase) updatePVCWithPVCPrimeAnnotations(pvc, pvcPrime *corev1.PersistentVolumeClaim, updateFunc updatePVCAnnotationsFunc) (*corev1.PersistentVolumeClaim, error) { + pvcCopy := pvc.DeepCopy() +diff --git a/pkg/controller/upload-controller.go b/pkg/controller/upload-controller.go +index f251cae5d..ed4420fb9 100644 +--- a/pkg/controller/upload-controller.go ++++ b/pkg/controller/upload-controller.go +@@ -623,6 +623,11 @@ func (r *UploadReconciler) createUploadPod(args UploadPodArgs) (*corev1.Pod, err + return nil, err + } + ++ workloadNodePlacement, err = cc.AdjustWorkloadNodePlacement(context.TODO(), r.client, workloadNodePlacement, args.PVC) ++ if err != nil { ++ return nil, fmt.Errorf("failed to adjust workload node placement: %w", err) ++ } ++ + pod := r.makeUploadPodSpec(args, podResourceRequirements, imagePullSecrets, workloadNodePlacement) + util.SetRecommendedLabels(pod, r.installerLabels, "cdi-controller") + +diff --git a/pkg/controller/util.go b/pkg/controller/util.go +index 81e050464..367b5453b 100644 +--- a/pkg/controller/util.go ++++ b/pkg/controller/util.go +@@ -298,7 +298,21 @@ func podSucceededFromPVC(pvc *corev1.PersistentVolumeClaim) bool { + } + + func setAnnotationsFromPodWithPrefix(anno map[string]string, pod *corev1.Pod, termMsg *common.TerminationMessage, prefix string) { +- if pod == nil || pod.Status.ContainerStatuses == nil { ++ if pod == nil { ++ return ++ } ++ ++ for _, cond := range pod.Status.Conditions { ++ if cond.Type != corev1.PodScheduled { ++ continue ++ } ++ ++ anno[cc.AnnProvisionerName] = pod.Name ++ ++ break ++ } ++ ++ if pod.Status.ContainerStatuses == nil { + return + } + annPodRestarts, _ := strconv.Atoi(anno[cc.AnnPodRestarts]) diff --git a/images/cdi-artifact/patches/README.md b/images/cdi-artifact/patches/README.md index 2a115ccb4..1ea32c62c 100644 --- a/images/cdi-artifact/patches/README.md +++ b/images/cdi-artifact/patches/README.md @@ -76,3 +76,7 @@ This is necessary for ensuring that the metrics can be accessed only by Promethe Currently covered metrics: - cdi-controller - cdi-deployment + +#### `019-add-provisioner-tolerations-anno.patch` + +Add annotation to manage provisioner tolerations to avoid unschedulable error. diff --git a/images/virtualization-artifact/pkg/common/annotations/annotations.go b/images/virtualization-artifact/pkg/common/annotations/annotations.go index a7221cd17..610cbe9a8 100644 --- a/images/virtualization-artifact/pkg/common/annotations/annotations.go +++ b/images/virtualization-artifact/pkg/common/annotations/annotations.go @@ -30,17 +30,26 @@ const ( VIShortName = "vi" // AnnAPIGroup is the APIGroup for virtualization-controller. - AnnAPIGroup = "virt.deckhouse.io" + AnnAPIGroup = "virt.deckhouse.io" + AnnAPIGroupCDI = "cdi.kubevirt.io" - // AnnCreatedBy is a pod annotation indicating if the pod was created by the PVC + // AnnCreatedBy is a pod annotation indicating if the pod was created by the PVC. AnnCreatedBy = AnnAPIGroup + "/storage.createdByController" // AnnPodRetainAfterCompletion is PVC annotation for retaining transfer pods after completion AnnPodRetainAfterCompletion = AnnAPIGroup + "/storage.pod.retainAfterCompletion" - // AnnUploadURL provides a const for CVMI/VMI/VMD uploadURL annotation + // AnnUploadURL provides a const for CVMI/VMI/VMD uploadURL annotation. AnnUploadURL = AnnAPIGroup + "/upload.url" + // AnnTolerationsHash provides a const for annotation with hash of applied tolerations. + AnnTolerationsHash = AnnAPIGroup + "/tolerations-hash" + + // AnnProvisionerTolerations provides a const for tolerations to use for provisioners. + AnnProvisionerTolerations = AnnAPIGroup + "/provisioner-tolerations" + // AnnProvisionerName provides a name of data volume provisioner. + AnnProvisionerName = AnnAPIGroup + "/provisioner-name" + // AnnDefaultStorageClass is the annotation indicating that a storage class is the default one. AnnDefaultStorageClass = "storageclass.kubernetes.io/is-default-class" @@ -70,6 +79,10 @@ const ( AppKubernetesManagedByLabel = "app.kubernetes.io/managed-by" // AppKubernetesComponentLabel is the Kubernetes recommended component label AppKubernetesComponentLabel = "app.kubernetes.io/component" + + AnnPodScheduledCondition = AnnAPIGroupCDI + "/pod.condition.scheduled" + AnnPodScheduledConditionMessage = AnnAPIGroupCDI + "/pod.condition.scheduled.message" + AnnPodScheduledConditionReason = AnnAPIGroupCDI + "/pod.condition.scheduled.reason" ) // AddAnnotation adds an annotation to an object diff --git a/images/virtualization-artifact/pkg/common/provisioner/node_placement.go b/images/virtualization-artifact/pkg/common/provisioner/node_placement.go new file mode 100644 index 000000000..14ecd1e1b --- /dev/null +++ b/images/virtualization-artifact/pkg/common/provisioner/node_placement.go @@ -0,0 +1,87 @@ +/* +Copyright 2024 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package provisioner + +import ( + "encoding/base64" + "encoding/json" + + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/deckhouse/virtualization-controller/pkg/common/annotations" +) + +type NodePlacement struct { + // tolerations is a list of tolerations applied to the relevant kind of pods + // See https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/ for more info. + // These are additional tolerations other than default ones. + Tolerations []corev1.Toleration `json:"tolerations,omitempty"` +} + +func IsNodePlacementChanged(nodePlacement *NodePlacement, obj client.Object) (bool, error) { + oldHash, exists := obj.GetAnnotations()[annotations.AnnTolerationsHash] + + if nodePlacement == nil && exists { + return true, nil + } + + if (nodePlacement == nil || len(nodePlacement.Tolerations) == 0) && !exists { + return false, nil + } + + JSON, err := json.Marshal(nodePlacement.Tolerations) + if err != nil { + return false, err + } + + newHash := base64.StdEncoding.EncodeToString(JSON) + + return oldHash != newHash, nil +} + +func KeepNodePlacementTolerations(nodePlacement *NodePlacement, obj client.Object) error { + anno := obj.GetAnnotations() + + if nodePlacement == nil || len(nodePlacement.Tolerations) == 0 { + _, ok := anno[annotations.AnnTolerationsHash] + if !ok { + return nil + } + + delete(anno, annotations.AnnTolerationsHash) + + obj.SetAnnotations(anno) + + return nil + } + + JSON, err := json.Marshal(nodePlacement.Tolerations) + if err != nil { + return err + } + + if anno == nil { + anno = make(map[string]string) + } + + anno[annotations.AnnTolerationsHash] = base64.StdEncoding.EncodeToString(JSON) + + obj.SetAnnotations(anno) + + return nil +} diff --git a/images/virtualization-artifact/pkg/common/provisioner/node_placement_test.go b/images/virtualization-artifact/pkg/common/provisioner/node_placement_test.go new file mode 100644 index 000000000..cf05c26c5 --- /dev/null +++ b/images/virtualization-artifact/pkg/common/provisioner/node_placement_test.go @@ -0,0 +1,133 @@ +/* +Copyright 2024 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package provisioner + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + + "github.com/deckhouse/virtualization-controller/pkg/common/annotations" +) + +func TestProvisioner(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Provisioner") +} + +var _ = Describe("KeepNodePlacementTolerations", func() { + var pod *corev1.Pod + + BeforeEach(func() { + pod = &corev1.Pod{} + }) + + Context("there is no node placement", func() { + It("doesnt set annotation", func() { + err := KeepNodePlacementTolerations(nil, pod) + Expect(err).To(BeNil()) + Expect(pod.Annotations).To(BeEmpty()) + }) + }) + + Context("there are no tolerations", func() { + It("doesnt set annotation", func() { + var nodePlacement NodePlacement + err := KeepNodePlacementTolerations(&nodePlacement, pod) + Expect(err).To(BeNil()) + Expect(pod.Annotations).To(BeEmpty()) + }) + }) + + Context("there are tolerations", func() { + It("set annotation", func() { + nodePlacement := NodePlacement{Tolerations: []corev1.Toleration{ + { + Key: "Foo", + Operator: "Exists", + Effect: "NoSchedule", + }, + }} + + err := KeepNodePlacementTolerations(&nodePlacement, pod) + Expect(err).To(BeNil()) + Expect(pod.Annotations).ToNot(BeEmpty()) + Expect(pod.Annotations[annotations.AnnTolerationsHash]).ToNot(BeEmpty()) + }) + }) +}) + +var _ = Describe("IsNodePlacementChanged", func() { + var pod *corev1.Pod + + Context("there is no toleration in obj", func() { + BeforeEach(func() { + pod = &corev1.Pod{} + }) + + It("is not changed with empty node placement", func() { + isChanged, err := IsNodePlacementChanged(nil, pod) + Expect(err).To(BeNil()) + Expect(isChanged).To(BeFalse()) + }) + + It("is not changed with empty tolerations", func() { + isChanged, err := IsNodePlacementChanged(&NodePlacement{}, pod) + Expect(err).To(BeNil()) + Expect(isChanged).To(BeFalse()) + }) + }) + + Context("there is toleration in obj", func() { + var nodePlacement *NodePlacement + + BeforeEach(func() { + pod = &corev1.Pod{} + + nodePlacement = &NodePlacement{Tolerations: []corev1.Toleration{{Key: "Foo"}}} + err := KeepNodePlacementTolerations(nodePlacement, pod) + Expect(err).To(BeNil()) + }) + + It("is not changed: with tolerations", func() { + isChanged, err := IsNodePlacementChanged(nodePlacement, pod) + Expect(err).To(BeNil()) + Expect(isChanged).To(BeFalse()) + }) + + It("is changed: no node placement", func() { + isChanged, err := IsNodePlacementChanged(nil, pod) + Expect(err).To(BeNil()) + Expect(isChanged).To(BeTrue()) + }) + + It("is changed: with empty tolerations", func() { + isChanged, err := IsNodePlacementChanged(&NodePlacement{}, pod) + Expect(err).To(BeNil()) + Expect(isChanged).To(BeTrue()) + }) + + It("is changed: with different tolerations", func() { + changedTolerations := &NodePlacement{Tolerations: []corev1.Toleration{{Key: "Bar"}}} + isChanged, err := IsNodePlacementChanged(changedTolerations, pod) + Expect(err).To(BeNil()) + Expect(isChanged).To(BeTrue()) + }) + }) +}) diff --git a/images/virtualization-artifact/pkg/controller/conditions/getter.go b/images/virtualization-artifact/pkg/controller/conditions/getter.go new file mode 100644 index 000000000..10693b6fa --- /dev/null +++ b/images/virtualization-artifact/pkg/controller/conditions/getter.go @@ -0,0 +1,29 @@ +/* +Copyright 2024 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package conditions + +import corev1 "k8s.io/api/core/v1" + +func GetPodCondition(condType corev1.PodConditionType, conds []corev1.PodCondition) (corev1.PodCondition, bool) { + for _, cond := range conds { + if cond.Type == condType { + return cond, true + } + } + + return corev1.PodCondition{}, false +} diff --git a/images/virtualization-artifact/pkg/controller/cvi/internal/source/interfaces.go b/images/virtualization-artifact/pkg/controller/cvi/internal/source/interfaces.go index fbdc608f4..6201d46fc 100644 --- a/images/virtualization-artifact/pkg/controller/cvi/internal/source/interfaces.go +++ b/images/virtualization-artifact/pkg/controller/cvi/internal/source/interfaces.go @@ -35,7 +35,7 @@ import ( //go:generate moq -rm -out mock.go . Importer Uploader Stat type Importer interface { - Start(ctx context.Context, settings *importer.Settings, obj service.ObjectKind, sup *supplements.Generator, caBundle *datasource.CABundle) error + Start(ctx context.Context, settings *importer.Settings, obj service.ObjectKind, sup *supplements.Generator, caBundle *datasource.CABundle, opts ...service.Option) error StartWithPodSetting(ctx context.Context, settings *importer.Settings, sup *supplements.Generator, caBundle *datasource.CABundle, podSettings *importer.PodSettings) error CleanUp(ctx context.Context, sup *supplements.Generator) (bool, error) CleanUpSupplements(ctx context.Context, sup *supplements.Generator) (bool, error) @@ -47,7 +47,7 @@ type Importer interface { } type Uploader interface { - Start(ctx context.Context, settings *uploader.Settings, obj service.ObjectKind, sup *supplements.Generator, caBundle *datasource.CABundle) error + Start(ctx context.Context, settings *uploader.Settings, obj service.ObjectKind, sup *supplements.Generator, caBundle *datasource.CABundle, opts ...service.Option) error CleanUp(ctx context.Context, sup *supplements.Generator) (bool, error) GetPod(ctx context.Context, sup *supplements.Generator) (*corev1.Pod, error) GetIngress(ctx context.Context, sup *supplements.Generator) (*netv1.Ingress, error) diff --git a/images/virtualization-artifact/pkg/controller/cvi/internal/source/mock.go b/images/virtualization-artifact/pkg/controller/cvi/internal/source/mock.go index 19f0b6e27..fc4c87fbe 100644 --- a/images/virtualization-artifact/pkg/controller/cvi/internal/source/mock.go +++ b/images/virtualization-artifact/pkg/controller/cvi/internal/source/mock.go @@ -46,7 +46,7 @@ var _ Importer = &ImporterMock{} // ProtectFunc: func(ctx context.Context, pod *corev1.Pod) error { // panic("mock out the Protect method") // }, -// StartFunc: func(ctx context.Context, settings *importer.Settings, obj service.ObjectKind, sup *supplements.Generator, caBundle *datasource.CABundle) error { +// StartFunc: func(ctx context.Context, settings *importer.Settings, obj service.ObjectKind, sup *supplements.Generator, caBundle *datasource.CABundle, opts ...service.Option) error { // panic("mock out the Start method") // }, // StartWithPodSettingFunc: func(ctx context.Context, settings *importer.Settings, sup *supplements.Generator, caBundle *datasource.CABundle, podSettings *importer.PodSettings) error { @@ -81,7 +81,7 @@ type ImporterMock struct { ProtectFunc func(ctx context.Context, pod *corev1.Pod) error // StartFunc mocks the Start method. - StartFunc func(ctx context.Context, settings *importer.Settings, obj service.ObjectKind, sup *supplements.Generator, caBundle *datasource.CABundle) error + StartFunc func(ctx context.Context, settings *importer.Settings, obj service.ObjectKind, sup *supplements.Generator, caBundle *datasource.CABundle, opts ...service.Option) error // StartWithPodSettingFunc mocks the StartWithPodSetting method. StartWithPodSettingFunc func(ctx context.Context, settings *importer.Settings, sup *supplements.Generator, caBundle *datasource.CABundle, podSettings *importer.PodSettings) error @@ -151,6 +151,8 @@ type ImporterMock struct { Sup *supplements.Generator // CaBundle is the caBundle argument value. CaBundle *datasource.CABundle + // Opts is the opts argument value. + Opts []service.Option } // StartWithPodSetting holds details about calls to the StartWithPodSetting method. StartWithPodSetting []struct { @@ -413,7 +415,7 @@ func (mock *ImporterMock) ProtectCalls() []struct { } // Start calls StartFunc. -func (mock *ImporterMock) Start(ctx context.Context, settings *importer.Settings, obj service.ObjectKind, sup *supplements.Generator, caBundle *datasource.CABundle) error { +func (mock *ImporterMock) Start(ctx context.Context, settings *importer.Settings, obj service.ObjectKind, sup *supplements.Generator, caBundle *datasource.CABundle, opts ...service.Option) error { if mock.StartFunc == nil { panic("ImporterMock.StartFunc: method is nil but Importer.Start was just called") } @@ -423,17 +425,19 @@ func (mock *ImporterMock) Start(ctx context.Context, settings *importer.Settings Obj service.ObjectKind Sup *supplements.Generator CaBundle *datasource.CABundle + Opts []service.Option }{ Ctx: ctx, Settings: settings, Obj: obj, Sup: sup, CaBundle: caBundle, + Opts: opts, } mock.lockStart.Lock() mock.calls.Start = append(mock.calls.Start, callInfo) mock.lockStart.Unlock() - return mock.StartFunc(ctx, settings, obj, sup, caBundle) + return mock.StartFunc(ctx, settings, obj, sup, caBundle, opts...) } // StartCalls gets all the calls that were made to Start. @@ -446,6 +450,7 @@ func (mock *ImporterMock) StartCalls() []struct { Obj service.ObjectKind Sup *supplements.Generator CaBundle *datasource.CABundle + Opts []service.Option } { var calls []struct { Ctx context.Context @@ -453,6 +458,7 @@ func (mock *ImporterMock) StartCalls() []struct { Obj service.ObjectKind Sup *supplements.Generator CaBundle *datasource.CABundle + Opts []service.Option } mock.lockStart.RLock() calls = mock.calls.Start @@ -575,7 +581,7 @@ var _ Uploader = &UploaderMock{} // ProtectFunc: func(ctx context.Context, pod *corev1.Pod, svc *corev1.Service, ing *netv1.Ingress) error { // panic("mock out the Protect method") // }, -// StartFunc: func(ctx context.Context, settings *uploader.Settings, obj service.ObjectKind, sup *supplements.Generator, caBundle *datasource.CABundle) error { +// StartFunc: func(ctx context.Context, settings *uploader.Settings, obj service.ObjectKind, sup *supplements.Generator, caBundle *datasource.CABundle, opts ...service.Option) error { // panic("mock out the Start method") // }, // UnprotectFunc: func(ctx context.Context, pod *corev1.Pod, svc *corev1.Service, ing *netv1.Ingress) error { @@ -610,7 +616,7 @@ type UploaderMock struct { ProtectFunc func(ctx context.Context, pod *corev1.Pod, svc *corev1.Service, ing *netv1.Ingress) error // StartFunc mocks the Start method. - StartFunc func(ctx context.Context, settings *uploader.Settings, obj service.ObjectKind, sup *supplements.Generator, caBundle *datasource.CABundle) error + StartFunc func(ctx context.Context, settings *uploader.Settings, obj service.ObjectKind, sup *supplements.Generator, caBundle *datasource.CABundle, opts ...service.Option) error // UnprotectFunc mocks the Unprotect method. UnprotectFunc func(ctx context.Context, pod *corev1.Pod, svc *corev1.Service, ing *netv1.Ingress) error @@ -682,6 +688,8 @@ type UploaderMock struct { Sup *supplements.Generator // CaBundle is the caBundle argument value. CaBundle *datasource.CABundle + // Opts is the opts argument value. + Opts []service.Option } // Unprotect holds details about calls to the Unprotect method. Unprotect []struct { @@ -967,7 +975,7 @@ func (mock *UploaderMock) ProtectCalls() []struct { } // Start calls StartFunc. -func (mock *UploaderMock) Start(ctx context.Context, settings *uploader.Settings, obj service.ObjectKind, sup *supplements.Generator, caBundle *datasource.CABundle) error { +func (mock *UploaderMock) Start(ctx context.Context, settings *uploader.Settings, obj service.ObjectKind, sup *supplements.Generator, caBundle *datasource.CABundle, opts ...service.Option) error { if mock.StartFunc == nil { panic("UploaderMock.StartFunc: method is nil but Uploader.Start was just called") } @@ -977,17 +985,19 @@ func (mock *UploaderMock) Start(ctx context.Context, settings *uploader.Settings Obj service.ObjectKind Sup *supplements.Generator CaBundle *datasource.CABundle + Opts []service.Option }{ Ctx: ctx, Settings: settings, Obj: obj, Sup: sup, CaBundle: caBundle, + Opts: opts, } mock.lockStart.Lock() mock.calls.Start = append(mock.calls.Start, callInfo) mock.lockStart.Unlock() - return mock.StartFunc(ctx, settings, obj, sup, caBundle) + return mock.StartFunc(ctx, settings, obj, sup, caBundle, opts...) } // StartCalls gets all the calls that were made to Start. @@ -1000,6 +1010,7 @@ func (mock *UploaderMock) StartCalls() []struct { Obj service.ObjectKind Sup *supplements.Generator CaBundle *datasource.CABundle + Opts []service.Option } { var calls []struct { Ctx context.Context @@ -1007,6 +1018,7 @@ func (mock *UploaderMock) StartCalls() []struct { Obj service.ObjectKind Sup *supplements.Generator CaBundle *datasource.CABundle + Opts []service.Option } mock.lockStart.RLock() calls = mock.calls.Start diff --git a/images/virtualization-artifact/pkg/controller/importer/importer_pod.go b/images/virtualization-artifact/pkg/controller/importer/importer_pod.go index 71f5b2d60..ce2ff03a4 100644 --- a/images/virtualization-artifact/pkg/controller/importer/importer_pod.go +++ b/images/virtualization-artifact/pkg/controller/importer/importer_pod.go @@ -31,6 +31,7 @@ import ( "github.com/deckhouse/virtualization-controller/pkg/common/annotations" "github.com/deckhouse/virtualization-controller/pkg/common/object" podutil "github.com/deckhouse/virtualization-controller/pkg/common/pod" + "github.com/deckhouse/virtualization-controller/pkg/common/provisioner" ) const ( @@ -89,51 +90,29 @@ type PodSettings struct { ImagePullSecrets []corev1.LocalObjectReference PriorityClassName string PVCName string - // workloadNodePlacement *sdkapi.NodePlacement + NodePlacement *provisioner.NodePlacement } // CreatePod creates and returns a pointer to a pod which is created based on the passed-in endpoint, secret // name, etc. A nil secret means the endpoint credentials are not passed to the // importer pod. func (imp *Importer) CreatePod(ctx context.Context, client client.Client) (*corev1.Pod, error) { - var err error - // args.ResourceRequirements, err = cc.GetDefaultPodResourceRequirements(client) - // if err != nil { - // return nil, err - // } - - // args.ImagePullSecrets, err = cc.GetImagePullSecrets(client) - // if err != nil { - // return nil, err - // } - - // args.workloadNodePlacement, err = cc.GetWorkloadNodePlacement(ctx, client) - // if err != nil { - // return nil, err - // } - - pod := imp.makeImporterPodSpec() - - if err = client.Create(ctx, pod); err != nil { + pod, err := imp.makeImporterPodSpec() + if err != nil { return nil, err } - return pod, nil -} - -// CleanupPod deletes importer Pod. -// No need to delete CABundle configmap and auth Secret. They have ownerRef and will be gc'ed. -func CleanupPod(ctx context.Context, client client.Client, pod *corev1.Pod) error { - if pod == nil { - return nil + err = client.Create(ctx, pod) + if err != nil { + return nil, err } - return object.CleanupObject(ctx, client, pod) + return pod, nil } // makeImporterPodSpec creates and return the importer pod spec based on the passed-in endpoint, secret and pvc. -func (imp *Importer) makeImporterPodSpec() *corev1.Pod { - pod := &corev1.Pod{ +func (imp *Importer) makeImporterPodSpec() (*corev1.Pod, error) { + pod := corev1.Pod{ TypeMeta: metav1.TypeMeta{ Kind: "Pod", APIVersion: "v1", @@ -144,36 +123,37 @@ func (imp *Importer) makeImporterPodSpec() *corev1.Pod { Annotations: map[string]string{ annotations.AnnCreatedBy: "yes", }, - // Labels: map[string]string{ - // common.CDILabelKey: common.CDILabelValue, - // common.CDIComponentLabel: common.ImporterPodNamePrefix, - // common.PrometheusLabelKey: common.PrometheusLabelValue, - // }, OwnerReferences: []metav1.OwnerReference{ imp.PodSettings.OwnerReference, }, }, Spec: corev1.PodSpec{ // Container and volumes will be added later. - Containers: []corev1.Container{}, - Volumes: []corev1.Volume{}, - RestartPolicy: corev1.RestartPolicyOnFailure, - // NodeSelector: args.workloadNodePlacement.NodeSelector, - // Tolerations: args.workloadNodePlacement.Tolerations, - // Affinity: args.workloadNodePlacement.Affinity, + Containers: []corev1.Container{}, + Volumes: []corev1.Volume{}, + RestartPolicy: corev1.RestartPolicyOnFailure, PriorityClassName: imp.PodSettings.PriorityClassName, ImagePullSecrets: imp.PodSettings.ImagePullSecrets, }, } - annotations.SetRecommendedLabels(pod, imp.PodSettings.InstallerLabels, imp.PodSettings.ControllerName) + if imp.PodSettings.NodePlacement != nil && len(imp.PodSettings.NodePlacement.Tolerations) > 0 { + pod.Spec.Tolerations = imp.PodSettings.NodePlacement.Tolerations + + err := provisioner.KeepNodePlacementTolerations(imp.PodSettings.NodePlacement, &pod) + if err != nil { + return nil, err + } + } + + annotations.SetRecommendedLabels(&pod, imp.PodSettings.InstallerLabels, imp.PodSettings.ControllerName) podutil.SetRestrictedSecurityContext(&pod.Spec) container := imp.makeImporterContainerSpec() - imp.addVolumes(pod, container) + imp.addVolumes(&pod, container) pod.Spec.Containers = append(pod.Spec.Containers, *container) - return pod + return &pod, nil } func (imp *Importer) makeImporterContainerSpec() *corev1.Container { diff --git a/images/virtualization-artifact/pkg/controller/importer/importer_pod_test.go b/images/virtualization-artifact/pkg/controller/importer/importer_pod_test.go index dfa4c4195..45d2c1713 100644 --- a/images/virtualization-artifact/pkg/controller/importer/importer_pod_test.go +++ b/images/virtualization-artifact/pkg/controller/importer/importer_pod_test.go @@ -19,6 +19,7 @@ package importer import ( "testing" + "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/ptr" @@ -53,7 +54,8 @@ func Test_MakePodSpec(t *testing.T) { imp := NewImporter(podSettings, settings) - pod := imp.makeImporterPodSpec() + pod, err := imp.makeImporterPodSpec() + require.NoError(t, err) if pod.Namespace == "" { t.Fatalf("pod.Namespace should not be empty!") @@ -90,7 +92,8 @@ func Test_MakePodSpec_CABundle(t *testing.T) { imp := NewImporter(podSettings, settings) - pod := imp.makeImporterPodSpec() + pod, err := imp.makeImporterPodSpec() + require.NoError(t, err) hasCAVol := false for _, vol := range pod.Spec.Volumes { diff --git a/images/virtualization-artifact/pkg/controller/importer/node_placement.go b/images/virtualization-artifact/pkg/controller/importer/node_placement.go deleted file mode 100644 index e9511d562..000000000 --- a/images/virtualization-artifact/pkg/controller/importer/node_placement.go +++ /dev/null @@ -1,47 +0,0 @@ -/* -Copyright 2024 Flant JSC - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package importer - -import corev1 "k8s.io/api/core/v1" - -// NodePlacement describes node scheduling configuration. -// +k8s:openapi-gen=true -type NodePlacement struct { - // nodeSelector is the node selector applied to the relevant kind of pods - // It specifies a map of key-value pairs: for the pod to be eligible to run on a node, - // the node must have each of the indicated key-value pairs as labels - // (it can have additional labels as well). - // See https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#nodeselector - // +kubebuilder:validation:Optional - // +optional - NodeSelector map[string]string `json:"nodeSelector,omitempty"` - - // affinity enables pod affinity/anti-affinity placement expanding the types of constraints - // that can be expressed with nodeSelector. - // affinity is going to be applied to the relevant kind of pods in parallel with nodeSelector - // See https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#affinity-and-anti-affinity - // +kubebuilder:validation:Optional - // +optional - Affinity *corev1.Affinity `json:"affinity,omitempty"` - - // tolerations is a list of tolerations applied to the relevant kind of pods - // See https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/ for more info. - // These are additional tolerations other than default ones. - // +kubebuilder:validation:Optional - // +optional - Tolerations []corev1.Toleration `json:"tolerations,omitempty"` -} diff --git a/images/virtualization-artifact/pkg/controller/kvbuilder/dv.go b/images/virtualization-artifact/pkg/controller/kvbuilder/dv.go index ce1aac2b8..b8ec43d43 100644 --- a/images/virtualization-artifact/pkg/controller/kvbuilder/dv.go +++ b/images/virtualization-artifact/pkg/controller/kvbuilder/dv.go @@ -17,6 +17,9 @@ limitations under the License. package kvbuilder import ( + "encoding/json" + "fmt" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -24,6 +27,8 @@ import ( cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1" "github.com/deckhouse/virtualization-controller/pkg/common" + "github.com/deckhouse/virtualization-controller/pkg/common/annotations" + "github.com/deckhouse/virtualization-controller/pkg/common/provisioner" "github.com/deckhouse/virtualization-controller/pkg/common/pvc" "github.com/deckhouse/virtualization-controller/pkg/common/resource_builder" ) @@ -76,6 +81,31 @@ func (b *DV) SetDataSource(source *cdiv1.DataVolumeSource) { b.Resource.Spec.Source = source } +func (b *DV) SetNodePlacement(nodePlacement *provisioner.NodePlacement) error { + if nodePlacement == nil || len(nodePlacement.Tolerations) == 0 { + return nil + } + + anno := b.Resource.GetAnnotations() + if anno == nil { + anno = make(map[string]string) + } + + data, err := json.Marshal(nodePlacement.Tolerations) + if err != nil { + return fmt.Errorf("failed to marshal tolerations: %w", err) + } + + anno[annotations.AnnProvisionerTolerations] = string(data) + + err = provisioner.KeepNodePlacementTolerations(nodePlacement, b.Resource) + if err != nil { + return fmt.Errorf("failed to keep node placement: %w", err) + } + + return nil +} + func (b *DV) SetRegistryDataSource(imageName, authSecret, caBundleConfigMap string) { url := common.DockerRegistrySchemePrefix + imageName diff --git a/images/virtualization-artifact/pkg/controller/service/condition.go b/images/virtualization-artifact/pkg/controller/service/condition.go index d8b9ec424..6cc581e54 100644 --- a/images/virtualization-artifact/pkg/controller/service/condition.go +++ b/images/virtualization-artifact/pkg/controller/service/condition.go @@ -36,16 +36,6 @@ func CapitalizeFirstLetter(s string) string { return string(runes) } -func GetPodCondition(condType corev1.PodConditionType, conds []corev1.PodCondition) (corev1.PodCondition, bool) { - for _, cond := range conds { - if cond.Type == condType { - return cond, true - } - } - - return corev1.PodCondition{}, false -} - func GetDataVolumeCondition(conditionType cdiv1.DataVolumeConditionType, conditions []cdiv1.DataVolumeCondition) *cdiv1.DataVolumeCondition { for i, condition := range conditions { if condition.Type == conditionType { diff --git a/images/virtualization-artifact/pkg/controller/service/disk_service.go b/images/virtualization-artifact/pkg/controller/service/disk_service.go index 9c4388051..e68e0d9b5 100644 --- a/images/virtualization-artifact/pkg/controller/service/disk_service.go +++ b/images/virtualization-artifact/pkg/controller/service/disk_service.go @@ -41,6 +41,7 @@ import ( dvutil "github.com/deckhouse/virtualization-controller/pkg/common/datavolume" "github.com/deckhouse/virtualization-controller/pkg/common/object" "github.com/deckhouse/virtualization-controller/pkg/common/pointer" + "github.com/deckhouse/virtualization-controller/pkg/controller/conditions" "github.com/deckhouse/virtualization-controller/pkg/controller/kvbuilder" "github.com/deckhouse/virtualization-controller/pkg/controller/supplements" "github.com/deckhouse/virtualization-controller/pkg/dvcr" @@ -72,11 +73,24 @@ func (s DiskService) Start( source *cdiv1.DataVolumeSource, obj ObjectKind, sup *supplements.Generator, + opts ...Option, ) error { dvBuilder := kvbuilder.NewDV(sup.DataVolume()) dvBuilder.SetDataSource(source) dvBuilder.SetOwnerRef(obj, obj.GroupVersionKind()) + for _, opt := range opts { + switch v := opt.(type) { + case *NodePlacementOption: + err := dvBuilder.SetNodePlacement(v.nodePlacement) + if err != nil { + return fmt.Errorf("set node placement: %w", err) + } + default: + return fmt.Errorf("unknown Start option") + } + } + sc, err := s.GetStorageClass(ctx, storageClass) if err != nil { return err @@ -151,6 +165,25 @@ func (s DiskService) StartImmediate( return supplements.EnsureForDataVolume(ctx, s.client, sup, dvBuilder.GetResource(), s.dvcrSettings) } +func (s DiskService) CheckProvisioning(ctx context.Context, pvc *corev1.PersistentVolumeClaim) error { + podName, ok := pvc.Annotations[annotations.AnnProvisionerName] + if !ok || podName == "" { + return nil + } + + pod, err := object.FetchObject(ctx, types.NamespacedName{Name: podName, Namespace: pvc.Namespace}, s.client, &corev1.Pod{}) + if err != nil { + return fmt.Errorf("failed to fetch data volume provisioner %s: %w", podName, err) + } + + scheduled, _ := conditions.GetPodCondition(corev1.PodScheduled, pod.Status.Conditions) + if scheduled.Status == corev1.ConditionFalse && scheduled.Reason == corev1.PodReasonUnschedulable { + return ErrDataVolumeProvisionerUnschedulable + } + + return nil +} + func (s DiskService) CreatePersistentVolumeClaim(ctx context.Context, pvc *corev1.PersistentVolumeClaim) error { err := s.client.Create(ctx, pvc) if err != nil && !k8serrors.IsAlreadyExists(err) { @@ -482,12 +515,12 @@ func (s DiskService) CheckImportProcess(ctx context.Context, dv *cdiv1.DataVolum } if cdiImporterPrime != nil { - podInitializedCond, ok := GetPodCondition(corev1.PodInitialized, cdiImporterPrime.Status.Conditions) + podInitializedCond, ok := conditions.GetPodCondition(corev1.PodInitialized, cdiImporterPrime.Status.Conditions) if ok && podInitializedCond.Status == corev1.ConditionFalse && strings.Contains(podInitializedCond.Reason, "Error") { return fmt.Errorf("%w; %s error %s: %s", ErrDataVolumeNotRunning, key.String(), podInitializedCond.Reason, podInitializedCond.Message) } - podScheduledCond, ok := GetPodCondition(corev1.PodScheduled, cdiImporterPrime.Status.Conditions) + podScheduledCond, ok := conditions.GetPodCondition(corev1.PodScheduled, cdiImporterPrime.Status.Conditions) if ok && podScheduledCond.Status == corev1.ConditionFalse && strings.Contains(podScheduledCond.Reason, "Error") { return fmt.Errorf("%w; %s error %s: %s", ErrDataVolumeNotRunning, key.String(), podScheduledCond.Reason, podScheduledCond.Message) } diff --git a/images/virtualization-artifact/pkg/controller/service/errors.go b/images/virtualization-artifact/pkg/controller/service/errors.go index 341c712a2..41128242d 100644 --- a/images/virtualization-artifact/pkg/controller/service/errors.go +++ b/images/virtualization-artifact/pkg/controller/service/errors.go @@ -19,11 +19,12 @@ package service import "errors" var ( - ErrStorageClassNotFound = errors.New("storage class not found") - ErrStorageProfileNotFound = errors.New("storage profile not found") - ErrDefaultStorageClassNotFound = errors.New("default storage class not found") - ErrStorageClassNotAllowed = errors.New("storage class not allowed") - ErrDataVolumeNotRunning = errors.New("pvc importer is not running") + ErrStorageClassNotFound = errors.New("storage class not found") + ErrStorageProfileNotFound = errors.New("storage profile not found") + ErrDefaultStorageClassNotFound = errors.New("default storage class not found") + ErrStorageClassNotAllowed = errors.New("storage class not allowed") + ErrDataVolumeNotRunning = errors.New("pvc importer is not running") + ErrDataVolumeProvisionerUnschedulable = errors.New("provisioner unschedulable") ) var ( diff --git a/images/virtualization-artifact/pkg/controller/service/importer_service.go b/images/virtualization-artifact/pkg/controller/service/importer_service.go index 6af668a1a..ccbbb1a44 100644 --- a/images/virtualization-artifact/pkg/controller/service/importer_service.go +++ b/images/virtualization-artifact/pkg/controller/service/importer_service.go @@ -27,6 +27,7 @@ import ( "github.com/deckhouse/virtualization-controller/pkg/common/annotations" "github.com/deckhouse/virtualization-controller/pkg/common/datasource" + "github.com/deckhouse/virtualization-controller/pkg/common/provisioner" "github.com/deckhouse/virtualization-controller/pkg/controller/importer" "github.com/deckhouse/virtualization-controller/pkg/controller/supplements" "github.com/deckhouse/virtualization-controller/pkg/dvcr" @@ -65,11 +66,39 @@ func NewImporterService( } } -func (s ImporterService) Start(ctx context.Context, settings *importer.Settings, obj ObjectKind, sup *supplements.Generator, caBundle *datasource.CABundle) error { +type Option interface{} + +type NodePlacementOption struct { + nodePlacement *provisioner.NodePlacement +} + +func WithNodePlacement(nodePlacement *provisioner.NodePlacement) Option { + return &NodePlacementOption{nodePlacement: nodePlacement} +} + +func (s ImporterService) Start( + ctx context.Context, + settings *importer.Settings, + obj ObjectKind, + sup *supplements.Generator, + caBundle *datasource.CABundle, + opts ...Option, +) error { ownerRef := metav1.NewControllerRef(obj, obj.GroupVersionKind()) settings.Verbose = s.verbose - pod, err := importer.NewImporter(s.getPodSettings(ownerRef, sup), settings).CreatePod(ctx, s.client) + podSettings := s.getPodSettings(ownerRef, sup) + + for _, opt := range opts { + switch v := opt.(type) { + case *NodePlacementOption: + podSettings.NodePlacement = v.nodePlacement + default: + return fmt.Errorf("unknown Start option") + } + } + + pod, err := importer.NewImporter(podSettings, settings).CreatePod(ctx, s.client) if err != nil && !k8serrors.IsAlreadyExists(err) { return err } diff --git a/images/virtualization-artifact/pkg/controller/service/stat_service.go b/images/virtualization-artifact/pkg/controller/service/stat_service.go index ba5634193..ec01a841e 100644 --- a/images/virtualization-artifact/pkg/controller/service/stat_service.go +++ b/images/virtualization-artifact/pkg/controller/service/stat_service.go @@ -34,6 +34,7 @@ import ( "github.com/deckhouse/virtualization-controller/pkg/common/imageformat" "github.com/deckhouse/virtualization-controller/pkg/common/percent" podutil "github.com/deckhouse/virtualization-controller/pkg/common/pod" + "github.com/deckhouse/virtualization-controller/pkg/controller/conditions" "github.com/deckhouse/virtualization-controller/pkg/controller/monitoring" virtv2 "github.com/deckhouse/virtualization/api/core/v1alpha2" ) @@ -108,12 +109,12 @@ func (s StatService) CheckPod(pod *corev1.Pod) error { return errors.New("nil pod") } - podInitializedCond, ok := GetPodCondition(corev1.PodInitialized, pod.Status.Conditions) + podInitializedCond, ok := conditions.GetPodCondition(corev1.PodInitialized, pod.Status.Conditions) if ok && podInitializedCond.Status == corev1.ConditionFalse { return fmt.Errorf("provisioning Pod %s/%s is %w: %s", pod.Namespace, pod.Name, ErrNotInitialized, podInitializedCond.Message) } - podScheduledCond, ok := GetPodCondition(corev1.PodScheduled, pod.Status.Conditions) + podScheduledCond, ok := conditions.GetPodCondition(corev1.PodScheduled, pod.Status.Conditions) if ok && podScheduledCond.Status == corev1.ConditionFalse { return fmt.Errorf("provisioning Pod %s/%s is %w: %s", pod.Namespace, pod.Name, ErrNotScheduled, podScheduledCond.Message) } diff --git a/images/virtualization-artifact/pkg/controller/service/uploader_service.go b/images/virtualization-artifact/pkg/controller/service/uploader_service.go index d99a37911..5fd98229e 100644 --- a/images/virtualization-artifact/pkg/controller/service/uploader_service.go +++ b/images/virtualization-artifact/pkg/controller/service/uploader_service.go @@ -67,11 +67,29 @@ func NewUploaderService( } } -func (s UploaderService) Start(ctx context.Context, settings *uploader.Settings, obj ObjectKind, sup *supplements.Generator, caBundle *datasource.CABundle) error { +func (s UploaderService) Start( + ctx context.Context, + settings *uploader.Settings, + obj ObjectKind, + sup *supplements.Generator, + caBundle *datasource.CABundle, + opts ...Option, +) error { ownerRef := metav1.NewControllerRef(obj, obj.GroupVersionKind()) settings.Verbose = s.verbose - pod, err := uploader.NewPod(s.getPodSettings(ownerRef, sup), settings).Create(ctx, s.client) + podSettings := s.getPodSettings(ownerRef, sup) + + for _, opt := range opts { + switch v := opt.(type) { + case *NodePlacementOption: + podSettings.NodePlacement = v.nodePlacement + default: + return fmt.Errorf("unknown Start option") + } + } + + pod, err := uploader.NewPod(podSettings, settings).Create(ctx, s.client) if err != nil && !k8serrors.IsAlreadyExists(err) { return err } diff --git a/images/virtualization-artifact/pkg/controller/uploader/uploader_pod.go b/images/virtualization-artifact/pkg/controller/uploader/uploader_pod.go index 056f42959..198517dab 100644 --- a/images/virtualization-artifact/pkg/controller/uploader/uploader_pod.go +++ b/images/virtualization-artifact/pkg/controller/uploader/uploader_pod.go @@ -30,6 +30,7 @@ import ( "github.com/deckhouse/virtualization-controller/pkg/common/annotations" "github.com/deckhouse/virtualization-controller/pkg/common/object" podutil "github.com/deckhouse/virtualization-controller/pkg/common/pod" + "github.com/deckhouse/virtualization-controller/pkg/common/provisioner" ) const ( @@ -64,22 +65,27 @@ type PodSettings struct { ImagePullSecrets []corev1.LocalObjectReference PriorityClassName string ServiceName string + NodePlacement *provisioner.NodePlacement } // Create creates and returns a pointer to a pod which is created based on the passed-in endpoint, secret // name, etc. A nil secret means the endpoint credentials are not passed to the uploader pod. func (p *Pod) Create(ctx context.Context, client client.Client) (*corev1.Pod, error) { - pod := p.makeSpec() + pod, err := p.makeSpec() + if err != nil { + return nil, err + } - if err := client.Create(ctx, pod); err != nil { + err = client.Create(ctx, pod) + if err != nil { return nil, err } return pod, nil } -func (p *Pod) makeSpec() *corev1.Pod { - pod := &corev1.Pod{ +func (p *Pod) makeSpec() (*corev1.Pod, error) { + pod := corev1.Pod{ TypeMeta: metav1.TypeMeta{ Kind: "Pod", APIVersion: "v1", @@ -107,14 +113,23 @@ func (p *Pod) makeSpec() *corev1.Pod { }, } - annotations.SetRecommendedLabels(pod, p.PodSettings.InstallerLabels, p.PodSettings.ControllerName) + if p.PodSettings.NodePlacement != nil && len(p.PodSettings.NodePlacement.Tolerations) > 0 { + pod.Spec.Tolerations = p.PodSettings.NodePlacement.Tolerations + + err := provisioner.KeepNodePlacementTolerations(p.PodSettings.NodePlacement, &pod) + if err != nil { + return nil, err + } + } + + annotations.SetRecommendedLabels(&pod, p.PodSettings.InstallerLabels, p.PodSettings.ControllerName) podutil.SetRestrictedSecurityContext(&pod.Spec) container := p.makeUploaderContainerSpec() - p.addVolumes(pod, container) + p.addVolumes(&pod, container) pod.Spec.Containers = append(pod.Spec.Containers, *container) - return pod + return &pod, nil } func (p *Pod) makeUploaderContainerSpec() *corev1.Container { diff --git a/images/virtualization-artifact/pkg/controller/uploader/uploader_pod_test.go b/images/virtualization-artifact/pkg/controller/uploader/uploader_pod_test.go index 1f11e4d3b..b0ab9c1db 100644 --- a/images/virtualization-artifact/pkg/controller/uploader/uploader_pod_test.go +++ b/images/virtualization-artifact/pkg/controller/uploader/uploader_pod_test.go @@ -19,6 +19,7 @@ package uploader import ( "testing" + "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/ptr" @@ -50,7 +51,8 @@ func Test_MakePodSpec(t *testing.T) { imp := NewPod(podSettings, settings) - pod := imp.makeSpec() + pod, err := imp.makeSpec() + require.NoError(t, err) if pod.Namespace == "" { t.Fatalf("pod.Namespace should not be empty!") diff --git a/images/virtualization-artifact/pkg/controller/vd/internal/source/blank.go b/images/virtualization-artifact/pkg/controller/vd/internal/source/blank.go index eef7fdfb2..82431df40 100644 --- a/images/virtualization-artifact/pkg/controller/vd/internal/source/blank.go +++ b/images/virtualization-artifact/pkg/controller/vd/internal/source/blank.go @@ -19,14 +19,17 @@ package source import ( "context" "errors" + "fmt" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/deckhouse/virtualization-controller/pkg/common/annotations" "github.com/deckhouse/virtualization-controller/pkg/common/object" + "github.com/deckhouse/virtualization-controller/pkg/common/provisioner" "github.com/deckhouse/virtualization-controller/pkg/controller/conditions" "github.com/deckhouse/virtualization-controller/pkg/controller/service" "github.com/deckhouse/virtualization-controller/pkg/controller/supplements" @@ -41,17 +44,20 @@ type BlankDataSource struct { statService *service.StatService diskService *service.DiskService storageClassService *service.VirtualDiskStorageClassService + client client.Client } func NewBlankDataSource( statService *service.StatService, diskService *service.DiskService, storageClassService *service.VirtualDiskStorageClassService, + client client.Client, ) *BlankDataSource { return &BlankDataSource{ statService: statService, diskService: diskService, storageClassService: storageClassService, + client: client, } } @@ -80,7 +86,7 @@ func (ds BlankDataSource) Sync(ctx context.Context, vd *virtv2.VirtualDisk) (rec switch { case isDiskProvisioningFinished(condition): - log.Debug("Disk provisioning finished: clean up") + log.Info("Disk provisioning finished: clean up") setPhaseConditionForFinishedDisk(pvc, cb, &vd.Status.Phase, supgen) @@ -113,7 +119,14 @@ func (ds BlankDataSource) Sync(ctx context.Context, vd *virtv2.VirtualDisk) (rec source := ds.getSource() - err = ds.diskService.Start(ctx, diskSize, sc, source, vd, supgen) + var nodePlacement *provisioner.NodePlacement + nodePlacement, err = getNodePlacement(ctx, ds.client, vd) + if err != nil { + setPhaseConditionToFailed(cb, &vd.Status.Phase, fmt.Errorf("unexpected error: %w", err)) + return reconcile.Result{}, fmt.Errorf("fauled to get importer tolerations: %w", err) + } + + err = ds.diskService.Start(ctx, diskSize, sc, source, vd, supgen, service.WithNodePlacement(nodePlacement)) if updated, err := setPhaseConditionFromStorageError(err, vd, cb); err != nil || updated { return reconcile.Result{}, err @@ -127,6 +140,7 @@ func (ds BlankDataSource) Sync(ctx context.Context, vd *virtv2.VirtualDisk) (rec return reconcile.Result{Requeue: true}, nil case pvc == nil: + log.Info("PVC not found") vd.Status.Phase = virtv2.DiskProvisioning cb. Status(metav1.ConditionFalse). @@ -148,6 +162,11 @@ func (ds BlankDataSource) Sync(ctx context.Context, vd *virtv2.VirtualDisk) (rec default: log.Info("Provisioning to PVC is in progress", "dvProgress", dv.Status.Progress, "dvPhase", dv.Status.Phase, "pvcPhase", pvc.Status.Phase) + err = ds.diskService.CheckProvisioning(ctx, pvc) + if err != nil { + return reconcile.Result{}, setPhaseConditionFromProvisioningError(ctx, err, cb, vd, dv, ds.diskService, ds.client) + } + vd.Status.Progress = ds.diskService.GetProgress(dv, vd.Status.Progress, service.NewScaleOption(0, 100)) vd.Status.Capacity = ds.diskService.GetCapacity(pvc) vd.Status.Target.PersistentVolumeClaim = dv.Status.ClaimName diff --git a/images/virtualization-artifact/pkg/controller/vd/internal/source/http.go b/images/virtualization-artifact/pkg/controller/vd/internal/source/http.go index a06ebfb97..1e5493c18 100644 --- a/images/virtualization-artifact/pkg/controller/vd/internal/source/http.go +++ b/images/virtualization-artifact/pkg/controller/vd/internal/source/http.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/deckhouse/virtualization-controller/pkg/common" @@ -33,6 +34,7 @@ import ( "github.com/deckhouse/virtualization-controller/pkg/common/imageformat" "github.com/deckhouse/virtualization-controller/pkg/common/object" podutil "github.com/deckhouse/virtualization-controller/pkg/common/pod" + "github.com/deckhouse/virtualization-controller/pkg/common/provisioner" "github.com/deckhouse/virtualization-controller/pkg/controller/conditions" "github.com/deckhouse/virtualization-controller/pkg/controller/importer" "github.com/deckhouse/virtualization-controller/pkg/controller/service" @@ -51,6 +53,7 @@ type HTTPDataSource struct { diskService *service.DiskService dvcrSettings *dvcr.Settings storageClassService *service.VirtualDiskStorageClassService + client client.Client } func NewHTTPDataSource( @@ -59,6 +62,7 @@ func NewHTTPDataSource( diskService *service.DiskService, dvcrSettings *dvcr.Settings, storageClassService *service.VirtualDiskStorageClassService, + client client.Client, ) *HTTPDataSource { return &HTTPDataSource{ statService: statService, @@ -66,6 +70,7 @@ func NewHTTPDataSource( diskService: diskService, dvcrSettings: dvcrSettings, storageClassService: storageClassService, + client: client, } } @@ -128,7 +133,15 @@ func (ds HTTPDataSource) Sync(ctx context.Context, vd *virtv2.VirtualDisk) (reco vd.Status.Progress = "0%" envSettings := ds.getEnvSettings(vd, supgen) - err = ds.importerService.Start(ctx, envSettings, vd, supgen, datasource.NewCABundleForVMD(vd.GetNamespace(), vd.Spec.DataSource)) + + var nodePlacement *provisioner.NodePlacement + nodePlacement, err = getNodePlacement(ctx, ds.client, vd) + if err != nil { + setPhaseConditionToFailed(cb, &vd.Status.Phase, fmt.Errorf("unexpected error: %w", err)) + return reconcile.Result{}, fmt.Errorf("fauled to get importer tolerations: %w", err) + } + + err = ds.importerService.Start(ctx, envSettings, vd, supgen, datasource.NewCABundleForVMD(vd.GetNamespace(), vd.Spec.DataSource), service.WithNodePlacement(nodePlacement)) switch { case err == nil: // OK. @@ -151,24 +164,7 @@ func (ds HTTPDataSource) Sync(ctx context.Context, vd *virtv2.VirtualDisk) (reco err = ds.statService.CheckPod(pod) if err != nil { - vd.Status.Phase = virtv2.DiskFailed - - switch { - case errors.Is(err, service.ErrNotInitialized), errors.Is(err, service.ErrNotScheduled): - cb. - Status(metav1.ConditionFalse). - Reason(vdcondition.ProvisioningNotStarted). - Message(service.CapitalizeFirstLetter(err.Error() + ".")) - return reconcile.Result{}, nil - case errors.Is(err, service.ErrProvisioningFailed): - cb. - Status(metav1.ConditionFalse). - Reason(vdcondition.ProvisioningFailed). - Message(service.CapitalizeFirstLetter(err.Error() + ".")) - return reconcile.Result{}, nil - default: - return reconcile.Result{}, err - } + return reconcile.Result{}, setPhaseConditionFromPodError(ctx, err, pod, vd, cb, ds.client) } err = ds.importerService.Protect(ctx, pod) @@ -225,7 +221,14 @@ func (ds HTTPDataSource) Sync(ctx context.Context, vd *virtv2.VirtualDisk) (reco source := ds.getSource(supgen, ds.statService.GetDVCRImageName(pod)) - err = ds.diskService.Start(ctx, diskSize, sc, source, vd, supgen) + var nodePlacement *provisioner.NodePlacement + nodePlacement, err = getNodePlacement(ctx, ds.client, vd) + if err != nil { + setPhaseConditionToFailed(cb, &vd.Status.Phase, fmt.Errorf("unexpected error: %w", err)) + return reconcile.Result{}, fmt.Errorf("fauled to get importer tolerations: %w", err) + } + + err = ds.diskService.Start(ctx, diskSize, sc, source, vd, supgen, service.WithNodePlacement(nodePlacement)) if updated, err := setPhaseConditionFromStorageError(err, vd, cb); err != nil || updated { return reconcile.Result{}, err } @@ -259,6 +262,11 @@ func (ds HTTPDataSource) Sync(ctx context.Context, vd *virtv2.VirtualDisk) (reco default: log.Info("Provisioning to PVC is in progress", "dvProgress", dv.Status.Progress, "dvPhase", dv.Status.Phase, "pvcPhase", pvc.Status.Phase) + err = ds.diskService.CheckProvisioning(ctx, pvc) + if err != nil { + return reconcile.Result{}, setPhaseConditionFromProvisioningError(ctx, err, cb, vd, dv, ds.diskService, ds.client) + } + vd.Status.Progress = ds.diskService.GetProgress(dv, vd.Status.Progress, service.NewScaleOption(50, 100)) vd.Status.Capacity = ds.diskService.GetCapacity(pvc) vd.Status.Target.PersistentVolumeClaim = dv.Status.ClaimName diff --git a/images/virtualization-artifact/pkg/controller/vd/internal/source/object_ref.go b/images/virtualization-artifact/pkg/controller/vd/internal/source/object_ref.go index f289c475f..5019c723c 100644 --- a/images/virtualization-artifact/pkg/controller/vd/internal/source/object_ref.go +++ b/images/virtualization-artifact/pkg/controller/vd/internal/source/object_ref.go @@ -49,7 +49,7 @@ func NewObjectRefDataSource( diskService: diskService, vdSnapshotSyncer: NewObjectRefVirtualDiskSnapshot(diskService), viDVCRSyncer: NewObjectRefVirtualImageDVCR(statService, diskService, storageClassService, client), - viPVCSyncer: NewObjectRefVirtualImagePVC(diskService, storageClassService), + viPVCSyncer: NewObjectRefVirtualImagePVC(diskService, storageClassService, client), cviSyncer: NewObjectRefClusterVirtualImage(statService, diskService, storageClassService, client), } } diff --git a/images/virtualization-artifact/pkg/controller/vd/internal/source/object_ref_cvi.go b/images/virtualization-artifact/pkg/controller/vd/internal/source/object_ref_cvi.go index ebc47910f..34fc50f13 100644 --- a/images/virtualization-artifact/pkg/controller/vd/internal/source/object_ref_cvi.go +++ b/images/virtualization-artifact/pkg/controller/vd/internal/source/object_ref_cvi.go @@ -32,6 +32,7 @@ import ( "github.com/deckhouse/virtualization-controller/pkg/common/imageformat" "github.com/deckhouse/virtualization-controller/pkg/common/object" "github.com/deckhouse/virtualization-controller/pkg/common/pointer" + "github.com/deckhouse/virtualization-controller/pkg/common/provisioner" "github.com/deckhouse/virtualization-controller/pkg/controller/conditions" "github.com/deckhouse/virtualization-controller/pkg/controller/service" "github.com/deckhouse/virtualization-controller/pkg/controller/supplements" @@ -141,7 +142,14 @@ func (ds ObjectRefClusterVirtualImage) Sync(ctx context.Context, vd *virtv2.Virt source := ds.getSource(supgen, cvi) - err = ds.diskService.Start(ctx, diskSize, sc, source, vd, supgen) + var nodePlacement *provisioner.NodePlacement + nodePlacement, err = getNodePlacement(ctx, ds.client, vd) + if err != nil { + setPhaseConditionToFailed(cb, &vd.Status.Phase, fmt.Errorf("unexpected error: %w", err)) + return reconcile.Result{}, fmt.Errorf("fauled to get importer tolerations: %w", err) + } + + err = ds.diskService.Start(ctx, diskSize, sc, source, vd, supgen, service.WithNodePlacement(nodePlacement)) if updated, err := setPhaseConditionFromStorageError(err, vd, cb); err != nil || updated { return reconcile.Result{}, err } @@ -175,6 +183,11 @@ func (ds ObjectRefClusterVirtualImage) Sync(ctx context.Context, vd *virtv2.Virt default: log.Info("Provisioning to PVC is in progress", "dvProgress", dv.Status.Progress, "dvPhase", dv.Status.Phase, "pvcPhase", pvc.Status.Phase) + err = ds.diskService.CheckProvisioning(ctx, pvc) + if err != nil { + return reconcile.Result{}, setPhaseConditionFromProvisioningError(ctx, err, cb, vd, dv, ds.diskService, ds.client) + } + vd.Status.Progress = ds.diskService.GetProgress(dv, vd.Status.Progress, service.NewScaleOption(0, 100)) vd.Status.Capacity = ds.diskService.GetCapacity(pvc) vd.Status.Target.PersistentVolumeClaim = dv.Status.ClaimName diff --git a/images/virtualization-artifact/pkg/controller/vd/internal/source/object_ref_vi_dvcr.go b/images/virtualization-artifact/pkg/controller/vd/internal/source/object_ref_vi_dvcr.go index 9d6417178..575c19faf 100644 --- a/images/virtualization-artifact/pkg/controller/vd/internal/source/object_ref_vi_dvcr.go +++ b/images/virtualization-artifact/pkg/controller/vd/internal/source/object_ref_vi_dvcr.go @@ -32,6 +32,7 @@ import ( "github.com/deckhouse/virtualization-controller/pkg/common/imageformat" "github.com/deckhouse/virtualization-controller/pkg/common/object" "github.com/deckhouse/virtualization-controller/pkg/common/pointer" + "github.com/deckhouse/virtualization-controller/pkg/common/provisioner" "github.com/deckhouse/virtualization-controller/pkg/controller/conditions" "github.com/deckhouse/virtualization-controller/pkg/controller/service" "github.com/deckhouse/virtualization-controller/pkg/controller/supplements" @@ -140,7 +141,14 @@ func (ds ObjectRefVirtualImageDVCR) Sync(ctx context.Context, vd *virtv2.Virtual source := ds.getSource(supgen, vi) - err = ds.diskService.Start(ctx, diskSize, sc, source, vd, supgen) + var nodePlacement *provisioner.NodePlacement + nodePlacement, err = getNodePlacement(ctx, ds.client, vd) + if err != nil { + setPhaseConditionToFailed(cb, &vd.Status.Phase, fmt.Errorf("unexpected error: %w", err)) + return reconcile.Result{}, fmt.Errorf("fauled to get importer tolerations: %w", err) + } + + err = ds.diskService.Start(ctx, diskSize, sc, source, vd, supgen, service.WithNodePlacement(nodePlacement)) if updated, err := setPhaseConditionFromStorageError(err, vd, cb); err != nil || updated { return reconcile.Result{}, err } @@ -174,6 +182,11 @@ func (ds ObjectRefVirtualImageDVCR) Sync(ctx context.Context, vd *virtv2.Virtual default: log.Info("Provisioning to PVC is in progress", "dvProgress", dv.Status.Progress, "dvPhase", dv.Status.Phase, "pvcPhase", pvc.Status.Phase) + err = ds.diskService.CheckProvisioning(ctx, pvc) + if err != nil { + return reconcile.Result{}, setPhaseConditionFromProvisioningError(ctx, err, cb, vd, dv, ds.diskService, ds.client) + } + vd.Status.Progress = ds.diskService.GetProgress(dv, vd.Status.Progress, service.NewScaleOption(0, 100)) vd.Status.Capacity = ds.diskService.GetCapacity(pvc) vd.Status.Target.PersistentVolumeClaim = dv.Status.ClaimName diff --git a/images/virtualization-artifact/pkg/controller/vd/internal/source/object_ref_vi_pvc.go b/images/virtualization-artifact/pkg/controller/vd/internal/source/object_ref_vi_pvc.go index 32d79b7e6..c679145cd 100644 --- a/images/virtualization-artifact/pkg/controller/vd/internal/source/object_ref_vi_pvc.go +++ b/images/virtualization-artifact/pkg/controller/vd/internal/source/object_ref_vi_pvc.go @@ -24,12 +24,14 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/deckhouse/virtualization-controller/pkg/common/annotations" "github.com/deckhouse/virtualization-controller/pkg/common/imageformat" "github.com/deckhouse/virtualization-controller/pkg/common/object" "github.com/deckhouse/virtualization-controller/pkg/common/pointer" + "github.com/deckhouse/virtualization-controller/pkg/common/provisioner" "github.com/deckhouse/virtualization-controller/pkg/controller/conditions" "github.com/deckhouse/virtualization-controller/pkg/controller/service" "github.com/deckhouse/virtualization-controller/pkg/controller/supplements" @@ -41,12 +43,18 @@ import ( type ObjectRefVirtualImagePVC struct { diskService *service.DiskService storageClassService *service.VirtualDiskStorageClassService + client client.Client } -func NewObjectRefVirtualImagePVC(diskService *service.DiskService, storageClassService *service.VirtualDiskStorageClassService) *ObjectRefVirtualImagePVC { +func NewObjectRefVirtualImagePVC( + diskService *service.DiskService, + storageClassService *service.VirtualDiskStorageClassService, + client client.Client, +) *ObjectRefVirtualImagePVC { return &ObjectRefVirtualImagePVC{ diskService: diskService, storageClassService: storageClassService, + client: client, } } @@ -134,7 +142,14 @@ func (ds ObjectRefVirtualImagePVC) Sync(ctx context.Context, vd *virtv2.VirtualD }, } - err = ds.diskService.Start(ctx, size, sc, source, vd, supgen) + var nodePlacement *provisioner.NodePlacement + nodePlacement, err = getNodePlacement(ctx, ds.client, vd) + if err != nil { + setPhaseConditionToFailed(cb, &vd.Status.Phase, fmt.Errorf("unexpected error: %w", err)) + return reconcile.Result{}, fmt.Errorf("fauled to get importer tolerations: %w", err) + } + + err = ds.diskService.Start(ctx, size, sc, source, vd, supgen, service.WithNodePlacement(nodePlacement)) if updated, err := setPhaseConditionFromStorageError(err, vd, cb); err != nil || updated { return reconcile.Result{}, err } @@ -168,6 +183,11 @@ func (ds ObjectRefVirtualImagePVC) Sync(ctx context.Context, vd *virtv2.VirtualD default: log.Info("Provisioning to PVC is in progress", "dvProgress", dv.Status.Progress, "dvPhase", dv.Status.Phase, "pvcPhase", pvc.Status.Phase) + err = ds.diskService.CheckProvisioning(ctx, pvc) + if err != nil { + return reconcile.Result{}, setPhaseConditionFromProvisioningError(ctx, err, cb, vd, dv, ds.diskService, ds.client) + } + vd.Status.Progress = ds.diskService.GetProgress(dv, vd.Status.Progress, service.NewScaleOption(0, 100)) vd.Status.Target.PersistentVolumeClaim = dv.Status.ClaimName diff --git a/images/virtualization-artifact/pkg/controller/vd/internal/source/registry.go b/images/virtualization-artifact/pkg/controller/vd/internal/source/registry.go index 66009ade8..9796fe9be 100644 --- a/images/virtualization-artifact/pkg/controller/vd/internal/source/registry.go +++ b/images/virtualization-artifact/pkg/controller/vd/internal/source/registry.go @@ -35,6 +35,7 @@ import ( "github.com/deckhouse/virtualization-controller/pkg/common/imageformat" "github.com/deckhouse/virtualization-controller/pkg/common/object" podutil "github.com/deckhouse/virtualization-controller/pkg/common/pod" + "github.com/deckhouse/virtualization-controller/pkg/common/provisioner" "github.com/deckhouse/virtualization-controller/pkg/controller/conditions" "github.com/deckhouse/virtualization-controller/pkg/controller/importer" "github.com/deckhouse/virtualization-controller/pkg/controller/service" @@ -133,7 +134,15 @@ func (ds RegistryDataSource) Sync(ctx context.Context, vd *virtv2.VirtualDisk) ( vd.Status.Progress = "0%" envSettings := ds.getEnvSettings(vd, supgen) - err = ds.importerService.Start(ctx, envSettings, vd, supgen, datasource.NewCABundleForVMD(vd.GetNamespace(), vd.Spec.DataSource)) + + var nodePlacement *provisioner.NodePlacement + nodePlacement, err = getNodePlacement(ctx, ds.client, vd) + if err != nil { + setPhaseConditionToFailed(cb, &vd.Status.Phase, fmt.Errorf("unexpected error: %w", err)) + return reconcile.Result{}, fmt.Errorf("fauled to get importer tolerations: %w", err) + } + + err = ds.importerService.Start(ctx, envSettings, vd, supgen, datasource.NewCABundleForVMD(vd.GetNamespace(), vd.Spec.DataSource), service.WithNodePlacement(nodePlacement)) switch { case err == nil: // OK. @@ -156,24 +165,7 @@ func (ds RegistryDataSource) Sync(ctx context.Context, vd *virtv2.VirtualDisk) ( err = ds.statService.CheckPod(pod) if err != nil { - vd.Status.Phase = virtv2.DiskFailed - - switch { - case errors.Is(err, service.ErrNotInitialized), errors.Is(err, service.ErrNotScheduled): - cb. - Status(metav1.ConditionFalse). - Reason(vdcondition.ProvisioningNotStarted). - Message(service.CapitalizeFirstLetter(err.Error() + ".")) - return reconcile.Result{}, nil - case errors.Is(err, service.ErrProvisioningFailed): - cb. - Status(metav1.ConditionFalse). - Reason(vdcondition.ProvisioningFailed). - Message(service.CapitalizeFirstLetter(err.Error() + ".")) - return reconcile.Result{}, nil - default: - return reconcile.Result{}, err - } + return reconcile.Result{}, setPhaseConditionFromPodError(ctx, err, pod, vd, cb, ds.client) } vd.Status.Phase = virtv2.DiskProvisioning @@ -228,7 +220,14 @@ func (ds RegistryDataSource) Sync(ctx context.Context, vd *virtv2.VirtualDisk) ( source := ds.getSource(supgen, ds.statService.GetDVCRImageName(pod)) - err = ds.diskService.Start(ctx, diskSize, sc, source, vd, supgen) + var nodePlacement *provisioner.NodePlacement + nodePlacement, err = getNodePlacement(ctx, ds.client, vd) + if err != nil { + setPhaseConditionToFailed(cb, &vd.Status.Phase, fmt.Errorf("unexpected error: %w", err)) + return reconcile.Result{}, fmt.Errorf("fauled to get importer tolerations: %w", err) + } + + err = ds.diskService.Start(ctx, diskSize, sc, source, vd, supgen, service.WithNodePlacement(nodePlacement)) if updated, err := setPhaseConditionFromStorageError(err, vd, cb); err != nil || updated { return reconcile.Result{}, err } @@ -261,6 +260,11 @@ func (ds RegistryDataSource) Sync(ctx context.Context, vd *virtv2.VirtualDisk) ( default: log.Info("Provisioning to PVC is in progress", "dvProgress", dv.Status.Progress, "dvPhase", dv.Status.Phase, "pvcPhase", pvc.Status.Phase) + err = ds.diskService.CheckProvisioning(ctx, pvc) + if err != nil { + return reconcile.Result{}, setPhaseConditionFromProvisioningError(ctx, err, cb, vd, dv, ds.diskService, ds.client) + } + vd.Status.Progress = ds.diskService.GetProgress(dv, vd.Status.Progress, service.NewScaleOption(50, 100)) vd.Status.Capacity = ds.diskService.GetCapacity(pvc) vd.Status.Target.PersistentVolumeClaim = dv.Status.ClaimName diff --git a/images/virtualization-artifact/pkg/controller/vd/internal/source/sources.go b/images/virtualization-artifact/pkg/controller/vd/internal/source/sources.go index 3df01edaf..0e9b822af 100644 --- a/images/virtualization-artifact/pkg/controller/vd/internal/source/sources.go +++ b/images/virtualization-artifact/pkg/controller/vd/internal/source/sources.go @@ -25,10 +25,14 @@ import ( corev1 "k8s.io/api/core/v1" storev1 "k8s.io/api/storage/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "github.com/deckhouse/virtualization-controller/pkg/common/annotations" "github.com/deckhouse/virtualization-controller/pkg/common/object" + "github.com/deckhouse/virtualization-controller/pkg/common/provisioner" "github.com/deckhouse/virtualization-controller/pkg/controller/conditions" "github.com/deckhouse/virtualization-controller/pkg/controller/service" "github.com/deckhouse/virtualization-controller/pkg/controller/supplements" @@ -87,11 +91,11 @@ func (s Sources) CleanUp(ctx context.Context, vd *virtv2.VirtualDisk) (bool, err return requeue, nil } -type Cleaner interface { +type SupplementsCleaner interface { CleanUpSupplements(ctx context.Context, vd *virtv2.VirtualDisk) (reconcile.Result, error) } -func CleanUpSupplements(ctx context.Context, vd *virtv2.VirtualDisk, c Cleaner) (reconcile.Result, error) { +func CleanUpSupplements(ctx context.Context, vd *virtv2.VirtualDisk, c SupplementsCleaner) (reconcile.Result, error) { if object.ShouldCleanupSubResources(vd) { return c.CleanUpSupplements(ctx, vd) } @@ -234,6 +238,161 @@ func setPhaseConditionForPVCProvisioningDisk( } } +func setPhaseConditionFromPodError( + ctx context.Context, + podErr error, + pod *corev1.Pod, + vd *virtv2.VirtualDisk, + cb *conditions.ConditionBuilder, + c client.Client, +) error { + switch { + case errors.Is(podErr, service.ErrNotInitialized): + vd.Status.Phase = virtv2.DiskFailed + cb. + Status(metav1.ConditionFalse). + Reason(vdcondition.ProvisioningNotStarted). + Message(service.CapitalizeFirstLetter(podErr.Error()) + ".") + return nil + case errors.Is(podErr, service.ErrNotScheduled): + vd.Status.Phase = virtv2.DiskPending + + nodePlacement, err := getNodePlacement(ctx, c, vd) + if err != nil { + setPhaseConditionToFailed(cb, &vd.Status.Phase, fmt.Errorf("unexpected error: %w", err)) + return fmt.Errorf("fauled to get importer tolerations: %w", err) + } + + var isChanged bool + isChanged, err = provisioner.IsNodePlacementChanged(nodePlacement, pod) + if err != nil { + setPhaseConditionToFailed(cb, &vd.Status.Phase, fmt.Errorf("unexpected error: %w", err)) + return err + } + + if isChanged { + // log.Info("The node placement has changed for importer pod: recreate the provisioner") + err = c.Delete(ctx, pod) + if err != nil { + setPhaseConditionToFailed(cb, &vd.Status.Phase, fmt.Errorf("unexpected error: %w", err)) + return err + } + + cb. + Status(metav1.ConditionFalse). + Reason(vdcondition.ProvisioningNotStarted). + Message("Provisioner recreation due to a changes in the virtual machine tolerations.") + } else { + cb. + Status(metav1.ConditionFalse). + Reason(vdcondition.ProvisioningNotStarted). + Message(service.CapitalizeFirstLetter(podErr.Error()) + ".") + } + + return nil + case errors.Is(podErr, service.ErrProvisioningFailed): + setPhaseConditionToFailed(cb, &vd.Status.Phase, podErr) + return nil + default: + setPhaseConditionToFailed(cb, &vd.Status.Phase, fmt.Errorf("unexpected error: %w", podErr)) + return podErr + } +} + +type Cleaner interface { + CleanUp(ctx context.Context, sup *supplements.Generator) (bool, error) +} + +func setPhaseConditionFromProvisioningError( + ctx context.Context, + provisioningErr error, + cb *conditions.ConditionBuilder, + vd *virtv2.VirtualDisk, + dv *cdiv1.DataVolume, + cleaner Cleaner, + c client.Client, +) error { + switch { + case errors.Is(provisioningErr, service.ErrDataVolumeProvisionerUnschedulable): + nodePlacement, err := getNodePlacement(ctx, c, vd) + if err != nil { + err = errors.Join(provisioningErr, err) + setPhaseConditionToFailed(cb, &vd.Status.Phase, err) + return err + } + + isChanged, err := provisioner.IsNodePlacementChanged(nodePlacement, dv) + if err != nil { + err = errors.Join(provisioningErr, err) + setPhaseConditionToFailed(cb, &vd.Status.Phase, err) + return err + } + + vd.Status.Phase = virtv2.DiskProvisioning + + if isChanged { + supgen := supplements.NewGenerator(annotations.VDShortName, vd.Name, vd.Namespace, vd.UID) + + _, err = cleaner.CleanUp(ctx, supgen) + if err != nil { + if err != nil { + err = errors.Join(provisioningErr, err) + setPhaseConditionToFailed(cb, &vd.Status.Phase, err) + return err + } + } + + cb. + Status(metav1.ConditionFalse). + Reason(vdcondition.Provisioning). + Message("PVC provisioner recreation due to a changes in the virtual machine tolerations.") + } else { + cb. + Status(metav1.ConditionFalse). + Reason(vdcondition.Provisioning). + Message("Trying to schedule the PVC provisioner.") + } + + return nil + default: + setPhaseConditionToFailed(cb, &vd.Status.Phase, provisioningErr) + return provisioningErr + } +} + +func getNodePlacement(ctx context.Context, c client.Client, vd *virtv2.VirtualDisk) (*provisioner.NodePlacement, error) { + if len(vd.Status.AttachedToVirtualMachines) != 1 { + return nil, nil + } + + vmKey := types.NamespacedName{Name: vd.Status.AttachedToVirtualMachines[0].Name, Namespace: vd.Namespace} + vm, err := object.FetchObject(ctx, vmKey, c, &virtv2.VirtualMachine{}) + if err != nil { + return nil, fmt.Errorf("unable to get the virtual machine %s: %w", vmKey, err) + } + + if vm == nil { + return nil, nil + } + + var nodePlacement provisioner.NodePlacement + nodePlacement.Tolerations = append(nodePlacement.Tolerations, vm.Spec.Tolerations...) + + vmClassKey := types.NamespacedName{Name: vm.Spec.VirtualMachineClassName} + vmClass, err := object.FetchObject(ctx, vmClassKey, c, &virtv2.VirtualMachineClass{}) + if err != nil { + return nil, fmt.Errorf("unable to get the virtual machine class %s: %w", vmClassKey, err) + } + + if vmClass == nil { + return &nodePlacement, nil + } + + nodePlacement.Tolerations = append(nodePlacement.Tolerations, vmClass.Spec.Tolerations...) + + return &nodePlacement, nil +} + const retryPeriod = 1 func setQuotaExceededPhaseCondition(cb *conditions.ConditionBuilder, phase *virtv2.DiskPhase, err error, creationTimestamp metav1.Time) reconcile.Result { diff --git a/images/virtualization-artifact/pkg/controller/vd/internal/source/upload.go b/images/virtualization-artifact/pkg/controller/vd/internal/source/upload.go index 3b5eb91d3..d1c2181c5 100644 --- a/images/virtualization-artifact/pkg/controller/vd/internal/source/upload.go +++ b/images/virtualization-artifact/pkg/controller/vd/internal/source/upload.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/deckhouse/virtualization-controller/pkg/common" @@ -33,6 +34,7 @@ import ( "github.com/deckhouse/virtualization-controller/pkg/common/imageformat" "github.com/deckhouse/virtualization-controller/pkg/common/object" podutil "github.com/deckhouse/virtualization-controller/pkg/common/pod" + "github.com/deckhouse/virtualization-controller/pkg/common/provisioner" "github.com/deckhouse/virtualization-controller/pkg/controller/conditions" "github.com/deckhouse/virtualization-controller/pkg/controller/service" "github.com/deckhouse/virtualization-controller/pkg/controller/supplements" @@ -51,6 +53,7 @@ type UploadDataSource struct { diskService *service.DiskService dvcrSettings *dvcr.Settings storageClassService *service.VirtualDiskStorageClassService + client client.Client } func NewUploadDataSource( @@ -59,6 +62,7 @@ func NewUploadDataSource( diskService *service.DiskService, dvcrSettings *dvcr.Settings, storageClassService *service.VirtualDiskStorageClassService, + client client.Client, ) *UploadDataSource { return &UploadDataSource{ statService: statService, @@ -66,6 +70,7 @@ func NewUploadDataSource( diskService: diskService, dvcrSettings: dvcrSettings, storageClassService: storageClassService, + client: client, } } @@ -136,7 +141,15 @@ func (ds UploadDataSource) Sync(ctx context.Context, vd *virtv2.VirtualDisk) (re vd.Status.Progress = "0%" envSettings := ds.getEnvSettings(vd, supgen) - err = ds.uploaderService.Start(ctx, envSettings, vd, supgen, datasource.NewCABundleForVMD(vd.GetNamespace(), vd.Spec.DataSource)) + + var nodePlacement *provisioner.NodePlacement + nodePlacement, err = getNodePlacement(ctx, ds.client, vd) + if err != nil { + setPhaseConditionToFailed(cb, &vd.Status.Phase, fmt.Errorf("unexpected error: %w", err)) + return reconcile.Result{}, fmt.Errorf("fauled to get importer tolerations: %w", err) + } + + err = ds.uploaderService.Start(ctx, envSettings, vd, supgen, datasource.NewCABundleForVMD(vd.GetNamespace(), vd.Spec.DataSource), service.WithNodePlacement(nodePlacement)) switch { case err == nil: // OK. @@ -157,24 +170,7 @@ func (ds UploadDataSource) Sync(ctx context.Context, vd *virtv2.VirtualDisk) (re case !podutil.IsPodComplete(pod): err = ds.statService.CheckPod(pod) if err != nil { - vd.Status.Phase = virtv2.DiskFailed - - switch { - case errors.Is(err, service.ErrNotInitialized), errors.Is(err, service.ErrNotScheduled): - cb. - Status(metav1.ConditionFalse). - Reason(vdcondition.ProvisioningNotStarted). - Message(service.CapitalizeFirstLetter(err.Error() + ".")) - return reconcile.Result{}, nil - case errors.Is(err, service.ErrProvisioningFailed): - cb. - Status(metav1.ConditionFalse). - Reason(vdcondition.ProvisioningFailed). - Message(service.CapitalizeFirstLetter(err.Error() + ".")) - return reconcile.Result{}, nil - default: - return reconcile.Result{}, err - } + return reconcile.Result{}, setPhaseConditionFromPodError(ctx, err, pod, vd, cb, ds.client) } if !ds.statService.IsUploadStarted(vd.GetUID(), pod) { @@ -295,6 +291,11 @@ func (ds UploadDataSource) Sync(ctx context.Context, vd *virtv2.VirtualDisk) (re default: log.Info("Provisioning to PVC is in progress", "dvProgress", dv.Status.Progress, "dvPhase", dv.Status.Phase, "pvcPhase", pvc.Status.Phase) + err = ds.diskService.CheckProvisioning(ctx, pvc) + if err != nil { + return reconcile.Result{}, setPhaseConditionFromProvisioningError(ctx, err, cb, vd, dv, ds.diskService, ds.client) + } + vd.Status.Progress = ds.diskService.GetProgress(dv, vd.Status.Progress, service.NewScaleOption(50, 100)) vd.Status.Capacity = ds.diskService.GetCapacity(pvc) vd.Status.Target.PersistentVolumeClaim = dv.Status.ClaimName diff --git a/images/virtualization-artifact/pkg/controller/vd/internal/watcher/pod_watcher.go b/images/virtualization-artifact/pkg/controller/vd/internal/watcher/pod_watcher.go new file mode 100644 index 000000000..9517b46da --- /dev/null +++ b/images/virtualization-artifact/pkg/controller/vd/internal/watcher/pod_watcher.go @@ -0,0 +1,82 @@ +/* +Copyright 2024 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watcher + +import ( + "fmt" + log "log/slog" + + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/source" + + "github.com/deckhouse/virtualization-controller/pkg/controller/conditions" + virtv2 "github.com/deckhouse/virtualization/api/core/v1alpha2" +) + +type PodWatcher struct { + logger *log.Logger + client client.Client +} + +func NewPodWatcher(client client.Client) *PodWatcher { + return &PodWatcher{ + logger: log.Default().With("watcher", "pod"), + client: client, + } +} + +func (w PodWatcher) Watch(mgr manager.Manager, ctr controller.Controller) error { + return ctr.Watch( + source.Kind(mgr.GetCache(), &corev1.Pod{}), + handler.EnqueueRequestForOwner( + mgr.GetScheme(), + mgr.GetRESTMapper(), + &virtv2.VirtualDisk{}, + handler.OnlyControllerOwner(), + ), + predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { return false }, + DeleteFunc: func(e event.DeleteEvent) bool { return true }, + UpdateFunc: w.filterUpdateEvents, + }, + ) +} + +func (w PodWatcher) filterUpdateEvents(e event.UpdateEvent) bool { + oldPod, ok := e.ObjectOld.(*corev1.Pod) + if !ok { + w.logger.Error(fmt.Sprintf("expected an old Pod but got a %T", e.ObjectOld)) + return false + } + + newPod, ok := e.ObjectNew.(*corev1.Pod) + if !ok { + w.logger.Error(fmt.Sprintf("expected a new Pod but got a %T", e.ObjectNew)) + return false + } + + oldPodScheduled, _ := conditions.GetPodCondition(corev1.PodScheduled, oldPod.Status.Conditions) + newPodScheduled, _ := conditions.GetPodCondition(corev1.PodScheduled, newPod.Status.Conditions) + + return oldPodScheduled.LastTransitionTime != newPodScheduled.LastTransitionTime +} diff --git a/images/virtualization-artifact/pkg/controller/vd/vd_controller.go b/images/virtualization-artifact/pkg/controller/vd/vd_controller.go index ab1a093a4..1fd20e58c 100644 --- a/images/virtualization-artifact/pkg/controller/vd/vd_controller.go +++ b/images/virtualization-artifact/pkg/controller/vd/vd_controller.go @@ -66,13 +66,13 @@ func NewController( disk := service.NewDiskService(mgr.GetClient(), dvcr, protection) scService := service.NewVirtualDiskStorageClassService(storageClassSettings) - blank := source.NewBlankDataSource(stat, disk, scService) + blank := source.NewBlankDataSource(stat, disk, scService, mgr.GetClient()) sources := source.NewSources() - sources.Set(virtv2.DataSourceTypeHTTP, source.NewHTTPDataSource(stat, importer, disk, dvcr, scService)) + sources.Set(virtv2.DataSourceTypeHTTP, source.NewHTTPDataSource(stat, importer, disk, dvcr, scService, mgr.GetClient())) sources.Set(virtv2.DataSourceTypeContainerImage, source.NewRegistryDataSource(stat, importer, disk, dvcr, mgr.GetClient(), scService)) sources.Set(virtv2.DataSourceTypeObjectRef, source.NewObjectRefDataSource(stat, disk, mgr.GetClient(), scService)) - sources.Set(virtv2.DataSourceTypeUpload, source.NewUploadDataSource(stat, uploader, disk, dvcr, scService)) + sources.Set(virtv2.DataSourceTypeUpload, source.NewUploadDataSource(stat, uploader, disk, dvcr, scService, mgr.GetClient())) reconciler := NewReconciler( mgr.GetClient(), diff --git a/images/virtualization-artifact/pkg/controller/vd/vd_reconciler.go b/images/virtualization-artifact/pkg/controller/vd/vd_reconciler.go index b5b181bf4..ba30bf178 100644 --- a/images/virtualization-artifact/pkg/controller/vd/vd_reconciler.go +++ b/images/virtualization-artifact/pkg/controller/vd/vd_reconciler.go @@ -42,6 +42,10 @@ import ( virtv2 "github.com/deckhouse/virtualization/api/core/v1alpha2" ) +type Watcher interface { + Watch(mgr manager.Manager, ctr controller.Controller) error +} + type Handler interface { Handle(ctx context.Context, vd *virtv2.VirtualDisk) (reconcile.Result, error) } @@ -140,7 +144,7 @@ func (r *Reconciler) SetupController(_ context.Context, mgr manager.Manager, ctr ), predicate.Funcs{ CreateFunc: func(e event.CreateEvent) bool { return false }, - DeleteFunc: func(e event.DeleteEvent) bool { return false }, + DeleteFunc: func(e event.DeleteEvent) bool { return true }, UpdateFunc: func(e event.UpdateEvent) bool { oldDV, ok := e.ObjectOld.(*cdiv1.DataVolume) if !ok { @@ -232,14 +236,15 @@ func (r *Reconciler) SetupController(_ context.Context, mgr manager.Manager, ctr return fmt.Errorf("error setting watch on CVIs: %w", err) } - w := watcher.NewVirtualDiskSnapshotWatcher(mgr.GetClient()) - if err := w.Watch(mgr, ctr); err != nil { - return fmt.Errorf("error setting watch on VDSnapshots: %w", err) - } - - storageClassReadyWatcher := watcher.NewStorageClassWatcher(mgr.GetClient()) - if err := storageClassReadyWatcher.Watch(mgr, ctr); err != nil { - return fmt.Errorf("error setting watch on StorageClass: %w", err) + for _, w := range []Watcher{ + watcher.NewVirtualDiskSnapshotWatcher(mgr.GetClient()), + watcher.NewStorageClassWatcher(mgr.GetClient()), + watcher.NewPodWatcher(mgr.GetClient()), + } { + err := w.Watch(mgr, ctr) + if err != nil { + return fmt.Errorf("error setting watcher: %w", err) + } } return nil diff --git a/images/virtualization-artifact/pkg/controller/vi/internal/source/interfaces.go b/images/virtualization-artifact/pkg/controller/vi/internal/source/interfaces.go index 046671c1a..fdea94294 100644 --- a/images/virtualization-artifact/pkg/controller/vi/internal/source/interfaces.go +++ b/images/virtualization-artifact/pkg/controller/vi/internal/source/interfaces.go @@ -35,7 +35,7 @@ import ( //go:generate moq -rm -out mock.go . Importer Uploader Stat Handler type Importer interface { - Start(ctx context.Context, settings *importer.Settings, obj service.ObjectKind, sup *supplements.Generator, caBundle *datasource.CABundle) error + Start(ctx context.Context, settings *importer.Settings, obj service.ObjectKind, sup *supplements.Generator, caBundle *datasource.CABundle, opts ...service.Option) error StartWithPodSetting(ctx context.Context, settings *importer.Settings, sup *supplements.Generator, caBundle *datasource.CABundle, podSettings *importer.PodSettings) error CleanUp(ctx context.Context, sup *supplements.Generator) (bool, error) CleanUpSupplements(ctx context.Context, sup *supplements.Generator) (bool, error) @@ -46,7 +46,7 @@ type Importer interface { } type Uploader interface { - Start(ctx context.Context, settings *uploader.Settings, obj service.ObjectKind, sup *supplements.Generator, caBundle *datasource.CABundle) error + Start(ctx context.Context, settings *uploader.Settings, obj service.ObjectKind, sup *supplements.Generator, caBundle *datasource.CABundle, opts ...service.Option) error CleanUp(ctx context.Context, sup *supplements.Generator) (bool, error) CleanUpSupplements(ctx context.Context, sup *supplements.Generator) (bool, error) GetPod(ctx context.Context, sup *supplements.Generator) (*corev1.Pod, error) diff --git a/images/virtualization-artifact/pkg/controller/vi/internal/source/mock.go b/images/virtualization-artifact/pkg/controller/vi/internal/source/mock.go index 3e5d84578..591e614c8 100644 --- a/images/virtualization-artifact/pkg/controller/vi/internal/source/mock.go +++ b/images/virtualization-artifact/pkg/controller/vi/internal/source/mock.go @@ -1,19 +1,3 @@ -/* -Copyright 2024 Flant JSC - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - // Code generated by moq; DO NOT EDIT. // github.com/matryer/moq @@ -60,7 +44,7 @@ var _ Importer = &ImporterMock{} // ProtectFunc: func(ctx context.Context, pod *corev1.Pod) error { // panic("mock out the Protect method") // }, -// StartFunc: func(ctx context.Context, settings *importer.Settings, obj service.ObjectKind, sup *supplements.Generator, caBundle *datasource.CABundle) error { +// StartFunc: func(ctx context.Context, settings *importer.Settings, obj service.ObjectKind, sup *supplements.Generator, caBundle *datasource.CABundle, opts ...service.Option) error { // panic("mock out the Start method") // }, // StartWithPodSettingFunc: func(ctx context.Context, settings *importer.Settings, sup *supplements.Generator, caBundle *datasource.CABundle, podSettings *importer.PodSettings) error { @@ -92,7 +76,7 @@ type ImporterMock struct { ProtectFunc func(ctx context.Context, pod *corev1.Pod) error // StartFunc mocks the Start method. - StartFunc func(ctx context.Context, settings *importer.Settings, obj service.ObjectKind, sup *supplements.Generator, caBundle *datasource.CABundle) error + StartFunc func(ctx context.Context, settings *importer.Settings, obj service.ObjectKind, sup *supplements.Generator, caBundle *datasource.CABundle, opts ...service.Option) error // StartWithPodSettingFunc mocks the StartWithPodSetting method. StartWithPodSettingFunc func(ctx context.Context, settings *importer.Settings, sup *supplements.Generator, caBundle *datasource.CABundle, podSettings *importer.PodSettings) error @@ -153,6 +137,8 @@ type ImporterMock struct { Sup *supplements.Generator // CaBundle is the caBundle argument value. CaBundle *datasource.CABundle + // Opts is the opts argument value. + Opts []service.Option } // StartWithPodSetting holds details about calls to the StartWithPodSetting method. StartWithPodSetting []struct { @@ -374,7 +360,7 @@ func (mock *ImporterMock) ProtectCalls() []struct { } // Start calls StartFunc. -func (mock *ImporterMock) Start(ctx context.Context, settings *importer.Settings, obj service.ObjectKind, sup *supplements.Generator, caBundle *datasource.CABundle) error { +func (mock *ImporterMock) Start(ctx context.Context, settings *importer.Settings, obj service.ObjectKind, sup *supplements.Generator, caBundle *datasource.CABundle, opts ...service.Option) error { if mock.StartFunc == nil { panic("ImporterMock.StartFunc: method is nil but Importer.Start was just called") } @@ -384,17 +370,19 @@ func (mock *ImporterMock) Start(ctx context.Context, settings *importer.Settings Obj service.ObjectKind Sup *supplements.Generator CaBundle *datasource.CABundle + Opts []service.Option }{ Ctx: ctx, Settings: settings, Obj: obj, Sup: sup, CaBundle: caBundle, + Opts: opts, } mock.lockStart.Lock() mock.calls.Start = append(mock.calls.Start, callInfo) mock.lockStart.Unlock() - return mock.StartFunc(ctx, settings, obj, sup, caBundle) + return mock.StartFunc(ctx, settings, obj, sup, caBundle, opts...) } // StartCalls gets all the calls that were made to Start. @@ -407,6 +395,7 @@ func (mock *ImporterMock) StartCalls() []struct { Obj service.ObjectKind Sup *supplements.Generator CaBundle *datasource.CABundle + Opts []service.Option } { var calls []struct { Ctx context.Context @@ -414,6 +403,7 @@ func (mock *ImporterMock) StartCalls() []struct { Obj service.ObjectKind Sup *supplements.Generator CaBundle *datasource.CABundle + Opts []service.Option } mock.lockStart.RLock() calls = mock.calls.Start @@ -539,7 +529,7 @@ var _ Uploader = &UploaderMock{} // ProtectFunc: func(ctx context.Context, pod *corev1.Pod, svc *corev1.Service, ing *netv1.Ingress) error { // panic("mock out the Protect method") // }, -// StartFunc: func(ctx context.Context, settings *uploader.Settings, obj service.ObjectKind, sup *supplements.Generator, caBundle *datasource.CABundle) error { +// StartFunc: func(ctx context.Context, settings *uploader.Settings, obj service.ObjectKind, sup *supplements.Generator, caBundle *datasource.CABundle, opts ...service.Option) error { // panic("mock out the Start method") // }, // UnprotectFunc: func(ctx context.Context, pod *corev1.Pod, svc *corev1.Service, ing *netv1.Ingress) error { @@ -577,7 +567,7 @@ type UploaderMock struct { ProtectFunc func(ctx context.Context, pod *corev1.Pod, svc *corev1.Service, ing *netv1.Ingress) error // StartFunc mocks the Start method. - StartFunc func(ctx context.Context, settings *uploader.Settings, obj service.ObjectKind, sup *supplements.Generator, caBundle *datasource.CABundle) error + StartFunc func(ctx context.Context, settings *uploader.Settings, obj service.ObjectKind, sup *supplements.Generator, caBundle *datasource.CABundle, opts ...service.Option) error // UnprotectFunc mocks the Unprotect method. UnprotectFunc func(ctx context.Context, pod *corev1.Pod, svc *corev1.Service, ing *netv1.Ingress) error @@ -656,6 +646,8 @@ type UploaderMock struct { Sup *supplements.Generator // CaBundle is the caBundle argument value. CaBundle *datasource.CABundle + // Opts is the opts argument value. + Opts []service.Option } // Unprotect holds details about calls to the Unprotect method. Unprotect []struct { @@ -978,7 +970,7 @@ func (mock *UploaderMock) ProtectCalls() []struct { } // Start calls StartFunc. -func (mock *UploaderMock) Start(ctx context.Context, settings *uploader.Settings, obj service.ObjectKind, sup *supplements.Generator, caBundle *datasource.CABundle) error { +func (mock *UploaderMock) Start(ctx context.Context, settings *uploader.Settings, obj service.ObjectKind, sup *supplements.Generator, caBundle *datasource.CABundle, opts ...service.Option) error { if mock.StartFunc == nil { panic("UploaderMock.StartFunc: method is nil but Uploader.Start was just called") } @@ -988,17 +980,19 @@ func (mock *UploaderMock) Start(ctx context.Context, settings *uploader.Settings Obj service.ObjectKind Sup *supplements.Generator CaBundle *datasource.CABundle + Opts []service.Option }{ Ctx: ctx, Settings: settings, Obj: obj, Sup: sup, CaBundle: caBundle, + Opts: opts, } mock.lockStart.Lock() mock.calls.Start = append(mock.calls.Start, callInfo) mock.lockStart.Unlock() - return mock.StartFunc(ctx, settings, obj, sup, caBundle) + return mock.StartFunc(ctx, settings, obj, sup, caBundle, opts...) } // StartCalls gets all the calls that were made to Start. @@ -1011,6 +1005,7 @@ func (mock *UploaderMock) StartCalls() []struct { Obj service.ObjectKind Sup *supplements.Generator CaBundle *datasource.CABundle + Opts []service.Option } { var calls []struct { Ctx context.Context @@ -1018,6 +1013,7 @@ func (mock *UploaderMock) StartCalls() []struct { Obj service.ObjectKind Sup *supplements.Generator CaBundle *datasource.CABundle + Opts []service.Option } mock.lockStart.RLock() calls = mock.calls.Start