diff --git a/mmv1/third_party/terraform/services/container/node_config.go.tmpl b/mmv1/third_party/terraform/services/container/node_config.go.tmpl index 6b54e3a49c95..1b38a8643bcb 100644 --- a/mmv1/third_party/terraform/services/container/node_config.go.tmpl +++ b/mmv1/third_party/terraform/services/container/node_config.go.tmpl @@ -2027,10 +2027,14 @@ func flattenHostMaintenancePolicy(c *container.HostMaintenancePolicy) []map[stri // node pool updates in `resource_container_cluster` func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Config, nodePoolInfo *NodePoolInformation, prefix, name string, timeout time.Duration) error { - // Nodepool write-lock will be acquired when update function is called. + // Cluster write-lock will be acquired when createOpF is called to make operation creations sequential within + // the cluster, and read-lock will be acquired when waitOpF is called to allow concurrent operation. + // This is to resolve the bottleneck of large number of operations being created at the same time. + clusterLockKey := nodePoolInfo.clusterLockKey() + // Nodepool write-lock will be acquired when calling creaetOpF and waitOpF. npLockKey := nodePoolInfo.nodePoolLockKey(name) - userAgent, err := tpgresource.GenerateUserAgentString(d, config.UserAgent) + userAgent, err := tpgresource.GenerateUserAgentString(d, config.UserAgent) if err != nil { return err } @@ -2049,17 +2053,15 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf }, } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2067,8 +2069,8 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] Updated logging_variant for node pool %s", name) @@ -2078,24 +2080,22 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf if d.HasChange(prefix + "node_config.0.containerd_config") { if _, ok := d.GetOk(prefix + "node_config.0.containerd_config"); ok { req := &container.UpdateNodePoolRequest{ - Name: name, + Name: name, ContainerdConfig: expandContainerdConfig(d.Get(prefix + "node_config.0.containerd_config")), } if req.ContainerdConfig == nil { req.ContainerdConfig = &container.ContainerdConfig{} req.ForceSendFields = []string{"ContainerdConfig"} } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2103,8 +2103,8 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] Updated containerd_config for node pool %s", name) @@ -2132,17 +2132,15 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf req.StoragePools = storagePools } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2150,7 +2148,7 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { return err } log.Printf("[INFO] Updated disk disk_size_gb/disk_type/machine_type/storage_pools for Node Pool %s", d.Id()) @@ -2188,17 +2186,15 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf req.Taints = ntaints } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2206,8 +2202,8 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] Updated taints for Node Pool %s", d.Id()) } @@ -2240,17 +2236,15 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf req.Tags = ntags } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2258,8 +2252,8 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] Updated tags for node pool %s", name) } @@ -2282,17 +2276,15 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf req.ResourceManagerTags = rmTags } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2300,7 +2292,7 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { return err } log.Printf("[INFO] Updated resource manager tags for node pool %s", name) @@ -2318,17 +2310,15 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf } } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2337,8 +2327,8 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf } // Call update serially. - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] Updated resource labels for node pool %s", name) @@ -2356,17 +2346,15 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf } } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2375,8 +2363,8 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf } // Call update serially. - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] Updated labels for node pool %s", name) @@ -2390,25 +2378,23 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf }, } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.Update(nodePoolInfo.parent(), req) if config.UserProjectOverride { clusterUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterUpdateCall.Do() - if err != nil { - return err - } + return clusterUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool", userAgent, timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] Updated image type in Node Pool %s", d.Id()) } @@ -2423,18 +2409,15 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf req.WorkloadMetadataConfig = &container.WorkloadMetadataConfig{} req.ForceSendFields = []string{"WorkloadMetadataConfig"} } - updateF := func() error { - clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name),req) + createOpF := func() (*container.Operation, error) { + clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2442,8 +2425,8 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] Updated workload_metadata_config for node pool %s", name) } @@ -2456,17 +2439,15 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf Enabled: gcfsEnabled, }, } - updateF := func() error { - clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name),req) + createOpF := func() (*container.Operation, error) { + clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2474,8 +2455,8 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] Updated gcfs_config for node pool %s", name) @@ -2491,17 +2472,15 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf req.KubeletConfig = &container.NodeKubeletConfig{} req.ForceSendFields = []string{"KubeletConfig"} } - updateF := func() error { - clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name),req) + createOpF := func() (*container.Operation, error) { + clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2509,8 +2488,8 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] Updated kubelet_config for node pool %s", name) @@ -2525,17 +2504,15 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf req.LinuxNodeConfig = &container.LinuxNodeConfig{} req.ForceSendFields = []string{"LinuxNodeConfig"} } - updateF := func() error { - clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name),req) + createOpF := func() (*container.Operation, error) { + clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2543,8 +2520,8 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] Updated linux_node_config for node pool %s", name) @@ -2556,21 +2533,19 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf } if v, ok := d.GetOk(prefix + "node_config.0.fast_socket"); ok { fastSocket := v.([]interface{})[0].(map[string]interface{}) - req.FastSocket = &container.FastSocket{ + req.FastSocket = &container.FastSocket{ Enabled: fastSocket["enabled"].(bool), } } - updateF := func() error { - clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name),req) + createOpF := func() (*container.Operation, error) { + clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -2578,8 +2553,8 @@ func nodePoolNodeConfigUpdate(d *schema.ResourceData, config *transport_tpg.Conf timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] Updated fast_socket for node pool %s", name) diff --git a/mmv1/third_party/terraform/services/container/resource_container_node_pool.go.tmpl b/mmv1/third_party/terraform/services/container/resource_container_node_pool.go.tmpl index ccda8717798e..dd2cd3a64db1 100644 --- a/mmv1/third_party/terraform/services/container/resource_container_node_pool.go.tmpl +++ b/mmv1/third_party/terraform/services/container/resource_container_node_pool.go.tmpl @@ -568,11 +568,6 @@ func resourceContainerNodePoolCreate(d *schema.ResourceData, meta interface{}) e return err } - // Acquire read-lock on cluster. - clusterLockKey := nodePoolInfo.clusterLockKey() - transport_tpg.MutexStore.RLock(clusterLockKey) - defer transport_tpg.MutexStore.RUnlock(clusterLockKey) - // Acquire write-lock on nodepool. npLockKey := nodePoolInfo.nodePoolLockKey(nodePool.Name) transport_tpg.MutexStore.Lock(npLockKey) @@ -585,6 +580,9 @@ func resourceContainerNodePoolCreate(d *schema.ResourceData, meta interface{}) e timeout := d.Timeout(schema.TimeoutCreate) startTime := time.Now() + clusterLockKey := nodePoolInfo.clusterLockKey() + transport_tpg.MutexStore.RLock(clusterLockKey) + // we attempt to prefetch the node pool to make sure it doesn't exist before creation var id = fmt.Sprintf("projects/%s/locations/%s/clusters/%s/nodePools/%s", nodePoolInfo.project, nodePoolInfo.location, nodePoolInfo.cluster, nodePool.Name) name := getNodePoolName(id) @@ -599,11 +597,16 @@ func resourceContainerNodePoolCreate(d *schema.ResourceData, meta interface{}) e // refreshed on the next call to apply. d.SetId(id) } else if err == nil { + transport_tpg.MutexStore.RUnlock(clusterLockKey) return fmt.Errorf("resource - %s - already exists", id) } + transport_tpg.MutexStore.RUnlock(clusterLockKey) var operation *container.Operation err = retry.Retry(timeout, func() *retry.RetryError { + transport_tpg.MutexStore.Lock(clusterLockKey) + defer transport_tpg.MutexStore.Unlock(clusterLockKey) + clusterNodePoolsCreateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Create(nodePoolInfo.parent(), req) if config.UserProjectOverride { clusterNodePoolsCreateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) @@ -622,6 +625,8 @@ func resourceContainerNodePoolCreate(d *schema.ResourceData, meta interface{}) e } return nil }) + transport_tpg.MutexStore.RLock(clusterLockKey) + defer transport_tpg.MutexStore.RUnlock(clusterLockKey) if err != nil { return fmt.Errorf("error creating NodePool: %s", err) } @@ -796,10 +801,7 @@ func resourceContainerNodePoolDelete(d *schema.ResourceData, meta interface{}) e } } - // Acquire read-lock on cluster. clusterLockKey := nodePoolInfo.clusterLockKey() - transport_tpg.MutexStore.RLock(clusterLockKey) - defer transport_tpg.MutexStore.RUnlock(clusterLockKey) // Acquire write-lock on nodepool. npLockKey := nodePoolInfo.nodePoolLockKey(name) @@ -811,6 +813,8 @@ func resourceContainerNodePoolDelete(d *schema.ResourceData, meta interface{}) e var operation *container.Operation err = retry.Retry(timeout, func() *retry.RetryError { + transport_tpg.MutexStore.Lock(clusterLockKey) + defer transport_tpg.MutexStore.Unlock(clusterLockKey) clusterNodePoolsDeleteCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Delete(nodePoolInfo.fullyQualifiedName(name)) if config.UserProjectOverride { clusterNodePoolsDeleteCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) @@ -830,6 +834,8 @@ func resourceContainerNodePoolDelete(d *schema.ResourceData, meta interface{}) e return nil }) + transport_tpg.MutexStore.RLock(clusterLockKey) + defer transport_tpg.MutexStore.RUnlock(clusterLockKey) if err != nil { return fmt.Errorf("Error deleting NodePool: %s", err) @@ -1346,22 +1352,20 @@ func expandNodeNetworkConfig(v interface{}) *container.NodeNetworkConfig { return nnc } - func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *NodePoolInformation, prefix string, timeout time.Duration) error { config := meta.(*transport_tpg.Config) name := d.Get(prefix + "name").(string) - userAgent, err := tpgresource.GenerateUserAgentString(d, config.UserAgent) + userAgent, err := tpgresource.GenerateUserAgentString(d, config.UserAgent) if err != nil { return err } - // Acquire read-lock on cluster. + // Cluster write-lock will be acquired when createOpF is called, and read-lock will be acquired when waitOpF is + // called. clusterLockKey := nodePoolInfo.clusterLockKey() - transport_tpg.MutexStore.RLock(clusterLockKey) - defer transport_tpg.MutexStore.RUnlock(clusterLockKey) - // Nodepool write-lock will be acquired when update function is called. + // Nodepool write-lock will be acquired when calling createOpF and waitOpF. npLockKey := nodePoolInfo.nodePoolLockKey(name) if d.HasChange(prefix + "autoscaling") { @@ -1389,25 +1393,23 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node Update: update, } - updateF := func() error { + createOpF := func() (*container.Operation, error) { clusterUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.Update(nodePoolInfo.parent(), req) if config.UserProjectOverride { clusterUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterUpdateCall.Do() - if err != nil { - return err - } + return clusterUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool", userAgent, timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] Updated autoscaling in Node Pool %s", d.Id()) } @@ -1421,25 +1423,22 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node req := &container.SetNodePoolSizeRequest{ NodeCount: newSize, } - updateF := func() error { - clusterNodePoolsSetSizeCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.SetSize(nodePoolInfo.fullyQualifiedName(name),req) + createOpF := func() (*container.Operation, error) { + clusterNodePoolsSetSizeCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.SetSize(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsSetSizeCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsSetSizeCall.Do() - - if err != nil { - return err - } + return clusterNodePoolsSetSizeCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool size", userAgent, timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] GKE node pool %s size has been updated to %d", name, newSize) } @@ -1456,25 +1455,22 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node Management: management, } - updateF := func() error { - clusterNodePoolsSetManagementCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.SetManagement(nodePoolInfo.fullyQualifiedName(name),req) + createOpF := func() (*container.Operation, error) { + clusterNodePoolsSetManagementCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.SetManagement(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsSetManagementCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsSetManagementCall.Do() - - if err != nil { - return err - } + return clusterNodePoolsSetManagementCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool management", userAgent, timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] Updated management in Node Pool %s", name) } @@ -1484,24 +1480,21 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node NodePoolId: name, NodeVersion: d.Get(prefix + "version").(string), } - updateF := func() error { - clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name),req) + createOpF := func() (*container.Operation, error) { + clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool version", userAgent, timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] Updated version in Node Pool %s", name) } @@ -1510,23 +1503,20 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node req := &container.UpdateNodePoolRequest{ Locations: tpgresource.ConvertStringSet(d.Get(prefix + "node_locations").(*schema.Set)), } - updateF := func() error { - clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name),req) + createOpF := func() (*container.Operation, error) { + clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool node locations", userAgent, timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] Updated node locations in Node Pool %s", name) } @@ -1574,7 +1564,7 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node if v, ok := blueGreenSettingsConfig["standard_rollout_policy"]; ok && len(v.([]interface{})) > 0 { standardRolloutPolicy := &container.StandardRolloutPolicy{} - if standardRolloutPolicyConfig, ok := v.([]interface{})[0].(map[string]interface{}); ok { + if standardRolloutPolicyConfig, ok := v.([]interface{})[0].(map[string]interface{}); ok { standardRolloutPolicy.BatchSoakDuration = standardRolloutPolicyConfig["batch_soak_duration"].(string) if v, ok := standardRolloutPolicyConfig["batch_node_count"]; ok { standardRolloutPolicy.BatchNodeCount = int64(v.(int)) @@ -1591,44 +1581,38 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node req := &container.UpdateNodePoolRequest{ UpgradeSettings: upgradeSettings, } - updateF := func() error { - clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name),req) + createOpF := func() (*container.Operation, error) { + clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool upgrade settings", userAgent, timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] Updated upgrade settings in Node Pool %s", name) } if d.HasChange(prefix + "network_config") { - if d.HasChange(prefix + "network_config.0.enable_private_nodes") || d.HasChange(prefix + "network_config.0.network_performance_config") { + if d.HasChange(prefix+"network_config.0.enable_private_nodes") || d.HasChange(prefix+"network_config.0.network_performance_config") { req := &container.UpdateNodePoolRequest{ - NodePoolId: name, + NodePoolId: name, NodeNetworkConfig: expandNodeNetworkConfig(d.Get(prefix + "network_config")), } - updateF := func() error { - clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name),req) + createOpF := func() (*container.Operation, error) { + clusterNodePoolsUpdateCall := config.NewContainerClient(userAgent).Projects.Locations.Clusters.NodePools.Update(nodePoolInfo.fullyQualifiedName(name), req) if config.UserProjectOverride { clusterNodePoolsUpdateCall.Header().Add("X-Goog-User-Project", nodePoolInfo.project) } - op, err := clusterNodePoolsUpdateCall.Do() - - if err != nil { - return err - } + return clusterNodePoolsUpdateCall.Do() + } - // Wait until it's updated + waitOpF := func(op *container.Operation) error { return ContainerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, @@ -1636,8 +1620,8 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node timeout) } - if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil { - return err + if err := retryWhileIncompatibleOperation(timeout, npLockKey, clusterLockKey, createOpF, waitOpF); err != nil { + return err } log.Printf("[INFO] Updated network_config for node pool %s", name) @@ -1688,15 +1672,27 @@ func containerNodePoolAwaitRestingState(config *transport_tpg.Config, name, proj return state, err } -// Retries an operation while the canonical error code is FAILED_PRECONDTION -// or RESOURCE_EXHAUSTED which indicates there is an incompatible operation -// already running on the cluster or there are the number of allowed -// concurrent operations running on the cluster. These errors can be safely -// retried until the incompatible operation completes, and the newly -// requested operation can begin. -func retryWhileIncompatibleOperation(timeout time.Duration, lockKey string, f func() error) error { +// Retries an operation while the canonical error code is FAILED_PRECONDTION or RESOURCE_EXHAUSTED which indicates +// there is an incompatible operation already running on the cluster or there are the number of allowed concurrent +// operations running on the cluster. These errors can be safely retried until the incompatible operation completes, +// and the newly requested operation can begin. +// The npLockKey is held throughout createOpFunc and waitOpFunc. +// The clusterLockKey write-lock is held during createOpFunc to make operation creations sequential within the cluster, +// and clusterLockKey read-lock is held during waitOpFunc to allow concurrency on a cluster. +func retryWhileIncompatibleOperation(timeout time.Duration, npLockKey string, clusterLockKey string, createOpFunc func() (*container.Operation, error), waitOpFunc func(*container.Operation) error) error { + f := func() error { + transport_tpg.MutexStore.Lock(clusterLockKey) + op, err := createOpFunc() + transport_tpg.MutexStore.Unlock(clusterLockKey) + if err != nil { + return err + } + transport_tpg.MutexStore.RLock(clusterLockKey) + defer transport_tpg.MutexStore.RUnlock(clusterLockKey) + return waitOpFunc(op) + } return retry.Retry(timeout, func() *retry.RetryError { - if err := transport_tpg.LockedCall(lockKey, f); err != nil { + if err := transport_tpg.LockedCall(npLockKey, f); err != nil { if tpgresource.IsFailedPreconditionError(err) || tpgresource.IsQuotaError(err) { return retry.RetryableError(err) }