Skip to content

Commit

Permalink
add support for pipelines controller statefulset ordinals
Browse files Browse the repository at this point in the history
  • Loading branch information
jkhelil committed Oct 1, 2024
1 parent 229dca3 commit fd9970c
Show file tree
Hide file tree
Showing 14 changed files with 241 additions and 4 deletions.
10 changes: 10 additions & 0 deletions docs/TektonPipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ spec:
threads-per-controller: 2
kube-api-qps: 5.0
kube-api-burst: 10
statefulset-ordinals: false
options:
disabled: false
configMaps: {}
Expand Down Expand Up @@ -263,6 +264,7 @@ spec:
threads-per-controller: 2
kube-api-qps: 5.0
kube-api-burst: 10
statefulset-ordinals: false
```
These fields are optional and there is no default values. If user passes them, operator will include most of fields into the deployment `tekton-pipelines-controller` under the container `tekton-pipelines-controller` as arguments(duplicate name? No, container and deployment has the same name), otherwise pipelines controller's default values will be considered. and `buckets` field is updated into `config-leader-election` config-map under the namespace `tekton-pipelines`.

Expand All @@ -275,6 +277,14 @@ A high level descriptions are given here. To get the detailed information please
* `threads-per-controller` - is the number of threads(aka worker) to use when processing the pipelines controller's workqueue, default value in pipelines controller is `2`
* `kube-api-qps` - QPS indicates the maximum QPS to the cluster master from the REST client, default value in pipeline controller is `5.0`
* `kube-api-burst` - maximum burst for throttle, default value in pipeline controller is `10`
* `statefulset-ordinals` - enables StatefulSet Ordinals mode for the Tekton Pipelines controller. This allows the Pipelines controller to be deployed as a StatefulSet with the correct configuration to ensure load balancing. The load is evenly distributed across replicas, and the number of buckets is enforced to match the number of replicas.

There are two mechanisms available for scaling Pipelines controllers horizontally:

- Using leader election, which allows for failover, but can result in hot-spotting.
- Using StatefulSet ordinals, which doesn't allow for failover, but guarantees keys are evenly spread across replicas.



> #### Note:
> * `kube-api-qps` and `kube-api-burst` will be multiplied by 2 in pipelines controller. To get the detailed information visit [Performance Configuration](https://tekton.dev/docs/pipelines/tekton-controller-performance-configuration/) guide
Expand Down
11 changes: 11 additions & 0 deletions pkg/apis/operator/v1alpha1/tektonpipeline_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,17 @@ func (p *Pipeline) setDefaults() {
p.EnableGitResolver = ptr.Bool(true)
}

// Statefulset Ordinals
// if StatefulSet Ordinals mode, buckets should be equal to replicas
if p.Performance.StatefulsetOrdinals == nil {
p.Performance.StatefulsetOrdinals = ptr.Bool(false)
} else if *p.Performance.StatefulsetOrdinals {
if p.Performance.Replicas != nil && *p.Performance.Replicas > 1 {
replicas := uint(*p.Performance.Replicas)
p.Performance.Buckets = &replicas
}
}

// run platform specific defaulting
if IsOpenShiftPlatform() {
p.openshiftDefaulting()
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/operator/v1alpha1/tektonpipeline_defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ func Test_SetDefaults_PipelineProperties(t *testing.T) {
EnableGitResolver: ptr.Bool(true),
EnableClusterResolver: ptr.Bool(true),
},
Performance: PipelinePerformanceProperties{
PipelinePerformanceStatefulsetOrdinalsConfig: PipelinePerformanceStatefulsetOrdinalsConfig{StatefulsetOrdinals: ptr.Bool(false)},
},
}

tp.SetDefaults(context.TODO())
Expand Down
8 changes: 8 additions & 0 deletions pkg/apis/operator/v1alpha1/tektonpipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ type PipelinePerformanceProperties struct {
// +optional
PipelinePerformanceLeaderElectionConfig `json:",inline"`
// +optional
PipelinePerformanceStatefulsetOrdinalsConfig `json:",inline"`
// +optional
PipelineDeploymentPerformanceArgs `json:",inline"`
// +optional
Replicas *int32 `json:"replicas,omitempty"`
Expand All @@ -201,6 +203,12 @@ type PipelinePerformanceLeaderElectionConfig struct {
Buckets *uint `json:"buckets,omitempty"`
}

// allow to configure pipelines controller ha mode to statefulset ordinals
type PipelinePerformanceStatefulsetOrdinalsConfig struct {
//if is true, enable StatefulsetOrdinals mode
StatefulsetOrdinals *bool `json:"statefulset-ordinals"`
}

// performance configurations to tune the performance of the pipeline controller
// these properties will be added/updated as arguments in pipeline controller deployment
// https://tekton.dev/docs/pipelines/tekton-controller-performance-configuration/
Expand Down
10 changes: 10 additions & 0 deletions pkg/apis/operator/v1alpha1/tektonpipeline_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,5 +116,15 @@ func (prof *PipelinePerformanceProperties) validate(path string) *apis.FieldErro
}
}

// check for StatefulsetOrdinals and Replicas
if prof.StatefulsetOrdinals != nil && *prof.StatefulsetOrdinals {
if prof.Replicas != nil {
replicas := uint(*prof.Replicas)
if *prof.Buckets != replicas {
errs = errs.Also(apis.ErrInvalidValue(*prof.Replicas, fmt.Sprintf("%s.replicas", path), "replicas must equal buckets for statefulset ordinals"))
}
}
}

return errs
}
28 changes: 28 additions & 0 deletions pkg/apis/operator/v1alpha1/tektonpipeline_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,13 @@ func TestTektonPipelinePerformancePropertiesValidate(t *testing.T) {
return &value
}

// return pointer value for replicas
getReplicas := func(value int32) *int32 {
return &value
}

statefulsetOrdinals := true

// validate buckets minimum range
tp.Spec.PipelineProperties.Performance = PipelinePerformanceProperties{}
tp.Spec.PipelineProperties.Performance.DisableHA = false
Expand Down Expand Up @@ -310,4 +317,25 @@ func TestTektonPipelinePerformancePropertiesValidate(t *testing.T) {
tp.Spec.PipelineProperties.Performance.Buckets = getBuckets(10)
errs = tp.Validate(context.TODO())
assert.Equal(t, "", errs.Error())

// validate buckets is equal to replicas when StatefulsetOrdinals is true
tp.Spec.PipelineProperties.Performance = PipelinePerformanceProperties{}
tp.Spec.PipelineProperties.Performance.DisableHA = false
bucketValue := uint(5)
tp.Spec.PipelineProperties.Performance.Buckets = getBuckets(bucketValue)
replicaValue := int32(5)
tp.Spec.PipelineProperties.Performance.Replicas = getReplicas(replicaValue)
tp.Spec.PipelineProperties.Performance.StatefulsetOrdinals = &statefulsetOrdinals
errs = tp.Validate(context.TODO())
assert.Equal(t, "", errs.Error())

// validate error when buckets is not equal to replica
tp.Spec.PipelineProperties.Performance = PipelinePerformanceProperties{}
tp.Spec.PipelineProperties.Performance.DisableHA = false
tp.Spec.PipelineProperties.Performance.StatefulsetOrdinals = &statefulsetOrdinals
bucketValue = uint(5)
tp.Spec.PipelineProperties.Performance.Buckets = getBuckets(bucketValue)
tp.Spec.PipelineProperties.Performance.Replicas = getReplicas(3)
errs = tp.Validate(context.TODO())
assert.Equal(t, "invalid value 3: spec.performance.replicas must equal buckets", errs.Error())
}
90 changes: 90 additions & 0 deletions pkg/reconciler/common/transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"knative.dev/pkg/logging"
Expand Down Expand Up @@ -58,6 +59,11 @@ const (

runAsNonRootValue = true
allowPrivilegedEscalationValue = false

tektonPipelinesControllerName = "tekton-pipelines-controller"
tektonPipelinesServiceName = "tekton-pipelines-controller"
tektonPipelinesControllerStatefulServiceName = "STATEFUL_SERVICE_NAME"
tektonPipelinesControllerStatefulControllerOrdinal = "STATEFUL_CONTROLLER_ORDINAL"
)

// transformers that are common to all components.
Expand Down Expand Up @@ -1013,3 +1019,87 @@ func AddSecretData(data map[string][]byte, annotations map[string]string) mf.Tra
return nil
}
}

// ConvertDeploymentToStatefulSet converts a Deployment tekton-pipelines-controller to a StatefulSet
func ConvertDeploymentToStatefulSet() mf.Transformer {
return func(u *unstructured.Unstructured) error {
if u.GetKind() != "Deployment" || u.GetName() != tektonPipelinesControllerName {
return nil
}

d := &appsv1.Deployment{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, d)
if err != nil {
return err
}

ss := &appsv1.StatefulSet{
TypeMeta: metav1.TypeMeta{
Kind: "StatefulSet",
APIVersion: appsv1.SchemeGroupVersion.Group + "/" + appsv1.SchemeGroupVersion.Version,
},
ObjectMeta: d.ObjectMeta,
Spec: appsv1.StatefulSetSpec{
Selector: d.Spec.Selector,
ServiceName: tektonPipelinesServiceName,
Template: d.Spec.Template,
Replicas: d.Spec.Replicas,
UpdateStrategy: appsv1.StatefulSetUpdateStrategy{
Type: appsv1.RollingUpdateStatefulSetStrategyType,
},
},
}

unstrObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(ss)
if err != nil {
return err
}

u.SetUnstructuredContent(unstrObj)

return nil
}
}

// AddStatefulEnvVars adds environment variables to tekton-pipelines-controller statefulset
func AddStatefulEnvVars() mf.Transformer {
return func(u *unstructured.Unstructured) error {
if u.GetKind() != "StatefulSet" || u.GetName() != tektonPipelinesControllerName {
return nil
}

ss := &appsv1.StatefulSet{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, ss)
if err != nil {
return err
}

newEnvVars := []corev1.EnvVar{
{
Name: tektonPipelinesControllerStatefulServiceName,
Value: tektonPipelinesServiceName,
},
{
Name: tektonPipelinesControllerStatefulControllerOrdinal,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
}

if len(ss.Spec.Template.Spec.Containers) > 0 {
ss.Spec.Template.Spec.Containers[0].Env = append(ss.Spec.Template.Spec.Containers[0].Env, newEnvVars...)
}

unstrObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(ss)
if err != nil {
return err
}

u.SetUnstructuredContent(unstrObj)

return nil
}
}
4 changes: 3 additions & 1 deletion pkg/reconciler/kubernetes/tektoninstallerset/client/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ func verifyMainInstallerSets(iSets []v1alpha1.TektonInstallerSet) error {
static = true
}
if strings.Contains(iSets[0].GetName(), InstallerSubTypeDeployment) ||
strings.Contains(iSets[1].GetName(), InstallerSubTypeDeployment) {
strings.Contains(iSets[1].GetName(), InstallerSubTypeDeployment) ||
strings.Contains(iSets[0].GetName(), InstallerSubTypeStatefulset) ||
strings.Contains(iSets[1].GetName(), InstallerSubTypeStatefulset) {
deployment = true
}
if !(static && deployment) {
Expand Down
33 changes: 32 additions & 1 deletion pkg/reconciler/kubernetes/tektoninstallerset/client/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ func (i *InstallerSetClient) CleanupMainSet(ctx context.Context) error {

// now delete all deployment installerSet
for _, is := range list.Items {
if strings.Contains(is.GetName(), InstallerSubTypeDeployment) {
if strings.Contains(is.GetName(), InstallerSubTypeDeployment) ||
strings.Contains(is.GetName(), InstallerSubTypeStatefulset) {
logger.Debugf("deleting main-deployment installer set: %s", is.GetName())
err = i.clientSet.Delete(ctx, is.GetName(), metav1.DeleteOptions{
PropagationPolicy: &deletePropagationPolicy,
Expand Down Expand Up @@ -125,3 +126,33 @@ func (i *InstallerSetClient) cleanup(ctx context.Context, isType string) error {
}
return nil
}

func (i *InstallerSetClient) CleanupSubTypeDeployment(ctx context.Context) error {
return i.cleanupSubType(ctx, InstallerTypeMain, InstallerSubTypeDeployment)
}

func (i *InstallerSetClient) cleanupSubType(ctx context.Context, isType string, isSubType string) error {
logger := logging.FromContext(ctx).With("kind", i.resourceKind, "type", isType)

list, err := i.clientSet.List(ctx, metav1.ListOptions{LabelSelector: i.getSetLabels(isType)})
if err != nil {
return err
}

if len(list.Items) != 1 {
logger.Errorf("found more than 1 installerSet for %s something fishy, cleaning up all", isType)
}

for _, is := range list.Items {
if strings.Contains(is.GetName(), isSubType) {
logger.Debugf("deleting %s installer set: %s", isType, is.GetName())
err = i.clientSet.Delete(ctx, is.GetName(), metav1.DeleteOptions{
PropagationPolicy: &deletePropagationPolicy,
})
if err != nil {
return fmt.Errorf("failed to delete %s set: %s", isType, is.GetName())
}
}
}
return nil
}
5 changes: 3 additions & 2 deletions pkg/reconciler/kubernetes/tektoninstallerset/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import (
)

const (
InstallerSubTypeStatic = "static"
InstallerSubTypeDeployment = "deployment"
InstallerSubTypeStatic = "static"
InstallerSubTypeDeployment = "deployment"
InstallerSubTypeStatefulset = "statefulset"

InstallerTypeMain = "main"
InstallerTypePre = "pre"
Expand Down
21 changes: 21 additions & 0 deletions pkg/reconciler/kubernetes/tektoninstallerset/client/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (i *InstallerSetClient) create(ctx context.Context, comp v1alpha1.TektonCom
func (i *InstallerSetClient) makeMainSets(ctx context.Context, comp v1alpha1.TektonComponent, manifest *mf.Manifest) ([]v1alpha1.TektonInstallerSet, error) {
staticManifest := manifest.Filter(mf.Not(mf.ByKind("Deployment")), mf.Not(mf.ByKind("Service")))
deploymentManifest := manifest.Filter(mf.Any(mf.ByKind("Deployment"), mf.ByKind("Service")))
statefulSetManifest := manifest.Filter(mf.Any(mf.ByKind("StatefulSet"), mf.ByKind("Service")))

kind := strings.ToLower(strings.TrimPrefix(i.resourceKind, "Tekton"))
staticName := fmt.Sprintf("%s-%s-%s-", kind, InstallerTypeMain, InstallerSubTypeStatic)
Expand All @@ -78,6 +79,7 @@ func (i *InstallerSetClient) makeMainSets(ctx context.Context, comp v1alpha1.Tek
}

deployName := fmt.Sprintf("%s-%s-%s-", kind, InstallerTypeMain, InstallerSubTypeDeployment)

deploymentIS, err := i.makeInstallerSet(ctx, comp, &deploymentManifest, deployName, InstallerTypeMain, nil)
if err != nil {
return nil, err
Expand All @@ -87,6 +89,25 @@ func (i *InstallerSetClient) makeMainSets(ctx context.Context, comp v1alpha1.Tek
if err != nil {
return nil, err
}

statefulSet := false
if pipeline, ok := comp.(*v1alpha1.TektonPipeline); ok {
statefulSet = pipeline.Spec.Performance.StatefulsetOrdinals != nil && *pipeline.Spec.Performance.StatefulsetOrdinals
}
if statefulSet {
stsName := fmt.Sprintf("%s-%s-%s-", kind, InstallerTypeMain, InstallerSubTypeStatefulset)
stsIS, err := i.makeInstallerSet(ctx, comp, &statefulSetManifest, stsName, InstallerTypeMain, nil)
if err != nil {
return nil, err
}

stsIS, err = i.clientSet.Create(ctx, stsIS, metav1.CreateOptions{})
if err != nil {
return nil, err
}
return []v1alpha1.TektonInstallerSet{*staticIS, *deploymentIS, *stsIS}, nil
}

return []v1alpha1.TektonInstallerSet{*staticIS, *deploymentIS}, nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/reconciler/kubernetes/tektonpipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func NewExtendedController(generator common.ExtensionGenerator) injection.Contro

c := &Reconciler{
kubeClientSet: kubeclient.Get(ctx),
operatorClientSet: operatorclient.Get(ctx),
extension: generator(ctx),
manifest: manifest,
pipelineVersion: pipelineVer,
Expand Down
Loading

0 comments on commit fd9970c

Please sign in to comment.